http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java
index 0000000,bdec887..0dbacfb
mode 000000,100644..100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java
@@@ -1,0 -1,41 +1,41 @@@
+ /**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.mob.mapreduce;
+ 
 -import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.KeyValue;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.mob.MobFileName;
+ import org.apache.hadoop.io.Text;
+ import org.apache.hadoop.mapreduce.Partitioner;
+ 
+ /**
+  * The partitioner for the sweep job.
+  * The key is a mob file name. We bucket by date.
+  */
+ @InterfaceAudience.Private
+ public class MobFilePathHashPartitioner extends Partitioner<Text, KeyValue> {
+ 
+   @Override
+   public int getPartition(Text fileName, KeyValue kv, int numPartitions) {
+     MobFileName mobFileName = MobFileName.create(fileName.toString());
+     String date = mobFileName.getDate();
+     int hash = date.hashCode();
+     return (hash & Integer.MAX_VALUE) % numPartitions;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
index 0000000,5da220f..3a06ad8
mode 000000,100644..100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
@@@ -1,0 -1,603 +1,603 @@@
+ /**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.mob.mapreduce;
+ 
+ import java.io.File;
+ import java.io.IOException;
+ import java.net.InetSocketAddress;
+ import java.util.ArrayList;
+ import java.util.List;
+ import java.util.PriorityQueue;
+ import java.util.Set;
+ import java.util.TreeSet;
+ import java.util.UUID;
+ 
+ import org.apache.commons.lang.StringUtils;
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
 -import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.CommonConfigurationKeys;
+ import org.apache.hadoop.fs.FSDataOutputStream;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hbase.Abortable;
+ import org.apache.hadoop.hbase.HColumnDescriptor;
+ import org.apache.hadoop.hbase.HConstants;
 -import org.apache.hadoop.hbase.KeyValue;
++import org.apache.hadoop.hbase.NoTagsKeyValue;
+ import org.apache.hadoop.hbase.ServerName;
+ import org.apache.hadoop.hbase.TableName;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.client.Scan;
+ import org.apache.hadoop.hbase.io.HFileLink;
+ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+ import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+ import org.apache.hadoop.hbase.master.TableLockManager;
+ import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
+ import org.apache.hadoop.hbase.mob.MobConstants;
+ import org.apache.hadoop.hbase.mob.MobUtils;
+ import org.apache.hadoop.hbase.regionserver.BloomType;
+ import org.apache.hadoop.hbase.regionserver.StoreFile;
+ import org.apache.hadoop.hbase.util.Bytes;
+ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+ import org.apache.hadoop.hbase.util.FSUtils;
+ import org.apache.hadoop.hbase.util.Strings;
+ import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+ import org.apache.hadoop.io.IOUtils;
+ import org.apache.hadoop.io.SequenceFile;
+ import org.apache.hadoop.io.SequenceFile.CompressionType;
+ import org.apache.hadoop.io.Text;
+ import org.apache.hadoop.io.Writable;
+ import org.apache.hadoop.io.serializer.JavaSerialization;
+ import org.apache.hadoop.io.serializer.WritableSerialization;
+ import org.apache.hadoop.mapreduce.Job;
+ import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+ import org.apache.hadoop.net.DNS;
+ import org.apache.hadoop.security.Credentials;
+ import org.apache.hadoop.security.UserGroupInformation;
+ import org.apache.zookeeper.KeeperException;
+ 
+ /**
+  * The sweep job.
+  * Run map reduce to merge the smaller mob files into bigger ones and cleans 
the unused ones.
+  */
+ @InterfaceAudience.Private
+ public class SweepJob {
+ 
+   private final FileSystem fs;
+   private final Configuration conf;
+   private static final Log LOG = LogFactory.getLog(SweepJob.class);
+   static final String SWEEP_JOB_ID = "hbase.mob.sweep.job.id";
+   static final String SWEEP_JOB_SERVERNAME = "hbase.mob.sweep.job.servername";
+   static final String SWEEP_JOB_TABLE_NODE = "hbase.mob.sweep.job.table.node";
+   static final String WORKING_DIR_KEY = "hbase.mob.sweep.job.dir";
+   static final String WORKING_ALLNAMES_FILE_KEY = 
"hbase.mob.sweep.job.all.file";
+   static final String WORKING_VISITED_DIR_KEY = 
"hbase.mob.sweep.job.visited.dir";
+   static final String WORKING_ALLNAMES_DIR = "all";
+   static final String WORKING_VISITED_DIR = "visited";
+   public static final String WORKING_FILES_DIR_KEY = 
"mob.sweep.job.files.dir";
+   //the MOB_SWEEP_JOB_DELAY is ONE_DAY by default. Its value is only changed 
when testing.
+   public static final String MOB_SWEEP_JOB_DELAY = 
"hbase.mob.sweep.job.delay";
+   protected static long ONE_DAY = 24 * 60 * 60 * 1000;
+   private long compactionStartTime = EnvironmentEdgeManager.currentTime();
+   public final static String CREDENTIALS_LOCATION = "credentials_location";
+   private CacheConfig cacheConfig;
+   static final int SCAN_CACHING = 10000;
+   private TableLockManager tableLockManager;
+ 
+   public SweepJob(Configuration conf, FileSystem fs) {
+     this.conf = conf;
+     this.fs = fs;
+     // disable the block cache.
+     Configuration copyOfConf = new Configuration(conf);
+     copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
+     cacheConfig = new CacheConfig(copyOfConf);
+   }
+ 
+   static ServerName getCurrentServerName(Configuration conf) throws 
IOException {
+     String hostname = conf.get(
+         "hbase.regionserver.ipc.address",
+         Strings.domainNamePointerToHostName(DNS.getDefaultHost(
+             conf.get("hbase.regionserver.dns.interface", "default"),
+             conf.get("hbase.regionserver.dns.nameserver", "default"))));
+     int port = conf.getInt(HConstants.REGIONSERVER_PORT, 
HConstants.DEFAULT_REGIONSERVER_PORT);
+     // Creation of a HSA will force a resolve.
+     InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
+     if (initialIsa.getAddress() == null) {
+       throw new IllegalArgumentException("Failed resolve of " + initialIsa);
+     }
+     return ServerName.valueOf(initialIsa.getHostName(), initialIsa.getPort(),
+         EnvironmentEdgeManager.currentTime());
+   }
+ 
+   /**
+    * Runs MapReduce to do the sweeping on the mob files.
+    * There's a MobReferenceOnlyFilter so that the mappers only get the cells 
that have mob
+    * references from 'normal' regions' rows.
+    * The running of the sweep tool on the same column family are mutually 
exclusive.
+    * The HBase major compaction and running of the sweep tool on the same 
column family
+    * are mutually exclusive.
+    * The synchronization is done by the Zookeeper.
+    * So in the beginning of the running, we need to make sure only this sweep 
tool is the only one
+    * that is currently running in this column family, and in this column 
family there're no major
+    * compaction in progress.
+    * @param tn The current table name.
+    * @param family The descriptor of the current column family.
+    * @return 0 upon success, 3 if bailing out because another compaction is 
currently happening,
+    *   or 4 the mr job was unsuccessful
+    *
+    * @throws IOException
+    * @throws ClassNotFoundException
+    * @throws InterruptedException
+    * @throws KeeperException
+    */
+   public int sweep(TableName tn, HColumnDescriptor family) throws IOException,
+       ClassNotFoundException, InterruptedException, KeeperException {
+     Configuration conf = new Configuration(this.conf);
+     // check whether the current user is the same one with the owner of hbase 
root
+     String currentUserName = 
UserGroupInformation.getCurrentUser().getShortUserName();
+     FileStatus[] hbaseRootFileStat = fs.listStatus(new 
Path(conf.get(HConstants.HBASE_DIR)));
+     if (hbaseRootFileStat.length > 0) {
+       String owner = hbaseRootFileStat[0].getOwner();
+       if (!owner.equals(currentUserName)) {
+         String errorMsg = "The current user[" + currentUserName
+             + "] doesn't have hbase root credentials."
+             + " Please make sure the user is the root of the target HBase";
+         LOG.error(errorMsg);
+         throw new IOException(errorMsg);
+       }
+     } else {
+       LOG.error("The target HBase doesn't exist");
+       throw new IOException("The target HBase doesn't exist");
+     }
+     String familyName = family.getNameAsString();
+     String id = "SweepJob" + UUID.randomUUID().toString().replace("-", "");
+     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, id, new 
DummyMobAbortable());
+     try {
+       ServerName serverName = getCurrentServerName(conf);
+       tableLockManager = TableLockManager.createTableLockManager(conf, zkw, 
serverName);
+       TableName lockName = MobUtils.getTableLockName(tn);
+       TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool");
+       String tableName = tn.getNameAsString();
+       // Try to obtain the lock. Use this lock to synchronize all the query
+       try {
+         lock.acquire();
+       } catch (Exception e) {
+         LOG.warn("Can not lock the table " + tableName
+             + ". The major compaction in HBase may be in-progress or another 
sweep job is running."
+             + " Please re-run the job.");
+         return 3;
+       }
+       Job job = null;
+       try {
+         Scan scan = new Scan();
+         scan.addFamily(family.getName());
+         // Do not retrieve the mob data when scanning
+         scan.setAttribute(MobConstants.MOB_SCAN_RAW, 
Bytes.toBytes(Boolean.TRUE));
+         scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, 
Bytes.toBytes(Boolean.TRUE));
+         scan.setCaching(SCAN_CACHING);
+         scan.setCacheBlocks(false);
+         scan.setMaxVersions(family.getMaxVersions());
+         conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
+             JavaSerialization.class.getName() + "," + 
WritableSerialization.class.getName());
+         conf.set(SWEEP_JOB_ID, id);
+         conf.set(SWEEP_JOB_SERVERNAME, serverName.toString());
+         String tableLockNode = ZKUtil.joinZNode(zkw.tableLockZNode, 
lockName.getNameAsString());
+         conf.set(SWEEP_JOB_TABLE_NODE, tableLockNode);
+         job = prepareJob(tn, familyName, scan, conf);
+         job.getConfiguration().set(TableInputFormat.SCAN_COLUMN_FAMILY, 
familyName);
+         // Record the compaction start time.
+         // In the sweep tool, only the mob file whose modification time is 
older than
+         // (startTime - delay) could be handled by this tool.
+         // The delay is one day. It could be configured as well, but this is 
only used
+         // in the test.
+         
job.getConfiguration().setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE,
+             compactionStartTime);
+ 
+         job.setPartitionerClass(MobFilePathHashPartitioner.class);
+         submit(job, tn, familyName);
+         if (job.waitForCompletion(true)) {
+           // Archive the unused mob files.
+           removeUnusedFiles(job, tn, family);
+         } else {
+           System.err.println("Job was not successful");
+           return 4;
+         }
+       } finally {
+         try {
+           cleanup(job, tn, familyName);
+         } finally {
+           try {
+             lock.release();
+           } catch (IOException e) {
+             LOG.error("Failed to release the table lock " + tableName, e);
+           }
+         }
+       }
+     } finally {
+       zkw.close();
+     }
+     return 0;
+   }
+ 
+   /**
+    * Prepares a map reduce job.
+    * @param tn The current table name.
+    * @param familyName The current family name.
+    * @param scan The current scan.
+    * @param conf The current configuration.
+    * @return A map reduce job.
+    * @throws IOException
+    */
+   private Job prepareJob(TableName tn, String familyName, Scan scan, 
Configuration conf)
+       throws IOException {
+     Job job = Job.getInstance(conf);
+     job.setJarByClass(SweepMapper.class);
+     TableMapReduceUtil.initTableMapperJob(tn.getNameAsString(), scan,
+         SweepMapper.class, Text.class, Writable.class, job);
+ 
+     job.setInputFormatClass(TableInputFormat.class);
+     job.setMapOutputKeyClass(Text.class);
 -    job.setMapOutputValueClass(KeyValue.class);
++    job.setMapOutputValueClass(NoTagsKeyValue.class);
+     job.setReducerClass(SweepReducer.class);
+     job.setOutputFormatClass(NullOutputFormat.class);
+     String jobName = getCustomJobName(this.getClass().getSimpleName(), tn, 
familyName);
+     job.setJobName(jobName);
+     if (StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
+       String fileLoc = conf.get(CREDENTIALS_LOCATION);
+       Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), 
conf);
+       job.getCredentials().addAll(cred);
+     }
+     return job;
+   }
+ 
+   /**
+    * Gets a customized job name.
+    * It's className-mapperClassName-reducerClassName-tableName-familyName.
+    * @param className The current class name.
+    * @param tableName The current table name.
+    * @param familyName The current family name.
+    * @return The customized job name.
+    */
+   private static String getCustomJobName(String className, TableName 
tableName, String familyName) {
+     StringBuilder name = new StringBuilder();
+     name.append(className);
+     name.append('-').append(SweepMapper.class.getSimpleName());
+     name.append('-').append(SweepReducer.class.getSimpleName());
+     name.append('-').append(tableName.getNamespaceAsString());
+     name.append('-').append(tableName.getQualifierAsString());
+     name.append('-').append(familyName);
+     return name.toString();
+   }
+ 
+   /**
+    * Submits a job.
+    * @param job The current job.
+    * @param tn The current table name.
+    * @param familyName The current family name.
+    * @throws IOException
+    */
+   private void submit(Job job, TableName tn, String familyName) throws 
IOException {
+     // delete the temp directory of the mob files in case the failure in the 
previous
+     // execution.
+     Path tempDir =
+         new Path(MobUtils.getMobHome(job.getConfiguration()), 
MobConstants.TEMP_DIR_NAME);
+     Path mobCompactionTempDir =
+         new Path(tempDir, 
MobConstants.MOB_SWEEP_TOOL_COMPACTION_TEMP_DIR_NAME);
+     Path workingPath = 
MobUtils.getCompactionWorkingPath(mobCompactionTempDir, job.getJobName());
+     job.getConfiguration().set(WORKING_DIR_KEY, workingPath.toString());
+     // delete the working directory in case it'not deleted by the last 
running.
+     fs.delete(workingPath, true);
+     // create the working directory.
+     fs.mkdirs(workingPath);
+     // create a sequence file which contains the names of all the existing 
files.
+     Path workingPathOfFiles = new Path(workingPath, "files");
+     Path workingPathOfNames = new Path(workingPath, "names");
+     job.getConfiguration().set(WORKING_FILES_DIR_KEY, 
workingPathOfFiles.toString());
+     Path allFileNamesPath = new Path(workingPathOfNames, 
WORKING_ALLNAMES_DIR);
+     job.getConfiguration().set(WORKING_ALLNAMES_FILE_KEY, 
allFileNamesPath.toString());
+     Path vistiedFileNamesPath = new Path(workingPathOfNames, 
WORKING_VISITED_DIR);
+     job.getConfiguration().set(WORKING_VISITED_DIR_KEY, 
vistiedFileNamesPath.toString());
+     // create a directory where the files contain names of visited mob files 
are saved.
+     fs.mkdirs(vistiedFileNamesPath);
+     Path mobStorePath = MobUtils.getMobFamilyPath(job.getConfiguration(), tn, 
familyName);
+     // Find all the files whose creation time are older than one day.
+     // Write those file names to a file.
+     // In each reducer there's a writer, it write the visited file names to a 
file which is saved
+     // in WORKING_VISITED_DIR.
+     // After the job is finished, compare those files, then find out the 
unused mob files and
+     // archive them.
+     FileStatus[] files = fs.listStatus(mobStorePath);
+     Set<String> fileNames = new TreeSet<String>();
+     long mobCompactionDelay = 
job.getConfiguration().getLong(MOB_SWEEP_JOB_DELAY, ONE_DAY);
+     for (FileStatus fileStatus : files) {
+       if (fileStatus.isFile() && 
!HFileLink.isHFileLink(fileStatus.getPath())) {
+         if (compactionStartTime - fileStatus.getModificationTime() > 
mobCompactionDelay) {
+           // only record the potentially unused files older than one day.
+           fileNames.add(fileStatus.getPath().getName());
+         }
+       }
+     }
+     FSDataOutputStream fout = null;
+     SequenceFile.Writer writer = null;
+     try {
+       // create a file includes all the existing mob files whose creation 
time is older than
+       // (now - oneDay)
+       fout = fs.create(allFileNamesPath, true);
+       // write the names to a sequence file
+       writer = SequenceFile.createWriter(job.getConfiguration(), fout, 
String.class, String.class,
+           CompressionType.NONE, null);
+       for (String fileName : fileNames) {
+         writer.append(fileName, MobConstants.EMPTY_STRING);
+       }
+       writer.hflush();
+     } finally {
+       if (writer != null) {
+         IOUtils.closeStream(writer);
+       }
+       if (fout != null) {
+         IOUtils.closeStream(fout);
+       }
+     }
+   }
+ 
+   /**
+    * Gets the unused mob files.
+    * Compare the file which contains all the existing mob files and the 
visited files,
+    * find out the unused mob file and archive them.
+    * @param conf The current configuration.
+    * @return The unused mob files.
+    * @throws IOException
+    */
+   List<String> getUnusedFiles(Configuration conf) throws IOException {
+     // find out the unused files and archive them
+     Path allFileNamesPath = new Path(conf.get(WORKING_ALLNAMES_FILE_KEY));
+     SequenceFile.Reader allNamesReader = null;
+     MergeSortReader visitedNamesReader = null;
+     List<String> toBeArchived = new ArrayList<String>();
+     try {
+       allNamesReader = new SequenceFile.Reader(fs, allFileNamesPath, conf);
+       visitedNamesReader = new MergeSortReader(fs, conf,
+           new Path(conf.get(WORKING_VISITED_DIR_KEY)));
+       String nextAll = (String) allNamesReader.next((String) null);
+       String nextVisited = visitedNamesReader.next();
+       do {
+         if (nextAll != null) {
+           if (nextVisited != null) {
+             int compare = nextAll.compareTo(nextVisited);
+             if (compare < 0) {
+               toBeArchived.add(nextAll);
+               nextAll = (String) allNamesReader.next((String) null);
+             } else if (compare > 0) {
+               nextVisited = visitedNamesReader.next();
+             } else {
+               nextAll = (String) allNamesReader.next((String) null);
+               nextVisited = visitedNamesReader.next();
+             }
+           } else {
+             toBeArchived.add(nextAll);
+             nextAll = (String) allNamesReader.next((String) null);
+           }
+         } else {
+           break;
+         }
+       } while (nextAll != null || nextVisited != null);
+     } finally {
+       if (allNamesReader != null) {
+         IOUtils.closeStream(allNamesReader);
+       }
+       if (visitedNamesReader != null) {
+         visitedNamesReader.close();
+       }
+     }
+     return toBeArchived;
+   }
+ 
+   /**
+    * Archives unused mob files.
+    * @param job The current job.
+    * @param tn The current table name.
+    * @param hcd The descriptor of the current column family.
+    * @throws IOException
+    */
+   private void removeUnusedFiles(Job job, TableName tn, HColumnDescriptor 
hcd) throws IOException {
+     // find out the unused files and archive them
+     List<StoreFile> storeFiles = new ArrayList<StoreFile>();
+     List<String> toBeArchived = getUnusedFiles(job.getConfiguration());
+     // archive them
+     Path mobStorePath = MobUtils
+         .getMobFamilyPath(job.getConfiguration(), tn, hcd.getNameAsString());
+     for (String archiveFileName : toBeArchived) {
+       Path path = new Path(mobStorePath, archiveFileName);
+       storeFiles.add(new StoreFile(fs, path, job.getConfiguration(), 
cacheConfig, BloomType.NONE));
+     }
+     if (!storeFiles.isEmpty()) {
+       try {
+         MobUtils.removeMobFiles(job.getConfiguration(), fs, tn,
+             FSUtils.getTableDir(MobUtils.getMobHome(conf), tn), 
hcd.getName(), storeFiles);
+         LOG.info(storeFiles.size() + " unused MOB files are removed");
+       } catch (Exception e) {
+         LOG.error("Failed to archive the store files " + storeFiles, e);
+       }
+     }
+   }
+ 
+   /**
+    * Deletes the working directory.
+    * @param job The current job.
+    * @param familyName The family to cleanup
+    */
+   private void cleanup(Job job, TableName tn, String familyName) {
+     if (job != null) {
+       // delete the working directory
+       Path workingPath = new 
Path(job.getConfiguration().get(WORKING_DIR_KEY));
+       try {
+         fs.delete(workingPath, true);
+       } catch (IOException e) {
+         LOG.warn("Failed to delete the working directory after sweeping store 
" + familyName
+             + " in the table " + tn.getNameAsString(), e);
+       }
+     }
+   }
+ 
+   /**
+    * A result with index.
+    */
+   private class IndexedResult implements Comparable<IndexedResult> {
+     private int index;
+     private String value;
+ 
+     public IndexedResult(int index, String value) {
+       this.index = index;
+       this.value = value;
+     }
+ 
+     public int getIndex() {
+       return this.index;
+     }
+ 
+     public String getValue() {
+       return this.value;
+     }
+ 
+     @Override
+     public int compareTo(IndexedResult o) {
+       if (this.value == null && o.getValue() == null) {
+         return 0;
+       } else if (o.value == null) {
+         return 1;
+       } else if (this.value == null) {
+         return -1;
+       } else {
+         return this.value.compareTo(o.value);
+       }
+     }
+ 
+     @Override
+     public boolean equals(Object obj) {
+       if (this == obj) {
+         return true;
+       }
+       if (obj == null) {
+         return false;
+       }
+       if (!(obj instanceof IndexedResult)) {
+         return false;
+       }
+       return compareTo((IndexedResult) obj) == 0;
+     }
+ 
+     @Override
+     public int hashCode() {
+       return value.hashCode();
+     }
+   }
+ 
+   /**
+    * Merge sort reader.
+    * It merges and sort the readers in different sequence files as one where
+    * the results are read in order.
+    */
+   private class MergeSortReader {
+ 
+     private List<SequenceFile.Reader> readers = new 
ArrayList<SequenceFile.Reader>();
+     private PriorityQueue<IndexedResult> results = new 
PriorityQueue<IndexedResult>();
+ 
+     public MergeSortReader(FileSystem fs, Configuration conf, Path path) 
throws IOException {
+       if (fs.exists(path)) {
+         FileStatus[] files = fs.listStatus(path);
+         int index = 0;
+         for (FileStatus file : files) {
+           if (file.isFile()) {
+             SequenceFile.Reader reader = new SequenceFile.Reader(fs, 
file.getPath(), conf);
+             String key = (String) reader.next((String) null);
+             if (key != null) {
+               results.add(new IndexedResult(index, key));
+               readers.add(reader);
+               index++;
+             }
+           }
+         }
+       }
+     }
+ 
+     public String next() throws IOException {
+       IndexedResult result = results.poll();
+       if (result != null) {
+         SequenceFile.Reader reader = readers.get(result.getIndex());
+         String key = (String) reader.next((String) null);
+         if (key != null) {
+           results.add(new IndexedResult(result.getIndex(), key));
+         }
+         return result.getValue();
+       }
+       return null;
+     }
+ 
+     public void close() {
+       for (SequenceFile.Reader reader : readers) {
+         if (reader != null) {
+           IOUtils.closeStream(reader);
+         }
+       }
+     }
+   }
+ 
+   /**
+    * The counter used in sweep job.
+    */
+   public enum SweepCounter {
+ 
+     /**
+      * How many files are read.
+      */
+     INPUT_FILE_COUNT,
+ 
+     /**
+      * How many files need to be merged or cleaned.
+      */
+     FILE_TO_BE_MERGE_OR_CLEAN,
+ 
+     /**
+      * How many files are left after merging.
+      */
+     FILE_AFTER_MERGE_OR_CLEAN,
+ 
+     /**
+      * How many records are updated.
+      */
+     RECORDS_UPDATED,
+   }
+ 
+   public static class DummyMobAbortable implements Abortable {
+ 
+     private boolean abort = false;
+ 
+     public void abort(String why, Throwable e) {
+       abort = true;
+     }
+ 
+     public boolean isAborted() {
+       return abort;
+     }
+ 
+   }
 -}
