Author: jdcryans
Date: Fri Oct 29 00:09:56 2010
New Revision: 1028558
URL: http://svn.apache.org/viewvc?rev=1028558&view=rev
Log:
HBASE-3013 Tool to verify data in two clusters
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Result.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
hbase/trunk/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
Modified: hbase/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1028558&r1=1028557&r2=1028558&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Oct 29 00:09:56 2010
@@ -1108,6 +1108,7 @@ Release 0.21.0 - Unreleased
test writing
HBASE-2201 JRuby shell for replication
HBASE-2946 Increment multiple columns in a row at once
+ HBASE-3013 Tool to verify data in two clusters
OPTIMIZATIONS
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Result.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Result.java?rev=1028558&r1=1028557&r2=1028558&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Result.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Result.java Fri
Oct 29 00:09:56 2010
@@ -615,4 +615,31 @@ public class Result implements Writable
}
return results;
}
+
+ /**
+ * Does a deep comparison of two Results, down to the byte arrays.
+ * @param res1 first result to compare
+ * @param res2 second result to compare
+ * @throws Exception Every difference is throwing an exception
+ */
+ public static void compareResults(Result res1, Result res2)
+ throws Exception {
+ if (res2 == null) {
+ throw new Exception("There wasn't enough rows, we stopped at "
+ + Bytes.toString(res1.getRow()));
+ }
+ if (res1.size() != res2.size()) {
+ throw new Exception("This row doesn't have the same number of KVs: "
+ + res1.toString() + " compared to " + res2.toString());
+ }
+ KeyValue[] ourKVs = res1.sorted();
+ KeyValue[] replicatedKVs = res2.sorted();
+ for (int i = 0; i < res1.size(); i++) {
+ if (!ourKVs[i].equals(replicatedKVs[i]) &&
+ !Bytes.equals(ourKVs[i].getValue(), replicatedKVs[i].getValue())) {
+ throw new Exception("This result was different: "
+ + res1.toString() + " compared to " + res2.toString());
+ }
+ }
+ }
}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java?rev=1028558&r1=1028557&r2=1028558&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java Fri
Oct 29 00:09:56 2010
@@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.mapreduce;
+import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
import org.apache.hadoop.util.ProgramDriver;
/**
@@ -41,6 +42,10 @@ public class Driver {
"Complete a bulk data load.");
pgd.addClass(CopyTable.NAME, CopyTable.class,
"Export a table from local cluster to peer cluster");
+ pgd.addClass(VerifyReplication.NAME, VerifyReplication.class, "Compare" +
+ " the data from tables in two different clusters. WARNING: It" +
+ " doesn't work for incrementColumnValues'd cells since the" +
+ " timestamp is changed after being appended to the log.");
pgd.driver(args);
}
}
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java?rev=1028558&view=auto
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
(added)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
Fri Oct 29 00:09:56 2010
@@ -0,0 +1,280 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce.replication;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This map-only job compares the data from a local table with a remote one.
+ * Every cell is compared and must have exactly the same keys (even timestamp)
+ * as well as same value. It is possible to restrict the job by time range and
+ * families. The peer id that's provided must match the one given when the
+ * replication stream was setup.
+ * <p>
+ * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The
reason
+ * for a why a row is different is shown in the map's log.
+ */
+public class VerifyReplication {
+
+ private static final Log LOG =
+ LogFactory.getLog(VerifyReplication.class);
+
+ public final static String NAME = "verifyrep";
+ static long startTime = 0;
+ static long endTime = 0;
+ static String tableName = null;
+ static String families = null;
+ static String peerId = null;
+
+ /**
+ * Map-only comparator for 2 tables
+ */
+ public static class Verifier
+ extends TableMapper<ImmutableBytesWritable, Put> {
+
+ public static enum Counters {GOODROWS, BADROWS}
+
+ private ResultScanner replicatedScanner;
+
+ /**
+ * Map method that compares every scanned row with the equivalent from
+ * a distant cluster.
+ * @param row The current table row key.
+ * @param value The columns.
+ * @param context The current context.
+ * @throws IOException When something is broken with the data.
+ */
+ @Override
+ public void map(ImmutableBytesWritable row, Result value,
+ Context context)
+ throws IOException {
+ if (replicatedScanner == null) {
+ Configuration conf = context.getConfiguration();
+ Scan scan = new Scan();
+ scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
+ long startTime = conf.getLong(NAME + ".startTime", 0);
+ long endTime = conf.getLong(NAME + ".endTime", 0);
+ String families = conf.get(NAME + ".families", null);
+ if(families != null) {
+ String[] fams = families.split(",");
+ for(String fam : fams) {
+ scan.addFamily(Bytes.toBytes(fam));
+ }
+ }
+ if (startTime != 0) {
+ scan.setTimeRange(startTime,
+ endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
+ }
+ try {
+ ReplicationZookeeper zk = new ReplicationZookeeper(conf,
+ HConnectionManager.getConnection(conf).
+ getZooKeeperWatcher());
+ ReplicationPeer peer = zk.getPeer(conf.get(NAME+".peerId"));
+ HTable replicatedTable = new HTable(peer.getConfiguration(),
+ conf.get(NAME+".tableName"));
+ scan.setStartRow(value.getRow());
+ replicatedScanner = replicatedTable.getScanner(scan);
+ } catch (KeeperException e) {
+ throw new IOException("Got a ZK exception", e);
+ }
+ }
+ Result res = replicatedScanner.next();
+ try {
+ Result.compareResults(value, res);
+ context.getCounter(Counters.GOODROWS).increment(1);
+ } catch (Exception e) {
+ LOG.warn("Bad row", e);
+ context.getCounter(Counters.BADROWS).increment(1);
+ }
+ }
+
+ protected void cleanup(Context context) {
+ replicatedScanner.close();
+ }
+ }
+
+ /**
+ * Sets up the actual job.
+ *
+ * @param conf The current configuration.
+ * @param args The command line parameters.
+ * @return The newly created job.
+ * @throws java.io.IOException When setting up the job fails.
+ */
+ public static Job createSubmittableJob(Configuration conf, String[] args)
+ throws IOException {
+ if (!doCommandLine(args)) {
+ return null;
+ }
+ if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
+ throw new IOException("Replication needs to be enabled to verify it.");
+ }
+ try {
+ ReplicationZookeeper zk = new ReplicationZookeeper(conf,
+ HConnectionManager.getConnection(conf).getZooKeeperWatcher());
+ // Just verifying it we can connect
+ ReplicationPeer peer = zk.getPeer(peerId);
+ if (peer == null) {
+ throw new IOException("Couldn't get access to the slave cluster," +
+ "please see the log");
+ }
+ } catch (KeeperException ex) {
+ throw new IOException("Couldn't get access to the slave cluster" +
+ " because: ", ex);
+ }
+ conf.set(NAME+".peerId", peerId);
+ conf.set(NAME+".tableName", tableName);
+ conf.setLong(NAME+".startTime", startTime);
+ conf.setLong(NAME+".endTime", endTime);
+ if (families != null) {
+ conf.set(NAME+".families", families);
+ }
+ Job job = new Job(conf, NAME + "_" + tableName);
+ job.setJarByClass(VerifyReplication.class);
+
+ Scan scan = new Scan();
+ if (startTime != 0) {
+ scan.setTimeRange(startTime,
+ endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
+ }
+ if(families != null) {
+ String[] fams = families.split(",");
+ for(String fam : fams) {
+ scan.addFamily(Bytes.toBytes(fam));
+ }
+ }
+ TableMapReduceUtil.initTableMapperJob(tableName, scan,
+ Verifier.class, null, null, job);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.setNumReduceTasks(0);
+ return job;
+ }
+
+ private static boolean doCommandLine(final String[] args) {
+ if (args.length < 2) {
+ printUsage(null);
+ return false;
+ }
+ try {
+ for (int i = 0; i < args.length; i++) {
+ String cmd = args[i];
+ if (cmd.equals("-h") || cmd.startsWith("--h")) {
+ printUsage(null);
+ return false;
+ }
+
+ final String startTimeArgKey = "--starttime=";
+ if (cmd.startsWith(startTimeArgKey)) {
+ startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
+ continue;
+ }
+
+ final String endTimeArgKey = "--endtime=";
+ if (cmd.startsWith(endTimeArgKey)) {
+ endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
+ continue;
+ }
+
+ final String familiesArgKey = "--families=";
+ if (cmd.startsWith(familiesArgKey)) {
+ families = cmd.substring(familiesArgKey.length());
+ continue;
+ }
+
+ if (i == args.length-2) {
+ peerId = cmd;
+ }
+
+ if (i == args.length-1) {
+ tableName = cmd;
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ printUsage("Can't start because " + e.getMessage());
+ return false;
+ }
+ return true;
+ }
+
+ /*
+ * @param errorMsg Error message. Can be null.
+ */
+ private static void printUsage(final String errorMsg) {
+ if (errorMsg != null && errorMsg.length() > 0) {
+ System.err.println("ERROR: " + errorMsg);
+ }
+ System.err.println("Usage: verifyrep [--starttime=X]" +
+ " [--stoptime=Y] [--families=A] <peerid> <tablename>");
+ System.err.println();
+ System.err.println("Options:");
+ System.err.println(" starttime beginning of the time range");
+ System.err.println(" without endtime means from starttime to
forever");
+ System.err.println(" stoptime end of the time range");
+ System.err.println(" families comma-separated list of families to
copy");
+ System.err.println();
+ System.err.println("Args:");
+ System.err.println(" peerid Id of the peer used for verification,
must match the one given for replication");
+ System.err.println(" tablename Name of the table to verify");
+ System.err.println();
+ System.err.println("Examples:");
+ System.err.println(" To verify the data replicated from TestTable for a 1
hour window with peer #5 ");
+ System.err.println(" $ bin/hbase " +
+ "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" +
+ " --starttime=1265875194289 --stoptime=1265878794289 5 TestTable ");
+ }
+
+ /**
+ * Main entry point.
+ *
+ * @param args The command line parameters.
+ * @throws Exception When running the job fails.
+ */
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ Job job = createSubmittableJob(conf, args);
+ if (job != null) {
+ System.exit(job.waitForCompletion(true) ? 0 : 1);
+ }
+ }
+}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1028558&r1=1028557&r2=1028558&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
Fri Oct 29 00:09:56 2010
@@ -265,7 +265,7 @@ public class ReplicationZookeeper {
* @throws IOException
* @throws KeeperException
*/
- private ReplicationPeer getPeer(String peerId) throws IOException,
KeeperException{
+ public ReplicationPeer getPeer(String peerId) throws IOException,
KeeperException{
String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
byte [] data = ZKUtil.getData(this.zookeeper, znode);
String otherClusterKey = Bytes.toString(data);
Modified:
hbase/trunk/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html?rev=1028558&r1=1028557&r2=1028558&view=diff
==============================================================================
---
hbase/trunk/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html
(original)
+++
hbase/trunk/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html
Fri Oct 29 00:09:56 2010
@@ -31,6 +31,7 @@ This package provides replication betwee
<li><a href="#status">Status</a></li>
<li><a href="#requirements">Requirements</a></li>
<li><a href="#deployment">Deployment</a></li>
+ <li><a href="#verify">Verifying Replicated Data</a></li>
</ol>
<p>
@@ -49,6 +50,7 @@ features:
<li>Supports clusters of different sizes.</li>
<li>Handling of partitions longer than 10 minutes.</li>
<li>Ability to add/remove slave clusters at runtime.</li>
+ <li>MapReduce job to compare tables on two clusters</li>
</ol>
Please report bugs on the project's Jira when found.
<p>
@@ -122,5 +124,19 @@ issued that command but new entries won'
<p>
+<a name="verify">
+<h2>Verifying Replicated Data</h2>
+</a>
+<p>
+Verifying the replicated data on two clusters is easy to do in the shell when
+looking only at a few rows, but doing a systematic comparison requires more
+computing power. This is why the VerifyReplication MR job was created, it has
+to be run on the master cluster and needs to be provided with a peer id (the
+one provided when establishing a replication stream) and a table name. Other
+options let you specify a time range and specific families. This job's short
+name is "verifyrep" and needs to be provided when pointing "hadoop jar" to the
+hbase jar.
+</p>
+
</body>
</html>
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java?rev=1028558&r1=1028557&r2=1028558&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
Fri Oct 29 00:09:56 2010
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HBaseTest
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
@@ -41,10 +42,11 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.mapreduce.Job;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -455,6 +457,56 @@ public class TestReplication {
}
/**
+ * Do a small loading into a table, make sure the data is really the same,
+ * then run the VerifyReplication job to check the results. Do a second
+ * comparison where all the cells are different.
+ * @throws Exception
+ */
+ @Test
+ public void testVerifyRepJob() throws Exception {
+ // Populate the tables, at the same time it guarantees that the tables are
+ // identical since it does the check
+ testSmallBatch();
+
+ String[] args = new String[] {"2", Bytes.toString(tableName)};
+ Job job = VerifyReplication.createSubmittableJob(conf1, args);
+ if (job == null) {
+ fail("Job wasn't created, see the log");
+ }
+ if (!job.waitForCompletion(true)) {
+ fail("Job failed, see the log");
+ }
+ assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
+ findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
+ assertEquals(0, job.getCounters().
+ findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+
+ Scan scan = new Scan();
+ ResultScanner rs = htable2.getScanner(scan);
+ Put put = null;
+ for (Result result : rs) {
+ put = new Put(result.getRow());
+ KeyValue firstVal = result.raw()[0];
+ put.add(firstVal.getFamily(),
+ firstVal.getQualifier(), Bytes.toBytes("diff data"));
+ htable2.put(put);
+ }
+ Delete delete = new Delete(put.getRow());
+ htable2.delete(delete);
+ job = VerifyReplication.createSubmittableJob(conf1, args);
+ if (job == null) {
+ fail("Job wasn't created, see the log");
+ }
+ if (!job.waitForCompletion(true)) {
+ fail("Job failed, see the log");
+ }
+ assertEquals(0, job.getCounters().
+
findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
+ assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
+
findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+ }
+
+ /**
* Load up multiple tables over 2 region servers and kill a source during
* the upload. The failover happens internally.
* @throws Exception