Repository: hbase
Updated Branches:
  refs/heads/hbase-11339 9cf46dcbe -> 84e957c87


http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
new file mode 100644
index 0000000..04fe359
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
@@ -0,0 +1,506 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.InvalidFamilyOperationException;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobFile;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.MobZookeeper.DummyMobAbortable;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * The reducer of a sweep job.
+ * This reducer merges the small mob files into bigger ones, and write visited
+ * names of mob files to a sequence file which is used by the sweep job to 
delete
+ * the unused mob files.
+ * The key of the input is a file name, the value is a collection of KeyValue 
where
+ * the KeyValue is the actual cell (its format is valueLength + fileName) in 
HBase.
+ * In this reducer, we could know how many cells exist in HBase for a mob file.
+ * If the existCellSize/mobFileSize < compactionRatio, this mob
+ * file needs to be merged.
+ */
+@InterfaceAudience.Private
+public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
+
+  private static final Log LOG = LogFactory.getLog(SweepReducer.class);
+
+  private SequenceFile.Writer writer = null;
+  private MemStoreWrapper memstore;
+  private Configuration conf;
+  private FileSystem fs;
+
+  private Path familyDir;
+  private CacheConfig cacheConfig;
+  private long compactionBegin;
+  private HTable table;
+  private HColumnDescriptor family;
+  private long mobCompactionDelay;
+  private Path mobTableDir;
+
+  @Override
+  protected void setup(Context context) throws IOException, 
InterruptedException {
+    this.conf = context.getConfiguration();
+    this.fs = FileSystem.get(conf);
+    // the MOB_COMPACTION_DELAY is ONE_DAY by default. Its value is only 
changed when testing.
+    mobCompactionDelay = conf.getLong(SweepJob.MOB_COMPACTION_DELAY, 
SweepJob.ONE_DAY);
+    String tableName = conf.get(TableInputFormat.INPUT_TABLE);
+    String familyName = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY);
+    TableName tn = TableName.valueOf(tableName);
+    this.familyDir = MobUtils.getMobFamilyPath(conf, tn, familyName);
+    HBaseAdmin admin = new HBaseAdmin(this.conf);
+    try {
+      family = 
admin.getTableDescriptor(tn).getFamily(Bytes.toBytes(familyName));
+      if (family == null) {
+        // this column family might be removed, directly return.
+        throw new InvalidFamilyOperationException("Column family '" + 
familyName
+            + "' does not exist. It might be removed.");
+      }
+    } finally {
+      try {
+        admin.close();
+      } catch (IOException e) {
+        LOG.warn("Fail to close the HBaseAdmin", e);
+      }
+    }
+    // disable the block cache.
+    Configuration copyOfConf = new Configuration(conf);
+    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.00001f);
+    this.cacheConfig = new CacheConfig(copyOfConf);
+
+    table = new HTable(this.conf, Bytes.toBytes(tableName));
+    table.setAutoFlush(false, false);
+
+    table.setWriteBufferSize(1 * 1024 * 1024); // 1MB
+    memstore = new MemStoreWrapper(context, fs, table, family, new 
DefaultMemStore(), cacheConfig);
+
+    // The start time of the sweep tool.
+    // Only the mob files whose creation time is older than startTime-oneDay 
will be handled by the
+    // reducer since it brings inconsistency to handle the latest mob files.
+    this.compactionBegin = 
conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, 0);
+    mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tn);
+  }
+
+  private SweepPartition createPartition(SweepPartitionId id, Context context) 
throws IOException {
+    return new SweepPartition(id, context);
+  }
+
+  @Override
+  public void run(Context context) throws IOException, InterruptedException {
+    String jobId = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
+    String sweeperNode = context.getConfiguration().get(SweepJob.SWEEPER_NODE);
+    ZooKeeperWatcher zkw = new ZooKeeperWatcher(context.getConfiguration(), 
jobId,
+        new DummyMobAbortable());
+    try {
+      SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, 
jobId);
+      tracker.start();
+      setup(context);
+      // create a sequence contains all the visited file names in this reducer.
+      String dir = this.conf.get(SweepJob.WORKING_VISITED_DIR_KEY);
+      Path nameFilePath = new Path(dir, UUID.randomUUID().toString()
+          .replace("-", MobConstants.EMPTY_STRING));
+      if (!fs.exists(nameFilePath)) {
+        fs.create(nameFilePath, true);
+      }
+      writer = SequenceFile.createWriter(fs, context.getConfiguration(), 
nameFilePath,
+          String.class, String.class);
+      SweepPartitionId id;
+      SweepPartition partition = null;
+      // the mob files which have the same start key and date are in the same 
partition.
+      while (context.nextKey()) {
+        Text key = context.getCurrentKey();
+        String keyString = key.toString();
+        id = SweepPartitionId.create(keyString);
+        if (null == partition || !id.equals(partition.getId())) {
+          // It's the first mob file in the current partition.
+          if (null != partition) {
+            // this mob file is in different partitions with the previous mob 
file.
+            // directly close.
+            partition.close();
+          }
+          // create a new one
+          partition = createPartition(id, context);
+        }
+        if (partition != null) {
+          // run the partition
+          partition.execute(key, context.getValues());
+        }
+      }
+      if (null != partition) {
+        partition.close();
+      }
+    } catch (KeeperException e) {
+      throw new IOException(e);
+    } finally {
+      cleanup(context);
+      zkw.close();
+      if (writer != null) {
+        IOUtils.closeStream(writer);
+      }
+      if (table != null) {
+        try {
+          table.close();
+        } catch (IOException e) {
+          LOG.warn(e);
+        }
+      }
+    }
+
+  }
+
+  /**
+   * The mob files which have the same start key and date are in the same 
partition.
+   * The files in the same partition are merged together into bigger ones.
+   */
+  public class SweepPartition {
+
+    private final SweepPartitionId id;
+    private final Context context;
+    private boolean memstoreUpdated = false;
+    private boolean mergeSmall = false;
+    private final Map<String, MobFileStatus> fileStatusMap = new 
HashMap<String, MobFileStatus>();
+    private final List<Path> toBeDeleted = new ArrayList<Path>();
+
+    public SweepPartition(SweepPartitionId id, Context context) throws 
IOException {
+      this.id = id;
+      this.context = context;
+      memstore.setPartitionId(id);
+      init();
+    }
+
+    public SweepPartitionId getId() {
+      return this.id;
+    }
+
+    /**
+     * Prepares the map of files.
+     *
+     * @throws IOException
+     */
+    private void init() throws IOException {
+      FileStatus[] fileStats = listStatus(familyDir, id.getStartKey());
+      if (null == fileStats) {
+        return;
+      }
+
+      int smallFileCount = 0;
+      float compactionRatio = 
conf.getFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO,
+          MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO);
+      long compactionMergeableSize = conf.getLong(
+          MobConstants.MOB_SWEEP_TOOL_COMPACTION_MERGEABLE_SIZE,
+          MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE);
+      // list the files. Just merge the hfiles, don't merge the hfile links.
+      // prepare the map of mob files. The key is the file name, the value is 
the file status.
+      for (FileStatus fileStat : fileStats) {
+        MobFileStatus mobFileStatus = null;
+        if (!HFileLink.isHFileLink(fileStat.getPath())) {
+          mobFileStatus = new MobFileStatus(fileStat, compactionRatio, 
compactionMergeableSize);
+          if (mobFileStatus.needMerge()) {
+            smallFileCount++;
+          }
+          // key is file name (not hfile name), value is hfile status.
+          fileStatusMap.put(fileStat.getPath().getName(), mobFileStatus);
+        }
+      }
+      if (smallFileCount >= 2) {
+        // merge the files only when there're more than 1 files in the same 
partition.
+        this.mergeSmall = true;
+      }
+    }
+
+    /**
+     * Flushes the data into mob files and store files, and archives the small
+     * files after they're merged.
+     * @throws IOException
+     */
+    public void close() throws IOException {
+      if (null == id) {
+        return;
+      }
+      // flush remain key values into mob files
+      if (memstoreUpdated) {
+        memstore.flushMemStore();
+      }
+      List<StoreFile> storeFiles = new 
ArrayList<StoreFile>(toBeDeleted.size());
+      // delete samll files after compaction
+      for (Path path : toBeDeleted) {
+        LOG.info("[In Partition close] Delete the file " + path + " in 
partition close");
+        storeFiles.add(new StoreFile(fs, path, conf, cacheConfig, 
BloomType.NONE));
+      }
+      if (!storeFiles.isEmpty()) {
+        try {
+          MobUtils.removeMobFiles(conf, fs, table.getName(), mobTableDir, 
family.getName(),
+              storeFiles);
+          
context.getCounter(SweepCounter.FILE_TO_BE_MERGE_OR_CLEAN).increment(storeFiles.size());
+        } catch (IOException e) {
+          LOG.error("Fail to archive the store files " + storeFiles, e);
+        }
+        storeFiles.clear();
+      }
+      fileStatusMap.clear();
+    }
+
+    /**
+     * Merges the small mob files into bigger ones.
+     * @param fileName The current mob file name.
+     * @param values The collection of KeyValues in this mob file.
+     * @throws IOException
+     */
+    public void execute(Text fileName, Iterable<KeyValue> values) throws 
IOException {
+      if (null == values) {
+        return;
+      }
+      MobFileName mobFileName = MobFileName.create(fileName.toString());
+      LOG.info("[In reducer] The file name: " + fileName.toString());
+      MobFileStatus mobFileStat = fileStatusMap.get(mobFileName.getFileName());
+      if (null == mobFileStat) {
+        LOG.info("[In reducer] Cannot find the file, probably this record is 
obsolete");
+        return;
+      }
+      // only handle the files that are older then one day.
+      if (compactionBegin - mobFileStat.getFileStatus().getModificationTime()
+          <= mobCompactionDelay) {
+        return;
+      }
+      // write the hfile name
+      writer.append(mobFileName.getFileName(), MobConstants.EMPTY_STRING);
+      Set<KeyValue> kvs = new HashSet<KeyValue>();
+      for (KeyValue kv : values) {
+        if (kv.getValueLength() > Bytes.SIZEOF_INT) {
+          mobFileStat.addValidSize(Bytes.toInt(kv.getValueArray(), 
kv.getValueOffset(),
+              Bytes.SIZEOF_INT));
+        }
+        kvs.add(kv.createKeyOnly(false));
+      }
+      // If the mob file is a invalid one or a small one, merge it into 
new/bigger ones.
+      if (mobFileStat.needClean() || (mergeSmall && mobFileStat.needMerge())) {
+        context.getCounter(SweepCounter.INPUT_FILE_COUNT).increment(1);
+        MobFile file = MobFile.create(fs,
+            new Path(familyDir, mobFileName.getFileName()), conf, cacheConfig);
+        StoreFileScanner scanner = null;
+        try {
+          scanner = file.getScanner();
+          
scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY));
+          Cell cell;
+          while (null != (cell = scanner.next())) {
+            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+            KeyValue keyOnly = kv.createKeyOnly(false);
+            if (kvs.contains(keyOnly)) {
+              // write the KeyValue existing in HBase to the memstore.
+              memstore.addToMemstore(kv);
+              memstoreUpdated = true;
+            }
+          }
+        } finally {
+          if (scanner != null) {
+            scanner.close();
+          }
+        }
+        toBeDeleted.add(mobFileStat.getFileStatus().getPath());
+      }
+    }
+
+    /**
+     * Lists the files with the same prefix.
+     * @param p The file path.
+     * @param prefix The prefix.
+     * @return The files with the same prefix.
+     * @throws IOException
+     */
+    private FileStatus[] listStatus(Path p, String prefix) throws IOException {
+      return fs.listStatus(p, new PathPrefixFilter(prefix));
+    }
+  }
+
+  static class PathPrefixFilter implements PathFilter {
+
+    private final String prefix;
+
+    public PathPrefixFilter(String prefix) {
+      this.prefix = prefix;
+    }
+
+    public boolean accept(Path path) {
+      return path.getName().startsWith(prefix, 0);
+    }
+
+  }
+
+  /**
+   * The sweep partition id.
+   * It consists of the start key and date.
+   * The start key is a hex string of the checksum of a region start key.
+   * The date is the latest timestamp of cells in a mob file.
+   */
+  public static class SweepPartitionId {
+    private String date;
+    private String startKey;
+
+    public SweepPartitionId(MobFileName fileName) {
+      this.date = fileName.getDate();
+      this.startKey = fileName.getStartKey();
+    }
+
+    public SweepPartitionId(String date, String startKey) {
+      this.date = date;
+      this.startKey = startKey;
+    }
+
+    public static SweepPartitionId create(String key) {
+      return new SweepPartitionId(MobFileName.create(key));
+    }
+
+    @Override
+    public boolean equals(Object anObject) {
+      if (this == anObject) {
+        return true;
+      }
+      if (anObject instanceof SweepPartitionId) {
+        SweepPartitionId another = (SweepPartitionId) anObject;
+        if (this.date.equals(another.getDate()) && 
this.startKey.equals(another.getStartKey())) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    public String getDate() {
+      return this.date;
+    }
+
+    public String getStartKey() {
+      return this.startKey;
+    }
+
+    public void setDate(String date) {
+      this.date = date;
+    }
+
+    public void setStartKey(String startKey) {
+      this.startKey = startKey;
+    }
+  }
+
+  /**
+   * The mob file status used in the sweep reduecer.
+   */
+  private static class MobFileStatus {
+    private FileStatus fileStatus;
+    private int validSize;
+    private long size;
+
+    private float compactionRatio = 
MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO;
+    private long compactionMergeableSize =
+        MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE;
+
+    /**
+     * @param fileStatus The current FileStatus.
+     * @param compactionRatio compactionRatio the invalid ratio.
+     * If there're too many cells deleted in a mob file, it's regarded as 
invalid,
+     * and needs to be written to a new one.
+     * If existingCellSize/fileSize < compactionRatio, it's regarded as a 
invalid one.
+     * @param compactionMergeableSize compactionMergeableSize If the size of a 
mob file is less
+     * than this value, it's regarded as a small file and needs to be merged
+     */
+    public MobFileStatus(FileStatus fileStatus, float compactionRatio,
+        long compactionMergeableSize) {
+      this.fileStatus = fileStatus;
+      this.size = fileStatus.getLen();
+      validSize = 0;
+      this.compactionRatio = compactionRatio;
+      this.compactionMergeableSize = compactionMergeableSize;
+    }
+
+    /**
+     * Add size to this file.
+     * @param size The size to be added.
+     */
+    public void addValidSize(int size) {
+      this.validSize += size;
+    }
+
+    /**
+     * Whether the mob files need to be cleaned.
+     * If there're too many cells deleted in this mob file, it needs to be 
cleaned.
+     * @return True if it needs to be cleaned.
+     */
+    public boolean needClean() {
+      return validSize < compactionRatio * size;
+    }
+
+    /**
+     * Whether the mob files need to be merged.
+     * If this mob file is too small, it needs to be merged.
+     * @return True if it needs to be merged.
+     */
+    public boolean needMerge() {
+      return this.size < compactionMergeableSize;
+    }
+
+    /**
+     * Gets the file status.
+     * @return The file status.
+     */
+    public FileStatus getFileStatus() {
+      return fileStatus;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
new file mode 100644
index 0000000..d71dc83
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.zookeeper.KeeperException;
+
+import com.google.protobuf.ServiceException;
+
+/**
+ * The sweep tool. It deletes the mob files that are not used and merges the 
small mob files to
+ * bigger ones. Each run of this sweep tool only handles one column family. 
The runs on
+ * the same column family are mutually exclusive. And the major compaction and 
sweep tool on the
+ * same column family are mutually exclusive too.
+ */
+@InterfaceAudience.Public
+public class Sweeper extends Configured implements Tool {
+
+  /**
+   * Sweeps the mob files on one column family. It deletes the unused mob 
files and merges
+   * the small mob files into bigger ones.
+   * @param tableName The current table name in string format.
+   * @param familyName The column family name.
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   * @throws KeeperException
+   * @throws ServiceException
+   */
+  void sweepFamily(String tableName, String familyName) throws IOException, 
InterruptedException,
+      ClassNotFoundException, KeeperException, ServiceException {
+    Configuration conf = getConf();
+    // make sure the target HBase exists.
+    HBaseAdmin.checkHBaseAvailable(conf);
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    try {
+      FileSystem fs = FileSystem.get(conf);
+      TableName tn = TableName.valueOf(tableName);
+      HTableDescriptor htd = admin.getTableDescriptor(tn);
+      HColumnDescriptor family = htd.getFamily(Bytes.toBytes(familyName));
+      if (family == null || !MobUtils.isMobFamily(family)) {
+        throw new IOException("Column family " + familyName + " is not a MOB 
column family");
+      }
+      SweepJob job = new SweepJob(conf, fs);
+      // Run the sweeping
+      job.sweep(tn, family);
+    } finally {
+      try {
+        admin.close();
+      } catch (IOException e) {
+        System.out.println("Fail to close the HBaseAdmin: " + e.getMessage());
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    ToolRunner.run(conf, new Sweeper(), args);
+  }
+
+  private void printUsage() {
+    System.err.println("Usage:\n" + "--------------------------\n" + 
Sweeper.class.getName()
+        + " tableName familyName");
+    System.err.println(" tableName        The table name");
+    System.err.println(" familyName       The column family name");
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length != 2) {
+      printUsage();
+      return 1;
+    }
+    String table = args[0];
+    String family = args[1];
+    sweepFamily(table, family);
+    return 0;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index 9c6f34e..071b5fe 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -18,13 +18,17 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 import java.util.NavigableSet;
 import java.util.UUID;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -32,7 +36,10 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
@@ -44,7 +51,10 @@ import org.apache.hadoop.hbase.mob.MobFile;
 import org.apache.hadoop.hbase.mob.MobFileName;
 import org.apache.hadoop.hbase.mob.MobStoreEngine;
 import org.apache.hadoop.hbase.mob.MobUtils;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.mob.MobZookeeper;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * The store implementation to save MOBs (medium objects), it extends the 
HStore.
@@ -68,6 +78,7 @@ public class HMobStore extends HStore {
   private MobCacheConfig mobCacheConfig;
   private Path homePath;
   private Path mobFamilyPath;
+  private List<Path> mobDirLocations;
 
   public HMobStore(final HRegion region, final HColumnDescriptor family,
       final Configuration confParam) throws IOException {
@@ -76,6 +87,11 @@ public class HMobStore extends HStore {
     this.homePath = MobUtils.getMobHome(conf);
     this.mobFamilyPath = MobUtils.getMobFamilyPath(conf, this.getTableName(),
         family.getNameAsString());
+    mobDirLocations = new ArrayList<Path>();
+    mobDirLocations.add(mobFamilyPath);
+    TableName tn = region.getTableDesc().getTableName();
+    mobDirLocations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils
+        .getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
   }
 
   /**
@@ -87,6 +103,13 @@ public class HMobStore extends HStore {
   }
 
   /**
+   * Gets current config.
+   */
+  public Configuration getConfiguration() {
+    return this.conf;
+  }
+
+  /**
    * Gets the MobStoreScanner or MobReversedStoreScanner. In these scanners, a 
additional seeks in
    * the mob files should be performed after the seek in HBase is done.
    */
@@ -94,6 +117,15 @@ public class HMobStore extends HStore {
   protected KeyValueScanner createScanner(Scan scan, final 
NavigableSet<byte[]> targetCols,
       long readPt, KeyValueScanner scanner) throws IOException {
     if (scanner == null) {
+      if (MobUtils.isRefOnlyScan(scan)) {
+        Filter refOnlyFilter = new MobReferenceOnlyFilter();
+        Filter filter = scan.getFilter();
+        if (filter != null) {
+          scan.setFilter(new FilterList(filter, refOnlyFilter));
+        } else {
+          scan.setFilter(refOnlyFilter);
+        }
+      }
       scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, 
getScanInfo(), scan,
           targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), scan, 
targetCols, readPt);
     }
@@ -219,30 +251,10 @@ public class HMobStore extends HStore {
    */
   public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException {
     Cell result = null;
-    if (MobUtils.isValidMobRefCellValue(reference)) {
+    if (MobUtils.hasValidMobRefCellValue(reference)) {
       String fileName = MobUtils.getMobFileName(reference);
-      Path targetPath = new Path(mobFamilyPath, fileName);
-      MobFile file = null;
-      try {
-        file = 
mobCacheConfig.getMobFileCache().openFile(region.getFilesystem(), targetPath,
-            mobCacheConfig);
-        result = file.readCell(reference, cacheBlocks);
-      } catch (IOException e) {
-        LOG.error("Fail to open/read the mob file " + targetPath.toString(), 
e);
-      } catch (NullPointerException e) {
-        // When delete the file during the scan, the hdfs getBlockRange will
-        // throw NullPointerException, catch it and manage it.
-        LOG.error("Fail to read the mob file " + targetPath.toString()
-            + " since it's already deleted", e);
-      } finally {
-        if (file != null) {
-          mobCacheConfig.getMobFileCache().closeFile(file);
-        }
-      }
-    } else {
-      LOG.warn("Invalid reference to mob, " + reference.getValueLength() + " 
bytes is too short");
+      result = readCell(fileName, reference, cacheBlocks);
     }
-
     if (result == null) {
       LOG.warn("The KeyValue result is null, assemble a new KeyValue with the 
same row,family,"
           + "qualifier,timestamp,type and tags but with an empty value to 
return.");
@@ -258,10 +270,132 @@ public class HMobStore extends HStore {
   }
 
   /**
+   * Reads the cell from a mob file.
+   * The mob file might be located in different directories.
+   * 1. The working directory.
+   * 2. The archive directory.
+   * Reads the cell from the files located in both of the above directories.
+   * @param fileName The file to be read.
+   * @param search The cell to be searched.
+   * @param cacheMobBlocks Whether the scanner should cache blocks.
+   * @return The found cell. Null if there's no such a cell.
+   * @throws IOException
+   */
+  private Cell readCell(String fileName, Cell search, boolean cacheMobBlocks) 
throws IOException {
+    FileSystem fs = getFileSystem();
+    for (Path location : mobDirLocations) {
+      MobFile file = null;
+      Path path = new Path(location, fileName);
+      try {
+        file = mobCacheConfig.getMobFileCache().openFile(fs, path, 
mobCacheConfig);
+        return file.readCell(search, cacheMobBlocks);
+      } catch (IOException e) {
+        mobCacheConfig.getMobFileCache().evictFile(fileName);
+        if (e instanceof FileNotFoundException) {
+          LOG.warn("Fail to read the cell, the mob file " + path + " doesn't 
exist", e);
+        } else {
+          throw e;
+        }
+      } finally {
+        if (file != null) {
+          mobCacheConfig.getMobFileCache().closeFile(file);
+        }
+      }
+    }
+    LOG.error("The mob file " + fileName + " could not be found in the 
locations "
+        + mobDirLocations);
+    return null;
+  }
+
+  /**
    * Gets the mob file path.
    * @return The mob file path.
    */
   public Path getPath() {
     return mobFamilyPath;
   }
+
+  /**
+   * The compaction in the store of mob.
+   * The cells in this store contains the path of the mob files. There might 
be race
+   * condition between the major compaction and the sweeping in mob files.
+   * In order to avoid this, we need mutually exclude the running of the major 
compaction and
+   * sweeping in mob files.
+   * The minor compaction is not affected.
+   * The major compaction is converted to a minor one when a sweeping is in 
progress.
+   */
+  @Override
+  public List<StoreFile> compact(CompactionContext compaction) throws 
IOException {
+    // If it's major compaction, try to find whether there's a sweeper is 
running
+    // If yes, change the major compaction to a minor one.
+    if (compaction.getRequest().isMajor()) {
+      // Use the Zookeeper to coordinate.
+      // 1. Acquire a operation lock.
+      //   1.1. If no, convert the major compaction to a minor one and 
continue the compaction.
+      //   1.2. If the lock is obtained, search the node of sweeping.
+      //      1.2.1. If the node is there, the sweeping is in progress, 
convert the major
+      //             compaction to a minor one and continue the compaction.
+      //      1.2.2. If the node is not there, add a child to the major 
compaction node, and
+      //             run the compaction directly.
+      String compactionName = UUID.randomUUID().toString().replaceAll("-", "");
+      MobZookeeper zk = null;
+      try {
+        zk = MobZookeeper.newInstance(this.conf, compactionName);
+      } catch (KeeperException e) {
+        LOG.error("Cannot connect to the zookeeper, ready to perform the minor 
compaction instead",
+            e);
+        // change the major compaction into a minor one
+        compaction.getRequest().setIsMajor(false, false);
+        return super.compact(compaction);
+      }
+      boolean major = false;
+      try {
+        // try to acquire the operation lock.
+        if (zk.lockColumnFamily(getTableName().getNameAsString(), 
getFamily().getNameAsString())) {
+          try {
+            LOG.info("Obtain the lock for the store[" + this
+                + "], ready to perform the major compaction");
+            // check the sweeping node to find out whether the sweeping is in 
progress.
+            boolean hasSweeper = 
zk.isSweeperZNodeExist(getTableName().getNameAsString(),
+                getFamily().getNameAsString());
+            if (!hasSweeper) {
+              // if not, add a child to the major compaction node of this 
store.
+              major = 
zk.addMajorCompactionZNode(getTableName().getNameAsString(), getFamily()
+                  .getNameAsString(), compactionName);
+            }
+          } catch (Exception e) {
+            LOG.error("Fail to handle the Zookeeper", e);
+          } finally {
+            // release the operation lock
+            zk.unlockColumnFamily(getTableName().getNameAsString(), 
getFamily().getNameAsString());
+          }
+        }
+        try {
+          if (major) {
+            return super.compact(compaction);
+          } else {
+            LOG.warn("Cannot obtain the lock or a sweep tool is running on 
this store["
+                + this + "], ready to perform the minor compaction instead");
+            // change the major compaction into a minor one
+            compaction.getRequest().setIsMajor(false, false);
+            return super.compact(compaction);
+          }
+        } finally {
+          if (major) {
+            try {
+              zk.deleteMajorCompactionZNode(getTableName().getNameAsString(), 
getFamily()
+                  .getNameAsString(), compactionName);
+            } catch (KeeperException e) {
+              LOG.error("Fail to delete the compaction znode" + 
compactionName, e);
+            }
+          }
+        }
+      } finally {
+        zk.close();
+      }
+    } else {
+      // If it's not a major compaction, continue the compaction.
+      return super.compact(compaction);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReferenceOnlyFilter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReferenceOnlyFilter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReferenceOnlyFilter.java
new file mode 100644
index 0000000..10aea24
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReferenceOnlyFilter.java
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.mob.MobUtils;
+
+/**
+ * A filter that returns the cells which have mob reference tags. It's a 
server-side filter.
+ */
+@InterfaceAudience.Private
+class MobReferenceOnlyFilter extends FilterBase {
+
+  @Override
+  public ReturnCode filterKeyValue(Cell cell) {
+    if (null != cell) {
+      // If a cell with a mob reference tag, it's included.
+      if (MobUtils.isMobReferenceCell(cell)) {
+        return ReturnCode.INCLUDE;
+      }
+    }
+    return ReturnCode.SKIP;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleaner.java
new file mode 100644
index 0000000..7cba86c
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleaner.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestExpiredMobFileCleaner {
+
+  private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+  private final static TableName tableName = 
TableName.valueOf("TestExpiredMobFileCleaner");
+  private final static String family = "family";
+  private final static byte[] row1 = Bytes.toBytes("row1");
+  private final static byte[] row2 = Bytes.toBytes("row2");
+  private final static byte[] qf = Bytes.toBytes("qf");
+
+  private static HTable table;
+  private static Admin admin;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+    
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", 
true);
+
+    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    admin.disableTable(tableName);
+    admin.deleteTable(tableName);
+    admin.close();
+    TEST_UTIL.shutdownMiniCluster();
+    TEST_UTIL.getTestFileSystem().delete(TEST_UTIL.getDataTestDir(), true);
+  }
+
+  private void init() throws Exception {
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    HColumnDescriptor hcd = new HColumnDescriptor(family);
+    hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+    hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L));
+    hcd.setMaxVersions(4);
+    desc.addFamily(hcd);
+
+    admin = TEST_UTIL.getHBaseAdmin();
+    admin.createTable(desc);
+    table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+    table.setAutoFlush(false, false);
+  }
+
+  private void modifyColumnExpiryDays(int expireDays) throws Exception {
+    HColumnDescriptor hcd = new HColumnDescriptor(family);
+    hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+    hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L));
+    // change ttl as expire days to make some row expired
+    int timeToLive = expireDays * secondsOfDay();
+    hcd.setTimeToLive(timeToLive);
+
+    admin.modifyColumn(tableName, hcd);
+  }
+
+  private void putKVAndFlush(HTable table, byte[] row, byte[] value, long ts)
+      throws Exception {
+
+    Put put = new Put(row, ts);
+    put.add(Bytes.toBytes(family), qf, value);
+    table.put(put);
+
+    table.flushCommits();
+    admin.flush(tableName);
+  }
+
+  /**
+   * Creates a 3 day old hfile and an 1 day old hfile then sets expiry to 2 
days.
+   * Verifies that the 3 day old hfile is removed but the 1 day one is still 
present
+   * after the expiry based cleaner is run.
+   */
+  @Test
+  public void testCleaner() throws Exception {
+    init();
+
+    Path mobDirPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), 
tableName, family);
+
+    byte[] dummyData = makeDummyData(600);
+    long ts = System.currentTimeMillis() - 3 * secondsOfDay() * 1000; // 3 
days before
+    putKVAndFlush(table, row1, dummyData, ts);
+    FileStatus[] firstFiles = 
TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
+    //the first mob file
+    assertEquals("Before cleanup without delay 1", 1, firstFiles.length);
+    String firstFile = firstFiles[0].getPath().getName();
+
+    ts = System.currentTimeMillis() - 1 * secondsOfDay() * 1000; // 1 day 
before
+    putKVAndFlush(table, row2, dummyData, ts);
+    FileStatus[] secondFiles = 
TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
+    //now there are 2 mob files
+    assertEquals("Before cleanup without delay 2", 2, secondFiles.length);
+    String f1 = secondFiles[0].getPath().getName();
+    String f2 = secondFiles[1].getPath().getName();
+    String secondFile = f1.equals(firstFile) ? f2 : f1;
+
+    modifyColumnExpiryDays(2); // ttl = 2, make the first row expired
+
+    //run the cleaner
+    String[] args = new String[2];
+    args[0] = tableName.getNameAsString();
+    args[1] = family;
+    ToolRunner.run(TEST_UTIL.getConfiguration(), new ExpiredMobFileCleaner(), 
args);
+
+    FileStatus[] filesAfterClean = 
TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
+    String lastFile = filesAfterClean[0].getPath().getName();
+    //the first mob fie is removed
+    assertEquals("After cleanup without delay 1", 1, filesAfterClean.length);
+    assertEquals("After cleanup without delay 2", secondFile, lastFile);
+  }
+
+  private Path getMobFamilyPath(Configuration conf, TableName tableName, 
String familyName) {
+    Path p = new Path(MobUtils.getMobRegionPath(conf, tableName), familyName);
+    return p;
+  }
+
+  private int secondsOfDay() {
+    return 24 * 3600;
+  }
+
+  private byte[] makeDummyData(int size) {
+    byte [] dummyData = new byte[size];
+    new Random().nextBytes(dummyData);
+    return dummyData;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
new file mode 100644
index 0000000..e0b9a83
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestMobSweepJob {
+
+  private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+    
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", 
true);
+    
TEST_UTIL.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
+        JavaSerialization.class.getName() + "," + 
WritableSerialization.class.getName());
+    TEST_UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private void writeFileNames(FileSystem fs, Configuration conf, Path path,
+      String[] filesNames) throws IOException {
+    // write the names to a sequence file
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path,
+        String.class, String.class);
+    try {
+      for (String fileName : filesNames) {
+        writer.append(fileName, MobConstants.EMPTY_STRING);
+      }
+    } finally {
+      IOUtils.closeStream(writer);
+    }
+  }
+
+  @Test
+  public void testSweeperJobWithOutUnusedFile() throws Exception {
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Configuration configuration = new Configuration(
+        TEST_UTIL.getConfiguration());
+    Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+        "/hbase/mobcompaction/SweepJob/working/names/0/visited");
+    Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+        "/hbase/mobcompaction/SweepJob/working/names/0/all");
+    configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
+        vistiedFileNamesPath.toString());
+    configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
+        allFileNamesPath.toString());
+
+    writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
+        "2", "3", "4", "5", "6"});
+
+    Path r0 = new Path(vistiedFileNamesPath, "r0");
+    writeFileNames(fs, configuration, r0, new String[] { "1",
+        "2", "3"});
+    Path r1 = new Path(vistiedFileNamesPath, "r1");
+    writeFileNames(fs, configuration, r1, new String[] { "1", "4", "5"});
+    Path r2 = new Path(vistiedFileNamesPath, "r2");
+    writeFileNames(fs, configuration, r2, new String[] { "2", "3", "6"});
+
+    SweepJob sweepJob = new SweepJob(configuration, fs);
+    List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
+
+    assertEquals(0, toBeArchived.size());
+  }
+
+  @Test
+  public void testSweeperJobWithUnusedFile() throws Exception {
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Configuration configuration = new Configuration(
+        TEST_UTIL.getConfiguration());
+    Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+        "/hbase/mobcompaction/SweepJob/working/names/1/visited");
+    Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+        "/hbase/mobcompaction/SweepJob/working/names/1/all");
+    configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
+        vistiedFileNamesPath.toString());
+    configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
+        allFileNamesPath.toString());
+
+    writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
+        "2", "3", "4", "5", "6"});
+
+    Path r0 = new Path(vistiedFileNamesPath, "r0");
+    writeFileNames(fs, configuration, r0, new String[] { "1",
+        "2", "3"});
+    Path r1 = new Path(vistiedFileNamesPath, "r1");
+    writeFileNames(fs, configuration, r1, new String[] { "1", "5"});
+    Path r2 = new Path(vistiedFileNamesPath, "r2");
+    writeFileNames(fs, configuration, r2, new String[] { "2", "3"});
+
+    SweepJob sweepJob = new SweepJob(configuration, fs);
+    List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
+
+    assertEquals(2, toBeArchived.size());
+    assertEquals(new String[] { "4", "6" }, toBeArchived.toArray(new 
String[0]));
+  }
+
+  @Test
+  public void testSweeperJobWithRedundantFile() throws Exception {
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Configuration configuration = new Configuration(
+        TEST_UTIL.getConfiguration());
+    Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+        "/hbase/mobcompaction/SweepJob/working/names/2/visited");
+    Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+        "/hbase/mobcompaction/SweepJob/working/names/2/all");
+    configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
+        vistiedFileNamesPath.toString());
+    configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
+        allFileNamesPath.toString());
+
+    writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
+        "2", "3", "4", "5", "6"});
+
+    Path r0 = new Path(vistiedFileNamesPath, "r0");
+    writeFileNames(fs, configuration, r0, new String[] { "1",
+        "2", "3"});
+    Path r1 = new Path(vistiedFileNamesPath, "r1");
+    writeFileNames(fs, configuration, r1, new String[] { "1", "5", "6", "7"});
+    Path r2 = new Path(vistiedFileNamesPath, "r2");
+    writeFileNames(fs, configuration, r2, new String[] { "2", "3", "4"});
+
+    SweepJob sweepJob = new SweepJob(configuration, fs);
+    List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
+
+    assertEquals(0, toBeArchived.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
new file mode 100644
index 0000000..a7e2538
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mob.MobZookeeper;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+@Category(SmallTests.class)
+public class TestMobSweepMapper {
+
+  private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+    
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", 
true);
+    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void TestMap() throws Exception {
+    String prefix = "0000";
+    final String fileName = "19691231f2cd014ea28f42788214560a21a44cef";
+    final String mobFilePath = prefix + fileName;
+
+    ImmutableBytesWritable r = new ImmutableBytesWritable(Bytes.toBytes("r"));
+    final KeyValue[] kvList = new KeyValue[1];
+    kvList[0] = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"),
+            Bytes.toBytes("column"), Bytes.toBytes(mobFilePath));
+
+    Result columns = mock(Result.class);
+    when(columns.raw()).thenReturn(kvList);
+
+    Configuration configuration = new 
Configuration(TEST_UTIL.getConfiguration());
+    configuration.set(SweepJob.SWEEP_JOB_ID, "1");
+    configuration.set(SweepJob.SWEEPER_NODE, 
"/hbase/MOB/testSweepMapper:family-sweeper");
+
+    MobZookeeper zk = MobZookeeper.newInstance(configuration, "1");
+    zk.addSweeperZNode("testSweepMapper", "family", Bytes.toBytes("1"));
+
+    Mapper<ImmutableBytesWritable, Result, Text, KeyValue>.Context ctx =
+            mock(Mapper.Context.class);
+    when(ctx.getConfiguration()).thenReturn(configuration);
+    SweepMapper map = new SweepMapper();
+    doAnswer(new Answer<Void>() {
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        Text text = (Text) invocation.getArguments()[0];
+        KeyValue kv = (KeyValue) invocation.getArguments()[1];
+
+        assertEquals(Bytes.toString(text.getBytes(), 0, text.getLength()), 
fileName);
+        assertEquals(0, Bytes.compareTo(kv.getKey(), kvList[0].getKey()));
+
+        return null;
+      }
+    }).when(ctx).write(any(Text.class), any(KeyValue.class));
+
+    map.map(r, columns, ctx);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
new file mode 100644
index 0000000..0f4c3ff
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.MobZookeeper;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.counters.GenericCounter;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Matchers;
+
+@Category(MediumTests.class)
+public class TestMobSweepReducer {
+
+  private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+  private final static String tableName = "testSweepReducer";
+  private final static String row = "row";
+  private final static String family = "family";
+  private final static String qf = "qf";
+  private static HTable table;
+  private static Admin admin;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+    
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", 
true);
+
+    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @SuppressWarnings("deprecation")
+  @Before
+  public void setUp() throws Exception {
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    HColumnDescriptor hcd = new HColumnDescriptor(family);
+    hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+    hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L));
+    hcd.setMaxVersions(4);
+    desc.addFamily(hcd);
+
+    admin = TEST_UTIL.getHBaseAdmin();
+    admin.createTable(desc);
+    table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    admin.disableTable(TableName.valueOf(tableName));
+    admin.deleteTable(TableName.valueOf(tableName));
+    admin.close();
+  }
+
+  private List<String> getKeyFromSequenceFile(FileSystem fs, Path path,
+                                              Configuration conf) throws 
Exception {
+    List<String> list = new ArrayList<String>();
+    SequenceFile.Reader reader = new SequenceFile.Reader(conf, 
SequenceFile.Reader.file(path));
+
+    String next = (String) reader.next((String) null);
+    while (next != null) {
+      list.add(next);
+      next = (String) reader.next((String) null);
+    }
+    reader.close();
+    return list;
+  }
+
+  @Test
+  public void testRun() throws Exception {
+
+    byte[] mobValueBytes = new byte[100];
+
+    //get the path where mob files lie in
+    Path mobFamilyPath = 
MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(),
+    TableName.valueOf(tableName), family);
+
+    Put put = new Put(Bytes.toBytes(row));
+    put.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
+    Put put2 = new Put(Bytes.toBytes(row + "ignore"));
+    put2.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
+    table.put(put);
+    table.put(put2);
+    table.flushCommits();
+    admin.flush(TableName.valueOf(tableName));
+
+    FileStatus[] fileStatuses = 
TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+    //check the generation of a mob file
+    assertEquals(1, fileStatuses.length);
+
+    String mobFile1 = fileStatuses[0].getPath().getName();
+
+    Configuration configuration = new 
Configuration(TEST_UTIL.getConfiguration());
+    configuration.setFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO, 0.6f);
+    configuration.setStrings(TableInputFormat.INPUT_TABLE, tableName);
+    configuration.setStrings(TableInputFormat.SCAN_COLUMN_FAMILY, family);
+    configuration.setStrings(SweepJob.WORKING_VISITED_DIR_KEY, 
"jobWorkingNamesDir");
+    configuration.setStrings(SweepJob.WORKING_FILES_DIR_KEY, 
"compactionFileDir");
+    configuration.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
+            JavaSerialization.class.getName());
+    configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, 
"compactionVisitedDir");
+    configuration.setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE,
+        System.currentTimeMillis() + 24 * 3600 * 1000);
+
+    configuration.set(SweepJob.SWEEP_JOB_ID, "1");
+    configuration.set(SweepJob.SWEEPER_NODE, 
"/hbase/MOB/testSweepReducer:family-sweeper");
+
+    MobZookeeper zk = MobZookeeper.newInstance(configuration, "1");
+    zk.addSweeperZNode(tableName, family, Bytes.toBytes("1"));
+
+    //use the same counter when mocking
+    Counter counter = new GenericCounter();
+    Reducer<Text, KeyValue, Writable, Writable>.Context ctx =
+            mock(Reducer.Context.class);
+    when(ctx.getConfiguration()).thenReturn(configuration);
+    when(ctx.getCounter(Matchers.any(SweepCounter.class))).thenReturn(counter);
+    when(ctx.nextKey()).thenReturn(true).thenReturn(false);
+    when(ctx.getCurrentKey()).thenReturn(new Text(mobFile1));
+
+    byte[] refBytes = Bytes.toBytes(mobFile1);
+    long valueLength = refBytes.length;
+    byte[] newValue = Bytes.add(Bytes.toBytes(valueLength), refBytes);
+    KeyValue kv2 = new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family),
+            Bytes.toBytes(qf), 1, KeyValue.Type.Put, newValue);
+    List<KeyValue> list = new ArrayList<KeyValue>();
+    list.add(kv2);
+
+    when(ctx.getValues()).thenReturn(list);
+
+    SweepReducer reducer = new SweepReducer();
+    reducer.run(ctx);
+
+    FileStatus[] filsStatuses2 = 
TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+    String mobFile2 = filsStatuses2[0].getPath().getName();
+    //new mob file is generated, old one has been archived
+    assertEquals(1, filsStatuses2.length);
+    assertEquals(false, mobFile2.equalsIgnoreCase(mobFile1));
+
+    //test sequence file
+    String workingPath = configuration.get("mob.compaction.visited.dir");
+    FileStatus[] statuses = TEST_UTIL.getTestFileSystem().listStatus(new 
Path(workingPath));
+    Set<String> files = new TreeSet<String>();
+    for (FileStatus st : statuses) {
+      files.addAll(getKeyFromSequenceFile(TEST_UTIL.getTestFileSystem(),
+              st.getPath(), configuration));
+    }
+    assertEquals(1, files.size());
+    assertEquals(true, files.contains(mobFile1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
new file mode 100644
index 0000000..c43bceb
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestMobSweeper {
+  private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+  private String tableName;
+  private final static String row = "row_";
+  private final static String family = "family";
+  private final static String column = "column";
+  private static HTable table;
+  private static Admin admin;
+
+  private Random random = new Random();
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+    
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", 
true);
+
+    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+
+    TEST_UTIL.startMiniCluster();
+
+    TEST_UTIL.startMiniMapReduceCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+    TEST_UTIL.shutdownMiniMapReduceCluster();
+  }
+
+  @SuppressWarnings("deprecation")
+  @Before
+  public void setUp() throws Exception {
+    long tid = System.currentTimeMillis();
+    tableName = "testSweeper" + tid;
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    HColumnDescriptor hcd = new HColumnDescriptor(family);
+    hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+    hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L));
+    hcd.setMaxVersions(4);
+    desc.addFamily(hcd);
+
+    admin = TEST_UTIL.getHBaseAdmin();
+    admin.createTable(desc);
+    table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+    table.setAutoFlush(false);
+
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    admin.disableTable(TableName.valueOf(tableName));
+    admin.deleteTable(TableName.valueOf(tableName));
+    admin.close();
+  }
+
+  private Path getMobFamilyPath(Configuration conf, String tableNameStr,
+                                String familyName) {
+    Path p = new Path(MobUtils.getMobRegionPath(conf, 
TableName.valueOf(tableNameStr)),
+            familyName);
+    return p;
+  }
+
+
+  private String mergeString(Set<String> set) {
+    StringBuilder sb = new StringBuilder();
+    for (String s : set)
+      sb.append(s);
+    return sb.toString();
+  }
+
+
+  private void generateMobTable(int count, int flushStep)
+          throws IOException, InterruptedException {
+    if (count <= 0 || flushStep <= 0)
+      return;
+    int index = 0;
+    for (int i = 0; i < count; i++) {
+      byte[] mobVal = new byte[101*1024];
+      random.nextBytes(mobVal);
+
+      Put put = new Put(Bytes.toBytes(row + i));
+      put.add(Bytes.toBytes(family), Bytes.toBytes(column), mobVal);
+      table.put(put);
+      if (index++ % flushStep == 0) {
+        table.flushCommits();
+        admin.flush(TableName.valueOf(tableName));
+      }
+
+
+    }
+    table.flushCommits();
+    admin.flush(TableName.valueOf(tableName));
+  }
+
+  @Test
+  public void testSweeper() throws Exception {
+
+    int count = 10;
+    //create table and generate 10 mob files
+    generateMobTable(count, 1);
+
+    //get mob files
+    Path mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), 
tableName, family);
+    FileStatus[] fileStatuses = 
TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+    // mobFileSet0 stores the orignal mob files
+    TreeSet<String> mobFilesSet = new TreeSet<String>();
+    for (FileStatus status : fileStatuses) {
+      mobFilesSet.add(status.getPath().getName());
+    }
+
+    //scan the table, retreive the references
+    Scan scan = new Scan();
+    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+    scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, 
Bytes.toBytes(Boolean.TRUE));
+    ResultScanner rs = table.getScanner(scan);
+    TreeSet<String> mobFilesScanned = new TreeSet<String>();
+    for (Result res : rs) {
+      byte[] valueBytes = res.getValue(Bytes.toBytes(family),
+          Bytes.toBytes(column));
+      mobFilesScanned.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
+          valueBytes.length - Bytes.SIZEOF_INT));
+    }
+
+    //there should be 10 mob files
+    assertEquals(10, mobFilesScanned.size());
+    //check if we store the correct reference of mob files
+    assertEquals(mergeString(mobFilesSet), mergeString(mobFilesScanned));
+
+
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setLong(SweepJob.MOB_COMPACTION_DELAY, 24 * 60 * 60 * 1000);
+
+    String[] args = new String[2];
+    args[0] = tableName;
+    args[1] = family;
+    ToolRunner.run(conf, new Sweeper(), args);
+
+
+    mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, 
family);
+    fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+    mobFilesSet = new TreeSet<String>();
+    for (FileStatus status : fileStatuses) {
+      mobFilesSet.add(status.getPath().getName());
+    }
+
+    assertEquals(10, mobFilesSet.size());
+
+
+    scan = new Scan();
+    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+    scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, 
Bytes.toBytes(Boolean.TRUE));
+    rs = table.getScanner(scan);
+    TreeSet<String> mobFilesScannedAfterJob = new TreeSet<String>();
+    for (Result res : rs) {
+      byte[] valueBytes = res.getValue(Bytes.toBytes(family), Bytes.toBytes(
+          column));
+      mobFilesScannedAfterJob.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
+          valueBytes.length - Bytes.SIZEOF_INT));
+    }
+
+    assertEquals(10, mobFilesScannedAfterJob.size());
+
+    fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+    mobFilesSet = new TreeSet<String>();
+    for (FileStatus status : fileStatuses) {
+      mobFilesSet.add(status.getPath().getName());
+    }
+
+    assertEquals(10, mobFilesSet.size());
+    assertEquals(true, mobFilesScannedAfterJob.iterator().next()
+            .equalsIgnoreCase(mobFilesSet.iterator().next()));
+
+  }
+
+  @Test
+  public void testCompactionDelaySweeper() throws Exception {
+
+    int count = 10;
+    //create table and generate 10 mob files
+    generateMobTable(count, 1);
+
+    //get mob files
+    Path mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), 
tableName, family);
+    FileStatus[] fileStatuses = 
TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+    // mobFileSet0 stores the orignal mob files
+    TreeSet<String> mobFilesSet = new TreeSet<String>();
+    for (FileStatus status : fileStatuses) {
+      mobFilesSet.add(status.getPath().getName());
+    }
+
+    //scan the table, retreive the references
+    Scan scan = new Scan();
+    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+    scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, 
Bytes.toBytes(Boolean.TRUE));
+    ResultScanner rs = table.getScanner(scan);
+    TreeSet<String> mobFilesScanned = new TreeSet<String>();
+    for (Result res : rs) {
+      byte[] valueBytes = res.getValue(Bytes.toBytes(family),
+              Bytes.toBytes(column));
+      mobFilesScanned.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
+          valueBytes.length - Bytes.SIZEOF_INT));
+    }
+
+    //there should be 10 mob files
+    assertEquals(10, mobFilesScanned.size());
+    //check if we store the correct reference of mob files
+    assertEquals(mergeString(mobFilesSet), mergeString(mobFilesScanned));
+
+
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setLong(SweepJob.MOB_COMPACTION_DELAY, 0);
+
+    String[] args = new String[2];
+    args[0] = tableName;
+    args[1] = family;
+    ToolRunner.run(conf, new Sweeper(), args);
+
+
+    mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, 
family);
+    fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+    mobFilesSet = new TreeSet<String>();
+    for (FileStatus status : fileStatuses) {
+      mobFilesSet.add(status.getPath().getName());
+    }
+
+    assertEquals(1, mobFilesSet.size());
+
+
+    scan = new Scan();
+    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+    scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, 
Bytes.toBytes(Boolean.TRUE));
+    rs = table.getScanner(scan);
+    TreeSet<String> mobFilesScannedAfterJob = new TreeSet<String>();
+    for (Result res : rs) {
+      byte[] valueBytes = res.getValue(Bytes.toBytes(family), Bytes.toBytes(
+              column));
+      mobFilesScannedAfterJob.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
+          valueBytes.length - Bytes.SIZEOF_INT));
+    }
+
+    assertEquals(1, mobFilesScannedAfterJob.size());
+
+    fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+    mobFilesSet = new TreeSet<String>();
+    for (FileStatus status : fileStatuses) {
+      mobFilesSet.add(status.getPath().getName());
+    }
+
+    assertEquals(1, mobFilesSet.size());
+    assertEquals(true, mobFilesScannedAfterJob.iterator().next()
+            .equalsIgnoreCase(mobFilesSet.iterator().next()));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
index f8d6ce4..51f4de3 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
@@ -53,10 +53,13 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.MobZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -70,7 +73,7 @@ public class TestMobCompaction {
   @Rule
   public TestName name = new TestName();
   static final Log LOG = LogFactory.getLog(TestMobCompaction.class.getName());
-  private HBaseTestingUtility UTIL = null;
+  private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
   private Configuration conf = null;
 
   private HRegion region = null;
@@ -84,14 +87,22 @@ public class TestMobCompaction {
   private final byte[] STARTROW = Bytes.toBytes(START_KEY);
   private int compactionThreshold;
 
-  private void init(long mobThreshold) throws Exception {
-    this.mobCellThreshold = mobThreshold;
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+    UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", 
true);
+    UTIL.startMiniCluster(1);
+  }
 
-    UTIL = HBaseTestingUtility.createLocalHTU();
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
 
+  private void init(long mobThreshold) throws Exception {
+    this.mobCellThreshold = mobThreshold;
     conf = UTIL.getConfiguration();
     compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
-
     htd = UTIL.createTableDescriptor(name.getMethodName());
     hcd = new HColumnDescriptor(COLUMN_FAMILY);
     hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
@@ -158,7 +169,8 @@ public class TestMobCompaction {
     region.getTableDesc().getFamily(COLUMN_FAMILY).setValue(
         MobConstants.MOB_THRESHOLD, Bytes.toBytes(500L));
     region.initialize();
-    region.compactStores(true);
+    region.compactStores();
+
     assertEquals("After compaction: store files", 1, countStoreFiles());
     assertEquals("After compaction: mob file count", compactionThreshold, 
countMobFiles());
     assertEquals("After compaction: referenced mob file count", 0, 
countReferencedMobFiles());
@@ -307,7 +319,7 @@ public class TestMobCompaction {
         if (!MobUtils.isMobReferenceCell(kv)) {
           continue;
         }
-        if (!MobUtils.isValidMobRefCellValue(kv)) {
+        if (!MobUtils.hasValidMobRefCellValue(kv)) {
           continue;
         }
         int size = MobUtils.getMobValueLength(kv);

http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
index 69b9c8f..87147d1 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
@@ -22,12 +22,14 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.TableName;
@@ -40,6 +42,8 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobUtils;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -120,6 +124,7 @@ public class TestMobStoreScanner {
          testGetFromMemStore(false);
     testGetReferences(false);
     testMobThreshold(false);
+    testGetFromArchive(false);
   }
 
   @Test
@@ -128,6 +133,7 @@ public class TestMobStoreScanner {
          testGetFromMemStore(true);
     testGetReferences(true);
     testMobThreshold(true);
+    testGetFromArchive(true);
   }
 
   public void testGetFromFiles(boolean reversed) throws Exception {
@@ -282,6 +288,72 @@ public class TestMobStoreScanner {
     results.close();
   }
 
+  public void testGetFromArchive(boolean reversed) throws Exception {
+    String TN = "testGetFromArchive" + reversed;
+    setUp(defaultThreshold, TN);
+    long ts1 = System.currentTimeMillis();
+    long ts2 = ts1 + 1;
+    long ts3 = ts1 + 2;
+    byte [] value = generateMobValue((int)defaultThreshold+1);;
+    // Put some data
+    Put put1 = new Put(row1);
+    put1.add(family, qf1, ts3, value);
+    put1.add(family, qf2, ts2, value);
+    put1.add(family, qf3, ts1, value);
+    table.put(put1);
+
+    table.flushCommits();
+    admin.flush(TN);
+
+    // Get the files in the mob path
+    Path mobFamilyPath;
+    mobFamilyPath = new 
Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
+        TableName.valueOf(TN)), hcd.getNameAsString());
+    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
+    FileStatus[] files = fs.listStatus(mobFamilyPath);
+
+    // Get the archive path
+    Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration());
+    Path tableDir = FSUtils.getTableDir(rootDir, TableName.valueOf(TN));
+    HRegionInfo regionInfo = MobUtils.getMobRegionInfo(TableName.valueOf(TN));
+    Path storeArchiveDir = 
HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(),
+        regionInfo, tableDir, family);
+
+    // Move the files from mob path to archive path
+    fs.mkdirs(storeArchiveDir);
+    int fileCount = 0;
+    for(FileStatus file : files) {
+      fileCount++;
+      Path filePath = file.getPath();
+      Path src = new Path(mobFamilyPath, filePath.getName());
+      Path dst = new Path(storeArchiveDir, filePath.getName());
+      fs.rename(src, dst);
+    }
+
+    // Verify the moving success
+    FileStatus[] files1 = fs.listStatus(mobFamilyPath);
+    Assert.assertEquals(0, files1.length);
+    FileStatus[] files2 = fs.listStatus(storeArchiveDir);
+    Assert.assertEquals(fileCount, files2.length);
+
+    // Scan from archive
+    Scan scan = new Scan();
+    setScan(scan, reversed, false);
+    ResultScanner results = table.getScanner(scan);
+    int count = 0;
+    for (Result res : results) {
+      List<Cell> cells = res.listCells();
+      for(Cell cell : cells) {
+        // Verify the value
+        Assert.assertEquals(Bytes.toString(value),
+            Bytes.toString(CellUtil.cloneValue(cell)));
+        count++;
+      }
+    }
+    results.close();
+    Assert.assertEquals(3, count);
+  }
+
   /**
    * Assert the value is not store in mob.
    */

Reply via email to