++}

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java
index 0000000,7844359..dea67a4
mode 000000,100644..100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java
@@@ -1,0 -1,100 +1,100 @@@
+ /**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.mob.mapreduce;
+ 
+ import java.util.List;
+ import java.util.SortedSet;
+ import java.util.TreeSet;
+ 
 -import org.apache.hadoop.classification.InterfaceAudience;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.master.TableLockManager;
+ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName;
+ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock;
+ import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+ import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+ import org.apache.zookeeper.KeeperException;
+ 
+ /**
+  * Tracker on the sweep tool node in zookeeper.
+  * The sweep tool node is an ephemeral one, when the process dies this node 
is deleted,
+  * at that time MR might be still running, and if another sweep job is 
started, two MR
+  * for the same column family will run at the same time.
+  * This tracker watches this ephemeral node, if it's gone or it's not created 
by the
+  * sweep job that owns the current MR, the current process will be aborted.
+  */
+ @InterfaceAudience.Private
+ public class SweepJobNodeTracker extends ZooKeeperListener {
+ 
+   private String parentNode;
+   private String lockNodePrefix;
+   private String owner;
+   private String lockNode;
+ 
+   public SweepJobNodeTracker(ZooKeeperWatcher watcher, String parentNode, 
String owner) {
+     super(watcher);
+     this.parentNode = parentNode;
+     this.owner = owner;
+     this.lockNodePrefix = ZKUtil.joinZNode(parentNode, "write-");
+   }
+ 
+   /**
+    * Registers the watcher on the sweep job node.
+    * If there's no such a sweep job node, or it's not created by the sweep 
job that
+    * owns the current MR, the current process will be aborted.
+    * This assumes the table lock uses the Zookeeper. It's a workaround and 
only used
+    * in the sweep tool, and the sweep tool will be removed after the mob file 
compaction
+    * is finished.
+    */
+   public void start() throws KeeperException {
+     watcher.registerListener(this);
+     List<String> children = ZKUtil.listChildrenNoWatch(watcher, parentNode);
+     if (children != null && !children.isEmpty()) {
+       // there are locks
+       TreeSet<String> sortedChildren = new TreeSet<String>();
+       sortedChildren.addAll(children);
+       // find all the write locks
+       SortedSet<String> tails = sortedChildren.tailSet(lockNodePrefix);
+       if (!tails.isEmpty()) {
+         for (String tail : tails) {
+           String path = ZKUtil.joinZNode(parentNode, tail);
+           byte[] data = ZKUtil.getDataAndWatch(watcher, path);
+           TableLock lock = TableLockManager.fromBytes(data);
+           ServerName serverName = lock.getLockOwner();
+           org.apache.hadoop.hbase.ServerName sn = 
org.apache.hadoop.hbase.ServerName.valueOf(
+               serverName.getHostName(), serverName.getPort(), 
serverName.getStartCode());
+           // compare the server names (host, port and start code), make sure 
the lock is created
+           if (owner.equals(sn.toString())) {
+             lockNode = path;
+             return;
+           }
+         }
+       }
+     }
+     System.exit(1);
+   }
+ 
+   @Override
+   public void nodeDeleted(String path) {
+     // If the lock node is deleted, abort the current process.
+     if (path.equals(lockNode)) {
+       System.exit(1);
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
index 0000000,559d6db..7ac628c
mode 000000,100644..100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
@@@ -1,0 -1,87 +1,87 @@@
+ /**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.mob.mapreduce;
+ 
+ import java.io.IOException;
+ 
 -import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.Cell;
+ import org.apache.hadoop.hbase.KeyValue;
+ import org.apache.hadoop.hbase.KeyValueUtil;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.client.Result;
+ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+ import org.apache.hadoop.hbase.mapreduce.TableMapper;
+ import org.apache.hadoop.hbase.mob.MobUtils;
+ import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
+ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+ import org.apache.hadoop.io.Text;
+ import org.apache.zookeeper.KeeperException;
+ 
+ /**
+  * The mapper of a sweep job.
+  * Takes the rows from the table and their results and map to <filename:Text, 
mobValue:KeyValue>
+  * where mobValue is the actual cell in HBase.
+  */
+ @InterfaceAudience.Private
+ public class SweepMapper extends TableMapper<Text, KeyValue> {
+ 
+   private ZooKeeperWatcher zkw = null;
+ 
+   @Override
+   protected void setup(Context context) throws IOException,
+       InterruptedException {
+     String id = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
+     String owner = 
context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME);
+     String sweeperNode = 
context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE);
+     zkw = new ZooKeeperWatcher(context.getConfiguration(), id,
+         new DummyMobAbortable());
+     try {
+       SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, 
owner);
+       tracker.start();
+     } catch (KeeperException e) {
+       throw new IOException(e);
+     }
+   }
+ 
+   @Override
+   protected void cleanup(Context context) throws IOException,
+       InterruptedException {
+     if (zkw != null) {
+       zkw.close();
+     }
+   }
+ 
+   @Override
+   public void map(ImmutableBytesWritable r, Result columns, Context context) 
throws IOException,
+       InterruptedException {
+     if (columns == null) {
+       return;
+     }
+     Cell[] cells = columns.rawCells();
+     if (cells == null || cells.length == 0) {
+       return;
+     }
+     for (Cell c : cells) {
+       if (MobUtils.hasValidMobRefCellValue(c)) {
+         String fileName = MobUtils.getMobFileName(c);
+         context.write(new Text(fileName), KeyValueUtil.ensureKeyValue(c));
+       }
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
index 0000000,a2dfa29..03a05cc
mode 000000,100644..100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
@@@ -1,0 -1,472 +1,472 @@@
+ /**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.mob.mapreduce;
+ 
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.UUID;
+ 
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
 -import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FSDataOutputStream;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.fs.PathFilter;
+ import org.apache.hadoop.hbase.Cell;
+ import org.apache.hadoop.hbase.HColumnDescriptor;
+ import org.apache.hadoop.hbase.HConstants;
+ import org.apache.hadoop.hbase.InvalidFamilyOperationException;
+ import org.apache.hadoop.hbase.KeyValue;
+ import org.apache.hadoop.hbase.KeyValueUtil;
+ import org.apache.hadoop.hbase.TableName;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.client.Admin;
+ import org.apache.hadoop.hbase.client.BufferedMutator;
+ import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+ import org.apache.hadoop.hbase.client.Connection;
+ import org.apache.hadoop.hbase.client.ConnectionFactory;
+ import org.apache.hadoop.hbase.io.HFileLink;
+ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+ import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+ import org.apache.hadoop.hbase.mob.MobConstants;
+ import org.apache.hadoop.hbase.mob.MobFile;
+ import org.apache.hadoop.hbase.mob.MobFileName;
+ import org.apache.hadoop.hbase.mob.MobUtils;
+ import 
org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
+ import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
+ import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
+ import org.apache.hadoop.hbase.regionserver.BloomType;
+ import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
+ import org.apache.hadoop.hbase.regionserver.StoreFile;
+ import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+ import org.apache.hadoop.hbase.util.Bytes;
+ import org.apache.hadoop.hbase.util.FSUtils;
+ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+ import org.apache.hadoop.io.IOUtils;
+ import org.apache.hadoop.io.SequenceFile;
+ import org.apache.hadoop.io.SequenceFile.CompressionType;
+ import org.apache.hadoop.io.Text;
+ import org.apache.hadoop.io.Writable;
+ import org.apache.hadoop.mapreduce.Reducer;
+ import org.apache.zookeeper.KeeperException;
+ 
+ /**
+  * The reducer of a sweep job.
+  * This reducer merges the small mob files into bigger ones, and write visited
+  * names of mob files to a sequence file which is used by the sweep job to 
delete
+  * the unused mob files.
+  * The key of the input is a file name, the value is a collection of KeyValues
+  * (the value format of KeyValue is valueLength + fileName) in HBase.
+  * In this reducer, we could know how many cells exist in HBase for a mob 
file.
+  * If the existCellSize/mobFileSize < compactionRatio, this mob
+  * file needs to be merged.
+  */
+ @InterfaceAudience.Private
+ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> 
{
+ 
+   private static final Log LOG = LogFactory.getLog(SweepReducer.class);
+ 
+   private SequenceFile.Writer writer = null;
+   private MemStoreWrapper memstore;
+   private Configuration conf;
+   private FileSystem fs;
+ 
+   private Path familyDir;
+   private CacheConfig cacheConfig;
+   private long compactionBegin;
+   private BufferedMutator table;
+   private HColumnDescriptor family;
+   private long mobCompactionDelay;
+   private Path mobTableDir;
+ 
+   @Override
+   protected void setup(Context context) throws IOException, 
InterruptedException {
+     this.conf = context.getConfiguration();
+     Connection c = ConnectionFactory.createConnection(this.conf);
+     this.fs = FileSystem.get(conf);
+     // the MOB_SWEEP_JOB_DELAY is ONE_DAY by default. Its value is only 
changed when testing.
+     mobCompactionDelay = conf.getLong(SweepJob.MOB_SWEEP_JOB_DELAY, 
SweepJob.ONE_DAY);
+     String tableName = conf.get(TableInputFormat.INPUT_TABLE);
+     String familyName = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY);
+     TableName tn = TableName.valueOf(tableName);
+     this.familyDir = MobUtils.getMobFamilyPath(conf, tn, familyName);
+     Admin admin = c.getAdmin();
+     try {
+       family = 
admin.getTableDescriptor(tn).getFamily(Bytes.toBytes(familyName));
+       if (family == null) {
+         // this column family might be removed, directly return.
+         throw new InvalidFamilyOperationException("Column family '" + 
familyName
+             + "' does not exist. It might be removed.");
+       }
+     } finally {
+       try {
+         admin.close();
+       } catch (IOException e) {
+         LOG.warn("Failed to close the HBaseAdmin", e);
+       }
+     }
+     // disable the block cache.
+     Configuration copyOfConf = new Configuration(conf);
+     copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
+     this.cacheConfig = new CacheConfig(copyOfConf);
+ 
+     table = c.getBufferedMutator(new 
BufferedMutatorParams(tn).writeBufferSize(1*1024*1024));
+     memstore = new MemStoreWrapper(context, fs, table, family, new 
DefaultMemStore(), cacheConfig);
+ 
+     // The start time of the sweep tool.
+     // Only the mob files whose creation time is older than startTime-oneDay 
will be handled by the
+     // reducer since it brings inconsistency to handle the latest mob files.
+     this.compactionBegin = 
conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, 0);
+     mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tn);
+   }
+ 
+   private SweepPartition createPartition(CompactionPartitionId id, Context 
context)
+     throws IOException {
+     return new SweepPartition(id, context);
+   }
+ 
+   @Override
+   public void run(Context context) throws IOException, InterruptedException {
+     String jobId = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
+     String owner = 
context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME);
+     String sweeperNode = 
context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE);
+     ZooKeeperWatcher zkw = new ZooKeeperWatcher(context.getConfiguration(), 
jobId,
+         new DummyMobAbortable());
+     FSDataOutputStream fout = null;
+     try {
+       SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, 
owner);
+       tracker.start();
+       setup(context);
+       // create a sequence contains all the visited file names in this 
reducer.
+       String dir = this.conf.get(SweepJob.WORKING_VISITED_DIR_KEY);
+       Path nameFilePath = new Path(dir, UUID.randomUUID().toString()
+           .replace("-", MobConstants.EMPTY_STRING));
+       fout = fs.create(nameFilePath, true);
+       writer = SequenceFile.createWriter(context.getConfiguration(), fout, 
String.class,
+           String.class, CompressionType.NONE, null);
+       CompactionPartitionId id;
+       SweepPartition partition = null;
+       // the mob files which have the same start key and date are in the same 
partition.
+       while (context.nextKey()) {
+         Text key = context.getCurrentKey();
+         String keyString = key.toString();
+         id = createPartitionId(keyString);
+         if (null == partition || !id.equals(partition.getId())) {
+           // It's the first mob file in the current partition.
+           if (null != partition) {
+             // this mob file is in different partitions with the previous mob 
file.
+             // directly close.
+             partition.close();
+           }
+           // create a new one
+           partition = createPartition(id, context);
+         }
+         if (partition != null) {
+           // run the partition
+           partition.execute(key, context.getValues());
+         }
+       }
+       if (null != partition) {
+         partition.close();
+       }
+       writer.hflush();
+     } catch (KeeperException e) {
+       throw new IOException(e);
+     } finally {
+       cleanup(context);
+       zkw.close();
+       if (writer != null) {
+         IOUtils.closeStream(writer);
+       }
+       if (fout != null) {
+         IOUtils.closeStream(fout);
+       }
+       if (table != null) {
+         try {
+           table.close();
+         } catch (IOException e) {
+           LOG.warn(e);
+         }
+       }
+     }
+ 
+   }
+ 
+   /**
+    * The mob files which have the same start key and date are in the same 
partition.
+    * The files in the same partition are merged together into bigger ones.
+    */
+   public class SweepPartition {
+ 
+     private final CompactionPartitionId id;
+     private final Context context;
+     private boolean memstoreUpdated = false;
+     private boolean mergeSmall = false;
+     private final Map<String, MobFileStatus> fileStatusMap = new 
HashMap<String, MobFileStatus>();
+     private final List<Path> toBeDeleted = new ArrayList<Path>();
+ 
+     public SweepPartition(CompactionPartitionId id, Context context) throws 
IOException {
+       this.id = id;
+       this.context = context;
+       memstore.setPartitionId(id);
+       init();
+     }
+ 
+     public CompactionPartitionId getId() {
+       return this.id;
+     }
+ 
+     /**
+      * Prepares the map of files.
+      *
+      * @throws IOException
+      */
+     private void init() throws IOException {
+       FileStatus[] fileStats = listStatus(familyDir, id.getStartKey());
+       if (null == fileStats) {
+         return;
+       }
+ 
+       int smallFileCount = 0;
+       float compactionRatio = 
conf.getFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO,
+           MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO);
+       long compactionMergeableSize = conf.getLong(
+           MobConstants.MOB_SWEEP_TOOL_COMPACTION_MERGEABLE_SIZE,
+           MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE);
+       // list the files. Just merge the hfiles, don't merge the hfile links.
+       // prepare the map of mob files. The key is the file name, the value is 
the file status.
+       for (FileStatus fileStat : fileStats) {
+         MobFileStatus mobFileStatus = null;
+         if (!HFileLink.isHFileLink(fileStat.getPath())) {
+           mobFileStatus = new MobFileStatus(fileStat, compactionRatio, 
compactionMergeableSize);
+           if (mobFileStatus.needMerge()) {
+             smallFileCount++;
+           }
+           // key is file name (not hfile name), value is hfile status.
+           fileStatusMap.put(fileStat.getPath().getName(), mobFileStatus);
+         }
+       }
+       if (smallFileCount >= 2) {
+         // merge the files only when there're more than 1 files in the same 
partition.
+         this.mergeSmall = true;
+       }
+     }
+ 
+     /**
+      * Flushes the data into mob files and store files, and archives the small
+      * files after they're merged.
+      * @throws IOException
+      */
+     public void close() throws IOException {
+       if (null == id) {
+         return;
+       }
+       // flush remain key values into mob files
+       if (memstoreUpdated) {
+         memstore.flushMemStore();
+       }
+       List<StoreFile> storeFiles = new 
ArrayList<StoreFile>(toBeDeleted.size());
+       // delete samll files after compaction
+       for (Path path : toBeDeleted) {
+         LOG.info("[In Partition close] Delete the file " + path + " in 
partition close");
+         storeFiles.add(new StoreFile(fs, path, conf, cacheConfig, 
BloomType.NONE));
+       }
+       if (!storeFiles.isEmpty()) {
+         try {
+           MobUtils.removeMobFiles(conf, fs, table.getName(), mobTableDir, 
family.getName(),
+               storeFiles);
+           
context.getCounter(SweepCounter.FILE_TO_BE_MERGE_OR_CLEAN).increment(storeFiles.size());
+         } catch (IOException e) {
+           LOG.error("Failed to archive the store files " + storeFiles, e);
+         }
+         storeFiles.clear();
+       }
+       fileStatusMap.clear();
+     }
+ 
+     /**
+      * Merges the small mob files into bigger ones.
+      * @param fileName The current mob file name.
+      * @param values The collection of KeyValues in this mob file.
+      * @throws IOException
+      */
+     public void execute(Text fileName, Iterable<KeyValue> values) throws 
IOException {
+       if (null == values) {
+         return;
+       }
+       MobFileName mobFileName = MobFileName.create(fileName.toString());
+       LOG.info("[In reducer] The file name: " + fileName.toString());
+       MobFileStatus mobFileStat = 
fileStatusMap.get(mobFileName.getFileName());
+       if (null == mobFileStat) {
+         LOG.info("[In reducer] Cannot find the file, probably this record is 
obsolete");
+         return;
+       }
+       // only handle the files that are older then one day.
+       if (compactionBegin - mobFileStat.getFileStatus().getModificationTime()
+           <= mobCompactionDelay) {
+         return;
+       }
+       // write the hfile name
+       writer.append(mobFileName.getFileName(), MobConstants.EMPTY_STRING);
+       Set<KeyValue> kvs = new HashSet<KeyValue>();
+       for (KeyValue kv : values) {
+         if (kv.getValueLength() > Bytes.SIZEOF_INT) {
+           mobFileStat.addValidSize(Bytes.toInt(kv.getValueArray(), 
kv.getValueOffset(),
+               Bytes.SIZEOF_INT));
+         }
+         kvs.add(kv.createKeyOnly(false));
+       }
+       // If the mob file is a invalid one or a small one, merge it into 
new/bigger ones.
+       if (mobFileStat.needClean() || (mergeSmall && mobFileStat.needMerge())) 
{
+         context.getCounter(SweepCounter.INPUT_FILE_COUNT).increment(1);
+         MobFile file = MobFile.create(fs,
+             new Path(familyDir, mobFileName.getFileName()), conf, 
cacheConfig);
+         StoreFileScanner scanner = null;
+         file.open();
+         try {
+           scanner = file.getScanner();
+           
scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY));
+           Cell cell;
+           while (null != (cell = scanner.next())) {
+             KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+             KeyValue keyOnly = kv.createKeyOnly(false);
+             if (kvs.contains(keyOnly)) {
+               // write the KeyValue existing in HBase to the memstore.
+               memstore.addToMemstore(kv);
+               memstoreUpdated = true;
+             }
+           }
+         } finally {
+           if (scanner != null) {
+             scanner.close();
+           }
+           file.close();
+         }
+         toBeDeleted.add(mobFileStat.getFileStatus().getPath());
+       }
+     }
+ 
+     /**
+      * Lists the files with the same prefix.
+      * @param p The file path.
+      * @param prefix The prefix.
+      * @return The files with the same prefix.
+      * @throws IOException
+      */
+     private FileStatus[] listStatus(Path p, String prefix) throws IOException 
{
+       return fs.listStatus(p, new PathPrefixFilter(prefix));
+     }
+   }
+ 
+   static class PathPrefixFilter implements PathFilter {
+ 
+     private final String prefix;
+ 
+     public PathPrefixFilter(String prefix) {
+       this.prefix = prefix;
+     }
+ 
+     public boolean accept(Path path) {
+       return path.getName().startsWith(prefix, 0);
+     }
+ 
+   }
+ 
+   /**
+    * Creates the partition id.
+    * @param fileNameAsString The current file name, in string.
+    * @return The partition id.
+    */
+   private CompactionPartitionId createPartitionId(String fileNameAsString) {
+     MobFileName fileName = MobFileName.create(fileNameAsString);
+     return new CompactionPartitionId(fileName.getStartKey(), 
fileName.getDate());
+   }
+ 
+   /**
+    * The mob file status used in the sweep reduecer.
+    */
+   private static class MobFileStatus {
+     private FileStatus fileStatus;
+     private int validSize;
+     private long size;
+ 
+     private float compactionRatio = 
MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO;
+     private long compactionMergeableSize =
+         MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE;
+ 
+     /**
+      * @param fileStatus The current FileStatus.
+      * @param compactionRatio compactionRatio the invalid ratio.
+      * If there're too many cells deleted in a mob file, it's regarded as 
invalid,
+      * and needs to be written to a new one.
+      * If existingCellSize/fileSize < compactionRatio, it's regarded as a 
invalid one.
+      * @param compactionMergeableSize compactionMergeableSize If the size of 
a mob file is less
+      * than this value, it's regarded as a small file and needs to be merged
+      */
+     public MobFileStatus(FileStatus fileStatus, float compactionRatio,
+         long compactionMergeableSize) {
+       this.fileStatus = fileStatus;
+       this.size = fileStatus.getLen();
+       validSize = 0;
+       this.compactionRatio = compactionRatio;
+       this.compactionMergeableSize = compactionMergeableSize;
+     }
+ 
+     /**
+      * Add size to this file.
+      * @param size The size to be added.
+      */
+     public void addValidSize(int size) {
+       this.validSize += size;
+     }
+ 
+     /**
+      * Whether the mob files need to be cleaned.
+      * If there're too many cells deleted in this mob file, it needs to be 
cleaned.
+      * @return True if it needs to be cleaned.
+      */
+     public boolean needClean() {
+       return validSize < compactionRatio * size;
+     }
+ 
+     /**
+      * Whether the mob files need to be merged.
+      * If this mob file is too small, it needs to be merged.
+      * @return True if it needs to be merged.
+      */
+     public boolean needMerge() {
+       return this.size < compactionMergeableSize;
+     }
+ 
+     /**
+      * Gets the file status.
+      * @return The file status.
+      */
+     public FileStatus getFileStatus() {
+       return fileStatus;
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
index 0000000,5436554..e20186e
mode 000000,100644..100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
@@@ -1,0 -1,119 +1,119 @@@
+ /**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.mob.mapreduce;
+ 
+ import java.io.IOException;
+ 
 -import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.conf.Configured;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.hbase.HBaseConfiguration;
+ import org.apache.hadoop.hbase.HColumnDescriptor;
+ import org.apache.hadoop.hbase.HTableDescriptor;
+ import org.apache.hadoop.hbase.TableName;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.classification.InterfaceStability;
+ import org.apache.hadoop.hbase.client.HBaseAdmin;
+ import org.apache.hadoop.hbase.util.Bytes;
+ import org.apache.hadoop.util.Tool;
+ import org.apache.hadoop.util.ToolRunner;
+ import org.apache.zookeeper.KeeperException;
+ 
+ import com.google.protobuf.ServiceException;
+ 
+ /**
+  * The sweep tool. It deletes the mob files that are not used and merges the 
small mob files to
+  * bigger ones. Each run of this sweep tool only handles one column family. 
The runs on
+  * the same column family are mutually exclusive. And the major compaction 
and sweep tool on the
+  * same column family are mutually exclusive too.
+  */
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ public class Sweeper extends Configured implements Tool {
+ 
+   /**
+    * Sweeps the mob files on one column family. It deletes the unused mob 
files and merges
+    * the small mob files into bigger ones.
+    * @param tableName The current table name in string format.
+    * @param familyName The column family name.
+    * @return 0 if success, 2 if job aborted with an exception, 3 if unable to 
start due to
+    *   other compaction,4 if mr job was unsuccessful
+    * @throws IOException
+    * @throws InterruptedException
+    * @throws ClassNotFoundException
+    * @throws KeeperException
+    * @throws ServiceException
+    */
+   int sweepFamily(String tableName, String familyName) throws IOException, 
InterruptedException,
+       ClassNotFoundException, KeeperException, ServiceException {
+     Configuration conf = getConf();
+     // make sure the target HBase exists.
+     HBaseAdmin.checkHBaseAvailable(conf);
+     HBaseAdmin admin = new HBaseAdmin(conf);
+     try {
+       FileSystem fs = FileSystem.get(conf);
+       TableName tn = TableName.valueOf(tableName);
+       HTableDescriptor htd = admin.getTableDescriptor(tn);
+       HColumnDescriptor family = htd.getFamily(Bytes.toBytes(familyName));
+       if (family == null || !family.isMobEnabled()) {
+           throw new IOException("Column family " + familyName + " is not a 
MOB column family");
+       }
+       SweepJob job = new SweepJob(conf, fs);
+       // Run the sweeping
+       return job.sweep(tn, family);
+     } catch (Exception e) {
+       System.err.println("Job aborted due to exception " + e);
+       return 2; // job failed
+     } finally {
+       try {
+         admin.close();
+       } catch (IOException e) {
+         System.out.println("Failed to close the HBaseAdmin: " + 
e.getMessage());
+       }
+     }
+   }
+ 
+   public static void main(String[] args) throws Exception {
+     Configuration conf = HBaseConfiguration.create();
+     int ret = ToolRunner.run(conf, new Sweeper(), args);
+     System.exit(ret);
+   }
+ 
+   private void printUsage() {
+     System.err.println("Usage:\n" + "--------------------------\n" + 
Sweeper.class.getName()
+         + " tableName familyName");
+     System.err.println(" tableName        The table name");
+     System.err.println(" familyName       The column family name");
+   }
+ 
+   /**
+    * Main method for the tool.
+    * @return 0 if success, 1 for bad args. 2 if job aborted with an exception,
+    *   3 if unable to start due to other compaction, 4 if mr job was 
unsuccessful
+    */
+   public int run(String[] args) throws Exception {
+     if (args.length != 2) {
+       printUsage();
+       return 1;
+     }
+     String table = args[0];
+     String family = args[1];
+     return sweepFamily(table, family);
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
index 519edde,04d2b13..51e1a2d
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
@@@ -71,12 -79,15 +79,15 @@@ public class DefaultStoreEngine extend
      } catch (Exception e) {
        throw new IOException("Unable to load configured compactor '" + 
className + "'", e);
      }
-     className = conf.get(
+   }
+ 
+   protected void createCompactionPolicy(Configuration conf, Store store) 
throws IOException {
+     String className = conf.get(
 -            DEFAULT_COMPACTION_POLICY_CLASS_KEY, 
DEFAULT_COMPACTION_POLICY_CLASS.getName());
 +        DEFAULT_COMPACTION_POLICY_CLASS_KEY, 
DEFAULT_COMPACTION_POLICY_CLASS.getName());
      try {
        compactionPolicy = ReflectionUtils.instantiateWithCustomCtor(className,
 -              new Class[]{Configuration.class, StoreConfigInformation.class},
 -              new Object[]{conf, store});
 +          new Class[] { Configuration.class, StoreConfigInformation.class },
 +          new Object[] { conf, store });
      } catch (Exception e) {
        throw new IOException("Unable to load configured compaction policy '" + 
className + "'", e);
      }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index 0000000,8f12656..ad0c4d7
mode 000000,100644..100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@@ -1,0 -1,585 +1,585 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.regionserver;
+ 
+ import java.io.FileNotFoundException;
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Date;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.NavigableSet;
+ import java.util.UUID;
+ import java.util.concurrent.ConcurrentHashMap;
+ 
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
 -import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hbase.Cell;
+ import org.apache.hadoop.hbase.CellComparator;
+ import org.apache.hadoop.hbase.HColumnDescriptor;
+ import org.apache.hadoop.hbase.HConstants;
+ import org.apache.hadoop.hbase.KeyValue;
+ import org.apache.hadoop.hbase.KeyValue.Type;
+ import org.apache.hadoop.hbase.TableName;
+ import org.apache.hadoop.hbase.Tag;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.client.Scan;
+ import org.apache.hadoop.hbase.filter.Filter;
+ import org.apache.hadoop.hbase.filter.FilterList;
+ import org.apache.hadoop.hbase.io.compress.Compression;
+ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+ import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
+ import org.apache.hadoop.hbase.io.hfile.HFileContext;
+ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+ import org.apache.hadoop.hbase.master.TableLockManager;
+ import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
+ import org.apache.hadoop.hbase.mob.MobCacheConfig;
+ import org.apache.hadoop.hbase.mob.MobConstants;
+ import org.apache.hadoop.hbase.mob.MobFile;
+ import org.apache.hadoop.hbase.mob.MobFileName;
+ import org.apache.hadoop.hbase.mob.MobStoreEngine;
+ import org.apache.hadoop.hbase.mob.MobUtils;
+ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+ import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
+ import org.apache.hadoop.hbase.util.Bytes;
+ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+ import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+ import org.apache.hadoop.hbase.util.IdLock;
+ 
+ /**
+  * The store implementation to save MOBs (medium objects), it extends the 
HStore.
+  * When a descriptor of a column family has the value "IS_MOB", it means this 
column family
+  * is a mob one. When a HRegion instantiate a store for this column family, 
the HMobStore is
+  * created.
+  * HMobStore is almost the same with the HStore except using different types 
of scanners.
+  * In the method of getScanner, the MobStoreScanner and 
MobReversedStoreScanner are returned.
+  * In these scanners, a additional seeks in the mob files should be performed 
after the seek
+  * to HBase is done.
+  * The store implements how we save MOBs by extending HStore. When a 
descriptor
+  * of a column family has the value "IS_MOB", it means this column family is 
a mob one. When a
+  * HRegion instantiate a store for this column family, the HMobStore is 
created. HMobStore is
+  * almost the same with the HStore except using different types of scanners. 
In the method of
+  * getScanner, the MobStoreScanner and MobReversedStoreScanner are returned. 
In these scanners, a
+  * additional seeks in the mob files should be performed after the seek in 
HBase is done.
+  */
+ @InterfaceAudience.Private
+ public class HMobStore extends HStore {
+   private static final Log LOG = LogFactory.getLog(HMobStore.class);
+   private MobCacheConfig mobCacheConfig;
+   private Path homePath;
+   private Path mobFamilyPath;
+   private volatile long cellsCountCompactedToMob = 0;
+   private volatile long cellsCountCompactedFromMob = 0;
+   private volatile long cellsSizeCompactedToMob = 0;
+   private volatile long cellsSizeCompactedFromMob = 0;
+   private volatile long mobFlushCount = 0;
+   private volatile long mobFlushedCellsCount = 0;
+   private volatile long mobFlushedCellsSize = 0;
+   private volatile long mobScanCellsCount = 0;
+   private volatile long mobScanCellsSize = 0;
+   private HColumnDescriptor family;
+   private TableLockManager tableLockManager;
+   private TableName tableLockName;
+   private Map<String, List<Path>> map = new ConcurrentHashMap<String, 
List<Path>>();
+   private final IdLock keyLock = new IdLock();
+ 
+   public HMobStore(final HRegion region, final HColumnDescriptor family,
+       final Configuration confParam) throws IOException {
+     super(region, family, confParam);
+     this.family = family;
+     this.mobCacheConfig = (MobCacheConfig) cacheConf;
+     this.homePath = MobUtils.getMobHome(conf);
+     this.mobFamilyPath = MobUtils.getMobFamilyPath(conf, this.getTableName(),
+         family.getNameAsString());
+     List<Path> locations = new ArrayList<Path>(2);
+     locations.add(mobFamilyPath);
+     TableName tn = region.getTableDesc().getTableName();
+     locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, 
MobUtils.getMobRegionInfo(tn)
+         .getEncodedName(), family.getNameAsString()));
+     map.put(Bytes.toString(tn.getName()), locations);
+     if (region.getRegionServerServices() != null) {
+       tableLockManager = 
region.getRegionServerServices().getTableLockManager();
+       tableLockName = MobUtils.getTableLockName(getTableName());
+     }
+   }
+ 
+   /**
+    * Creates the mob cache config.
+    */
+   @Override
+   protected void createCacheConf(HColumnDescriptor family) {
+     cacheConf = new MobCacheConfig(conf, family);
+   }
+ 
+   /**
+    * Gets current config.
+    */
+   public Configuration getConfiguration() {
+     return this.conf;
+   }
+ 
+   /**
+    * Gets the MobStoreScanner or MobReversedStoreScanner. In these scanners, 
a additional seeks in
+    * the mob files should be performed after the seek in HBase is done.
+    */
+   @Override
+   protected KeyValueScanner createScanner(Scan scan, final 
NavigableSet<byte[]> targetCols,
+       long readPt, KeyValueScanner scanner) throws IOException {
+     if (scanner == null) {
+       if (MobUtils.isRefOnlyScan(scan)) {
+         Filter refOnlyFilter = new MobReferenceOnlyFilter();
+         Filter filter = scan.getFilter();
+         if (filter != null) {
+           scan.setFilter(new FilterList(filter, refOnlyFilter));
+         } else {
+           scan.setFilter(refOnlyFilter);
+         }
+       }
+       scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, 
getScanInfo(), scan,
+           targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), 
scan, targetCols, readPt);
+     }
+     return scanner;
+   }
+ 
+   /**
+    * Creates the mob store engine.
+    */
+   @Override
+   protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, 
Configuration conf,
+       CellComparator cellComparator) throws IOException {
+     MobStoreEngine engine = new MobStoreEngine();
+     engine.createComponents(conf, store, cellComparator);
+     return engine;
+   }
+ 
+   /**
+    * Gets the temp directory.
+    * @return The temp directory.
+    */
+   private Path getTempDir() {
+     return new Path(homePath, MobConstants.TEMP_DIR_NAME);
+   }
+ 
+   /**
+    * Creates the writer for the mob file in temp directory.
+    * @param date The latest date of written cells.
+    * @param maxKeyCount The key count.
+    * @param compression The compression algorithm.
+    * @param startKey The start key.
+    * @return The writer for the mob file.
+    * @throws IOException
+    */
+   public StoreFile.Writer createWriterInTmp(Date date, long maxKeyCount,
+       Compression.Algorithm compression, byte[] startKey) throws IOException {
+     if (startKey == null) {
+       startKey = HConstants.EMPTY_START_ROW;
+     }
+     Path path = getTempDir();
+     return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, 
compression, startKey);
+   }
+ 
+   /**
+    * Creates the writer for the del file in temp directory.
+    * The del file keeps tracking the delete markers. Its name has a suffix 
_del,
+    * the format is [0-9a-f]+(_del)?.
+    * @param date The latest date of written cells.
+    * @param maxKeyCount The key count.
+    * @param compression The compression algorithm.
+    * @param startKey The start key.
+    * @return The writer for the del file.
+    * @throws IOException
+    */
+   public StoreFile.Writer createDelFileWriterInTmp(Date date, long 
maxKeyCount,
+       Compression.Algorithm compression, byte[] startKey) throws IOException {
+     if (startKey == null) {
+       startKey = HConstants.EMPTY_START_ROW;
+     }
+     Path path = getTempDir();
+     String suffix = UUID
+         .randomUUID().toString().replaceAll("-", "") + "_del";
+     MobFileName mobFileName = MobFileName.create(startKey, 
MobUtils.formatDate(date), suffix);
+     return createWriterInTmp(mobFileName, path, maxKeyCount, compression);
+   }
+ 
+   /**
+    * Creates the writer for the mob file in temp directory.
+    * @param date The date string, its format is yyyymmmdd.
+    * @param basePath The basic path for a temp directory.
+    * @param maxKeyCount The key count.
+    * @param compression The compression algorithm.
+    * @param startKey The start key.
+    * @return The writer for the mob file.
+    * @throws IOException
+    */
+   public StoreFile.Writer createWriterInTmp(String date, Path basePath, long 
maxKeyCount,
+       Compression.Algorithm compression, byte[] startKey) throws IOException {
+     MobFileName mobFileName = MobFileName.create(startKey, date, 
UUID.randomUUID()
+         .toString().replaceAll("-", ""));
+     return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression);
+   }
+ 
+   /**
+    * Creates the writer for the mob file in temp directory.
+    * @param mobFileName The mob file name.
+    * @param basePath The basic path for a temp directory.
+    * @param maxKeyCount The key count.
+    * @param compression The compression algorithm.
+    * @return The writer for the mob file.
+    * @throws IOException
+    */
 -  public StoreFile.Writer createWriterInTmp(MobFileName mobFileName, Path 
basePath, long maxKeyCount,
 -      Compression.Algorithm compression) throws IOException {
++  public StoreFile.Writer createWriterInTmp(MobFileName mobFileName, Path 
basePath,
++      long maxKeyCount, Compression.Algorithm compression) throws IOException 
{
+     final CacheConfig writerCacheConf = mobCacheConfig;
+     HFileContext hFileContext = new 
HFileContextBuilder().withCompression(compression)
+         .withIncludesMvcc(true).withIncludesTags(true)
+         .withCompressTags(family.isCompressTags())
+         .withChecksumType(checksumType)
+         .withBytesPerCheckSum(bytesPerChecksum)
+         .withBlockSize(blocksize)
+         
.withHBaseCheckSum(true).withDataBlockEncoding(getFamily().getDataBlockEncoding())
+         .withEncryptionContext(cryptoContext)
+         .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
+ 
+     StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, 
region.getFilesystem())
+         .withFilePath(new Path(basePath, mobFileName.getFileName()))
+         
.withComparator(CellComparator.COMPARATOR).withBloomType(BloomType.NONE)
+         .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
+     return w;
+   }
+ 
+   /**
+    * Commits the mob file.
+    * @param sourceFile The source file.
+    * @param targetPath The directory path where the source file is renamed to.
+    * @throws IOException
+    */
+   public void commitFile(final Path sourceFile, Path targetPath) throws 
IOException {
+     if (sourceFile == null) {
+       return;
+     }
+     Path dstPath = new Path(targetPath, sourceFile.getName());
+     validateMobFile(sourceFile);
+     String msg = "Renaming flushed file from " + sourceFile + " to " + 
dstPath;
+     LOG.info(msg);
+     Path parent = dstPath.getParent();
+     if (!region.getFilesystem().exists(parent)) {
+       region.getFilesystem().mkdirs(parent);
+     }
+     if (!region.getFilesystem().rename(sourceFile, dstPath)) {
+       throw new IOException("Failed rename of " + sourceFile + " to " + 
dstPath);
+     }
+   }
+ 
+   /**
+    * Validates a mob file by opening and closing it.
+    *
+    * @param path the path to the mob file
+    */
+   private void validateMobFile(Path path) throws IOException {
+     StoreFile storeFile = null;
+     try {
+       storeFile =
+           new StoreFile(region.getFilesystem(), path, conf, 
this.mobCacheConfig, BloomType.NONE);
+       storeFile.createReader();
+     } catch (IOException e) {
+       LOG.error("Fail to open mob file[" + path + "], keep it in temp 
directory.", e);
+       throw e;
+     } finally {
+       if (storeFile != null) {
+         storeFile.closeReader(false);
+       }
+     }
+   }
+ 
+   /**
+    * Reads the cell from the mob file, and the read point does not count.
+    * This is used for DefaultMobStoreCompactor where we can read empty value 
for the missing cell.
+    * @param reference The cell found in the HBase, its value is a path to a 
mob file.
+    * @param cacheBlocks Whether the scanner should cache blocks.
+    * @return The cell found in the mob file.
+    * @throws IOException
+    */
+   public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException 
{
+     return resolve(reference, cacheBlocks, -1, true);
+   }
+ 
+   /**
+    * Reads the cell from the mob file.
+    * @param reference The cell found in the HBase, its value is a path to a 
mob file.
+    * @param cacheBlocks Whether the scanner should cache blocks.
+    * @param readPt the read point.
+    * @param readEmptyValueOnMobCellMiss Whether return null value when the 
mob file is
+    *        missing or corrupt.
+    * @return The cell found in the mob file.
+    * @throws IOException
+    */
+   public Cell resolve(Cell reference, boolean cacheBlocks, long readPt,
+     boolean readEmptyValueOnMobCellMiss) throws IOException {
+     Cell result = null;
+     if (MobUtils.hasValidMobRefCellValue(reference)) {
+       String fileName = MobUtils.getMobFileName(reference);
+       Tag tableNameTag = MobUtils.getTableNameTag(reference);
+       if (tableNameTag != null) {
+         byte[] tableName = tableNameTag.getValue();
+         String tableNameString = Bytes.toString(tableName);
+         List<Path> locations = map.get(tableNameString);
+         if (locations == null) {
+           IdLock.Entry lockEntry = 
keyLock.getLockEntry(tableNameString.hashCode());
+           try {
+             locations = map.get(tableNameString);
+             if (locations == null) {
+               locations = new ArrayList<Path>(2);
+               TableName tn = TableName.valueOf(tableName);
+               locations.add(MobUtils.getMobFamilyPath(conf, tn, 
family.getNameAsString()));
+               locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, 
MobUtils
+                   .getMobRegionInfo(tn).getEncodedName(), 
family.getNameAsString()));
+               map.put(tableNameString, locations);
+             }
+           } finally {
+             keyLock.releaseLockEntry(lockEntry);
+           }
+         }
+         result = readCell(locations, fileName, reference, cacheBlocks, readPt,
+           readEmptyValueOnMobCellMiss);
+       }
+     }
+     if (result == null) {
+       LOG.warn("The KeyValue result is null, assemble a new KeyValue with the 
same row,family,"
+           + "qualifier,timestamp,type and tags but with an empty value to 
return.");
+       result = new KeyValue(reference.getRowArray(), reference.getRowOffset(),
+           reference.getRowLength(), reference.getFamilyArray(), 
reference.getFamilyOffset(),
+           reference.getFamilyLength(), reference.getQualifierArray(),
+           reference.getQualifierOffset(), reference.getQualifierLength(), 
reference.getTimestamp(),
+           Type.codeToType(reference.getTypeByte()), 
HConstants.EMPTY_BYTE_ARRAY,
+           0, 0, reference.getTagsArray(), reference.getTagsOffset(),
+           reference.getTagsLength());
+     }
+     return result;
+   }
+ 
+   /**
+    * Reads the cell from a mob file.
+    * The mob file might be located in different directories.
+    * 1. The working directory.
+    * 2. The archive directory.
+    * Reads the cell from the files located in both of the above directories.
+    * @param locations The possible locations where the mob files are saved.
+    * @param fileName The file to be read.
+    * @param search The cell to be searched.
+    * @param cacheMobBlocks Whether the scanner should cache blocks.
+    * @param readPt the read point.
+    * @param readEmptyValueOnMobCellMiss Whether return null value when the 
mob file is
+    *        missing or corrupt.
+    * @return The found cell. Null if there's no such a cell.
+    * @throws IOException
+    */
+   private Cell readCell(List<Path> locations, String fileName, Cell search, 
boolean cacheMobBlocks,
+     long readPt, boolean readEmptyValueOnMobCellMiss) throws IOException {
+     FileSystem fs = getFileSystem();
+     Throwable throwable = null;
+     for (Path location : locations) {
+       MobFile file = null;
+       Path path = new Path(location, fileName);
+       try {
+         file = mobCacheConfig.getMobFileCache().openFile(fs, path, 
mobCacheConfig);
+         return readPt != -1 ? file.readCell(search, cacheMobBlocks, readPt) : 
file.readCell(search,
+           cacheMobBlocks);
+       } catch (IOException e) {
+         mobCacheConfig.getMobFileCache().evictFile(fileName);
+         throwable = e;
+         if ((e instanceof FileNotFoundException) ||
+             (e.getCause() instanceof FileNotFoundException)) {
+           LOG.warn("Fail to read the cell, the mob file " + path + " doesn't 
exist", e);
+         } else if (e instanceof CorruptHFileException) {
+           LOG.error("The mob file " + path + " is corrupt", e);
+           break;
+         } else {
+           throw e;
+         }
+       } catch (NullPointerException e) { // HDFS 1.x - 
DFSInputStream.getBlockAt()
+         mobCacheConfig.getMobFileCache().evictFile(fileName);
+         LOG.warn("Fail to read the cell", e);
+         throwable = e;
+       } catch (AssertionError e) { // assert in HDFS 1.x - 
DFSInputStream.getBlockAt()
+         mobCacheConfig.getMobFileCache().evictFile(fileName);
+         LOG.warn("Fail to read the cell", e);
+         throwable = e;
+       } finally {
+         if (file != null) {
+           mobCacheConfig.getMobFileCache().closeFile(file);
+         }
+       }
+     }
+     LOG.error("The mob file " + fileName + " could not be found in the 
locations " + locations
+       + " or it is corrupt");
+     if (readEmptyValueOnMobCellMiss) {
+       return null;
+     } else if (throwable instanceof IOException) {
+       throw (IOException) throwable;
+     } else {
+       throw new IOException(throwable);
+     }
+   }
+ 
+   /**
+    * Gets the mob file path.
+    * @return The mob file path.
+    */
+   public Path getPath() {
+     return mobFamilyPath;
+   }
+ 
+   /**
+    * The compaction in the store of mob.
+    * The cells in this store contains the path of the mob files. There might 
be race
+    * condition between the major compaction and the sweeping in mob files.
+    * In order to avoid this, we need mutually exclude the running of the 
major compaction and
+    * sweeping in mob files.
+    * The minor compaction is not affected.
+    * The major compaction is marked as retainDeleteMarkers when a sweeping is 
in progress.
+    */
+   @Override
+   public List<StoreFile> compact(CompactionContext compaction,
+       CompactionThroughputController throughputController) throws IOException 
{
+     // If it's major compaction, try to find whether there's a sweeper is 
running
+     // If yes, mark the major compaction as retainDeleteMarkers
+     if (compaction.getRequest().isAllFiles()) {
+       // Use the Zookeeper to coordinate.
+       // 1. Acquire a operation lock.
+       //   1.1. If no, mark the major compaction as retainDeleteMarkers and 
continue the compaction.
+       //   1.2. If the lock is obtained, search the node of sweeping.
+       //      1.2.1. If the node is there, the sweeping is in progress, mark 
the major
+       //             compaction as retainDeleteMarkers and continue the 
compaction.
+       //      1.2.2. If the node is not there, add a child to the major 
compaction node, and
+       //             run the compaction directly.
+       TableLock lock = null;
+       if (tableLockManager != null) {
+         lock = tableLockManager.readLock(tableLockName, "Major compaction in 
HMobStore");
+       }
+       boolean tableLocked = false;
+       String tableName = getTableName().getNameAsString();
+       if (lock != null) {
+         try {
+           LOG.info("Start to acquire a read lock for the table[" + tableName
+               + "], ready to perform the major compaction");
+           lock.acquire();
+           tableLocked = true;
+         } catch (Exception e) {
+           LOG.error("Fail to lock the table " + tableName, e);
+         }
+       } else {
+         // If the tableLockManager is null, mark the tableLocked as true.
+         tableLocked = true;
+       }
+       try {
+         if (!tableLocked) {
+           LOG.warn("Cannot obtain the table lock, maybe a sweep tool is 
running on this table["
+               + tableName + "], forcing the delete markers to be retained");
+           compaction.getRequest().forceRetainDeleteMarkers();
+         }
+         return super.compact(compaction, throughputController);
+       } finally {
+         if (tableLocked && lock != null) {
+           try {
+             lock.release();
+           } catch (IOException e) {
+             LOG.error("Fail to release the table lock " + tableName, e);
+           }
+         }
+       }
+     } else {
+       // If it's not a major compaction, continue the compaction.
+       return super.compact(compaction, throughputController);
+     }
+   }
+ 
+   public void updateCellsCountCompactedToMob(long count) {
+     cellsCountCompactedToMob += count;
+   }
+ 
+   public long getCellsCountCompactedToMob() {
+     return cellsCountCompactedToMob;
+   }
+ 
+   public void updateCellsCountCompactedFromMob(long count) {
+     cellsCountCompactedFromMob += count;
+   }
+ 
+   public long getCellsCountCompactedFromMob() {
+     return cellsCountCompactedFromMob;
+   }
+ 
+   public void updateCellsSizeCompactedToMob(long size) {
+     cellsSizeCompactedToMob += size;
+   }
+ 
+   public long getCellsSizeCompactedToMob() {
+     return cellsSizeCompactedToMob;
+   }
+ 
+   public void updateCellsSizeCompactedFromMob(long size) {
+     cellsSizeCompactedFromMob += size;
+   }
+ 
+   public long getCellsSizeCompactedFromMob() {
+     return cellsSizeCompactedFromMob;
+   }
+ 
+   public void updateMobFlushCount() {
+     mobFlushCount++;
+   }
+ 
+   public long getMobFlushCount() {
+     return mobFlushCount;
+   }
+ 
+   public void updateMobFlushedCellsCount(long count) {
+     mobFlushedCellsCount += count;
+   }
+ 
+   public long getMobFlushedCellsCount() {
+     return mobFlushedCellsCount;
+   }
+ 
+   public void updateMobFlushedCellsSize(long size) {
+     mobFlushedCellsSize += size;
+   }
+ 
+   public long getMobFlushedCellsSize() {
+     return mobFlushedCellsSize;
+   }
+ 
+   public void updateMobScanCellsCount(long count) {
+     mobScanCellsCount += count;
+   }
+ 
+   public long getMobScanCellsCount() {
+     return mobScanCellsCount;
+   }
+ 
+   public void updateMobScanCellsSize(long size) {
+     mobScanCellsSize += size;
+   }
+ 
+   public long getMobScanCellsSize() {
+     return mobScanCellsSize;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------

Reply via email to