http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java new file mode 100644 index 0000000..acf6ff8 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -0,0 +1,700 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce.replication; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableSnapshotScanner; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat; +import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.hbase.mapreduce.TableSplit; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl; +import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; + +/** + * This map-only job compares the data from a local table with a remote one. + * Every cell is compared and must have exactly the same keys (even timestamp) + * as well as same value. It is possible to restrict the job by time range and + * families. The peer id that's provided must match the one given when the + * replication stream was setup. + * <p> + * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason + * for a why a row is different is shown in the map's log. + */ +public class VerifyReplication extends Configured implements Tool { + + private static final Log LOG = + LogFactory.getLog(VerifyReplication.class); + + public final static String NAME = "verifyrep"; + private final static String PEER_CONFIG_PREFIX = NAME + ".peer."; + long startTime = 0; + long endTime = Long.MAX_VALUE; + int batch = -1; + int versions = -1; + String tableName = null; + String families = null; + String delimiter = ""; + String peerId = null; + String rowPrefixes = null; + int sleepMsBeforeReCompare = 0; + boolean verbose = false; + boolean includeDeletedCells = false; + //Source table snapshot name + String sourceSnapshotName = null; + //Temp location in source cluster to restore source snapshot + String sourceSnapshotTmpDir = null; + //Peer table snapshot name + String peerSnapshotName = null; + //Temp location in peer cluster to restore peer snapshot + String peerSnapshotTmpDir = null; + //Peer cluster Hadoop FS address + String peerFSAddress = null; + //Peer cluster HBase root dir location + String peerHBaseRootAddress = null; + + + private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + + /** + * Map-only comparator for 2 tables + */ + public static class Verifier + extends TableMapper<ImmutableBytesWritable, Put> { + + + + public static enum Counters { + GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS} + + private Connection sourceConnection; + private Table sourceTable; + private Connection replicatedConnection; + private Table replicatedTable; + private ResultScanner replicatedScanner; + private Result currentCompareRowInPeerTable; + private int sleepMsBeforeReCompare; + private String delimiter = ""; + private boolean verbose = false; + private int batch = -1; + + /** + * Map method that compares every scanned row with the equivalent from + * a distant cluster. + * @param row The current table row key. + * @param value The columns. + * @param context The current context. + * @throws IOException When something is broken with the data. + */ + @Override + public void map(ImmutableBytesWritable row, final Result value, + Context context) + throws IOException { + if (replicatedScanner == null) { + Configuration conf = context.getConfiguration(); + sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0); + delimiter = conf.get(NAME + ".delimiter", ""); + verbose = conf.getBoolean(NAME +".verbose", false); + batch = conf.getInt(NAME + ".batch", -1); + final Scan scan = new Scan(); + if (batch > 0) { + scan.setBatch(batch); + } + scan.setCacheBlocks(false); + scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1)); + long startTime = conf.getLong(NAME + ".startTime", 0); + long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE); + String families = conf.get(NAME + ".families", null); + if(families != null) { + String[] fams = families.split(","); + for(String fam : fams) { + scan.addFamily(Bytes.toBytes(fam)); + } + } + boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false); + scan.setRaw(includeDeletedCells); + String rowPrefixes = conf.get(NAME + ".rowPrefixes", null); + setRowPrefixFilter(scan, rowPrefixes); + scan.setTimeRange(startTime, endTime); + int versions = conf.getInt(NAME+".versions", -1); + LOG.info("Setting number of version inside map as: " + versions); + if (versions >= 0) { + scan.setMaxVersions(versions); + } + TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); + sourceConnection = ConnectionFactory.createConnection(conf); + sourceTable = sourceConnection.getTable(tableName); + + final InputSplit tableSplit = context.getInputSplit(); + + String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); + Configuration peerConf = HBaseConfiguration.createClusterConf(conf, + zkClusterKey, PEER_CONFIG_PREFIX); + + replicatedConnection = ConnectionFactory.createConnection(peerConf); + replicatedTable = replicatedConnection.getTable(tableName); + scan.setStartRow(value.getRow()); + + byte[] endRow = null; + if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) { + endRow = ((TableSnapshotInputFormat.TableSnapshotRegionSplit) tableSplit).getRegionInfo() + .getEndKey(); + } else { + endRow = ((TableSplit) tableSplit).getEndRow(); + } + + scan.setStopRow(endRow); + + String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null); + if (peerSnapshotName != null) { + String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null); + String peerFSAddress = conf.get(NAME + ".peerFSAddress", null); + String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null); + FileSystem.setDefaultUri(peerConf, peerFSAddress); + FSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress)); + LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:" + + peerSnapshotTmpDir + " peer root uri:" + FSUtils.getRootDir(peerConf) + + " peerFSAddress:" + peerFSAddress); + + replicatedScanner = new TableSnapshotScanner(peerConf, + new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan); + } else { + replicatedScanner = replicatedTable.getScanner(scan); + } + currentCompareRowInPeerTable = replicatedScanner.next(); + } + while (true) { + if (currentCompareRowInPeerTable == null) { + // reach the region end of peer table, row only in source table + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); + break; + } + int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow()); + if (rowCmpRet == 0) { + // rowkey is same, need to compare the content of the row + try { + Result.compareResults(value, currentCompareRowInPeerTable); + context.getCounter(Counters.GOODROWS).increment(1); + if (verbose) { + LOG.info("Good row key: " + delimiter + + Bytes.toStringBinary(value.getRow()) + delimiter); + } + } catch (Exception e) { + logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value); + } + currentCompareRowInPeerTable = replicatedScanner.next(); + break; + } else if (rowCmpRet < 0) { + // row only exists in source table + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); + break; + } else { + // row only exists in peer table + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, + currentCompareRowInPeerTable); + currentCompareRowInPeerTable = replicatedScanner.next(); + } + } + } + + private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) { + if (sleepMsBeforeReCompare > 0) { + Threads.sleep(sleepMsBeforeReCompare); + try { + Result sourceResult = sourceTable.get(new Get(row.getRow())); + Result replicatedResult = replicatedTable.get(new Get(row.getRow())); + Result.compareResults(sourceResult, replicatedResult); + if (!sourceResult.isEmpty()) { + context.getCounter(Counters.GOODROWS).increment(1); + if (verbose) { + LOG.info("Good row key (with recompare): " + delimiter + Bytes.toStringBinary(row.getRow()) + + delimiter); + } + } + return; + } catch (Exception e) { + LOG.error("recompare fail after sleep, rowkey=" + delimiter + + Bytes.toStringBinary(row.getRow()) + delimiter); + } + } + context.getCounter(counter).increment(1); + context.getCounter(Counters.BADROWS).increment(1); + LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) + + delimiter); + } + + @Override + protected void cleanup(Context context) { + if (replicatedScanner != null) { + try { + while (currentCompareRowInPeerTable != null) { + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, + currentCompareRowInPeerTable); + currentCompareRowInPeerTable = replicatedScanner.next(); + } + } catch (Exception e) { + LOG.error("fail to scan peer table in cleanup", e); + } finally { + replicatedScanner.close(); + replicatedScanner = null; + } + } + + if (sourceTable != null) { + try { + sourceTable.close(); + } catch (IOException e) { + LOG.error("fail to close source table in cleanup", e); + } + } + if(sourceConnection != null){ + try { + sourceConnection.close(); + } catch (Exception e) { + LOG.error("fail to close source connection in cleanup", e); + } + } + + if(replicatedTable != null){ + try{ + replicatedTable.close(); + } catch (Exception e) { + LOG.error("fail to close replicated table in cleanup", e); + } + } + if(replicatedConnection != null){ + try { + replicatedConnection.close(); + } catch (Exception e) { + LOG.error("fail to close replicated connection in cleanup", e); + } + } + } + } + + private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig( + final Configuration conf, String peerId) throws IOException { + ZooKeeperWatcher localZKW = null; + ReplicationPeerZKImpl peer = null; + try { + localZKW = new ZooKeeperWatcher(conf, "VerifyReplication", + new Abortable() { + @Override public void abort(String why, Throwable e) {} + @Override public boolean isAborted() {return false;} + }); + + ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW); + rp.init(); + + Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId); + if (pair == null) { + throw new IOException("Couldn't get peer conf!"); + } + + return pair; + } catch (ReplicationException e) { + throw new IOException( + "An error occurred while trying to connect to the remove peer cluster", e); + } finally { + if (peer != null) { + peer.close(); + } + if (localZKW != null) { + localZKW.close(); + } + } + } + + /** + * Sets up the actual job. + * + * @param conf The current configuration. + * @param args The command line parameters. + * @return The newly created job. + * @throws java.io.IOException When setting up the job fails. + */ + public Job createSubmittableJob(Configuration conf, String[] args) + throws IOException { + if (!doCommandLine(args)) { + return null; + } + conf.set(NAME+".peerId", peerId); + conf.set(NAME+".tableName", tableName); + conf.setLong(NAME+".startTime", startTime); + conf.setLong(NAME+".endTime", endTime); + conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare); + conf.set(NAME + ".delimiter", delimiter); + conf.setInt(NAME + ".batch", batch); + conf.setBoolean(NAME +".verbose", verbose); + conf.setBoolean(NAME +".includeDeletedCells", includeDeletedCells); + if (families != null) { + conf.set(NAME+".families", families); + } + if (rowPrefixes != null){ + conf.set(NAME+".rowPrefixes", rowPrefixes); + } + + Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf, peerId); + ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); + String peerQuorumAddress = peerConfig.getClusterKey(); + LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " + + peerConfig.getConfiguration()); + conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); + HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX, + peerConfig.getConfiguration().entrySet()); + + conf.setInt(NAME + ".versions", versions); + LOG.info("Number of version: " + versions); + + //Set Snapshot specific parameters + if (peerSnapshotName != null) { + conf.set(NAME + ".peerSnapshotName", peerSnapshotName); + conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir); + conf.set(NAME + ".peerFSAddress", peerFSAddress); + conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress); + + // This is to create HDFS delegation token for peer cluster in case of secured + conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress); + } + + Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); + job.setJarByClass(VerifyReplication.class); + + Scan scan = new Scan(); + scan.setTimeRange(startTime, endTime); + scan.setRaw(includeDeletedCells); + scan.setCacheBlocks(false); + if (batch > 0) { + scan.setBatch(batch); + } + if (versions >= 0) { + scan.setMaxVersions(versions); + LOG.info("Number of versions set to " + versions); + } + if(families != null) { + String[] fams = families.split(","); + for(String fam : fams) { + scan.addFamily(Bytes.toBytes(fam)); + } + } + + setRowPrefixFilter(scan, rowPrefixes); + + if (sourceSnapshotName != null) { + Path snapshotTempPath = new Path(sourceSnapshotTmpDir); + LOG.info( + "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir); + TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null, + null, job, true, snapshotTempPath); + } else { + TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); + } + Configuration peerClusterConf = peerConfigPair.getSecond(); + // Obtain the auth token from peer cluster + TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf); + + job.setOutputFormatClass(NullOutputFormat.class); + job.setNumReduceTasks(0); + return job; + } + + private static void setRowPrefixFilter(Scan scan, String rowPrefixes) { + if (rowPrefixes != null && !rowPrefixes.isEmpty()) { + String[] rowPrefixArray = rowPrefixes.split(","); + Arrays.sort(rowPrefixArray); + FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); + for (String prefix : rowPrefixArray) { + Filter filter = new PrefixFilter(Bytes.toBytes(prefix)); + filterList.addFilter(filter); + } + scan.setFilter(filterList); + byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]); + byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]); + setStartAndStopRows(scan, startPrefixRow, lastPrefixRow); + } + } + + private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) { + scan.setStartRow(startPrefixRow); + byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1), + new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)}); + scan.setStopRow(stopRow); + } + + @VisibleForTesting + public boolean doCommandLine(final String[] args) { + if (args.length < 2) { + printUsage(null); + return false; + } + try { + for (int i = 0; i < args.length; i++) { + String cmd = args[i]; + if (cmd.equals("-h") || cmd.startsWith("--h")) { + printUsage(null); + return false; + } + + final String startTimeArgKey = "--starttime="; + if (cmd.startsWith(startTimeArgKey)) { + startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); + continue; + } + + final String endTimeArgKey = "--endtime="; + if (cmd.startsWith(endTimeArgKey)) { + endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); + continue; + } + + final String includeDeletedCellsArgKey = "--raw"; + if (cmd.equals(includeDeletedCellsArgKey)) { + includeDeletedCells = true; + continue; + } + + final String versionsArgKey = "--versions="; + if (cmd.startsWith(versionsArgKey)) { + versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); + continue; + } + + final String batchArgKey = "--batch="; + if (cmd.startsWith(batchArgKey)) { + batch = Integer.parseInt(cmd.substring(batchArgKey.length())); + continue; + } + + final String familiesArgKey = "--families="; + if (cmd.startsWith(familiesArgKey)) { + families = cmd.substring(familiesArgKey.length()); + continue; + } + + final String rowPrefixesKey = "--row-prefixes="; + if (cmd.startsWith(rowPrefixesKey)){ + rowPrefixes = cmd.substring(rowPrefixesKey.length()); + continue; + } + + final String delimiterArgKey = "--delimiter="; + if (cmd.startsWith(delimiterArgKey)) { + delimiter = cmd.substring(delimiterArgKey.length()); + continue; + } + + final String sleepToReCompareKey = "--recomparesleep="; + if (cmd.startsWith(sleepToReCompareKey)) { + sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length())); + continue; + } + final String verboseKey = "--verbose"; + if (cmd.startsWith(verboseKey)) { + verbose = true; + continue; + } + + final String sourceSnapshotNameArgKey = "--sourceSnapshotName="; + if (cmd.startsWith(sourceSnapshotNameArgKey)) { + sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length()); + continue; + } + + final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir="; + if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) { + sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length()); + continue; + } + + final String peerSnapshotNameArgKey = "--peerSnapshotName="; + if (cmd.startsWith(peerSnapshotNameArgKey)) { + peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length()); + continue; + } + + final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir="; + if (cmd.startsWith(peerSnapshotTmpDirArgKey)) { + peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length()); + continue; + } + + final String peerFSAddressArgKey = "--peerFSAddress="; + if (cmd.startsWith(peerFSAddressArgKey)) { + peerFSAddress = cmd.substring(peerFSAddressArgKey.length()); + continue; + } + + final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress="; + if (cmd.startsWith(peerHBaseRootAddressArgKey)) { + peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length()); + continue; + } + + if (cmd.startsWith("--")) { + printUsage("Invalid argument '" + cmd + "'"); + return false; + } + + if (i == args.length-2) { + peerId = cmd; + } + + if (i == args.length-1) { + tableName = cmd; + } + } + + if ((sourceSnapshotName != null && sourceSnapshotTmpDir == null) + || (sourceSnapshotName == null && sourceSnapshotTmpDir != null)) { + printUsage("Source snapshot name and snapshot temp location should be provided" + + " to use snapshots in source cluster"); + return false; + } + + if (peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null + || peerHBaseRootAddress != null) { + if (peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null + || peerHBaseRootAddress == null) { + printUsage( + "Peer snapshot name, peer snapshot temp location, Peer HBase root address and " + + "peer FSAddress should be provided to use snapshots in peer cluster"); + return false; + } + } + + // This is to avoid making recompare calls to source/peer tables when snapshots are used + if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) { + printUsage( + "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable"); + return false; + } + + } catch (Exception e) { + e.printStackTrace(); + printUsage("Can't start because " + e.getMessage()); + return false; + } + return true; + } + + /* + * @param errorMsg Error message. Can be null. + */ + private static void printUsage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + System.err.println("Usage: verifyrep [--starttime=X]" + + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " + + "[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] " + + "[--peerSnapshotTmpDir=S] [--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid> <tablename>"); + System.err.println(); + System.err.println("Options:"); + System.err.println(" starttime beginning of the time range"); + System.err.println(" without endtime means from starttime to forever"); + System.err.println(" endtime end of the time range"); + System.err.println(" versions number of cell versions to verify"); + System.err.println(" batch batch count for scan, " + + "note that result row counts will no longer be actual number of rows when you use this option"); + System.err.println(" raw includes raw scan if given in options"); + System.err.println(" families comma-separated list of families to copy"); + System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on "); + System.err.println(" delimiter the delimiter used in display around rowkey"); + System.err.println(" recomparesleep milliseconds to sleep before recompare row, " + + "default value is 0 which disables the recompare."); + System.err.println(" verbose logs row keys of good rows"); + System.err.println(" sourceSnapshotName Source Snapshot Name"); + System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot"); + System.err.println(" peerSnapshotName Peer Snapshot Name"); + System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot"); + System.err.println(" peerFSAddress Peer cluster Hadoop FS address"); + System.err.println(" peerHBaseRootAddress Peer cluster HBase root location"); + System.err.println(); + System.err.println("Args:"); + System.err.println(" peerid Id of the peer used for verification, must match the one given for replication"); + System.err.println(" tablename Name of the table to verify"); + System.err.println(); + System.err.println("Examples:"); + System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 "); + System.err.println(" $ hbase " + + "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" + + " --starttime=1265875194289 --endtime=1265878794289 5 TestTable "); + } + + @Override + public int run(String[] args) throws Exception { + Configuration conf = this.getConf(); + Job job = createSubmittableJob(conf, args); + if (job != null) { + return job.waitForCompletion(true) ? 0 : 1; + } + return 1; + } + + /** + * Main entry point. + * + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args); + System.exit(res); + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java new file mode 100644 index 0000000..eb9a5f7 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java @@ -0,0 +1,470 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.mapreduce.JobUtil; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.LineReader; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/* + * The CompactionTool allows to execute a compaction specifying a: + * <ul> + * <li>table folder (all regions and families will be compacted) + * <li>region folder (all families in the region will be compacted) + * <li>family folder (the store files will be compacted) + * </ul> + */ [email protected](HBaseInterfaceAudience.TOOLS) +public class CompactionTool extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(CompactionTool.class); + + private final static String CONF_TMP_DIR = "hbase.tmp.dir"; + private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once"; + private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major"; + private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete"; + private final static String CONF_COMPLETE_COMPACTION = "hbase.hstore.compaction.complete"; + + /** + * Class responsible to execute the Compaction on the specified path. + * The path can be a table, region or family directory. + */ + private static class CompactionWorker { + private final boolean keepCompactedFiles; + private final boolean deleteCompacted; + private final Configuration conf; + private final FileSystem fs; + private final Path tmpDir; + + public CompactionWorker(final FileSystem fs, final Configuration conf) { + this.conf = conf; + this.keepCompactedFiles = !conf.getBoolean(CONF_COMPLETE_COMPACTION, true); + this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false); + this.tmpDir = new Path(conf.get(CONF_TMP_DIR)); + this.fs = fs; + } + + /** + * Execute the compaction on the specified path. + * + * @param path Directory path on which to run compaction. + * @param compactOnce Execute just a single step of compaction. + * @param major Request major compaction. + */ + public void compact(final Path path, final boolean compactOnce, final boolean major) throws IOException { + if (isFamilyDir(fs, path)) { + Path regionDir = path.getParent(); + Path tableDir = regionDir.getParent(); + TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir); + HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir); + compactStoreFiles(tableDir, htd, hri, + path.getName(), compactOnce, major); + } else if (isRegionDir(fs, path)) { + Path tableDir = path.getParent(); + TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir); + compactRegion(tableDir, htd, path, compactOnce, major); + } else if (isTableDir(fs, path)) { + compactTable(path, compactOnce, major); + } else { + throw new IOException( + "Specified path is not a table, region or family directory. path=" + path); + } + } + + private void compactTable(final Path tableDir, final boolean compactOnce, final boolean major) + throws IOException { + TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir); + for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) { + compactRegion(tableDir, htd, regionDir, compactOnce, major); + } + } + + private void compactRegion(final Path tableDir, final TableDescriptor htd, + final Path regionDir, final boolean compactOnce, final boolean major) + throws IOException { + HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir); + for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) { + compactStoreFiles(tableDir, htd, hri, familyDir.getName(), compactOnce, major); + } + } + + /** + * Execute the actual compaction job. + * If the compact once flag is not specified, execute the compaction until + * no more compactions are needed. Uses the Configuration settings provided. + */ + private void compactStoreFiles(final Path tableDir, final TableDescriptor htd, + final HRegionInfo hri, final String familyName, final boolean compactOnce, + final boolean major) throws IOException { + HStore store = getStore(conf, fs, tableDir, htd, hri, familyName, tmpDir); + LOG.info("Compact table=" + htd.getTableName() + + " region=" + hri.getRegionNameAsString() + + " family=" + familyName); + if (major) { + store.triggerMajorCompaction(); + } + do { + CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null); + if (compaction == null) break; + List<StoreFile> storeFiles = + store.compact(compaction, NoLimitThroughputController.INSTANCE); + if (storeFiles != null && !storeFiles.isEmpty()) { + if (keepCompactedFiles && deleteCompacted) { + for (StoreFile storeFile: storeFiles) { + fs.delete(storeFile.getPath(), false); + } + } + } + } while (store.needsCompaction() && !compactOnce); + } + + /** + * Create a "mock" HStore that uses the tmpDir specified by the user and + * the store dir to compact as source. + */ + private static HStore getStore(final Configuration conf, final FileSystem fs, + final Path tableDir, final TableDescriptor htd, final HRegionInfo hri, + final String familyName, final Path tempDir) throws IOException { + HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri) { + @Override + public Path getTempDir() { + return tempDir; + } + }; + HRegion region = new HRegion(regionFs, null, conf, htd, null); + return new HStore(region, htd.getColumnFamily(Bytes.toBytes(familyName)), conf); + } + } + + private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException { + Path regionInfo = new Path(path, HRegionFileSystem.REGION_INFO_FILE); + return fs.exists(regionInfo); + } + + private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException { + return FSTableDescriptors.getTableInfoPath(fs, path) != null; + } + + private static boolean isFamilyDir(final FileSystem fs, final Path path) throws IOException { + return isRegionDir(fs, path.getParent()); + } + + private static class CompactionMapper + extends Mapper<LongWritable, Text, NullWritable, NullWritable> { + private CompactionWorker compactor = null; + private boolean compactOnce = false; + private boolean major = false; + + @Override + public void setup(Context context) { + Configuration conf = context.getConfiguration(); + compactOnce = conf.getBoolean(CONF_COMPACT_ONCE, false); + major = conf.getBoolean(CONF_COMPACT_MAJOR, false); + + try { + FileSystem fs = FileSystem.get(conf); + this.compactor = new CompactionWorker(fs, conf); + } catch (IOException e) { + throw new RuntimeException("Could not get the input FileSystem", e); + } + } + + @Override + public void map(LongWritable key, Text value, Context context) + throws InterruptedException, IOException { + Path path = new Path(value.toString()); + this.compactor.compact(path, compactOnce, major); + } + } + + /** + * Input format that uses store files block location as input split locality. + */ + private static class CompactionInputFormat extends TextInputFormat { + @Override + protected boolean isSplitable(JobContext context, Path file) { + return true; + } + + /** + * Returns a split for each store files directory using the block location + * of each file as locality reference. + */ + @Override + public List<InputSplit> getSplits(JobContext job) throws IOException { + List<InputSplit> splits = new ArrayList<>(); + List<FileStatus> files = listStatus(job); + + Text key = new Text(); + for (FileStatus file: files) { + Path path = file.getPath(); + FileSystem fs = path.getFileSystem(job.getConfiguration()); + LineReader reader = new LineReader(fs.open(path)); + long pos = 0; + int n; + try { + while ((n = reader.readLine(key)) > 0) { + String[] hosts = getStoreDirHosts(fs, path); + splits.add(new FileSplit(path, pos, n, hosts)); + pos += n; + } + } finally { + reader.close(); + } + } + + return splits; + } + + /** + * return the top hosts of the store files, used by the Split + */ + private static String[] getStoreDirHosts(final FileSystem fs, final Path path) + throws IOException { + FileStatus[] files = FSUtils.listStatus(fs, path); + if (files == null) { + return new String[] {}; + } + + HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); + for (FileStatus hfileStatus: files) { + HDFSBlocksDistribution storeFileBlocksDistribution = + FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, hfileStatus.getLen()); + hdfsBlocksDistribution.add(storeFileBlocksDistribution); + } + + List<String> hosts = hdfsBlocksDistribution.getTopHosts(); + return hosts.toArray(new String[hosts.size()]); + } + + /** + * Create the input file for the given directories to compact. + * The file is a TextFile with each line corrisponding to a + * store files directory to compact. + */ + public static void createInputFile(final FileSystem fs, final Path path, + final Set<Path> toCompactDirs) throws IOException { + // Extract the list of store dirs + List<Path> storeDirs = new LinkedList<>(); + for (Path compactDir: toCompactDirs) { + if (isFamilyDir(fs, compactDir)) { + storeDirs.add(compactDir); + } else if (isRegionDir(fs, compactDir)) { + for (Path familyDir: FSUtils.getFamilyDirs(fs, compactDir)) { + storeDirs.add(familyDir); + } + } else if (isTableDir(fs, compactDir)) { + // Lookup regions + for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) { + for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) { + storeDirs.add(familyDir); + } + } + } else { + throw new IOException( + "Specified path is not a table, region or family directory. path=" + compactDir); + } + } + + // Write Input File + FSDataOutputStream stream = fs.create(path); + LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact."); + try { + final byte[] newLine = Bytes.toBytes("\n"); + for (Path storeDir: storeDirs) { + stream.write(Bytes.toBytes(storeDir.toString())); + stream.write(newLine); + } + } finally { + stream.close(); + } + } + } + + /** + * Execute compaction, using a Map-Reduce job. + */ + private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs, + final boolean compactOnce, final boolean major) throws Exception { + Configuration conf = getConf(); + conf.setBoolean(CONF_COMPACT_ONCE, compactOnce); + conf.setBoolean(CONF_COMPACT_MAJOR, major); + + Job job = new Job(conf); + job.setJobName("CompactionTool"); + job.setJarByClass(CompactionTool.class); + job.setMapperClass(CompactionMapper.class); + job.setInputFormatClass(CompactionInputFormat.class); + job.setOutputFormatClass(NullOutputFormat.class); + job.setMapSpeculativeExecution(false); + job.setNumReduceTasks(0); + + // add dependencies (including HBase ones) + TableMapReduceUtil.addDependencyJars(job); + + Path stagingDir = JobUtil.getStagingDir(conf); + try { + // Create input file with the store dirs + Path inputPath = new Path(stagingDir, "compact-"+ EnvironmentEdgeManager.currentTime()); + CompactionInputFormat.createInputFile(fs, inputPath, toCompactDirs); + CompactionInputFormat.addInputPath(job, inputPath); + + // Initialize credential for secure cluster + TableMapReduceUtil.initCredentials(job); + + // Start the MR Job and wait + return job.waitForCompletion(true) ? 0 : 1; + } finally { + fs.delete(stagingDir, true); + } + } + + /** + * Execute compaction, from this client, one path at the time. + */ + private int doClient(final FileSystem fs, final Set<Path> toCompactDirs, + final boolean compactOnce, final boolean major) throws IOException { + CompactionWorker worker = new CompactionWorker(fs, getConf()); + for (Path path: toCompactDirs) { + worker.compact(path, compactOnce, major); + } + return 0; + } + + @Override + public int run(String[] args) throws Exception { + Set<Path> toCompactDirs = new HashSet<>(); + boolean compactOnce = false; + boolean major = false; + boolean mapred = false; + + Configuration conf = getConf(); + FileSystem fs = FileSystem.get(conf); + + try { + for (int i = 0; i < args.length; ++i) { + String opt = args[i]; + if (opt.equals("-compactOnce")) { + compactOnce = true; + } else if (opt.equals("-major")) { + major = true; + } else if (opt.equals("-mapred")) { + mapred = true; + } else if (!opt.startsWith("-")) { + Path path = new Path(opt); + FileStatus status = fs.getFileStatus(path); + if (!status.isDirectory()) { + printUsage("Specified path is not a directory. path=" + path); + return 1; + } + toCompactDirs.add(path); + } else { + printUsage(); + } + } + } catch (Exception e) { + printUsage(e.getMessage()); + return 1; + } + + if (toCompactDirs.isEmpty()) { + printUsage("No directories to compact specified."); + return 1; + } + + // Execute compaction! + if (mapred) { + return doMapReduce(fs, toCompactDirs, compactOnce, major); + } else { + return doClient(fs, toCompactDirs, compactOnce, major); + } + } + + private void printUsage() { + printUsage(null); + } + + private void printUsage(final String message) { + if (message != null && message.length() > 0) { + System.err.println(message); + } + System.err.println("Usage: java " + this.getClass().getName() + " \\"); + System.err.println(" [-compactOnce] [-major] [-mapred] [-D<property=value>]* files..."); + System.err.println(); + System.err.println("Options:"); + System.err.println(" mapred Use MapReduce to run compaction."); + System.err.println(" compactOnce Execute just one compaction step. (default: while needed)"); + System.err.println(" major Trigger major compaction."); + System.err.println(); + System.err.println("Note: -D properties will be applied to the conf used. "); + System.err.println("For example: "); + System.err.println(" To preserve input files, pass -D"+CONF_COMPLETE_COMPACTION+"=false"); + System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false"); + System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR"); + System.err.println(); + System.err.println("Examples:"); + System.err.println(" To compact the full 'TestTable' using MapReduce:"); + System.err.println(" $ hbase " + this.getClass().getName() + " -mapred hdfs:///hbase/data/default/TestTable"); + System.err.println(); + System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':"); + System.err.println(" $ hbase " + this.getClass().getName() + " hdfs:///hbase/data/default/TestTable/abc/x"); + } + + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args)); + } +}
