Repository: hbase Updated Branches: refs/heads/0.98 93871c670 -> a3cfd5233
HBASE-10153 improve VerifyReplication to compute BADROWS more accurately (Jianwei) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a3cfd523 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a3cfd523 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a3cfd523 Branch: refs/heads/0.98 Commit: a3cfd5233dfbfdd57ac445acd0886df2f8bae895 Parents: 93871c6 Author: Ted Yu <te...@apache.org> Authored: Fri Oct 3 03:27:24 2014 +0000 Committer: Ted Yu <te...@apache.org> Committed: Fri Oct 3 03:27:24 2014 +0000 ---------------------------------------------------------------------- .../replication/VerifyReplication.java | 67 ++++++++++++++++---- 1 file changed, 55 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/a3cfd523/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index bb17e64..1088688 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.hbase.mapreduce.TableSplit; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeer; @@ -78,9 +79,11 @@ public class VerifyReplication extends Configured implements Tool { public static class Verifier extends TableMapper<ImmutableBytesWritable, Put> { - public static enum Counters {GOODROWS, BADROWS} + public static enum Counters { + GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS} private ResultScanner replicatedScanner; + private Result currentCompareRowInPeerTable; /** * Map method that compares every scanned row with the equivalent from @@ -111,6 +114,8 @@ public class VerifyReplication extends Configured implements Tool { if (versions >= 0) { scan.setMaxVersions(versions); } + + final TableSplit tableSplit = (TableSplit)(context.getInputSplit()); HConnectionManager.execute(new HConnectable<Void>(conf) { @Override public Void connect(HConnection conn) throws IOException { @@ -119,26 +124,64 @@ public class VerifyReplication extends Configured implements Tool { ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey); HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName")); - scan.setStartRow(value.getRow()); + scan.setStartRow(tableSplit.getStartRow()); + scan.setStopRow(tableSplit.getEndRow()); replicatedScanner = replicatedTable.getScanner(scan); return null; } }); + currentCompareRowInPeerTable = replicatedScanner.next(); } - Result res = replicatedScanner.next(); - try { - Result.compareResults(value, res); - context.getCounter(Counters.GOODROWS).increment(1); - } catch (Exception e) { - LOG.warn("Bad row", e); - context.getCounter(Counters.BADROWS).increment(1); + while (true) { + if (currentCompareRowInPeerTable == null) { + // reach the region end of peer table, row only in source table + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); + break; + } + int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow()); + if (rowCmpRet == 0) { + // rowkey is same, need to compare the content of the row + try { + Result.compareResults(value, currentCompareRowInPeerTable); + context.getCounter(Counters.GOODROWS).increment(1); + } catch (Exception e) { + logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value); + } + currentCompareRowInPeerTable = replicatedScanner.next(); + break; + } else if (rowCmpRet < 0) { + // row only exists in source table + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); + break; + } else { + // row only exists in peer table + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, + currentCompareRowInPeerTable); + currentCompareRowInPeerTable = replicatedScanner.next(); + } } } - + + private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) { + context.getCounter(counter).increment(1); + context.getCounter(Counters.BADROWS).increment(1); + LOG.error(counter.toString() + ", rowkey=" + Bytes.toString(row.getRow())); + } + protected void cleanup(Context context) { if (replicatedScanner != null) { - replicatedScanner.close(); - replicatedScanner = null; + try { + while (currentCompareRowInPeerTable != null) { + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, + currentCompareRowInPeerTable); + currentCompareRowInPeerTable = replicatedScanner.next(); + } + } catch (Exception e) { + LOG.error("fail to scan peer table in cleanup", e); + } finally { + replicatedScanner.close(); + replicatedScanner = null; + } } } }