Repository: hbase
Updated Branches:
  refs/heads/0.98 93871c670 -> a3cfd5233


HBASE-10153 improve VerifyReplication to compute BADROWS more accurately 
(Jianwei)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a3cfd523
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a3cfd523
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a3cfd523

Branch: refs/heads/0.98
Commit: a3cfd5233dfbfdd57ac445acd0886df2f8bae895
Parents: 93871c6
Author: Ted Yu <te...@apache.org>
Authored: Fri Oct 3 03:27:24 2014 +0000
Committer: Ted Yu <te...@apache.org>
Committed: Fri Oct 3 03:27:24 2014 +0000

----------------------------------------------------------------------
 .../replication/VerifyReplication.java          | 67 ++++++++++++++++----
 1 file changed, 55 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a3cfd523/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index bb17e64..1088688 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
@@ -78,9 +79,11 @@ public class VerifyReplication extends Configured implements 
Tool {
   public static class Verifier
       extends TableMapper<ImmutableBytesWritable, Put> {
 
-    public static enum Counters {GOODROWS, BADROWS}
+    public static enum Counters {
+      GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, 
CONTENT_DIFFERENT_ROWS}
 
     private ResultScanner replicatedScanner;
+    private Result currentCompareRowInPeerTable;
 
     /**
      * Map method that compares every scanned row with the equivalent from
@@ -111,6 +114,8 @@ public class VerifyReplication extends Configured 
implements Tool {
         if (versions >= 0) {
           scan.setMaxVersions(versions);
         }
+
+        final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
         HConnectionManager.execute(new HConnectable<Void>(conf) {
           @Override
           public Void connect(HConnection conn) throws IOException {
@@ -119,26 +124,64 @@ public class VerifyReplication extends Configured 
implements Tool {
             ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey);
 
             HTable replicatedTable = new HTable(peerConf, conf.get(NAME + 
".tableName"));
-            scan.setStartRow(value.getRow());
+            scan.setStartRow(tableSplit.getStartRow());
+            scan.setStopRow(tableSplit.getEndRow());
             replicatedScanner = replicatedTable.getScanner(scan);
             return null;
           }
         });
+        currentCompareRowInPeerTable = replicatedScanner.next();
       }
-      Result res = replicatedScanner.next();
-      try {
-        Result.compareResults(value, res);
-        context.getCounter(Counters.GOODROWS).increment(1);
-      } catch (Exception e) {
-        LOG.warn("Bad row", e);
-        context.getCounter(Counters.BADROWS).increment(1);
+      while (true) {
+        if (currentCompareRowInPeerTable == null) {
+          // reach the region end of peer table, row only in source table
+          logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
+          break;
+        }
+        int rowCmpRet = Bytes.compareTo(value.getRow(), 
currentCompareRowInPeerTable.getRow());
+        if (rowCmpRet == 0) {
+          // rowkey is same, need to compare the content of the row
+          try {
+            Result.compareResults(value, currentCompareRowInPeerTable);
+            context.getCounter(Counters.GOODROWS).increment(1);
+          } catch (Exception e) {
+            logFailRowAndIncreaseCounter(context, 
Counters.CONTENT_DIFFERENT_ROWS, value);
+          }
+          currentCompareRowInPeerTable = replicatedScanner.next();
+          break;
+        } else if (rowCmpRet < 0) {
+          // row only exists in source table
+          logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
+          break;
+        } else {
+          // row only exists in peer table
+          logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_PEER_TABLE_ROWS,
+            currentCompareRowInPeerTable);
+          currentCompareRowInPeerTable = replicatedScanner.next();
+        }
       }
     }
-
+    
+    private void logFailRowAndIncreaseCounter(Context context, Counters 
counter, Result row) {
+      context.getCounter(counter).increment(1);
+      context.getCounter(Counters.BADROWS).increment(1);
+      LOG.error(counter.toString() + ", rowkey=" + 
Bytes.toString(row.getRow()));
+    }
+    
     protected void cleanup(Context context) {
       if (replicatedScanner != null) {
-        replicatedScanner.close();
-        replicatedScanner = null;
+        try {
+          while (currentCompareRowInPeerTable != null) {
+            logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_PEER_TABLE_ROWS,
+              currentCompareRowInPeerTable);
+            currentCompareRowInPeerTable = replicatedScanner.next();
+          }
+        } catch (Exception e) {
+          LOG.error("fail to scan peer table in cleanup", e);
+        } finally {
+          replicatedScanner.close();
+          replicatedScanner = null;
+        }
       }
     }
   }

Reply via email to