http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java deleted file mode 100644 index c72a0c3..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java +++ /dev/null @@ -1,786 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.util.Iterator; -import java.util.Collections; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.Counters; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.apache.hadoop.util.GenericOptionsParser; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterators; - -public class SyncTable extends Configured implements Tool { - - private static final Log LOG = LogFactory.getLog(SyncTable.class); - - static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir"; - static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name"; - static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name"; - static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster"; - static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster"; - static final String DRY_RUN_CONF_KEY="sync.table.dry.run"; - - Path sourceHashDir; - String sourceTableName; - String targetTableName; - - String sourceZkCluster; - String targetZkCluster; - boolean dryRun; - - Counters counters; - - public SyncTable(Configuration conf) { - super(conf); - } - - public Job createSubmittableJob(String[] args) throws IOException { - FileSystem fs = sourceHashDir.getFileSystem(getConf()); - if (!fs.exists(sourceHashDir)) { - throw new IOException("Source hash dir not found: " + sourceHashDir); - } - - HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir); - LOG.info("Read source hash manifest: " + tableHash); - LOG.info("Read " + tableHash.partitions.size() + " partition keys"); - if (!tableHash.tableName.equals(sourceTableName)) { - LOG.warn("Table name mismatch - manifest indicates hash was taken from: " - + tableHash.tableName + " but job is reading from: " + sourceTableName); - } - if (tableHash.numHashFiles != tableHash.partitions.size() + 1) { - throw new RuntimeException("Hash data appears corrupt. The number of of hash files created" - + " should be 1 more than the number of partition keys. However, the manifest file " - + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys" - + " found in the partitions file is " + tableHash.partitions.size()); - } - - Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR); - int dataSubdirCount = 0; - for (FileStatus file : fs.listStatus(dataDir)) { - if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) { - dataSubdirCount++; - } - } - - if (dataSubdirCount != tableHash.numHashFiles) { - throw new RuntimeException("Hash data appears corrupt. The number of of hash files created" - + " should be 1 more than the number of partition keys. However, the number of data dirs" - + " found is " + dataSubdirCount + " but the number of partition keys" - + " found in the partitions file is " + tableHash.partitions.size()); - } - - Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name", - "syncTable_" + sourceTableName + "-" + targetTableName)); - Configuration jobConf = job.getConfiguration(); - job.setJarByClass(HashTable.class); - jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString()); - jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName); - jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName); - if (sourceZkCluster != null) { - jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster); - } - if (targetZkCluster != null) { - jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster); - } - jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun); - - TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(), - SyncMapper.class, null, null, job); - - job.setNumReduceTasks(0); - - if (dryRun) { - job.setOutputFormatClass(NullOutputFormat.class); - } else { - // No reducers. Just write straight to table. Call initTableReducerJob - // because it sets up the TableOutputFormat. - TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null, - targetZkCluster, null, null); - - // would be nice to add an option for bulk load instead - } - - // Obtain an authentication token, for the specified cluster, on behalf of the current user - if (sourceZkCluster != null) { - Configuration peerConf = - HBaseConfiguration.createClusterConf(job.getConfiguration(), sourceZkCluster); - TableMapReduceUtil.initCredentialsForCluster(job, peerConf); - } - return job; - } - - public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> { - Path sourceHashDir; - - Connection sourceConnection; - Connection targetConnection; - Table sourceTable; - Table targetTable; - boolean dryRun; - - HashTable.TableHash sourceTableHash; - HashTable.TableHash.Reader sourceHashReader; - ImmutableBytesWritable currentSourceHash; - ImmutableBytesWritable nextSourceKey; - HashTable.ResultHasher targetHasher; - - Throwable mapperException; - - public static enum Counter {BATCHES, HASHES_MATCHED, HASHES_NOT_MATCHED, SOURCEMISSINGROWS, - SOURCEMISSINGCELLS, TARGETMISSINGROWS, TARGETMISSINGCELLS, ROWSWITHDIFFS, DIFFERENTCELLVALUES, - MATCHINGROWS, MATCHINGCELLS, EMPTY_BATCHES, RANGESMATCHED, RANGESNOTMATCHED}; - - @Override - protected void setup(Context context) throws IOException { - - Configuration conf = context.getConfiguration(); - sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY)); - sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null); - targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY, - TableOutputFormat.OUTPUT_CONF_PREFIX); - sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY); - targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY); - dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false); - - sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir); - LOG.info("Read source hash manifest: " + sourceTableHash); - LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys"); - - TableSplit split = (TableSplit) context.getInputSplit(); - ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow()); - - sourceHashReader = sourceTableHash.newReader(conf, splitStartKey); - findNextKeyHashPair(); - - // create a hasher, but don't start it right away - // instead, find the first hash batch at or after the start row - // and skip any rows that come before. they will be caught by the previous task - targetHasher = new HashTable.ResultHasher(); - } - - private static Connection openConnection(Configuration conf, String zkClusterConfKey, - String configPrefix) - throws IOException { - String zkCluster = conf.get(zkClusterConfKey); - Configuration clusterConf = HBaseConfiguration.createClusterConf(conf, - zkCluster, configPrefix); - return ConnectionFactory.createConnection(clusterConf); - } - - private static Table openTable(Connection connection, Configuration conf, - String tableNameConfKey) throws IOException { - return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey))); - } - - /** - * Attempt to read the next source key/hash pair. - * If there are no more, set nextSourceKey to null - */ - private void findNextKeyHashPair() throws IOException { - boolean hasNext = sourceHashReader.next(); - if (hasNext) { - nextSourceKey = sourceHashReader.getCurrentKey(); - } else { - // no more keys - last hash goes to the end - nextSourceKey = null; - } - } - - @Override - protected void map(ImmutableBytesWritable key, Result value, Context context) - throws IOException, InterruptedException { - try { - // first, finish any hash batches that end before the scanned row - while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) { - moveToNextBatch(context); - } - - // next, add the scanned row (as long as we've reached the first batch) - if (targetHasher.isBatchStarted()) { - targetHasher.hashResult(value); - } - } catch (Throwable t) { - mapperException = t; - Throwables.propagateIfInstanceOf(t, IOException.class); - Throwables.propagateIfInstanceOf(t, InterruptedException.class); - Throwables.propagate(t); - } - } - - /** - * If there is an open hash batch, complete it and sync if there are diffs. - * Start a new batch, and seek to read the - */ - private void moveToNextBatch(Context context) throws IOException, InterruptedException { - if (targetHasher.isBatchStarted()) { - finishBatchAndCompareHashes(context); - } - targetHasher.startBatch(nextSourceKey); - currentSourceHash = sourceHashReader.getCurrentHash(); - - findNextKeyHashPair(); - } - - /** - * Finish the currently open hash batch. - * Compare the target hash to the given source hash. - * If they do not match, then sync the covered key range. - */ - private void finishBatchAndCompareHashes(Context context) - throws IOException, InterruptedException { - targetHasher.finishBatch(); - context.getCounter(Counter.BATCHES).increment(1); - if (targetHasher.getBatchSize() == 0) { - context.getCounter(Counter.EMPTY_BATCHES).increment(1); - } - ImmutableBytesWritable targetHash = targetHasher.getBatchHash(); - if (targetHash.equals(currentSourceHash)) { - context.getCounter(Counter.HASHES_MATCHED).increment(1); - } else { - context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1); - - ImmutableBytesWritable stopRow = nextSourceKey == null - ? new ImmutableBytesWritable(sourceTableHash.stopRow) - : nextSourceKey; - - if (LOG.isDebugEnabled()) { - LOG.debug("Hash mismatch. Key range: " + toHex(targetHasher.getBatchStartKey()) - + " to " + toHex(stopRow) - + " sourceHash: " + toHex(currentSourceHash) - + " targetHash: " + toHex(targetHash)); - } - - syncRange(context, targetHasher.getBatchStartKey(), stopRow); - } - } - private static String toHex(ImmutableBytesWritable bytes) { - return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength()); - } - - private static final CellScanner EMPTY_CELL_SCANNER - = new CellScanner(Collections.<Result>emptyIterator()); - - /** - * Rescan the given range directly from the source and target tables. - * Count and log differences, and if this is not a dry run, output Puts and Deletes - * to make the target table match the source table for this range - */ - private void syncRange(Context context, ImmutableBytesWritable startRow, - ImmutableBytesWritable stopRow) throws IOException, InterruptedException { - Scan scan = sourceTableHash.initScan(); - scan.setStartRow(startRow.copyBytes()); - scan.setStopRow(stopRow.copyBytes()); - - ResultScanner sourceScanner = sourceTable.getScanner(scan); - CellScanner sourceCells = new CellScanner(sourceScanner.iterator()); - - ResultScanner targetScanner = targetTable.getScanner(new Scan(scan)); - CellScanner targetCells = new CellScanner(targetScanner.iterator()); - - boolean rangeMatched = true; - byte[] nextSourceRow = sourceCells.nextRow(); - byte[] nextTargetRow = targetCells.nextRow(); - while(nextSourceRow != null || nextTargetRow != null) { - boolean rowMatched; - int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow); - if (rowComparison < 0) { - if (LOG.isInfoEnabled()) { - LOG.info("Target missing row: " + Bytes.toHex(nextSourceRow)); - } - context.getCounter(Counter.TARGETMISSINGROWS).increment(1); - - rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER); - nextSourceRow = sourceCells.nextRow(); // advance only source to next row - } else if (rowComparison > 0) { - if (LOG.isInfoEnabled()) { - LOG.info("Source missing row: " + Bytes.toHex(nextTargetRow)); - } - context.getCounter(Counter.SOURCEMISSINGROWS).increment(1); - - rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells); - nextTargetRow = targetCells.nextRow(); // advance only target to next row - } else { - // current row is the same on both sides, compare cell by cell - rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells); - nextSourceRow = sourceCells.nextRow(); - nextTargetRow = targetCells.nextRow(); - } - - if (!rowMatched) { - rangeMatched = false; - } - } - - sourceScanner.close(); - targetScanner.close(); - - context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED) - .increment(1); - } - - private static class CellScanner { - private final Iterator<Result> results; - - private byte[] currentRow; - private Result currentRowResult; - private int nextCellInRow; - - private Result nextRowResult; - - public CellScanner(Iterator<Result> results) { - this.results = results; - } - - /** - * Advance to the next row and return its row key. - * Returns null iff there are no more rows. - */ - public byte[] nextRow() { - if (nextRowResult == null) { - // no cached row - check scanner for more - while (results.hasNext()) { - nextRowResult = results.next(); - Cell nextCell = nextRowResult.rawCells()[0]; - if (currentRow == null - || !Bytes.equals(currentRow, 0, currentRow.length, nextCell.getRowArray(), - nextCell.getRowOffset(), nextCell.getRowLength())) { - // found next row - break; - } else { - // found another result from current row, keep scanning - nextRowResult = null; - } - } - - if (nextRowResult == null) { - // end of data, no more rows - currentRowResult = null; - currentRow = null; - return null; - } - } - - // advance to cached result for next row - currentRowResult = nextRowResult; - nextCellInRow = 0; - currentRow = currentRowResult.getRow(); - nextRowResult = null; - return currentRow; - } - - /** - * Returns the next Cell in the current row or null iff none remain. - */ - public Cell nextCellInRow() { - if (currentRowResult == null) { - // nothing left in current row - return null; - } - - Cell nextCell = currentRowResult.rawCells()[nextCellInRow]; - nextCellInRow++; - if (nextCellInRow == currentRowResult.size()) { - if (results.hasNext()) { - Result result = results.next(); - Cell cell = result.rawCells()[0]; - if (Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(), - cell.getRowOffset(), cell.getRowLength())) { - // result is part of current row - currentRowResult = result; - nextCellInRow = 0; - } else { - // result is part of next row, cache it - nextRowResult = result; - // current row is complete - currentRowResult = null; - } - } else { - // end of data - currentRowResult = null; - } - } - return nextCell; - } - } - - /** - * Compare the cells for the given row from the source and target tables. - * Count and log any differences. - * If not a dry run, output a Put and/or Delete needed to sync the target table - * to match the source table. - */ - private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells, - CellScanner targetCells) throws IOException, InterruptedException { - Put put = null; - Delete delete = null; - long matchingCells = 0; - boolean matchingRow = true; - Cell sourceCell = sourceCells.nextCellInRow(); - Cell targetCell = targetCells.nextCellInRow(); - while (sourceCell != null || targetCell != null) { - - int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell); - if (cellKeyComparison < 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Target missing cell: " + sourceCell); - } - context.getCounter(Counter.TARGETMISSINGCELLS).increment(1); - matchingRow = false; - - if (!dryRun) { - if (put == null) { - put = new Put(rowKey); - } - put.add(sourceCell); - } - - sourceCell = sourceCells.nextCellInRow(); - } else if (cellKeyComparison > 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Source missing cell: " + targetCell); - } - context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1); - matchingRow = false; - - if (!dryRun) { - if (delete == null) { - delete = new Delete(rowKey); - } - // add a tombstone to exactly match the target cell that is missing on the source - delete.addColumn(CellUtil.cloneFamily(targetCell), - CellUtil.cloneQualifier(targetCell), targetCell.getTimestamp()); - } - - targetCell = targetCells.nextCellInRow(); - } else { - // the cell keys are equal, now check values - if (CellUtil.matchingValue(sourceCell, targetCell)) { - matchingCells++; - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Different values: "); - LOG.debug(" source cell: " + sourceCell - + " value: " + Bytes.toHex(sourceCell.getValueArray(), - sourceCell.getValueOffset(), sourceCell.getValueLength())); - LOG.debug(" target cell: " + targetCell - + " value: " + Bytes.toHex(targetCell.getValueArray(), - targetCell.getValueOffset(), targetCell.getValueLength())); - } - context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1); - matchingRow = false; - - if (!dryRun) { - // overwrite target cell - if (put == null) { - put = new Put(rowKey); - } - put.add(sourceCell); - } - } - sourceCell = sourceCells.nextCellInRow(); - targetCell = targetCells.nextCellInRow(); - } - - if (!dryRun && sourceTableHash.scanBatch > 0) { - if (put != null && put.size() >= sourceTableHash.scanBatch) { - context.write(new ImmutableBytesWritable(rowKey), put); - put = null; - } - if (delete != null && delete.size() >= sourceTableHash.scanBatch) { - context.write(new ImmutableBytesWritable(rowKey), delete); - delete = null; - } - } - } - - if (!dryRun) { - if (put != null) { - context.write(new ImmutableBytesWritable(rowKey), put); - } - if (delete != null) { - context.write(new ImmutableBytesWritable(rowKey), delete); - } - } - - if (matchingCells > 0) { - context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells); - } - if (matchingRow) { - context.getCounter(Counter.MATCHINGROWS).increment(1); - return true; - } else { - context.getCounter(Counter.ROWSWITHDIFFS).increment(1); - return false; - } - } - - /** - * Compare row keys of the given Result objects. - * Nulls are after non-nulls - */ - private static int compareRowKeys(byte[] r1, byte[] r2) { - if (r1 == null) { - return 1; // source missing row - } else if (r2 == null) { - return -1; // target missing row - } else { - // Sync on no META tables only. We can directly do what CellComparator is doing inside. - // Never the call going to MetaCellComparator. - return Bytes.compareTo(r1, 0, r1.length, r2, 0, r2.length); - } - } - - /** - * Compare families, qualifiers, and timestamps of the given Cells. - * They are assumed to be of the same row. - * Nulls are after non-nulls. - */ - private static int compareCellKeysWithinRow(Cell c1, Cell c2) { - if (c1 == null) { - return 1; // source missing cell - } - if (c2 == null) { - return -1; // target missing cell - } - - int result = CellComparator.compareFamilies(c1, c2); - if (result != 0) { - return result; - } - - result = CellComparator.compareQualifiers(c1, c2); - if (result != 0) { - return result; - } - - // note timestamp comparison is inverted - more recent cells first - return CellComparator.compareTimestamps(c1, c2); - } - - @Override - protected void cleanup(Context context) - throws IOException, InterruptedException { - if (mapperException == null) { - try { - finishRemainingHashRanges(context); - } catch (Throwable t) { - mapperException = t; - } - } - - try { - sourceTable.close(); - targetTable.close(); - sourceConnection.close(); - targetConnection.close(); - } catch (Throwable t) { - if (mapperException == null) { - mapperException = t; - } else { - LOG.error("Suppressing exception from closing tables", t); - } - } - - // propagate first exception - if (mapperException != null) { - Throwables.propagateIfInstanceOf(mapperException, IOException.class); - Throwables.propagateIfInstanceOf(mapperException, InterruptedException.class); - Throwables.propagate(mapperException); - } - } - - private void finishRemainingHashRanges(Context context) throws IOException, - InterruptedException { - TableSplit split = (TableSplit) context.getInputSplit(); - byte[] splitEndRow = split.getEndRow(); - boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow); - - // if there are more hash batches that begin before the end of this split move to them - while (nextSourceKey != null - && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)) { - moveToNextBatch(context); - } - - if (targetHasher.isBatchStarted()) { - // need to complete the final open hash batch - - if ((nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0) - || (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow))) { - // the open hash range continues past the end of this region - // add a scan to complete the current hash range - Scan scan = sourceTableHash.initScan(); - scan.setStartRow(splitEndRow); - if (nextSourceKey == null) { - scan.setStopRow(sourceTableHash.stopRow); - } else { - scan.setStopRow(nextSourceKey.copyBytes()); - } - - ResultScanner targetScanner = null; - try { - targetScanner = targetTable.getScanner(scan); - for (Result row : targetScanner) { - targetHasher.hashResult(row); - } - } finally { - if (targetScanner != null) { - targetScanner.close(); - } - } - } // else current batch ends exactly at split end row - - finishBatchAndCompareHashes(context); - } - } - } - - private static final int NUM_ARGS = 3; - private static void printUsage(final String errorMsg) { - if (errorMsg != null && errorMsg.length() > 0) { - System.err.println("ERROR: " + errorMsg); - System.err.println(); - } - System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>"); - System.err.println(); - System.err.println("Options:"); - - System.err.println(" sourcezkcluster ZK cluster key of the source table"); - System.err.println(" (defaults to cluster in classpath's config)"); - System.err.println(" targetzkcluster ZK cluster key of the target table"); - System.err.println(" (defaults to cluster in classpath's config)"); - System.err.println(" dryrun if true, output counters but no writes"); - System.err.println(" (defaults to false)"); - System.err.println(); - System.err.println("Args:"); - System.err.println(" sourcehashdir path to HashTable output dir for source table"); - System.err.println(" (see org.apache.hadoop.hbase.mapreduce.HashTable)"); - System.err.println(" sourcetable Name of the source table to sync from"); - System.err.println(" targettable Name of the target table to sync to"); - System.err.println(); - System.err.println("Examples:"); - System.err.println(" For a dry run SyncTable of tableA from a remote source cluster"); - System.err.println(" to a local target cluster:"); - System.err.println(" $ hbase " + - "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true" - + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase" - + " hdfs://nn:9000/hashes/tableA tableA tableA"); - } - - private boolean doCommandLine(final String[] args) { - if (args.length < NUM_ARGS) { - printUsage(null); - return false; - } - try { - sourceHashDir = new Path(args[args.length - 3]); - sourceTableName = args[args.length - 2]; - targetTableName = args[args.length - 1]; - - for (int i = 0; i < args.length - NUM_ARGS; i++) { - String cmd = args[i]; - if (cmd.equals("-h") || cmd.startsWith("--h")) { - printUsage(null); - return false; - } - - final String sourceZkClusterKey = "--sourcezkcluster="; - if (cmd.startsWith(sourceZkClusterKey)) { - sourceZkCluster = cmd.substring(sourceZkClusterKey.length()); - continue; - } - - final String targetZkClusterKey = "--targetzkcluster="; - if (cmd.startsWith(targetZkClusterKey)) { - targetZkCluster = cmd.substring(targetZkClusterKey.length()); - continue; - } - - final String dryRunKey = "--dryrun="; - if (cmd.startsWith(dryRunKey)) { - dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length())); - continue; - } - - printUsage("Invalid argument '" + cmd + "'"); - return false; - } - - - } catch (Exception e) { - e.printStackTrace(); - printUsage("Can't start because " + e.getMessage()); - return false; - } - return true; - } - - /** - * Main entry point. - */ - public static void main(String[] args) throws Exception { - int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args); - System.exit(ret); - } - - @Override - public int run(String[] args) throws Exception { - String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); - if (!doCommandLine(otherArgs)) { - return 1; - } - - Job job = createSubmittableJob(otherArgs); - if (!job.waitForCompletion(true)) { - LOG.info("Map-reduce job failed!"); - return 1; - } - counters = job.getCounters(); - return 0; - } - -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java deleted file mode 100644 index 63868da..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java +++ /dev/null @@ -1,294 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Locale; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.util.StringUtils; - -/** - * Convert HBase tabular data into a format that is consumable by Map/Reduce. - */ [email protected] -public class TableInputFormat extends TableInputFormatBase -implements Configurable { - - @SuppressWarnings("hiding") - private static final Log LOG = LogFactory.getLog(TableInputFormat.class); - - /** Job parameter that specifies the input table. */ - public static final String INPUT_TABLE = "hbase.mapreduce.inputtable"; - /** - * If specified, use start keys of this table to split. - * This is useful when you are preparing data for bulkload. - */ - private static final String SPLIT_TABLE = "hbase.mapreduce.splittable"; - /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified. - * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details. - */ - public static final String SCAN = "hbase.mapreduce.scan"; - /** Scan start row */ - public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start"; - /** Scan stop row */ - public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop"; - /** Column Family to Scan */ - public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family"; - /** Space delimited list of columns and column families to scan. */ - public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns"; - /** The timestamp used to filter columns with a specific timestamp. */ - public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp"; - /** The starting timestamp used to filter columns with a specific range of versions. */ - public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start"; - /** The ending timestamp used to filter columns with a specific range of versions. */ - public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end"; - /** The maximum number of version to return. */ - public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions"; - /** Set to false to disable server-side caching of blocks for this scan. */ - public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks"; - /** The number of rows for caching that will be passed to scanners. */ - public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows"; - /** Set the maximum number of values to return for each call to next(). */ - public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize"; - /** Specify if we have to shuffle the map tasks. */ - public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps"; - - /** The configuration. */ - private Configuration conf = null; - - /** - * Returns the current configuration. - * - * @return The current configuration. - * @see org.apache.hadoop.conf.Configurable#getConf() - */ - @Override - public Configuration getConf() { - return conf; - } - - /** - * Sets the configuration. This is used to set the details for the table to - * be scanned. - * - * @param configuration The configuration to set. - * @see org.apache.hadoop.conf.Configurable#setConf( - * org.apache.hadoop.conf.Configuration) - */ - @Override - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION", - justification="Intentional") - public void setConf(Configuration configuration) { - this.conf = configuration; - - Scan scan = null; - - if (conf.get(SCAN) != null) { - try { - scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN)); - } catch (IOException e) { - LOG.error("An error occurred.", e); - } - } else { - try { - scan = createScanFromConfiguration(conf); - } catch (Exception e) { - LOG.error(StringUtils.stringifyException(e)); - } - } - - setScan(scan); - } - - /** - * Sets up a {@link Scan} instance, applying settings from the configuration property - * constants defined in {@code TableInputFormat}. This allows specifying things such as: - * <ul> - * <li>start and stop rows</li> - * <li>column qualifiers or families</li> - * <li>timestamps or timerange</li> - * <li>scanner caching and batch size</li> - * </ul> - */ - public static Scan createScanFromConfiguration(Configuration conf) throws IOException { - Scan scan = new Scan(); - - if (conf.get(SCAN_ROW_START) != null) { - scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START))); - } - - if (conf.get(SCAN_ROW_STOP) != null) { - scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP))); - } - - if (conf.get(SCAN_COLUMNS) != null) { - addColumns(scan, conf.get(SCAN_COLUMNS)); - } - - for (String columnFamily : conf.getTrimmedStrings(SCAN_COLUMN_FAMILY)) { - scan.addFamily(Bytes.toBytes(columnFamily)); - } - - if (conf.get(SCAN_TIMESTAMP) != null) { - scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP))); - } - - if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) { - scan.setTimeRange( - Long.parseLong(conf.get(SCAN_TIMERANGE_START)), - Long.parseLong(conf.get(SCAN_TIMERANGE_END))); - } - - if (conf.get(SCAN_MAXVERSIONS) != null) { - scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS))); - } - - if (conf.get(SCAN_CACHEDROWS) != null) { - scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS))); - } - - if (conf.get(SCAN_BATCHSIZE) != null) { - scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE))); - } - - // false by default, full table scans generate too much BC churn - scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false))); - - return scan; - } - - @Override - protected void initialize(JobContext context) throws IOException { - // Do we have to worry about mis-matches between the Configuration from setConf and the one - // in this context? - TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE)); - try { - initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName); - } catch (Exception e) { - LOG.error(StringUtils.stringifyException(e)); - } - } - - /** - * Parses a combined family and qualifier and adds either both or just the - * family in case there is no qualifier. This assumes the older colon - * divided notation, e.g. "family:qualifier". - * - * @param scan The Scan to update. - * @param familyAndQualifier family and qualifier - * @throws IllegalArgumentException When familyAndQualifier is invalid. - */ - private static void addColumn(Scan scan, byte[] familyAndQualifier) { - byte [][] fq = KeyValue.parseColumn(familyAndQualifier); - if (fq.length == 1) { - scan.addFamily(fq[0]); - } else if (fq.length == 2) { - scan.addColumn(fq[0], fq[1]); - } else { - throw new IllegalArgumentException("Invalid familyAndQualifier provided."); - } - } - - /** - * Adds an array of columns specified using old format, family:qualifier. - * <p> - * Overrides previous calls to {@link Scan#addColumn(byte[], byte[])}for any families in the - * input. - * - * @param scan The Scan to update. - * @param columns array of columns, formatted as <code>family:qualifier</code> - * @see Scan#addColumn(byte[], byte[]) - */ - public static void addColumns(Scan scan, byte [][] columns) { - for (byte[] column : columns) { - addColumn(scan, column); - } - } - - /** - * Calculates the splits that will serve as input for the map tasks. The - * number of splits matches the number of regions in a table. Splits are shuffled if - * required. - * @param context The current job context. - * @return The list of input splits. - * @throws IOException When creating the list of splits fails. - * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( - * org.apache.hadoop.mapreduce.JobContext) - */ - @Override - public List<InputSplit> getSplits(JobContext context) throws IOException { - List<InputSplit> splits = super.getSplits(context); - if ((conf.get(SHUFFLE_MAPS) != null) && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase(Locale.ROOT))) { - Collections.shuffle(splits); - } - return splits; - } - - /** - * Convenience method to parse a string representation of an array of column specifiers. - * - * @param scan The Scan to update. - * @param columns The columns to parse. - */ - private static void addColumns(Scan scan, String columns) { - String[] cols = columns.split(" "); - for (String col : cols) { - addColumn(scan, Bytes.toBytes(col)); - } - } - - @Override - protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException { - if (conf.get(SPLIT_TABLE) != null) { - TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE)); - try (Connection conn = ConnectionFactory.createConnection(getConf())) { - try (RegionLocator rl = conn.getRegionLocator(splitTableName)) { - return rl.getStartEndKeys(); - } - } - } - - return super.getStartEndKeys(); - } - - /** - * Sets split table in map-reduce job. - */ - public static void configureSplitTable(Job job, TableName tableName) { - job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString()); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java deleted file mode 100644 index ce1928e6..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ /dev/null @@ -1,653 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.Closeable; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Addressing; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.RegionSizeCalculator; -import org.apache.hadoop.hbase.util.Strings; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.net.DNS; -import org.apache.hadoop.util.StringUtils; - -/** - * A base for {@link TableInputFormat}s. Receives a {@link Connection}, a {@link TableName}, - * an {@link Scan} instance that defines the input columns etc. Subclasses may use - * other TableRecordReader implementations. - * - * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to - * function properly. Each of the entry points to this class used by the MapReduce framework, - * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)}, - * will call {@link #initialize(JobContext)} as a convenient centralized location to handle - * retrieving the necessary configuration information. If your subclass overrides either of these - * methods, either call the parent version or call initialize yourself. - * - * <p> - * An example of a subclass: - * <pre> - * class ExampleTIF extends TableInputFormatBase { - * - * {@literal @}Override - * protected void initialize(JobContext context) throws IOException { - * // We are responsible for the lifecycle of this connection until we hand it over in - * // initializeTable. - * Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create( - * job.getConfiguration())); - * TableName tableName = TableName.valueOf("exampleTable"); - * // mandatory. once passed here, TableInputFormatBase will handle closing the connection. - * initializeTable(connection, tableName); - * byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), - * Bytes.toBytes("columnB") }; - * // optional, by default we'll get everything for the table. - * Scan scan = new Scan(); - * for (byte[] family : inputColumns) { - * scan.addFamily(family); - * } - * Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); - * scan.setFilter(exampleFilter); - * setScan(scan); - * } - * } - * </pre> - */ [email protected] -public abstract class TableInputFormatBase -extends InputFormat<ImmutableBytesWritable, Result> { - - /** Specify if we enable auto-balance for input in M/R jobs.*/ - public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.input.autobalance"; - /** Specify if ratio for data skew in M/R jobs, it goes well with the enabling hbase.mapreduce - * .input.autobalance property.*/ - public static final String INPUT_AUTOBALANCE_MAXSKEWRATIO = "hbase.mapreduce.input.autobalance" + - ".maxskewratio"; - /** Specify if the row key in table is text (ASCII between 32~126), - * default is true. False means the table is using binary row key*/ - public static final String TABLE_ROW_TEXTKEY = "hbase.table.row.textkey"; - - private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class); - - private static final String NOT_INITIALIZED = "The input format instance has not been properly " + - "initialized. Ensure you call initializeTable either in your constructor or initialize " + - "method"; - private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" + - " previous error. Please look at the previous logs lines from" + - " the task's full log for more details."; - - /** Holds the details for the internal scanner. - * - * @see Scan */ - private Scan scan = null; - /** The {@link Admin}. */ - private Admin admin; - /** The {@link Table} to scan. */ - private Table table; - /** The {@link RegionLocator} of the table. */ - private RegionLocator regionLocator; - /** The reader scanning the table, can be a custom one. */ - private TableRecordReader tableRecordReader = null; - /** The underlying {@link Connection} of the table. */ - private Connection connection; - - - /** The reverse DNS lookup cache mapping: IPAddress => HostName */ - private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<>(); - - /** - * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses - * the default. - * - * @param split The split to work with. - * @param context The current context. - * @return The newly created record reader. - * @throws IOException When creating the reader fails. - * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader( - * org.apache.hadoop.mapreduce.InputSplit, - * org.apache.hadoop.mapreduce.TaskAttemptContext) - */ - @Override - public RecordReader<ImmutableBytesWritable, Result> createRecordReader( - InputSplit split, TaskAttemptContext context) - throws IOException { - // Just in case a subclass is relying on JobConfigurable magic. - if (table == null) { - initialize(context); - } - // null check in case our child overrides getTable to not throw. - try { - if (getTable() == null) { - // initialize() must not have been implemented in the subclass. - throw new IOException(INITIALIZATION_ERROR); - } - } catch (IllegalStateException exception) { - throw new IOException(INITIALIZATION_ERROR, exception); - } - TableSplit tSplit = (TableSplit) split; - LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes."); - final TableRecordReader trr = - this.tableRecordReader != null ? this.tableRecordReader : new TableRecordReader(); - Scan sc = new Scan(this.scan); - sc.setStartRow(tSplit.getStartRow()); - sc.setStopRow(tSplit.getEndRow()); - trr.setScan(sc); - trr.setTable(getTable()); - return new RecordReader<ImmutableBytesWritable, Result>() { - - @Override - public void close() throws IOException { - trr.close(); - closeTable(); - } - - @Override - public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { - return trr.getCurrentKey(); - } - - @Override - public Result getCurrentValue() throws IOException, InterruptedException { - return trr.getCurrentValue(); - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return trr.getProgress(); - } - - @Override - public void initialize(InputSplit inputsplit, TaskAttemptContext context) throws IOException, - InterruptedException { - trr.initialize(inputsplit, context); - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - return trr.nextKeyValue(); - } - }; - } - - protected Pair<byte[][],byte[][]> getStartEndKeys() throws IOException { - return getRegionLocator().getStartEndKeys(); - } - - /** - * Calculates the splits that will serve as input for the map tasks. The - * number of splits matches the number of regions in a table. - * - * @param context The current job context. - * @return The list of input splits. - * @throws IOException When creating the list of splits fails. - * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( - * org.apache.hadoop.mapreduce.JobContext) - */ - @Override - public List<InputSplit> getSplits(JobContext context) throws IOException { - boolean closeOnFinish = false; - - // Just in case a subclass is relying on JobConfigurable magic. - if (table == null) { - initialize(context); - closeOnFinish = true; - } - - // null check in case our child overrides getTable to not throw. - try { - if (getTable() == null) { - // initialize() must not have been implemented in the subclass. - throw new IOException(INITIALIZATION_ERROR); - } - } catch (IllegalStateException exception) { - throw new IOException(INITIALIZATION_ERROR, exception); - } - - try { - RegionSizeCalculator sizeCalculator = - new RegionSizeCalculator(getRegionLocator(), getAdmin()); - - TableName tableName = getTable().getName(); - - Pair<byte[][], byte[][]> keys = getStartEndKeys(); - if (keys == null || keys.getFirst() == null || - keys.getFirst().length == 0) { - HRegionLocation regLoc = - getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); - if (null == regLoc) { - throw new IOException("Expecting at least one region."); - } - List<InputSplit> splits = new ArrayList<>(1); - long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName()); - TableSplit split = new TableSplit(tableName, scan, - HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc - .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); - splits.add(split); - return splits; - } - List<InputSplit> splits = new ArrayList<>(keys.getFirst().length); - for (int i = 0; i < keys.getFirst().length; i++) { - if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { - continue; - } - - byte[] startRow = scan.getStartRow(); - byte[] stopRow = scan.getStopRow(); - // determine if the given start an stop key fall into the region - if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || - Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && - (stopRow.length == 0 || - Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { - byte[] splitStart = startRow.length == 0 || - Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? - keys.getFirst()[i] : startRow; - byte[] splitStop = (stopRow.length == 0 || - Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && - keys.getSecond()[i].length > 0 ? - keys.getSecond()[i] : stopRow; - - HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false); - // The below InetSocketAddress creation does a name resolution. - InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort()); - if (isa.isUnresolved()) { - LOG.warn("Failed resolve " + isa); - } - InetAddress regionAddress = isa.getAddress(); - String regionLocation; - regionLocation = reverseDNS(regionAddress); - - byte[] regionName = location.getRegionInfo().getRegionName(); - String encodedRegionName = location.getRegionInfo().getEncodedName(); - long regionSize = sizeCalculator.getRegionSize(regionName); - TableSplit split = new TableSplit(tableName, scan, - splitStart, splitStop, regionLocation, encodedRegionName, regionSize); - splits.add(split); - if (LOG.isDebugEnabled()) { - LOG.debug("getSplits: split -> " + i + " -> " + split); - } - } - } - //The default value of "hbase.mapreduce.input.autobalance" is false, which means not enabled. - boolean enableAutoBalance = context.getConfiguration() - .getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false); - if (enableAutoBalance) { - long totalRegionSize=0; - for (int i = 0; i < splits.size(); i++){ - TableSplit ts = (TableSplit)splits.get(i); - totalRegionSize += ts.getLength(); - } - long averageRegionSize = totalRegionSize / splits.size(); - // the averageRegionSize must be positive. - if (averageRegionSize <= 0) { - LOG.warn("The averageRegionSize is not positive: "+ averageRegionSize + ", " + - "set it to 1."); - averageRegionSize = 1; - } - return calculateRebalancedSplits(splits, context, averageRegionSize); - } else { - return splits; - } - } finally { - if (closeOnFinish) { - closeTable(); - } - } - } - - String reverseDNS(InetAddress ipAddress) throws UnknownHostException { - String hostName = this.reverseDNSCacheMap.get(ipAddress); - if (hostName == null) { - String ipAddressString = null; - try { - ipAddressString = DNS.reverseDns(ipAddress, null); - } catch (Exception e) { - // We can use InetAddress in case the jndi failed to pull up the reverse DNS entry from the - // name service. Also, in case of ipv6, we need to use the InetAddress since resolving - // reverse DNS using jndi doesn't work well with ipv6 addresses. - ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName(); - } - if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress); - hostName = Strings.domainNamePointerToHostName(ipAddressString); - this.reverseDNSCacheMap.put(ipAddress, hostName); - } - return hostName; - } - - /** - * Calculates the number of MapReduce input splits for the map tasks. The number of - * MapReduce input splits depends on the average region size and the "data skew ratio" user set in - * configuration. - * - * @param list The list of input splits before balance. - * @param context The current job context. - * @param average The average size of all regions . - * @return The list of input splits. - * @throws IOException When creating the list of splits fails. - * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( - * org.apache.hadoop.mapreduce.JobContext) - */ - private List<InputSplit> calculateRebalancedSplits(List<InputSplit> list, JobContext context, - long average) throws IOException { - List<InputSplit> resultList = new ArrayList<>(); - Configuration conf = context.getConfiguration(); - //The default data skew ratio is 3 - long dataSkewRatio = conf.getLong(INPUT_AUTOBALANCE_MAXSKEWRATIO, 3); - //It determines which mode to use: text key mode or binary key mode. The default is text mode. - boolean isTextKey = context.getConfiguration().getBoolean(TABLE_ROW_TEXTKEY, true); - long dataSkewThreshold = dataSkewRatio * average; - int count = 0; - while (count < list.size()) { - TableSplit ts = (TableSplit)list.get(count); - TableName tableName = ts.getTable(); - String regionLocation = ts.getRegionLocation(); - String encodedRegionName = ts.getEncodedRegionName(); - long regionSize = ts.getLength(); - if (regionSize >= dataSkewThreshold) { - // if the current region size is large than the data skew threshold, - // split the region into two MapReduce input splits. - byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow(), isTextKey); - if (Arrays.equals(ts.getEndRow(), splitKey)) { - // Not splitting since the end key is the same as the split key - resultList.add(ts); - } else { - //Set the size of child TableSplit as 1/2 of the region size. The exact size of the - // MapReduce input splits is not far off. - TableSplit t1 = new TableSplit(tableName, scan, ts.getStartRow(), splitKey, - regionLocation, regionSize / 2); - TableSplit t2 = new TableSplit(tableName, scan, splitKey, ts.getEndRow(), regionLocation, - regionSize - regionSize / 2); - resultList.add(t1); - resultList.add(t2); - } - count++; - } else if (regionSize >= average) { - // if the region size between average size and data skew threshold size, - // make this region as one MapReduce input split. - resultList.add(ts); - count++; - } else { - // if the total size of several small continuous regions less than the average region size, - // combine them into one MapReduce input split. - long totalSize = regionSize; - byte[] splitStartKey = ts.getStartRow(); - byte[] splitEndKey = ts.getEndRow(); - count++; - for (; count < list.size(); count++) { - TableSplit nextRegion = (TableSplit)list.get(count); - long nextRegionSize = nextRegion.getLength(); - if (totalSize + nextRegionSize <= dataSkewThreshold) { - totalSize = totalSize + nextRegionSize; - splitEndKey = nextRegion.getEndRow(); - } else { - break; - } - } - TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey, - regionLocation, encodedRegionName, totalSize); - resultList.add(t); - } - } - return resultList; - } - - /** - * select a split point in the region. The selection of the split point is based on an uniform - * distribution assumption for the keys in a region. - * Here are some examples: - * - * <table> - * <tr> - * <th>start key</th> - * <th>end key</th> - * <th>is text</th> - * <th>split point</th> - * </tr> - * <tr> - * <td>'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f', 'g'</td> - * <td>'a', 'a', 'a', 'f', 'f', 'f'</td> - * <td>true</td> - * <td>'a', 'a', 'a', 'd', 'd', -78, 50, -77, 51</td> - * </tr> - * <tr> - * <td>'1', '1', '1', '0', '0', '0'</td> - * <td>'1', '1', '2', '5', '7', '9', '0'</td> - * <td>true</td> - * <td>'1', '1', '1', -78, -77, -76, -104</td> - * </tr> - * <tr> - * <td>'1', '1', '1', '0'</td> - * <td>'1', '1', '2', '0'</td> - * <td>true</td> - * <td>'1', '1', '1', -80</td> - * </tr> - * <tr> - * <td>13, -19, 126, 127</td> - * <td>13, -19, 127, 0</td> - * <td>false</td> - * <td>13, -19, 126, -65</td> - * </tr> - * </table> - * - * Set this function as "public static", make it easier for test. - * - * @param start Start key of the region - * @param end End key of the region - * @param isText It determines to use text key mode or binary key mode - * @return The split point in the region. - */ - @InterfaceAudience.Private - public static byte[] getSplitKey(byte[] start, byte[] end, boolean isText) { - byte upperLimitByte; - byte lowerLimitByte; - //Use text mode or binary mode. - if (isText) { - //The range of text char set in ASCII is [32,126], the lower limit is space and the upper - // limit is '~'. - upperLimitByte = '~'; - lowerLimitByte = ' '; - } else { - upperLimitByte = -1; - lowerLimitByte = 0; - } - // For special case - // Example 1 : startkey=null, endkey="hhhqqqwww", splitKey="h" - // Example 2 (text key mode): startKey="ffffaaa", endKey=null, splitkey="f~~~~~~" - if (start.length == 0 && end.length == 0){ - return new byte[]{(byte) ((lowerLimitByte + upperLimitByte) / 2)}; - } - if (start.length == 0 && end.length != 0){ - return new byte[]{ end[0] }; - } - if (start.length != 0 && end.length == 0){ - byte[] result =new byte[start.length]; - result[0]=start[0]; - for (int k = 1; k < start.length; k++){ - result[k] = upperLimitByte; - } - return result; - } - return Bytes.split(start, end, false, 1)[1]; - } - - /** - * Test if the given region is to be included in the InputSplit while splitting - * the regions of a table. - * <p> - * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job, - * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br> - * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing, - * continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due to the ordering of the keys. - * <br> - * <br> - * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. - * <br> - * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded( i.e. all regions are included). - * - * - * @param startKey Start key of the region - * @param endKey End key of the region - * @return true, if this region needs to be included as part of the input (default). - * - */ - protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) { - return true; - } - - /** - * Allows subclasses to get the {@link RegionLocator}. - */ - protected RegionLocator getRegionLocator() { - if (regionLocator == null) { - throw new IllegalStateException(NOT_INITIALIZED); - } - return regionLocator; - } - - /** - * Allows subclasses to get the {@link Table}. - */ - protected Table getTable() { - if (table == null) { - throw new IllegalStateException(NOT_INITIALIZED); - } - return table; - } - - /** - * Allows subclasses to get the {@link Admin}. - */ - protected Admin getAdmin() { - if (admin == null) { - throw new IllegalStateException(NOT_INITIALIZED); - } - return admin; - } - - /** - * Allows subclasses to initialize the table information. - * - * @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close. - * @param tableName The {@link TableName} of the table to process. - * @throws IOException - */ - protected void initializeTable(Connection connection, TableName tableName) throws IOException { - if (this.table != null || this.connection != null) { - LOG.warn("initializeTable called multiple times. Overwriting connection and table " + - "reference; TableInputFormatBase will not close these old references when done."); - } - this.table = connection.getTable(tableName); - this.regionLocator = connection.getRegionLocator(tableName); - this.admin = connection.getAdmin(); - this.connection = connection; - } - - /** - * Gets the scan defining the actual details like columns etc. - * - * @return The internal scan instance. - */ - public Scan getScan() { - if (this.scan == null) this.scan = new Scan(); - return scan; - } - - /** - * Sets the scan defining the actual details like columns etc. - * - * @param scan The scan to set. - */ - public void setScan(Scan scan) { - this.scan = scan; - } - - /** - * Allows subclasses to set the {@link TableRecordReader}. - * - * @param tableRecordReader A different {@link TableRecordReader} - * implementation. - */ - protected void setTableRecordReader(TableRecordReader tableRecordReader) { - this.tableRecordReader = tableRecordReader; - } - - /** - * Handle subclass specific set up. - * Each of the entry points used by the MapReduce framework, - * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)}, - * will call {@link #initialize(JobContext)} as a convenient centralized location to handle - * retrieving the necessary configuration information and calling - * {@link #initializeTable(Connection, TableName)}. - * - * Subclasses should implement their initialize call such that it is safe to call multiple times. - * The current TableInputFormatBase implementation relies on a non-null table reference to decide - * if an initialize call is needed, but this behavior may change in the future. In particular, - * it is critical that initializeTable not be called multiple times since this will leak - * Connection instances. - * - */ - protected void initialize(JobContext context) throws IOException { - } - - /** - * Close the Table and related objects that were initialized via - * {@link #initializeTable(Connection, TableName)}. - * - * @throws IOException - */ - protected void closeTable() throws IOException { - close(admin, table, regionLocator, connection); - admin = null; - table = null; - regionLocator = null; - connection = null; - } - - private void close(Closeable... closables) throws IOException { - for (Closeable c : closables) { - if(c != null) { c.close(); } - } - } - -}
