http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
new file mode 100644
index 0000000..c72a0c3
--- /dev/null
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
@@ -0,0 +1,786 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Collections;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterators;
+
+public class SyncTable extends Configured implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(SyncTable.class);
+
+  static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir";
+  static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name";
+  static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name";
+  static final String SOURCE_ZK_CLUSTER_CONF_KEY = 
"sync.table.source.zk.cluster";
+  static final String TARGET_ZK_CLUSTER_CONF_KEY = 
"sync.table.target.zk.cluster";
+  static final String DRY_RUN_CONF_KEY="sync.table.dry.run";
+
+  Path sourceHashDir;
+  String sourceTableName;
+  String targetTableName;
+
+  String sourceZkCluster;
+  String targetZkCluster;
+  boolean dryRun;
+
+  Counters counters;
+
+  public SyncTable(Configuration conf) {
+    super(conf);
+  }
+
+  public Job createSubmittableJob(String[] args) throws IOException {
+    FileSystem fs = sourceHashDir.getFileSystem(getConf());
+    if (!fs.exists(sourceHashDir)) {
+      throw new IOException("Source hash dir not found: " + sourceHashDir);
+    }
+
+    HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), 
sourceHashDir);
+    LOG.info("Read source hash manifest: " + tableHash);
+    LOG.info("Read " + tableHash.partitions.size() + " partition keys");
+    if (!tableHash.tableName.equals(sourceTableName)) {
+      LOG.warn("Table name mismatch - manifest indicates hash was taken from: "
+          + tableHash.tableName + " but job is reading from: " + 
sourceTableName);
+    }
+    if (tableHash.numHashFiles != tableHash.partitions.size() + 1) {
+      throw new RuntimeException("Hash data appears corrupt. The number of of 
hash files created"
+          + " should be 1 more than the number of partition keys.  However, 
the manifest file "
+          + " says numHashFiles=" + tableHash.numHashFiles + " but the number 
of partition keys"
+          + " found in the partitions file is " + tableHash.partitions.size());
+    }
+
+    Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR);
+    int dataSubdirCount = 0;
+    for (FileStatus file : fs.listStatus(dataDir)) {
+      if 
(file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) {
+        dataSubdirCount++;
+      }
+    }
+
+    if (dataSubdirCount != tableHash.numHashFiles) {
+      throw new RuntimeException("Hash data appears corrupt. The number of of 
hash files created"
+          + " should be 1 more than the number of partition keys.  However, 
the number of data dirs"
+          + " found is " + dataSubdirCount + " but the number of partition 
keys"
+          + " found in the partitions file is " + tableHash.partitions.size());
+    }
+
+    Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name",
+        "syncTable_" + sourceTableName + "-" + targetTableName));
+    Configuration jobConf = job.getConfiguration();
+    job.setJarByClass(HashTable.class);
+    jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString());
+    jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName);
+    jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName);
+    if (sourceZkCluster != null) {
+      jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster);
+    }
+    if (targetZkCluster != null) {
+      jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster);
+    }
+    jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
+
+    TableMapReduceUtil.initTableMapperJob(targetTableName, 
tableHash.initScan(),
+        SyncMapper.class, null, null, job);
+
+    job.setNumReduceTasks(0);
+
+    if (dryRun) {
+      job.setOutputFormatClass(NullOutputFormat.class);
+    } else {
+      // No reducers.  Just write straight to table.  Call initTableReducerJob
+      // because it sets up the TableOutputFormat.
+      TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null,
+          targetZkCluster, null, null);
+
+      // would be nice to add an option for bulk load instead
+    }
+
+    // Obtain an authentication token, for the specified cluster, on behalf of 
the current user
+    if (sourceZkCluster != null) {
+      Configuration peerConf =
+          HBaseConfiguration.createClusterConf(job.getConfiguration(), 
sourceZkCluster);
+      TableMapReduceUtil.initCredentialsForCluster(job, peerConf);
+    }
+    return job;
+  }
+
+  public static class SyncMapper extends TableMapper<ImmutableBytesWritable, 
Mutation> {
+    Path sourceHashDir;
+
+    Connection sourceConnection;
+    Connection targetConnection;
+    Table sourceTable;
+    Table targetTable;
+    boolean dryRun;
+
+    HashTable.TableHash sourceTableHash;
+    HashTable.TableHash.Reader sourceHashReader;
+    ImmutableBytesWritable currentSourceHash;
+    ImmutableBytesWritable nextSourceKey;
+    HashTable.ResultHasher targetHasher;
+
+    Throwable mapperException;
+
+    public static enum Counter {BATCHES, HASHES_MATCHED, HASHES_NOT_MATCHED, 
SOURCEMISSINGROWS,
+      SOURCEMISSINGCELLS, TARGETMISSINGROWS, TARGETMISSINGCELLS, 
ROWSWITHDIFFS, DIFFERENTCELLVALUES,
+      MATCHINGROWS, MATCHINGCELLS, EMPTY_BATCHES, RANGESMATCHED, 
RANGESNOTMATCHED};
+
+    @Override
+    protected void setup(Context context) throws IOException {
+
+      Configuration conf = context.getConfiguration();
+      sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
+      sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, 
null);
+      targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY,
+          TableOutputFormat.OUTPUT_CONF_PREFIX);
+      sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
+      targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
+      dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false);
+
+      sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
+      LOG.info("Read source hash manifest: " + sourceTableHash);
+      LOG.info("Read " + sourceTableHash.partitions.size() + " partition 
keys");
+
+      TableSplit split = (TableSplit) context.getInputSplit();
+      ImmutableBytesWritable splitStartKey = new 
ImmutableBytesWritable(split.getStartRow());
+
+      sourceHashReader = sourceTableHash.newReader(conf, splitStartKey);
+      findNextKeyHashPair();
+
+      // create a hasher, but don't start it right away
+      // instead, find the first hash batch at or after the start row
+      // and skip any rows that come before.  they will be caught by the 
previous task
+      targetHasher = new HashTable.ResultHasher();
+    }
+
+    private static Connection openConnection(Configuration conf, String 
zkClusterConfKey,
+                                             String configPrefix)
+      throws IOException {
+        String zkCluster = conf.get(zkClusterConfKey);
+        Configuration clusterConf = HBaseConfiguration.createClusterConf(conf,
+            zkCluster, configPrefix);
+        return ConnectionFactory.createConnection(clusterConf);
+    }
+
+    private static Table openTable(Connection connection, Configuration conf,
+        String tableNameConfKey) throws IOException {
+      return 
connection.getTable(TableName.valueOf(conf.get(tableNameConfKey)));
+    }
+
+    /**
+     * Attempt to read the next source key/hash pair.
+     * If there are no more, set nextSourceKey to null
+     */
+    private void findNextKeyHashPair() throws IOException {
+      boolean hasNext = sourceHashReader.next();
+      if (hasNext) {
+        nextSourceKey = sourceHashReader.getCurrentKey();
+      } else {
+        // no more keys - last hash goes to the end
+        nextSourceKey = null;
+      }
+    }
+
+    @Override
+    protected void map(ImmutableBytesWritable key, Result value, Context 
context)
+        throws IOException, InterruptedException {
+      try {
+        // first, finish any hash batches that end before the scanned row
+        while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) {
+          moveToNextBatch(context);
+        }
+
+        // next, add the scanned row (as long as we've reached the first batch)
+        if (targetHasher.isBatchStarted()) {
+          targetHasher.hashResult(value);
+        }
+      } catch (Throwable t) {
+        mapperException = t;
+        Throwables.propagateIfInstanceOf(t, IOException.class);
+        Throwables.propagateIfInstanceOf(t, InterruptedException.class);
+        Throwables.propagate(t);
+      }
+    }
+
+    /**
+     * If there is an open hash batch, complete it and sync if there are diffs.
+     * Start a new batch, and seek to read the
+     */
+    private void moveToNextBatch(Context context) throws IOException, 
InterruptedException {
+      if (targetHasher.isBatchStarted()) {
+        finishBatchAndCompareHashes(context);
+      }
+      targetHasher.startBatch(nextSourceKey);
+      currentSourceHash = sourceHashReader.getCurrentHash();
+
+      findNextKeyHashPair();
+    }
+
+    /**
+     * Finish the currently open hash batch.
+     * Compare the target hash to the given source hash.
+     * If they do not match, then sync the covered key range.
+     */
+    private void finishBatchAndCompareHashes(Context context)
+        throws IOException, InterruptedException {
+      targetHasher.finishBatch();
+      context.getCounter(Counter.BATCHES).increment(1);
+      if (targetHasher.getBatchSize() == 0) {
+        context.getCounter(Counter.EMPTY_BATCHES).increment(1);
+      }
+      ImmutableBytesWritable targetHash = targetHasher.getBatchHash();
+      if (targetHash.equals(currentSourceHash)) {
+        context.getCounter(Counter.HASHES_MATCHED).increment(1);
+      } else {
+        context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1);
+
+        ImmutableBytesWritable stopRow = nextSourceKey == null
+                                          ? new 
ImmutableBytesWritable(sourceTableHash.stopRow)
+                                          : nextSourceKey;
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Hash mismatch.  Key range: " + 
toHex(targetHasher.getBatchStartKey())
+              + " to " + toHex(stopRow)
+              + " sourceHash: " + toHex(currentSourceHash)
+              + " targetHash: " + toHex(targetHash));
+        }
+
+        syncRange(context, targetHasher.getBatchStartKey(), stopRow);
+      }
+    }
+    private static String toHex(ImmutableBytesWritable bytes) {
+      return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength());
+    }
+
+    private static final CellScanner EMPTY_CELL_SCANNER
+      = new CellScanner(Collections.<Result>emptyIterator());
+
+    /**
+     * Rescan the given range directly from the source and target tables.
+     * Count and log differences, and if this is not a dry run, output Puts 
and Deletes
+     * to make the target table match the source table for this range
+     */
+    private void syncRange(Context context, ImmutableBytesWritable startRow,
+        ImmutableBytesWritable stopRow) throws IOException, 
InterruptedException {
+      Scan scan = sourceTableHash.initScan();
+      scan.setStartRow(startRow.copyBytes());
+      scan.setStopRow(stopRow.copyBytes());
+
+      ResultScanner sourceScanner = sourceTable.getScanner(scan);
+      CellScanner sourceCells = new CellScanner(sourceScanner.iterator());
+
+      ResultScanner targetScanner = targetTable.getScanner(new Scan(scan));
+      CellScanner targetCells = new CellScanner(targetScanner.iterator());
+
+      boolean rangeMatched = true;
+      byte[] nextSourceRow = sourceCells.nextRow();
+      byte[] nextTargetRow = targetCells.nextRow();
+      while(nextSourceRow != null || nextTargetRow != null) {
+        boolean rowMatched;
+        int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow);
+        if (rowComparison < 0) {
+          if (LOG.isInfoEnabled()) {
+            LOG.info("Target missing row: " + Bytes.toHex(nextSourceRow));
+          }
+          context.getCounter(Counter.TARGETMISSINGROWS).increment(1);
+
+          rowMatched = syncRowCells(context, nextSourceRow, sourceCells, 
EMPTY_CELL_SCANNER);
+          nextSourceRow = sourceCells.nextRow();  // advance only source to 
next row
+        } else if (rowComparison > 0) {
+          if (LOG.isInfoEnabled()) {
+            LOG.info("Source missing row: " + Bytes.toHex(nextTargetRow));
+          }
+          context.getCounter(Counter.SOURCEMISSINGROWS).increment(1);
+
+          rowMatched = syncRowCells(context, nextTargetRow, 
EMPTY_CELL_SCANNER, targetCells);
+          nextTargetRow = targetCells.nextRow();  // advance only target to 
next row
+        } else {
+          // current row is the same on both sides, compare cell by cell
+          rowMatched = syncRowCells(context, nextSourceRow, sourceCells, 
targetCells);
+          nextSourceRow = sourceCells.nextRow();
+          nextTargetRow = targetCells.nextRow();
+        }
+
+        if (!rowMatched) {
+          rangeMatched = false;
+        }
+      }
+
+      sourceScanner.close();
+      targetScanner.close();
+
+      context.getCounter(rangeMatched ? Counter.RANGESMATCHED : 
Counter.RANGESNOTMATCHED)
+        .increment(1);
+    }
+
+    private static class CellScanner {
+      private final Iterator<Result> results;
+
+      private byte[] currentRow;
+      private Result currentRowResult;
+      private int nextCellInRow;
+
+      private Result nextRowResult;
+
+      public CellScanner(Iterator<Result> results) {
+        this.results = results;
+      }
+
+      /**
+       * Advance to the next row and return its row key.
+       * Returns null iff there are no more rows.
+       */
+      public byte[] nextRow() {
+        if (nextRowResult == null) {
+          // no cached row - check scanner for more
+          while (results.hasNext()) {
+            nextRowResult = results.next();
+            Cell nextCell = nextRowResult.rawCells()[0];
+            if (currentRow == null
+                || !Bytes.equals(currentRow, 0, currentRow.length, 
nextCell.getRowArray(),
+                nextCell.getRowOffset(), nextCell.getRowLength())) {
+              // found next row
+              break;
+            } else {
+              // found another result from current row, keep scanning
+              nextRowResult = null;
+            }
+          }
+
+          if (nextRowResult == null) {
+            // end of data, no more rows
+            currentRowResult = null;
+            currentRow = null;
+            return null;
+          }
+        }
+
+        // advance to cached result for next row
+        currentRowResult = nextRowResult;
+        nextCellInRow = 0;
+        currentRow = currentRowResult.getRow();
+        nextRowResult = null;
+        return currentRow;
+      }
+
+      /**
+       * Returns the next Cell in the current row or null iff none remain.
+       */
+      public Cell nextCellInRow() {
+        if (currentRowResult == null) {
+          // nothing left in current row
+          return null;
+        }
+
+        Cell nextCell = currentRowResult.rawCells()[nextCellInRow];
+        nextCellInRow++;
+        if (nextCellInRow == currentRowResult.size()) {
+          if (results.hasNext()) {
+            Result result = results.next();
+            Cell cell = result.rawCells()[0];
+            if (Bytes.equals(currentRow, 0, currentRow.length, 
cell.getRowArray(),
+                cell.getRowOffset(), cell.getRowLength())) {
+              // result is part of current row
+              currentRowResult = result;
+              nextCellInRow = 0;
+            } else {
+              // result is part of next row, cache it
+              nextRowResult = result;
+              // current row is complete
+              currentRowResult = null;
+            }
+          } else {
+            // end of data
+            currentRowResult = null;
+          }
+        }
+        return nextCell;
+      }
+    }
+
+    /**
+     * Compare the cells for the given row from the source and target tables.
+     * Count and log any differences.
+     * If not a dry run, output a Put and/or Delete needed to sync the target 
table
+     * to match the source table.
+     */
+    private boolean syncRowCells(Context context, byte[] rowKey, CellScanner 
sourceCells,
+        CellScanner targetCells) throws IOException, InterruptedException {
+      Put put = null;
+      Delete delete = null;
+      long matchingCells = 0;
+      boolean matchingRow = true;
+      Cell sourceCell = sourceCells.nextCellInRow();
+      Cell targetCell = targetCells.nextCellInRow();
+      while (sourceCell != null || targetCell != null) {
+
+        int cellKeyComparison = compareCellKeysWithinRow(sourceCell, 
targetCell);
+        if (cellKeyComparison < 0) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Target missing cell: " + sourceCell);
+          }
+          context.getCounter(Counter.TARGETMISSINGCELLS).increment(1);
+          matchingRow = false;
+
+          if (!dryRun) {
+            if (put == null) {
+              put = new Put(rowKey);
+            }
+            put.add(sourceCell);
+          }
+
+          sourceCell = sourceCells.nextCellInRow();
+        } else if (cellKeyComparison > 0) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Source missing cell: " + targetCell);
+          }
+          context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1);
+          matchingRow = false;
+
+          if (!dryRun) {
+            if (delete == null) {
+              delete = new Delete(rowKey);
+            }
+            // add a tombstone to exactly match the target cell that is 
missing on the source
+            delete.addColumn(CellUtil.cloneFamily(targetCell),
+                CellUtil.cloneQualifier(targetCell), 
targetCell.getTimestamp());
+          }
+
+          targetCell = targetCells.nextCellInRow();
+        } else {
+          // the cell keys are equal, now check values
+          if (CellUtil.matchingValue(sourceCell, targetCell)) {
+            matchingCells++;
+          } else {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Different values: ");
+              LOG.debug("  source cell: " + sourceCell
+                  + " value: " + Bytes.toHex(sourceCell.getValueArray(),
+                      sourceCell.getValueOffset(), 
sourceCell.getValueLength()));
+              LOG.debug("  target cell: " + targetCell
+                  + " value: " + Bytes.toHex(targetCell.getValueArray(),
+                      targetCell.getValueOffset(), 
targetCell.getValueLength()));
+            }
+            context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1);
+            matchingRow = false;
+
+            if (!dryRun) {
+              // overwrite target cell
+              if (put == null) {
+                put = new Put(rowKey);
+              }
+              put.add(sourceCell);
+            }
+          }
+          sourceCell = sourceCells.nextCellInRow();
+          targetCell = targetCells.nextCellInRow();
+        }
+
+        if (!dryRun && sourceTableHash.scanBatch > 0) {
+          if (put != null && put.size() >= sourceTableHash.scanBatch) {
+            context.write(new ImmutableBytesWritable(rowKey), put);
+            put = null;
+          }
+          if (delete != null && delete.size() >= sourceTableHash.scanBatch) {
+            context.write(new ImmutableBytesWritable(rowKey), delete);
+            delete = null;
+          }
+        }
+      }
+
+      if (!dryRun) {
+        if (put != null) {
+          context.write(new ImmutableBytesWritable(rowKey), put);
+        }
+        if (delete != null) {
+          context.write(new ImmutableBytesWritable(rowKey), delete);
+        }
+      }
+
+      if (matchingCells > 0) {
+        context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells);
+      }
+      if (matchingRow) {
+        context.getCounter(Counter.MATCHINGROWS).increment(1);
+        return true;
+      } else {
+        context.getCounter(Counter.ROWSWITHDIFFS).increment(1);
+        return false;
+      }
+    }
+
+    /**
+     * Compare row keys of the given Result objects.
+     * Nulls are after non-nulls
+     */
+    private static int compareRowKeys(byte[] r1, byte[] r2) {
+      if (r1 == null) {
+        return 1;  // source missing row
+      } else if (r2 == null) {
+        return -1; // target missing row
+      } else {
+        // Sync on no META tables only. We can directly do what CellComparator 
is doing inside.
+        // Never the call going to MetaCellComparator.
+        return Bytes.compareTo(r1, 0, r1.length, r2, 0, r2.length);
+      }
+    }
+
+    /**
+     * Compare families, qualifiers, and timestamps of the given Cells.
+     * They are assumed to be of the same row.
+     * Nulls are after non-nulls.
+     */
+     private static int compareCellKeysWithinRow(Cell c1, Cell c2) {
+      if (c1 == null) {
+        return 1; // source missing cell
+      }
+      if (c2 == null) {
+        return -1; // target missing cell
+      }
+
+      int result = CellComparator.compareFamilies(c1, c2);
+      if (result != 0) {
+        return result;
+      }
+
+      result = CellComparator.compareQualifiers(c1, c2);
+      if (result != 0) {
+        return result;
+      }
+
+      // note timestamp comparison is inverted - more recent cells first
+      return CellComparator.compareTimestamps(c1, c2);
+    }
+
+    @Override
+    protected void cleanup(Context context)
+        throws IOException, InterruptedException {
+      if (mapperException == null) {
+        try {
+          finishRemainingHashRanges(context);
+        } catch (Throwable t) {
+          mapperException = t;
+        }
+      }
+
+      try {
+        sourceTable.close();
+        targetTable.close();
+        sourceConnection.close();
+        targetConnection.close();
+      } catch (Throwable t) {
+        if (mapperException == null) {
+          mapperException = t;
+        } else {
+          LOG.error("Suppressing exception from closing tables", t);
+        }
+      }
+
+      // propagate first exception
+      if (mapperException != null) {
+        Throwables.propagateIfInstanceOf(mapperException, IOException.class);
+        Throwables.propagateIfInstanceOf(mapperException, 
InterruptedException.class);
+        Throwables.propagate(mapperException);
+      }
+    }
+
+    private void finishRemainingHashRanges(Context context) throws IOException,
+        InterruptedException {
+      TableSplit split = (TableSplit) context.getInputSplit();
+      byte[] splitEndRow = split.getEndRow();
+      boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow);
+
+      // if there are more hash batches that begin before the end of this 
split move to them
+      while (nextSourceKey != null
+          && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)) {
+        moveToNextBatch(context);
+      }
+
+      if (targetHasher.isBatchStarted()) {
+        // need to complete the final open hash batch
+
+        if ((nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0)
+              || (nextSourceKey == null && !Bytes.equals(splitEndRow, 
sourceTableHash.stopRow))) {
+          // the open hash range continues past the end of this region
+          // add a scan to complete the current hash range
+          Scan scan = sourceTableHash.initScan();
+          scan.setStartRow(splitEndRow);
+          if (nextSourceKey == null) {
+            scan.setStopRow(sourceTableHash.stopRow);
+          } else {
+            scan.setStopRow(nextSourceKey.copyBytes());
+          }
+
+          ResultScanner targetScanner = null;
+          try {
+            targetScanner = targetTable.getScanner(scan);
+            for (Result row : targetScanner) {
+              targetHasher.hashResult(row);
+            }
+          } finally {
+            if (targetScanner != null) {
+              targetScanner.close();
+            }
+          }
+        } // else current batch ends exactly at split end row
+
+        finishBatchAndCompareHashes(context);
+      }
+    }
+  }
+
+  private static final int NUM_ARGS = 3;
+  private static void printUsage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+      System.err.println();
+    }
+    System.err.println("Usage: SyncTable [options] <sourcehashdir> 
<sourcetable> <targettable>");
+    System.err.println();
+    System.err.println("Options:");
+
+    System.err.println(" sourcezkcluster  ZK cluster key of the source table");
+    System.err.println("                  (defaults to cluster in classpath's 
config)");
+    System.err.println(" targetzkcluster  ZK cluster key of the target table");
+    System.err.println("                  (defaults to cluster in classpath's 
config)");
+    System.err.println(" dryrun           if true, output counters but no 
writes");
+    System.err.println("                  (defaults to false)");
+    System.err.println();
+    System.err.println("Args:");
+    System.err.println(" sourcehashdir    path to HashTable output dir for 
source table");
+    System.err.println("                  (see 
org.apache.hadoop.hbase.mapreduce.HashTable)");
+    System.err.println(" sourcetable      Name of the source table to sync 
from");
+    System.err.println(" targettable      Name of the target table to sync 
to");
+    System.err.println();
+    System.err.println("Examples:");
+    System.err.println(" For a dry run SyncTable of tableA from a remote 
source cluster");
+    System.err.println(" to a local target cluster:");
+    System.err.println(" $ hbase " +
+        "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true"
+        + " 
--sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase"
+        + " hdfs://nn:9000/hashes/tableA tableA tableA");
+  }
+
+  private boolean doCommandLine(final String[] args) {
+    if (args.length < NUM_ARGS) {
+      printUsage(null);
+      return false;
+    }
+    try {
+      sourceHashDir = new Path(args[args.length - 3]);
+      sourceTableName = args[args.length - 2];
+      targetTableName = args[args.length - 1];
+
+      for (int i = 0; i < args.length - NUM_ARGS; i++) {
+        String cmd = args[i];
+        if (cmd.equals("-h") || cmd.startsWith("--h")) {
+          printUsage(null);
+          return false;
+        }
+
+        final String sourceZkClusterKey = "--sourcezkcluster=";
+        if (cmd.startsWith(sourceZkClusterKey)) {
+          sourceZkCluster = cmd.substring(sourceZkClusterKey.length());
+          continue;
+        }
+
+        final String targetZkClusterKey = "--targetzkcluster=";
+        if (cmd.startsWith(targetZkClusterKey)) {
+          targetZkCluster = cmd.substring(targetZkClusterKey.length());
+          continue;
+        }
+
+        final String dryRunKey = "--dryrun=";
+        if (cmd.startsWith(dryRunKey)) {
+          dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length()));
+          continue;
+        }
+
+        printUsage("Invalid argument '" + cmd + "'");
+        return false;
+      }
+
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      printUsage("Can't start because " + e.getMessage());
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Main entry point.
+   */
+  public static void main(String[] args) throws Exception {
+    int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args);
+    System.exit(ret);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    String[] otherArgs = new GenericOptionsParser(getConf(), 
args).getRemainingArgs();
+    if (!doCommandLine(otherArgs)) {
+      return 1;
+    }
+
+    Job job = createSubmittableJob(otherArgs);
+    if (!job.waitForCompletion(true)) {
+      LOG.info("Map-reduce job failed!");
+      return 1;
+    }
+    counters = job.getCounters();
+    return 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
new file mode 100644
index 0000000..63868da
--- /dev/null
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
@@ -0,0 +1,294 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Convert HBase tabular data into a format that is consumable by Map/Reduce.
+ */
+@InterfaceAudience.Public
+public class TableInputFormat extends TableInputFormatBase
+implements Configurable {
+
+  @SuppressWarnings("hiding")
+  private static final Log LOG = LogFactory.getLog(TableInputFormat.class);
+
+  /** Job parameter that specifies the input table. */
+  public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
+  /**
+   * If specified, use start keys of this table to split.
+   * This is useful when you are preparing data for bulkload.
+   */
+  private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
+  /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is 
specified.
+   * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
+   */
+  public static final String SCAN = "hbase.mapreduce.scan";
+  /** Scan start row */
+  public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
+  /** Scan stop row */
+  public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
+  /** Column Family to Scan */
+  public static final String SCAN_COLUMN_FAMILY = 
"hbase.mapreduce.scan.column.family";
+  /** Space delimited list of columns and column families to scan. */
+  public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
+  /** The timestamp used to filter columns with a specific timestamp. */
+  public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
+  /** The starting timestamp used to filter columns with a specific range of 
versions. */
+  public static final String SCAN_TIMERANGE_START = 
"hbase.mapreduce.scan.timerange.start";
+  /** The ending timestamp used to filter columns with a specific range of 
versions. */
+  public static final String SCAN_TIMERANGE_END = 
"hbase.mapreduce.scan.timerange.end";
+  /** The maximum number of version to return. */
+  public static final String SCAN_MAXVERSIONS = 
"hbase.mapreduce.scan.maxversions";
+  /** Set to false to disable server-side caching of blocks for this scan. */
+  public static final String SCAN_CACHEBLOCKS = 
"hbase.mapreduce.scan.cacheblocks";
+  /** The number of rows for caching that will be passed to scanners. */
+  public static final String SCAN_CACHEDROWS = 
"hbase.mapreduce.scan.cachedrows";
+  /** Set the maximum number of values to return for each call to next(). */
+  public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
+  /** Specify if we have to shuffle the map tasks. */
+  public static final String SHUFFLE_MAPS = 
"hbase.mapreduce.inputtable.shufflemaps";
+
+  /** The configuration. */
+  private Configuration conf = null;
+
+  /**
+   * Returns the current configuration.
+   *
+   * @return The current configuration.
+   * @see org.apache.hadoop.conf.Configurable#getConf()
+   */
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Sets the configuration. This is used to set the details for the table to
+   * be scanned.
+   *
+   * @param configuration  The configuration to set.
+   * @see org.apache.hadoop.conf.Configurable#setConf(
+   *   org.apache.hadoop.conf.Configuration)
+   */
+  @Override
+  
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
+    justification="Intentional")
+  public void setConf(Configuration configuration) {
+    this.conf = configuration;
+
+    Scan scan = null;
+
+    if (conf.get(SCAN) != null) {
+      try {
+        scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
+      } catch (IOException e) {
+        LOG.error("An error occurred.", e);
+      }
+    } else {
+      try {
+        scan = createScanFromConfiguration(conf);
+      } catch (Exception e) {
+          LOG.error(StringUtils.stringifyException(e));
+      }
+    }
+
+    setScan(scan);
+  }
+
+  /**
+   * Sets up a {@link Scan} instance, applying settings from the configuration 
property
+   * constants defined in {@code TableInputFormat}.  This allows specifying 
things such as:
+   * <ul>
+   *   <li>start and stop rows</li>
+   *   <li>column qualifiers or families</li>
+   *   <li>timestamps or timerange</li>
+   *   <li>scanner caching and batch size</li>
+   * </ul>
+   */
+  public static Scan createScanFromConfiguration(Configuration conf) throws 
IOException {
+    Scan scan = new Scan();
+
+    if (conf.get(SCAN_ROW_START) != null) {
+      scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
+    }
+
+    if (conf.get(SCAN_ROW_STOP) != null) {
+      scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
+    }
+
+    if (conf.get(SCAN_COLUMNS) != null) {
+      addColumns(scan, conf.get(SCAN_COLUMNS));
+    }
+
+    for (String columnFamily : conf.getTrimmedStrings(SCAN_COLUMN_FAMILY)) {
+      scan.addFamily(Bytes.toBytes(columnFamily));
+    }
+
+    if (conf.get(SCAN_TIMESTAMP) != null) {
+      scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
+    }
+
+    if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) 
!= null) {
+      scan.setTimeRange(
+          Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
+          Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
+    }
+
+    if (conf.get(SCAN_MAXVERSIONS) != null) {
+      scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
+    }
+
+    if (conf.get(SCAN_CACHEDROWS) != null) {
+      scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
+    }
+
+    if (conf.get(SCAN_BATCHSIZE) != null) {
+      scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
+    }
+
+    // false by default, full table scans generate too much BC churn
+    scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
+
+    return scan;
+  }
+
+  @Override
+  protected void initialize(JobContext context) throws IOException {
+    // Do we have to worry about mis-matches between the Configuration from 
setConf and the one
+    // in this context?
+    TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE));
+    try {
+      initializeTable(ConnectionFactory.createConnection(new 
Configuration(conf)), tableName);
+    } catch (Exception e) {
+      LOG.error(StringUtils.stringifyException(e));
+    }
+  }
+
+  /**
+   * Parses a combined family and qualifier and adds either both or just the
+   * family in case there is no qualifier. This assumes the older colon
+   * divided notation, e.g. "family:qualifier".
+   *
+   * @param scan The Scan to update.
+   * @param familyAndQualifier family and qualifier
+   * @throws IllegalArgumentException When familyAndQualifier is invalid.
+   */
+  private static void addColumn(Scan scan, byte[] familyAndQualifier) {
+    byte [][] fq = KeyValue.parseColumn(familyAndQualifier);
+    if (fq.length == 1) {
+      scan.addFamily(fq[0]);
+    } else if (fq.length == 2) {
+      scan.addColumn(fq[0], fq[1]);
+    } else {
+      throw new IllegalArgumentException("Invalid familyAndQualifier 
provided.");
+    }
+  }
+
+  /**
+   * Adds an array of columns specified using old format, family:qualifier.
+   * <p>
+   * Overrides previous calls to {@link Scan#addColumn(byte[], byte[])}for any 
families in the
+   * input.
+   *
+   * @param scan The Scan to update.
+   * @param columns array of columns, formatted as 
<code>family:qualifier</code>
+   * @see Scan#addColumn(byte[], byte[])
+   */
+  public static void addColumns(Scan scan, byte [][] columns) {
+    for (byte[] column : columns) {
+      addColumn(scan, column);
+    }
+  }
+
+  /**
+   * Calculates the splits that will serve as input for the map tasks. The
+   * number of splits matches the number of regions in a table. Splits are 
shuffled if
+   * required.
+   * @param context  The current job context.
+   * @return The list of input splits.
+   * @throws IOException When creating the list of splits fails.
+   * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
+   *   org.apache.hadoop.mapreduce.JobContext)
+   */
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException {
+    List<InputSplit> splits = super.getSplits(context);
+    if ((conf.get(SHUFFLE_MAPS) != null) && 
"true".equals(conf.get(SHUFFLE_MAPS).toLowerCase(Locale.ROOT))) {
+      Collections.shuffle(splits);
+    }
+    return splits;
+  }
+
+  /**
+   * Convenience method to parse a string representation of an array of column 
specifiers.
+   *
+   * @param scan The Scan to update.
+   * @param columns  The columns to parse.
+   */
+  private static void addColumns(Scan scan, String columns) {
+    String[] cols = columns.split(" ");
+    for (String col : cols) {
+      addColumn(scan, Bytes.toBytes(col));
+    }
+  }
+
+  @Override
+  protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
+    if (conf.get(SPLIT_TABLE) != null) {
+      TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE));
+      try (Connection conn = ConnectionFactory.createConnection(getConf())) {
+        try (RegionLocator rl = conn.getRegionLocator(splitTableName)) {
+          return rl.getStartEndKeys();
+        }
+      }
+    }
+
+    return super.getStartEndKeys();
+  }
+
+  /**
+   * Sets split table in map-reduce job.
+   */
+  public static void configureSplitTable(Job job, TableName tableName) {
+    job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
new file mode 100644
index 0000000..fb38ebe
--- /dev/null
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
@@ -0,0 +1,652 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Addressing;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * A base for {@link TableInputFormat}s. Receives a {@link Connection}, a 
{@link TableName},
+ * an {@link Scan} instance that defines the input columns etc. Subclasses may 
use
+ * other TableRecordReader implementations.
+ *
+ * Subclasses MUST ensure initializeTable(Connection, TableName) is called for 
an instance to
+ * function properly. Each of the entry points to this class used by the 
MapReduce framework,
+ * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link 
#getSplits(JobContext)},
+ * will call {@link #initialize(JobContext)} as a convenient centralized 
location to handle
+ * retrieving the necessary configuration information. If your subclass 
overrides either of these
+ * methods, either call the parent version or call initialize yourself.
+ *
+ * <p>
+ * An example of a subclass:
+ * <pre>
+ *   class ExampleTIF extends TableInputFormatBase {
+ *
+ *     {@literal @}Override
+ *     protected void initialize(JobContext context) throws IOException {
+ *       // We are responsible for the lifecycle of this connection until we 
hand it over in
+ *       // initializeTable.
+ *       Connection connection = 
ConnectionFactory.createConnection(HBaseConfiguration.create(
+ *              job.getConfiguration()));
+ *       TableName tableName = TableName.valueOf("exampleTable");
+ *       // mandatory. once passed here, TableInputFormatBase will handle 
closing the connection.
+ *       initializeTable(connection, tableName);
+ *       byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+ *         Bytes.toBytes("columnB") };
+ *       // optional, by default we'll get everything for the table.
+ *       Scan scan = new Scan();
+ *       for (byte[] family : inputColumns) {
+ *         scan.addFamily(family);
+ *       }
+ *       Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new 
RegexStringComparator("aa.*"));
+ *       scan.setFilter(exampleFilter);
+ *       setScan(scan);
+ *     }
+ *   }
+ * </pre>
+ */
+@InterfaceAudience.Public
+public abstract class TableInputFormatBase
+extends InputFormat<ImmutableBytesWritable, Result> {
+
+  /** Specify if we enable auto-balance for input in M/R jobs.*/
+  public static final String MAPREDUCE_INPUT_AUTOBALANCE = 
"hbase.mapreduce.input.autobalance";
+  /** Specify if ratio for data skew in M/R jobs, it goes well with the 
enabling hbase.mapreduce
+   * .input.autobalance property.*/
+  public static final String INPUT_AUTOBALANCE_MAXSKEWRATIO = 
"hbase.mapreduce.input.autobalance" +
+          ".maxskewratio";
+  /** Specify if the row key in table is text (ASCII between 32~126),
+   * default is true. False means the table is using binary row key*/
+  public static final String TABLE_ROW_TEXTKEY = "hbase.table.row.textkey";
+
+  private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
+
+  private static final String NOT_INITIALIZED = "The input format instance has 
not been properly " +
+      "initialized. Ensure you call initializeTable either in your constructor 
or initialize " +
+      "method";
+  private static final String INITIALIZATION_ERROR = "Cannot create a record 
reader because of a" +
+            " previous error. Please look at the previous logs lines from" +
+            " the task's full log for more details.";
+
+  /** Holds the details for the internal scanner.
+   *
+   * @see Scan */
+  private Scan scan = null;
+  /** The {@link Admin}. */
+  private Admin admin;
+  /** The {@link Table} to scan. */
+  private Table table;
+  /** The {@link RegionLocator} of the table. */
+  private RegionLocator regionLocator;
+  /** The reader scanning the table, can be a custom one. */
+  private TableRecordReader tableRecordReader = null;
+  /** The underlying {@link Connection} of the table. */
+  private Connection connection;
+
+
+  /** The reverse DNS lookup cache mapping: IPAddress => HostName */
+  private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<>();
+
+  /**
+   * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was 
provided, uses
+   * the default.
+   *
+   * @param split  The split to work with.
+   * @param context  The current context.
+   * @return The newly created record reader.
+   * @throws IOException When creating the reader fails.
+   * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
+   *   org.apache.hadoop.mapreduce.InputSplit,
+   *   org.apache.hadoop.mapreduce.TaskAttemptContext)
+   */
+  @Override
+  public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
+      InputSplit split, TaskAttemptContext context)
+  throws IOException {
+    // Just in case a subclass is relying on JobConfigurable magic.
+    if (table == null) {
+      initialize(context);
+    }
+    // null check in case our child overrides getTable to not throw.
+    try {
+      if (getTable() == null) {
+        // initialize() must not have been implemented in the subclass.
+        throw new IOException(INITIALIZATION_ERROR);
+      }
+    } catch (IllegalStateException exception) {
+      throw new IOException(INITIALIZATION_ERROR, exception);
+    }
+    TableSplit tSplit = (TableSplit) split;
+    LOG.info("Input split length: " + 
StringUtils.humanReadableInt(tSplit.getLength()) + " bytes.");
+    final TableRecordReader trr =
+        this.tableRecordReader != null ? this.tableRecordReader : new 
TableRecordReader();
+    Scan sc = new Scan(this.scan);
+    sc.setStartRow(tSplit.getStartRow());
+    sc.setStopRow(tSplit.getEndRow());
+    trr.setScan(sc);
+    trr.setTable(getTable());
+    return new RecordReader<ImmutableBytesWritable, Result>() {
+
+      @Override
+      public void close() throws IOException {
+        trr.close();
+        closeTable();
+      }
+
+      @Override
+      public ImmutableBytesWritable getCurrentKey() throws IOException, 
InterruptedException {
+        return trr.getCurrentKey();
+      }
+
+      @Override
+      public Result getCurrentValue() throws IOException, InterruptedException 
{
+        return trr.getCurrentValue();
+      }
+
+      @Override
+      public float getProgress() throws IOException, InterruptedException {
+        return trr.getProgress();
+      }
+
+      @Override
+      public void initialize(InputSplit inputsplit, TaskAttemptContext 
context) throws IOException,
+          InterruptedException {
+        trr.initialize(inputsplit, context);
+      }
+
+      @Override
+      public boolean nextKeyValue() throws IOException, InterruptedException {
+        return trr.nextKeyValue();
+      }
+    };
+  }
+
+  protected Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
+    return getRegionLocator().getStartEndKeys();
+  }
+
+  /**
+   * Calculates the splits that will serve as input for the map tasks. The
+   * number of splits matches the number of regions in a table.
+   *
+   * @param context  The current job context.
+   * @return The list of input splits.
+   * @throws IOException When creating the list of splits fails.
+   * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
+   *   org.apache.hadoop.mapreduce.JobContext)
+   */
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException {
+    boolean closeOnFinish = false;
+
+    // Just in case a subclass is relying on JobConfigurable magic.
+    if (table == null) {
+      initialize(context);
+      closeOnFinish = true;
+    }
+
+    // null check in case our child overrides getTable to not throw.
+    try {
+      if (getTable() == null) {
+        // initialize() must not have been implemented in the subclass.
+        throw new IOException(INITIALIZATION_ERROR);
+      }
+    } catch (IllegalStateException exception) {
+      throw new IOException(INITIALIZATION_ERROR, exception);
+    }
+
+    try {
+      RegionSizeCalculator sizeCalculator =
+          new RegionSizeCalculator(getRegionLocator(), getAdmin());
+
+      TableName tableName = getTable().getName();
+
+      Pair<byte[][], byte[][]> keys = getStartEndKeys();
+      if (keys == null || keys.getFirst() == null ||
+          keys.getFirst().length == 0) {
+        HRegionLocation regLoc =
+            getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, 
false);
+        if (null == regLoc) {
+          throw new IOException("Expecting at least one region.");
+        }
+        List<InputSplit> splits = new ArrayList<>(1);
+        long regionSize = 
sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
+        TableSplit split = new TableSplit(tableName, scan,
+            HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
+                
.getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
+        splits.add(split);
+        return splits;
+      }
+      List<InputSplit> splits = new ArrayList<>(keys.getFirst().length);
+      for (int i = 0; i < keys.getFirst().length; i++) {
+        if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
+          continue;
+        }
+
+        byte[] startRow = scan.getStartRow();
+        byte[] stopRow = scan.getStopRow();
+        // determine if the given start an stop key fall into the region
+        if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
+            Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
+            (stopRow.length == 0 ||
+             Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
+          byte[] splitStart = startRow.length == 0 ||
+            Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
+              keys.getFirst()[i] : startRow;
+          byte[] splitStop = (stopRow.length == 0 ||
+            Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
+            keys.getSecond()[i].length > 0 ?
+              keys.getSecond()[i] : stopRow;
+
+          HRegionLocation location = 
getRegionLocator().getRegionLocation(keys.getFirst()[i], false);
+          // The below InetSocketAddress creation does a name resolution.
+          InetSocketAddress isa = new 
InetSocketAddress(location.getHostname(), location.getPort());
+          if (isa.isUnresolved()) {
+            LOG.warn("Failed resolve " + isa);
+          }
+          InetAddress regionAddress = isa.getAddress();
+          String regionLocation;
+          regionLocation = reverseDNS(regionAddress);
+
+          byte[] regionName = location.getRegionInfo().getRegionName();
+          String encodedRegionName = location.getRegionInfo().getEncodedName();
+          long regionSize = sizeCalculator.getRegionSize(regionName);
+          TableSplit split = new TableSplit(tableName, scan,
+            splitStart, splitStop, regionLocation, encodedRegionName, 
regionSize);
+          splits.add(split);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("getSplits: split -> " + i + " -> " + split);
+          }
+        }
+      }
+      //The default value of "hbase.mapreduce.input.autobalance" is false, 
which means not enabled.
+      boolean enableAutoBalance = context.getConfiguration()
+        .getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false);
+      if (enableAutoBalance) {
+        long totalRegionSize=0;
+        for (int i = 0; i < splits.size(); i++){
+          TableSplit ts = (TableSplit)splits.get(i);
+          totalRegionSize += ts.getLength();
+        }
+        long averageRegionSize = totalRegionSize / splits.size();
+        // the averageRegionSize must be positive.
+        if (averageRegionSize <= 0) {
+            LOG.warn("The averageRegionSize is not positive: "+ 
averageRegionSize + ", " +
+                    "set it to 1.");
+            averageRegionSize = 1;
+        }
+        return calculateRebalancedSplits(splits, context, averageRegionSize);
+      } else {
+        return splits;
+      }
+    } finally {
+      if (closeOnFinish) {
+        closeTable();
+      }
+    }
+  }
+
+  String reverseDNS(InetAddress ipAddress) throws UnknownHostException {
+    String hostName = this.reverseDNSCacheMap.get(ipAddress);
+    if (hostName == null) {
+      String ipAddressString = null;
+      try {
+        ipAddressString = DNS.reverseDns(ipAddress, null);
+      } catch (Exception e) {
+        // We can use InetAddress in case the jndi failed to pull up the 
reverse DNS entry from the
+        // name service. Also, in case of ipv6, we need to use the InetAddress 
since resolving
+        // reverse DNS using jndi doesn't work well with ipv6 addresses.
+        ipAddressString = 
InetAddress.getByName(ipAddress.getHostAddress()).getHostName();
+      }
+      if (ipAddressString == null) throw new UnknownHostException("No host 
found for " + ipAddress);
+      hostName = Strings.domainNamePointerToHostName(ipAddressString);
+      this.reverseDNSCacheMap.put(ipAddress, hostName);
+    }
+    return hostName;
+  }
+
+  /**
+   * Calculates the number of MapReduce input splits for the map tasks. The 
number of
+   * MapReduce input splits depends on the average region size and the "data 
skew ratio" user set in
+   * configuration.
+   *
+   * @param list  The list of input splits before balance.
+   * @param context  The current job context.
+   * @param average  The average size of all regions .
+   * @return The list of input splits.
+   * @throws IOException When creating the list of splits fails.
+   * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
+   *   org.apache.hadoop.mapreduce.JobContext)
+   */
+  private List<InputSplit> calculateRebalancedSplits(List<InputSplit> list, 
JobContext context,
+                                               long average) throws 
IOException {
+    List<InputSplit> resultList = new ArrayList<>();
+    Configuration conf = context.getConfiguration();
+    //The default data skew ratio is 3
+    long dataSkewRatio = conf.getLong(INPUT_AUTOBALANCE_MAXSKEWRATIO, 3);
+    //It determines which mode to use: text key mode or binary key mode. The 
default is text mode.
+    boolean isTextKey = 
context.getConfiguration().getBoolean(TABLE_ROW_TEXTKEY, true);
+    long dataSkewThreshold = dataSkewRatio * average;
+    int count = 0;
+    while (count < list.size()) {
+      TableSplit ts = (TableSplit)list.get(count);
+      TableName tableName = ts.getTable();
+      String regionLocation = ts.getRegionLocation();
+      String encodedRegionName = ts.getEncodedRegionName();
+      long regionSize = ts.getLength();
+      if (regionSize >= dataSkewThreshold) {
+        // if the current region size is large than the data skew threshold,
+        // split the region into two MapReduce input splits.
+        byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow(), 
isTextKey);
+        if (Arrays.equals(ts.getEndRow(), splitKey)) {
+          // Not splitting since the end key is the same as the split key
+          resultList.add(ts);
+        } else {
+          //Set the size of child TableSplit as 1/2 of the region size. The 
exact size of the
+          // MapReduce input splits is not far off.
+          TableSplit t1 = new TableSplit(tableName, scan, ts.getStartRow(), 
splitKey,
+              regionLocation, regionSize / 2);
+          TableSplit t2 = new TableSplit(tableName, scan, splitKey, 
ts.getEndRow(), regionLocation,
+              regionSize - regionSize / 2);
+          resultList.add(t1);
+          resultList.add(t2);
+        }
+        count++;
+      } else if (regionSize >= average) {
+        // if the region size between average size and data skew threshold 
size,
+        // make this region as one MapReduce input split.
+        resultList.add(ts);
+        count++;
+      } else {
+        // if the total size of several small continuous regions less than the 
average region size,
+        // combine them into one MapReduce input split.
+        long totalSize = regionSize;
+        byte[] splitStartKey = ts.getStartRow();
+        byte[] splitEndKey = ts.getEndRow();
+        count++;
+        for (; count < list.size(); count++) {
+          TableSplit nextRegion = (TableSplit)list.get(count);
+          long nextRegionSize = nextRegion.getLength();
+          if (totalSize + nextRegionSize <= dataSkewThreshold) {
+            totalSize = totalSize + nextRegionSize;
+            splitEndKey = nextRegion.getEndRow();
+          } else {
+            break;
+          }
+        }
+        TableSplit t = new TableSplit(tableName, scan, splitStartKey, 
splitEndKey,
+                regionLocation, encodedRegionName, totalSize);
+        resultList.add(t);
+      }
+    }
+    return resultList;
+  }
+
+  /**
+   * select a split point in the region. The selection of the split point is 
based on an uniform
+   * distribution assumption for the keys in a region.
+   * Here are some examples:
+   *
+   * <table>
+   *   <tr>
+   *     <th>start key</th>
+   *     <th>end key</th>
+   *     <th>is text</th>
+   *     <th>split point</th>
+   *   </tr>
+   *   <tr>
+   *     <td>'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f', 'g'</td>
+   *     <td>'a', 'a', 'a', 'f', 'f', 'f'</td>
+   *     <td>true</td>
+   *     <td>'a', 'a', 'a', 'd', 'd', -78, 50, -77, 51</td>
+   *   </tr>
+   *   <tr>
+   *     <td>'1', '1', '1', '0', '0', '0'</td>
+   *     <td>'1', '1', '2', '5', '7', '9', '0'</td>
+   *     <td>true</td>
+   *     <td>'1', '1', '1', -78, -77, -76, -104</td>
+   *   </tr>
+   *   <tr>
+   *     <td>'1', '1', '1', '0'</td>
+   *     <td>'1', '1', '2', '0'</td>
+   *     <td>true</td>
+   *     <td>'1', '1', '1', -80</td>
+   *   </tr>
+   *   <tr>
+   *     <td>13, -19, 126, 127</td>
+   *     <td>13, -19, 127, 0</td>
+   *     <td>false</td>
+   *     <td>13, -19, 126, -65</td>
+   *   </tr>
+   * </table>
+   *
+   * Set this function as "public static", make it easier for test.
+   *
+   * @param start Start key of the region
+   * @param end End key of the region
+   * @param isText It determines to use text key mode or binary key mode
+   * @return The split point in the region.
+   */
+  @InterfaceAudience.Private
+  public static byte[] getSplitKey(byte[] start, byte[] end, boolean isText) {
+    byte upperLimitByte;
+    byte lowerLimitByte;
+    //Use text mode or binary mode.
+    if (isText) {
+      //The range of text char set in ASCII is [32,126], the lower limit is 
space and the upper
+      // limit is '~'.
+      upperLimitByte = '~';
+      lowerLimitByte = ' ';
+    } else {
+      upperLimitByte = -1;
+      lowerLimitByte = 0;
+    }
+    // For special case
+    // Example 1 : startkey=null, endkey="hhhqqqwww", splitKey="h"
+    // Example 2 (text key mode): startKey="ffffaaa", endKey=null, 
splitkey="f~~~~~~"
+    if (start.length == 0 && end.length == 0){
+      return new byte[]{(byte) ((lowerLimitByte + upperLimitByte) / 2)};
+    }
+    if (start.length == 0 && end.length != 0){
+      return new byte[]{ end[0] };
+    }
+    if (start.length != 0 && end.length == 0){
+      byte[] result =new byte[start.length];
+      result[0]=start[0];
+      for (int k = 1; k < start.length; k++){
+          result[k] = upperLimitByte;
+      }
+      return result;
+    }
+    return Bytes.split(start, end, false, 1)[1];
+  }
+
+  /**
+   * Test if the given region is to be included in the InputSplit while 
splitting
+   * the regions of a table.
+   * <p>
+   * This optimization is effective when there is a specific reasoning to 
exclude an entire region from the M-R job,
+   * (and hence, not contributing to the InputSplit), given the start and end 
keys of the same. <br>
+   * Useful when we need to remember the last-processed top record and revisit 
the [last, current) interval for M-R processing,
+   * continuously. In addition to reducing InputSplits, reduces the load on 
the region server as well, due to the ordering of the keys.
+   * <br>
+   * <br>
+   * Note: It is possible that <code>endKey.length() == 0 </code> , for the 
last (recent) region.
+   * <br>
+   * Override this method, if you want to bulk exclude regions altogether from 
M-R. By default, no region is excluded( i.e. all regions are included).
+   *
+   *
+   * @param startKey Start key of the region
+   * @param endKey End key of the region
+   * @return true, if this region needs to be included as part of the input 
(default).
+   *
+   */
+  protected boolean includeRegionInSplit(final byte[] startKey, final byte [] 
endKey) {
+    return true;
+  }
+
+  /**
+   * Allows subclasses to get the {@link RegionLocator}.
+   */
+  protected RegionLocator getRegionLocator() {
+    if (regionLocator == null) {
+      throw new IllegalStateException(NOT_INITIALIZED);
+    }
+    return regionLocator;
+  }
+
+  /**
+   * Allows subclasses to get the {@link Table}.
+   */
+  protected Table getTable() {
+    if (table == null) {
+      throw new IllegalStateException(NOT_INITIALIZED);
+    }
+    return table;
+  }
+
+  /**
+   * Allows subclasses to get the {@link Admin}.
+   */
+  protected Admin getAdmin() {
+    if (admin == null) {
+      throw new IllegalStateException(NOT_INITIALIZED);
+    }
+    return admin;
+  }
+
+  /**
+   * Allows subclasses to initialize the table information.
+   *
+   * @param connection  The Connection to the HBase cluster. MUST be 
unmanaged. We will close.
+   * @param tableName  The {@link TableName} of the table to process.
+   * @throws IOException
+   */
+  protected void initializeTable(Connection connection, TableName tableName) 
throws IOException {
+    if (this.table != null || this.connection != null) {
+      LOG.warn("initializeTable called multiple times. Overwriting connection 
and table " +
+          "reference; TableInputFormatBase will not close these old references 
when done.");
+    }
+    this.table = connection.getTable(tableName);
+    this.regionLocator = connection.getRegionLocator(tableName);
+    this.admin = connection.getAdmin();
+    this.connection = connection;
+  }
+
+  /**
+   * Gets the scan defining the actual details like columns etc.
+   *
+   * @return The internal scan instance.
+   */
+  public Scan getScan() {
+    if (this.scan == null) this.scan = new Scan();
+    return scan;
+  }
+
+  /**
+   * Sets the scan defining the actual details like columns etc.
+   *
+   * @param scan  The scan to set.
+   */
+  public void setScan(Scan scan) {
+    this.scan = scan;
+  }
+
+  /**
+   * Allows subclasses to set the {@link TableRecordReader}.
+   *
+   * @param tableRecordReader A different {@link TableRecordReader}
+   *   implementation.
+   */
+  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+    this.tableRecordReader = tableRecordReader;
+  }
+
+  /**
+   * Handle subclass specific set up.
+   * Each of the entry points used by the MapReduce framework,
+   * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link 
#getSplits(JobContext)},
+   * will call {@link #initialize(JobContext)} as a convenient centralized 
location to handle
+   * retrieving the necessary configuration information and calling
+   * {@link #initializeTable(Connection, TableName)}.
+   *
+   * Subclasses should implement their initialize call such that it is safe to 
call multiple times.
+   * The current TableInputFormatBase implementation relies on a non-null 
table reference to decide
+   * if an initialize call is needed, but this behavior may change in the 
future. In particular,
+   * it is critical that initializeTable not be called multiple times since 
this will leak
+   * Connection instances.
+   *
+   */
+  protected void initialize(JobContext context) throws IOException {
+  }
+
+  /**
+   * Close the Table and related objects that were initialized via
+   * {@link #initializeTable(Connection, TableName)}.
+   *
+   * @throws IOException
+   */
+  protected void closeTable() throws IOException {
+    close(admin, table, regionLocator, connection);
+    admin = null;
+    table = null;
+    regionLocator = null;
+    connection = null;
+  }
+
+  private void close(Closeable... closables) throws IOException {
+    for (Closeable c : closables) {
+      if(c != null) { c.close(); }
+    }
+  }
+
+}

Reply via email to