http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java deleted file mode 100644 index 8bb266e..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ /dev/null @@ -1,700 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce.replication; - -import java.io.IOException; -import java.util.Arrays; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.TableSnapshotScanner; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.PrefixFilter; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; -import org.apache.hadoop.hbase.mapreduce.TableMapper; -import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat; -import org.apache.hadoop.hbase.mapreduce.TableSplit; -import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationFactory; -import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl; -import org.apache.hadoop.hbase.replication.ReplicationPeers; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; - -/** - * This map-only job compares the data from a local table with a remote one. - * Every cell is compared and must have exactly the same keys (even timestamp) - * as well as same value. It is possible to restrict the job by time range and - * families. The peer id that's provided must match the one given when the - * replication stream was setup. - * <p> - * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason - * for a why a row is different is shown in the map's log. - */ -public class VerifyReplication extends Configured implements Tool { - - private static final Log LOG = - LogFactory.getLog(VerifyReplication.class); - - public final static String NAME = "verifyrep"; - private final static String PEER_CONFIG_PREFIX = NAME + ".peer."; - long startTime = 0; - long endTime = Long.MAX_VALUE; - int batch = -1; - int versions = -1; - String tableName = null; - String families = null; - String delimiter = ""; - String peerId = null; - String rowPrefixes = null; - int sleepMsBeforeReCompare = 0; - boolean verbose = false; - boolean includeDeletedCells = false; - //Source table snapshot name - String sourceSnapshotName = null; - //Temp location in source cluster to restore source snapshot - String sourceSnapshotTmpDir = null; - //Peer table snapshot name - String peerSnapshotName = null; - //Temp location in peer cluster to restore peer snapshot - String peerSnapshotTmpDir = null; - //Peer cluster Hadoop FS address - String peerFSAddress = null; - //Peer cluster HBase root dir location - String peerHBaseRootAddress = null; - - - private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; - - /** - * Map-only comparator for 2 tables - */ - public static class Verifier - extends TableMapper<ImmutableBytesWritable, Put> { - - - - public static enum Counters { - GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS} - - private Connection sourceConnection; - private Table sourceTable; - private Connection replicatedConnection; - private Table replicatedTable; - private ResultScanner replicatedScanner; - private Result currentCompareRowInPeerTable; - private int sleepMsBeforeReCompare; - private String delimiter = ""; - private boolean verbose = false; - private int batch = -1; - - /** - * Map method that compares every scanned row with the equivalent from - * a distant cluster. - * @param row The current table row key. - * @param value The columns. - * @param context The current context. - * @throws IOException When something is broken with the data. - */ - @Override - public void map(ImmutableBytesWritable row, final Result value, - Context context) - throws IOException { - if (replicatedScanner == null) { - Configuration conf = context.getConfiguration(); - sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0); - delimiter = conf.get(NAME + ".delimiter", ""); - verbose = conf.getBoolean(NAME +".verbose", false); - batch = conf.getInt(NAME + ".batch", -1); - final Scan scan = new Scan(); - if (batch > 0) { - scan.setBatch(batch); - } - scan.setCacheBlocks(false); - scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1)); - long startTime = conf.getLong(NAME + ".startTime", 0); - long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE); - String families = conf.get(NAME + ".families", null); - if(families != null) { - String[] fams = families.split(","); - for(String fam : fams) { - scan.addFamily(Bytes.toBytes(fam)); - } - } - boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false); - scan.setRaw(includeDeletedCells); - String rowPrefixes = conf.get(NAME + ".rowPrefixes", null); - setRowPrefixFilter(scan, rowPrefixes); - scan.setTimeRange(startTime, endTime); - int versions = conf.getInt(NAME+".versions", -1); - LOG.info("Setting number of version inside map as: " + versions); - if (versions >= 0) { - scan.setMaxVersions(versions); - } - TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); - sourceConnection = ConnectionFactory.createConnection(conf); - sourceTable = sourceConnection.getTable(tableName); - - final InputSplit tableSplit = context.getInputSplit(); - - String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); - Configuration peerConf = HBaseConfiguration.createClusterConf(conf, - zkClusterKey, PEER_CONFIG_PREFIX); - - replicatedConnection = ConnectionFactory.createConnection(peerConf); - replicatedTable = replicatedConnection.getTable(tableName); - scan.setStartRow(value.getRow()); - - byte[] endRow = null; - if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) { - endRow = ((TableSnapshotInputFormat.TableSnapshotRegionSplit) tableSplit).getRegionInfo() - .getEndKey(); - } else { - endRow = ((TableSplit) tableSplit).getEndRow(); - } - - scan.setStopRow(endRow); - - String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null); - if (peerSnapshotName != null) { - String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null); - String peerFSAddress = conf.get(NAME + ".peerFSAddress", null); - String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null); - FileSystem.setDefaultUri(peerConf, peerFSAddress); - FSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress)); - LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:" - + peerSnapshotTmpDir + " peer root uri:" + FSUtils.getRootDir(peerConf) - + " peerFSAddress:" + peerFSAddress); - - replicatedScanner = new TableSnapshotScanner(peerConf, - new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan); - } else { - replicatedScanner = replicatedTable.getScanner(scan); - } - currentCompareRowInPeerTable = replicatedScanner.next(); - } - while (true) { - if (currentCompareRowInPeerTable == null) { - // reach the region end of peer table, row only in source table - logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); - break; - } - int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow()); - if (rowCmpRet == 0) { - // rowkey is same, need to compare the content of the row - try { - Result.compareResults(value, currentCompareRowInPeerTable); - context.getCounter(Counters.GOODROWS).increment(1); - if (verbose) { - LOG.info("Good row key: " + delimiter - + Bytes.toStringBinary(value.getRow()) + delimiter); - } - } catch (Exception e) { - logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value); - } - currentCompareRowInPeerTable = replicatedScanner.next(); - break; - } else if (rowCmpRet < 0) { - // row only exists in source table - logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); - break; - } else { - // row only exists in peer table - logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, - currentCompareRowInPeerTable); - currentCompareRowInPeerTable = replicatedScanner.next(); - } - } - } - - private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) { - if (sleepMsBeforeReCompare > 0) { - Threads.sleep(sleepMsBeforeReCompare); - try { - Result sourceResult = sourceTable.get(new Get(row.getRow())); - Result replicatedResult = replicatedTable.get(new Get(row.getRow())); - Result.compareResults(sourceResult, replicatedResult); - if (!sourceResult.isEmpty()) { - context.getCounter(Counters.GOODROWS).increment(1); - if (verbose) { - LOG.info("Good row key (with recompare): " + delimiter + Bytes.toStringBinary(row.getRow()) - + delimiter); - } - } - return; - } catch (Exception e) { - LOG.error("recompare fail after sleep, rowkey=" + delimiter + - Bytes.toStringBinary(row.getRow()) + delimiter); - } - } - context.getCounter(counter).increment(1); - context.getCounter(Counters.BADROWS).increment(1); - LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) + - delimiter); - } - - @Override - protected void cleanup(Context context) { - if (replicatedScanner != null) { - try { - while (currentCompareRowInPeerTable != null) { - logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, - currentCompareRowInPeerTable); - currentCompareRowInPeerTable = replicatedScanner.next(); - } - } catch (Exception e) { - LOG.error("fail to scan peer table in cleanup", e); - } finally { - replicatedScanner.close(); - replicatedScanner = null; - } - } - - if (sourceTable != null) { - try { - sourceTable.close(); - } catch (IOException e) { - LOG.error("fail to close source table in cleanup", e); - } - } - if(sourceConnection != null){ - try { - sourceConnection.close(); - } catch (Exception e) { - LOG.error("fail to close source connection in cleanup", e); - } - } - - if(replicatedTable != null){ - try{ - replicatedTable.close(); - } catch (Exception e) { - LOG.error("fail to close replicated table in cleanup", e); - } - } - if(replicatedConnection != null){ - try { - replicatedConnection.close(); - } catch (Exception e) { - LOG.error("fail to close replicated connection in cleanup", e); - } - } - } - } - - private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig( - final Configuration conf, String peerId) throws IOException { - ZooKeeperWatcher localZKW = null; - ReplicationPeerZKImpl peer = null; - try { - localZKW = new ZooKeeperWatcher(conf, "VerifyReplication", - new Abortable() { - @Override public void abort(String why, Throwable e) {} - @Override public boolean isAborted() {return false;} - }); - - ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW); - rp.init(); - - Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId); - if (pair == null) { - throw new IOException("Couldn't get peer conf!"); - } - - return pair; - } catch (ReplicationException e) { - throw new IOException( - "An error occurred while trying to connect to the remove peer cluster", e); - } finally { - if (peer != null) { - peer.close(); - } - if (localZKW != null) { - localZKW.close(); - } - } - } - - /** - * Sets up the actual job. - * - * @param conf The current configuration. - * @param args The command line parameters. - * @return The newly created job. - * @throws java.io.IOException When setting up the job fails. - */ - public Job createSubmittableJob(Configuration conf, String[] args) - throws IOException { - if (!doCommandLine(args)) { - return null; - } - conf.set(NAME+".peerId", peerId); - conf.set(NAME+".tableName", tableName); - conf.setLong(NAME+".startTime", startTime); - conf.setLong(NAME+".endTime", endTime); - conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare); - conf.set(NAME + ".delimiter", delimiter); - conf.setInt(NAME + ".batch", batch); - conf.setBoolean(NAME +".verbose", verbose); - conf.setBoolean(NAME +".includeDeletedCells", includeDeletedCells); - if (families != null) { - conf.set(NAME+".families", families); - } - if (rowPrefixes != null){ - conf.set(NAME+".rowPrefixes", rowPrefixes); - } - - Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf, peerId); - ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); - String peerQuorumAddress = peerConfig.getClusterKey(); - LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " + - peerConfig.getConfiguration()); - conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); - HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX, - peerConfig.getConfiguration().entrySet()); - - conf.setInt(NAME + ".versions", versions); - LOG.info("Number of version: " + versions); - - //Set Snapshot specific parameters - if (peerSnapshotName != null) { - conf.set(NAME + ".peerSnapshotName", peerSnapshotName); - conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir); - conf.set(NAME + ".peerFSAddress", peerFSAddress); - conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress); - - // This is to create HDFS delegation token for peer cluster in case of secured - conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress); - } - - Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); - job.setJarByClass(VerifyReplication.class); - - Scan scan = new Scan(); - scan.setTimeRange(startTime, endTime); - scan.setRaw(includeDeletedCells); - scan.setCacheBlocks(false); - if (batch > 0) { - scan.setBatch(batch); - } - if (versions >= 0) { - scan.setMaxVersions(versions); - LOG.info("Number of versions set to " + versions); - } - if(families != null) { - String[] fams = families.split(","); - for(String fam : fams) { - scan.addFamily(Bytes.toBytes(fam)); - } - } - - setRowPrefixFilter(scan, rowPrefixes); - - if (sourceSnapshotName != null) { - Path snapshotTempPath = new Path(sourceSnapshotTmpDir); - LOG.info( - "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir); - TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null, - null, job, true, snapshotTempPath); - } else { - TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); - } - Configuration peerClusterConf = peerConfigPair.getSecond(); - // Obtain the auth token from peer cluster - TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf); - - job.setOutputFormatClass(NullOutputFormat.class); - job.setNumReduceTasks(0); - return job; - } - - private static void setRowPrefixFilter(Scan scan, String rowPrefixes) { - if (rowPrefixes != null && !rowPrefixes.isEmpty()) { - String[] rowPrefixArray = rowPrefixes.split(","); - Arrays.sort(rowPrefixArray); - FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); - for (String prefix : rowPrefixArray) { - Filter filter = new PrefixFilter(Bytes.toBytes(prefix)); - filterList.addFilter(filter); - } - scan.setFilter(filterList); - byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]); - byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]); - setStartAndStopRows(scan, startPrefixRow, lastPrefixRow); - } - } - - private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) { - scan.setStartRow(startPrefixRow); - byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1), - new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)}); - scan.setStopRow(stopRow); - } - - @VisibleForTesting - public boolean doCommandLine(final String[] args) { - if (args.length < 2) { - printUsage(null); - return false; - } - try { - for (int i = 0; i < args.length; i++) { - String cmd = args[i]; - if (cmd.equals("-h") || cmd.startsWith("--h")) { - printUsage(null); - return false; - } - - final String startTimeArgKey = "--starttime="; - if (cmd.startsWith(startTimeArgKey)) { - startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); - continue; - } - - final String endTimeArgKey = "--endtime="; - if (cmd.startsWith(endTimeArgKey)) { - endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); - continue; - } - - final String includeDeletedCellsArgKey = "--raw"; - if (cmd.equals(includeDeletedCellsArgKey)) { - includeDeletedCells = true; - continue; - } - - final String versionsArgKey = "--versions="; - if (cmd.startsWith(versionsArgKey)) { - versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); - continue; - } - - final String batchArgKey = "--batch="; - if (cmd.startsWith(batchArgKey)) { - batch = Integer.parseInt(cmd.substring(batchArgKey.length())); - continue; - } - - final String familiesArgKey = "--families="; - if (cmd.startsWith(familiesArgKey)) { - families = cmd.substring(familiesArgKey.length()); - continue; - } - - final String rowPrefixesKey = "--row-prefixes="; - if (cmd.startsWith(rowPrefixesKey)){ - rowPrefixes = cmd.substring(rowPrefixesKey.length()); - continue; - } - - final String delimiterArgKey = "--delimiter="; - if (cmd.startsWith(delimiterArgKey)) { - delimiter = cmd.substring(delimiterArgKey.length()); - continue; - } - - final String sleepToReCompareKey = "--recomparesleep="; - if (cmd.startsWith(sleepToReCompareKey)) { - sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length())); - continue; - } - final String verboseKey = "--verbose"; - if (cmd.startsWith(verboseKey)) { - verbose = true; - continue; - } - - final String sourceSnapshotNameArgKey = "--sourceSnapshotName="; - if (cmd.startsWith(sourceSnapshotNameArgKey)) { - sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length()); - continue; - } - - final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir="; - if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) { - sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length()); - continue; - } - - final String peerSnapshotNameArgKey = "--peerSnapshotName="; - if (cmd.startsWith(peerSnapshotNameArgKey)) { - peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length()); - continue; - } - - final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir="; - if (cmd.startsWith(peerSnapshotTmpDirArgKey)) { - peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length()); - continue; - } - - final String peerFSAddressArgKey = "--peerFSAddress="; - if (cmd.startsWith(peerFSAddressArgKey)) { - peerFSAddress = cmd.substring(peerFSAddressArgKey.length()); - continue; - } - - final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress="; - if (cmd.startsWith(peerHBaseRootAddressArgKey)) { - peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length()); - continue; - } - - if (cmd.startsWith("--")) { - printUsage("Invalid argument '" + cmd + "'"); - return false; - } - - if (i == args.length-2) { - peerId = cmd; - } - - if (i == args.length-1) { - tableName = cmd; - } - } - - if ((sourceSnapshotName != null && sourceSnapshotTmpDir == null) - || (sourceSnapshotName == null && sourceSnapshotTmpDir != null)) { - printUsage("Source snapshot name and snapshot temp location should be provided" - + " to use snapshots in source cluster"); - return false; - } - - if (peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null - || peerHBaseRootAddress != null) { - if (peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null - || peerHBaseRootAddress == null) { - printUsage( - "Peer snapshot name, peer snapshot temp location, Peer HBase root address and " - + "peer FSAddress should be provided to use snapshots in peer cluster"); - return false; - } - } - - // This is to avoid making recompare calls to source/peer tables when snapshots are used - if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) { - printUsage( - "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable"); - return false; - } - - } catch (Exception e) { - e.printStackTrace(); - printUsage("Can't start because " + e.getMessage()); - return false; - } - return true; - } - - /* - * @param errorMsg Error message. Can be null. - */ - private static void printUsage(final String errorMsg) { - if (errorMsg != null && errorMsg.length() > 0) { - System.err.println("ERROR: " + errorMsg); - } - System.err.println("Usage: verifyrep [--starttime=X]" + - " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " + - "[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] " - + "[--peerSnapshotTmpDir=S] [--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid> <tablename>"); - System.err.println(); - System.err.println("Options:"); - System.err.println(" starttime beginning of the time range"); - System.err.println(" without endtime means from starttime to forever"); - System.err.println(" endtime end of the time range"); - System.err.println(" versions number of cell versions to verify"); - System.err.println(" batch batch count for scan, " + - "note that result row counts will no longer be actual number of rows when you use this option"); - System.err.println(" raw includes raw scan if given in options"); - System.err.println(" families comma-separated list of families to copy"); - System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on "); - System.err.println(" delimiter the delimiter used in display around rowkey"); - System.err.println(" recomparesleep milliseconds to sleep before recompare row, " + - "default value is 0 which disables the recompare."); - System.err.println(" verbose logs row keys of good rows"); - System.err.println(" sourceSnapshotName Source Snapshot Name"); - System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot"); - System.err.println(" peerSnapshotName Peer Snapshot Name"); - System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot"); - System.err.println(" peerFSAddress Peer cluster Hadoop FS address"); - System.err.println(" peerHBaseRootAddress Peer cluster HBase root location"); - System.err.println(); - System.err.println("Args:"); - System.err.println(" peerid Id of the peer used for verification, must match the one given for replication"); - System.err.println(" tablename Name of the table to verify"); - System.err.println(); - System.err.println("Examples:"); - System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 "); - System.err.println(" $ hbase " + - "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" + - " --starttime=1265875194289 --endtime=1265878794289 5 TestTable "); - } - - @Override - public int run(String[] args) throws Exception { - Configuration conf = this.getConf(); - Job job = createSubmittableJob(conf, args); - if (job != null) { - return job.waitForCompletion(true) ? 0 : 1; - } - return 1; - } - - /** - * Main entry point. - * - * @param args The command line parameters. - * @throws Exception When running the job fails. - */ - public static void main(String[] args) throws Exception { - int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args); - System.exit(res); - } -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java deleted file mode 100644 index eb9a5f7..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java +++ /dev/null @@ -1,470 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.HDFSBlocksDistribution; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.mapreduce.JobUtil; -import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; -import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.FSTableDescriptors; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.apache.hadoop.util.LineReader; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -/* - * The CompactionTool allows to execute a compaction specifying a: - * <ul> - * <li>table folder (all regions and families will be compacted) - * <li>region folder (all families in the region will be compacted) - * <li>family folder (the store files will be compacted) - * </ul> - */ [email protected](HBaseInterfaceAudience.TOOLS) -public class CompactionTool extends Configured implements Tool { - private static final Log LOG = LogFactory.getLog(CompactionTool.class); - - private final static String CONF_TMP_DIR = "hbase.tmp.dir"; - private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once"; - private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major"; - private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete"; - private final static String CONF_COMPLETE_COMPACTION = "hbase.hstore.compaction.complete"; - - /** - * Class responsible to execute the Compaction on the specified path. - * The path can be a table, region or family directory. - */ - private static class CompactionWorker { - private final boolean keepCompactedFiles; - private final boolean deleteCompacted; - private final Configuration conf; - private final FileSystem fs; - private final Path tmpDir; - - public CompactionWorker(final FileSystem fs, final Configuration conf) { - this.conf = conf; - this.keepCompactedFiles = !conf.getBoolean(CONF_COMPLETE_COMPACTION, true); - this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false); - this.tmpDir = new Path(conf.get(CONF_TMP_DIR)); - this.fs = fs; - } - - /** - * Execute the compaction on the specified path. - * - * @param path Directory path on which to run compaction. - * @param compactOnce Execute just a single step of compaction. - * @param major Request major compaction. - */ - public void compact(final Path path, final boolean compactOnce, final boolean major) throws IOException { - if (isFamilyDir(fs, path)) { - Path regionDir = path.getParent(); - Path tableDir = regionDir.getParent(); - TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir); - HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir); - compactStoreFiles(tableDir, htd, hri, - path.getName(), compactOnce, major); - } else if (isRegionDir(fs, path)) { - Path tableDir = path.getParent(); - TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir); - compactRegion(tableDir, htd, path, compactOnce, major); - } else if (isTableDir(fs, path)) { - compactTable(path, compactOnce, major); - } else { - throw new IOException( - "Specified path is not a table, region or family directory. path=" + path); - } - } - - private void compactTable(final Path tableDir, final boolean compactOnce, final boolean major) - throws IOException { - TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir); - for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) { - compactRegion(tableDir, htd, regionDir, compactOnce, major); - } - } - - private void compactRegion(final Path tableDir, final TableDescriptor htd, - final Path regionDir, final boolean compactOnce, final boolean major) - throws IOException { - HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir); - for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) { - compactStoreFiles(tableDir, htd, hri, familyDir.getName(), compactOnce, major); - } - } - - /** - * Execute the actual compaction job. - * If the compact once flag is not specified, execute the compaction until - * no more compactions are needed. Uses the Configuration settings provided. - */ - private void compactStoreFiles(final Path tableDir, final TableDescriptor htd, - final HRegionInfo hri, final String familyName, final boolean compactOnce, - final boolean major) throws IOException { - HStore store = getStore(conf, fs, tableDir, htd, hri, familyName, tmpDir); - LOG.info("Compact table=" + htd.getTableName() + - " region=" + hri.getRegionNameAsString() + - " family=" + familyName); - if (major) { - store.triggerMajorCompaction(); - } - do { - CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null); - if (compaction == null) break; - List<StoreFile> storeFiles = - store.compact(compaction, NoLimitThroughputController.INSTANCE); - if (storeFiles != null && !storeFiles.isEmpty()) { - if (keepCompactedFiles && deleteCompacted) { - for (StoreFile storeFile: storeFiles) { - fs.delete(storeFile.getPath(), false); - } - } - } - } while (store.needsCompaction() && !compactOnce); - } - - /** - * Create a "mock" HStore that uses the tmpDir specified by the user and - * the store dir to compact as source. - */ - private static HStore getStore(final Configuration conf, final FileSystem fs, - final Path tableDir, final TableDescriptor htd, final HRegionInfo hri, - final String familyName, final Path tempDir) throws IOException { - HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri) { - @Override - public Path getTempDir() { - return tempDir; - } - }; - HRegion region = new HRegion(regionFs, null, conf, htd, null); - return new HStore(region, htd.getColumnFamily(Bytes.toBytes(familyName)), conf); - } - } - - private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException { - Path regionInfo = new Path(path, HRegionFileSystem.REGION_INFO_FILE); - return fs.exists(regionInfo); - } - - private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException { - return FSTableDescriptors.getTableInfoPath(fs, path) != null; - } - - private static boolean isFamilyDir(final FileSystem fs, final Path path) throws IOException { - return isRegionDir(fs, path.getParent()); - } - - private static class CompactionMapper - extends Mapper<LongWritable, Text, NullWritable, NullWritable> { - private CompactionWorker compactor = null; - private boolean compactOnce = false; - private boolean major = false; - - @Override - public void setup(Context context) { - Configuration conf = context.getConfiguration(); - compactOnce = conf.getBoolean(CONF_COMPACT_ONCE, false); - major = conf.getBoolean(CONF_COMPACT_MAJOR, false); - - try { - FileSystem fs = FileSystem.get(conf); - this.compactor = new CompactionWorker(fs, conf); - } catch (IOException e) { - throw new RuntimeException("Could not get the input FileSystem", e); - } - } - - @Override - public void map(LongWritable key, Text value, Context context) - throws InterruptedException, IOException { - Path path = new Path(value.toString()); - this.compactor.compact(path, compactOnce, major); - } - } - - /** - * Input format that uses store files block location as input split locality. - */ - private static class CompactionInputFormat extends TextInputFormat { - @Override - protected boolean isSplitable(JobContext context, Path file) { - return true; - } - - /** - * Returns a split for each store files directory using the block location - * of each file as locality reference. - */ - @Override - public List<InputSplit> getSplits(JobContext job) throws IOException { - List<InputSplit> splits = new ArrayList<>(); - List<FileStatus> files = listStatus(job); - - Text key = new Text(); - for (FileStatus file: files) { - Path path = file.getPath(); - FileSystem fs = path.getFileSystem(job.getConfiguration()); - LineReader reader = new LineReader(fs.open(path)); - long pos = 0; - int n; - try { - while ((n = reader.readLine(key)) > 0) { - String[] hosts = getStoreDirHosts(fs, path); - splits.add(new FileSplit(path, pos, n, hosts)); - pos += n; - } - } finally { - reader.close(); - } - } - - return splits; - } - - /** - * return the top hosts of the store files, used by the Split - */ - private static String[] getStoreDirHosts(final FileSystem fs, final Path path) - throws IOException { - FileStatus[] files = FSUtils.listStatus(fs, path); - if (files == null) { - return new String[] {}; - } - - HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); - for (FileStatus hfileStatus: files) { - HDFSBlocksDistribution storeFileBlocksDistribution = - FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, hfileStatus.getLen()); - hdfsBlocksDistribution.add(storeFileBlocksDistribution); - } - - List<String> hosts = hdfsBlocksDistribution.getTopHosts(); - return hosts.toArray(new String[hosts.size()]); - } - - /** - * Create the input file for the given directories to compact. - * The file is a TextFile with each line corrisponding to a - * store files directory to compact. - */ - public static void createInputFile(final FileSystem fs, final Path path, - final Set<Path> toCompactDirs) throws IOException { - // Extract the list of store dirs - List<Path> storeDirs = new LinkedList<>(); - for (Path compactDir: toCompactDirs) { - if (isFamilyDir(fs, compactDir)) { - storeDirs.add(compactDir); - } else if (isRegionDir(fs, compactDir)) { - for (Path familyDir: FSUtils.getFamilyDirs(fs, compactDir)) { - storeDirs.add(familyDir); - } - } else if (isTableDir(fs, compactDir)) { - // Lookup regions - for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) { - for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) { - storeDirs.add(familyDir); - } - } - } else { - throw new IOException( - "Specified path is not a table, region or family directory. path=" + compactDir); - } - } - - // Write Input File - FSDataOutputStream stream = fs.create(path); - LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact."); - try { - final byte[] newLine = Bytes.toBytes("\n"); - for (Path storeDir: storeDirs) { - stream.write(Bytes.toBytes(storeDir.toString())); - stream.write(newLine); - } - } finally { - stream.close(); - } - } - } - - /** - * Execute compaction, using a Map-Reduce job. - */ - private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs, - final boolean compactOnce, final boolean major) throws Exception { - Configuration conf = getConf(); - conf.setBoolean(CONF_COMPACT_ONCE, compactOnce); - conf.setBoolean(CONF_COMPACT_MAJOR, major); - - Job job = new Job(conf); - job.setJobName("CompactionTool"); - job.setJarByClass(CompactionTool.class); - job.setMapperClass(CompactionMapper.class); - job.setInputFormatClass(CompactionInputFormat.class); - job.setOutputFormatClass(NullOutputFormat.class); - job.setMapSpeculativeExecution(false); - job.setNumReduceTasks(0); - - // add dependencies (including HBase ones) - TableMapReduceUtil.addDependencyJars(job); - - Path stagingDir = JobUtil.getStagingDir(conf); - try { - // Create input file with the store dirs - Path inputPath = new Path(stagingDir, "compact-"+ EnvironmentEdgeManager.currentTime()); - CompactionInputFormat.createInputFile(fs, inputPath, toCompactDirs); - CompactionInputFormat.addInputPath(job, inputPath); - - // Initialize credential for secure cluster - TableMapReduceUtil.initCredentials(job); - - // Start the MR Job and wait - return job.waitForCompletion(true) ? 0 : 1; - } finally { - fs.delete(stagingDir, true); - } - } - - /** - * Execute compaction, from this client, one path at the time. - */ - private int doClient(final FileSystem fs, final Set<Path> toCompactDirs, - final boolean compactOnce, final boolean major) throws IOException { - CompactionWorker worker = new CompactionWorker(fs, getConf()); - for (Path path: toCompactDirs) { - worker.compact(path, compactOnce, major); - } - return 0; - } - - @Override - public int run(String[] args) throws Exception { - Set<Path> toCompactDirs = new HashSet<>(); - boolean compactOnce = false; - boolean major = false; - boolean mapred = false; - - Configuration conf = getConf(); - FileSystem fs = FileSystem.get(conf); - - try { - for (int i = 0; i < args.length; ++i) { - String opt = args[i]; - if (opt.equals("-compactOnce")) { - compactOnce = true; - } else if (opt.equals("-major")) { - major = true; - } else if (opt.equals("-mapred")) { - mapred = true; - } else if (!opt.startsWith("-")) { - Path path = new Path(opt); - FileStatus status = fs.getFileStatus(path); - if (!status.isDirectory()) { - printUsage("Specified path is not a directory. path=" + path); - return 1; - } - toCompactDirs.add(path); - } else { - printUsage(); - } - } - } catch (Exception e) { - printUsage(e.getMessage()); - return 1; - } - - if (toCompactDirs.isEmpty()) { - printUsage("No directories to compact specified."); - return 1; - } - - // Execute compaction! - if (mapred) { - return doMapReduce(fs, toCompactDirs, compactOnce, major); - } else { - return doClient(fs, toCompactDirs, compactOnce, major); - } - } - - private void printUsage() { - printUsage(null); - } - - private void printUsage(final String message) { - if (message != null && message.length() > 0) { - System.err.println(message); - } - System.err.println("Usage: java " + this.getClass().getName() + " \\"); - System.err.println(" [-compactOnce] [-major] [-mapred] [-D<property=value>]* files..."); - System.err.println(); - System.err.println("Options:"); - System.err.println(" mapred Use MapReduce to run compaction."); - System.err.println(" compactOnce Execute just one compaction step. (default: while needed)"); - System.err.println(" major Trigger major compaction."); - System.err.println(); - System.err.println("Note: -D properties will be applied to the conf used. "); - System.err.println("For example: "); - System.err.println(" To preserve input files, pass -D"+CONF_COMPLETE_COMPACTION+"=false"); - System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false"); - System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR"); - System.err.println(); - System.err.println("Examples:"); - System.err.println(" To compact the full 'TestTable' using MapReduce:"); - System.err.println(" $ hbase " + this.getClass().getName() + " -mapred hdfs:///hbase/data/default/TestTable"); - System.err.println(); - System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':"); - System.err.println(" $ hbase " + this.getClass().getName() + " hdfs:///hbase/data/default/TestTable/abc/x"); - } - - public static void main(String[] args) throws Exception { - System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args)); - } -}
