Repository: hbase
Updated Branches:
  refs/heads/master 191afc8eb -> 946c1ed8f


HBASE-16423 Add re-compare option to VerifyReplication to avoid occasional 
inconsistent rows (Jianwei Cui)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/946c1ed8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/946c1ed8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/946c1ed8

Branch: refs/heads/master
Commit: 946c1ed8f89967b1f036ee3b0dcc296082eee487
Parents: 191afc8
Author: tedyu <yuzhih...@gmail.com>
Authored: Thu Sep 22 21:01:22 2016 -0700
Committer: tedyu <yuzhih...@gmail.com>
Committed: Thu Sep 22 21:01:22 2016 -0700

----------------------------------------------------------------------
 .../replication/VerifyReplication.java          | 62 +++++++++++++++++---
 1 file changed, 54 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/946c1ed8/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index c4dd3ad..04ae18f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -51,6 +52,7 @@ import 
org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
@@ -83,6 +85,7 @@ public class VerifyReplication extends Configured implements 
Tool {
   static String delimiter = "";
   static String peerId = null;
   static String rowPrefixes = null;
+  static int sleepMsBeforeReCompare = 0;
 
   private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
 
@@ -97,10 +100,13 @@ public class VerifyReplication extends Configured 
implements Tool {
     public static enum Counters {
       GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, 
CONTENT_DIFFERENT_ROWS}
 
-    private Connection connection;
+    private Connection sourceConnection;
+    private Table sourceTable;
+    private Connection replicatedConnection;
     private Table replicatedTable;
     private ResultScanner replicatedScanner;
     private Result currentCompareRowInPeerTable;
+    private int sleepMsBeforeReCompare;
 
     /**
      * Map method that compares every scanned row with the equivalent from
@@ -116,6 +122,7 @@ public class VerifyReplication extends Configured 
implements Tool {
         throws IOException {
       if (replicatedScanner == null) {
         Configuration conf = context.getConfiguration();
+        sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 
0);
         final Scan scan = new Scan();
         scan.setBatch(batch);
         scan.setCacheBlocks(false);
@@ -137,6 +144,9 @@ public class VerifyReplication extends Configured 
implements Tool {
         if (versions >= 0) {
           scan.setMaxVersions(versions);
         }
+        TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
+        sourceConnection = ConnectionFactory.createConnection(conf);
+        sourceTable = sourceConnection.getTable(tableName);
 
         final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
 
@@ -144,9 +154,8 @@ public class VerifyReplication extends Configured 
implements Tool {
         Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
             zkClusterKey, PEER_CONFIG_PREFIX);
 
-        TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
-        connection = ConnectionFactory.createConnection(peerConf);
-        replicatedTable = connection.getTable(tableName);
+        replicatedConnection = ConnectionFactory.createConnection(peerConf);
+        replicatedTable = replicatedConnection.getTable(tableName);
         scan.setStartRow(value.getRow());
         scan.setStopRow(tableSplit.getEndRow());
         replicatedScanner = replicatedTable.getScanner(scan);
@@ -184,6 +193,18 @@ public class VerifyReplication extends Configured 
implements Tool {
     }
 
     private void logFailRowAndIncreaseCounter(Context context, Counters 
counter, Result row) {
+      if (sleepMsBeforeReCompare > 0) {
+        Threads.sleep(sleepMsBeforeReCompare);
+        try {
+          Result sourceResult = sourceTable.get(new Get(row.getRow()));
+          Result replicatedResult = replicatedTable.get(new Get(row.getRow()));
+          Result.compareResults(sourceResult, replicatedResult);
+          return;
+        } catch (Exception e) {
+          LOG.error("recompare fail after sleep, rowkey=" + delimiter +
+              Bytes.toString(row.getRow()) + delimiter);
+        }
+      }
       context.getCounter(counter).increment(1);
       context.getCounter(Counters.BADROWS).increment(1);
       LOG.error(counter.toString() + ", rowkey=" + delimiter + 
Bytes.toString(row.getRow()) +
@@ -206,18 +227,34 @@ public class VerifyReplication extends Configured 
implements Tool {
           replicatedScanner = null;
         }
       }
+
+      if (sourceTable != null) {
+        try {
+          sourceTable.close();
+        } catch (IOException e) {
+          LOG.error("fail to close source table in cleanup", e);
+        }
+      }
+      if(sourceConnection != null){
+        try {
+          sourceConnection.close();
+        } catch (Exception e) {
+          LOG.error("fail to close source connection in cleanup", e);
+        }
+      }
+
       if(replicatedTable != null){
         try{
           replicatedTable.close();
         } catch (Exception e) {
-          LOG.error("fail to close table in cleanup", e);
+          LOG.error("fail to close replicated table in cleanup", e);
         }
       }
-      if(connection != null){
+      if(replicatedConnection != null){
         try {
-          connection.close();
+          replicatedConnection.close();
         } catch (Exception e) {
-          LOG.error("fail to close connection in cleanup", e);
+          LOG.error("fail to close replicated connection in cleanup", e);
         }
       }
     }
@@ -273,6 +310,7 @@ public class VerifyReplication extends Configured 
implements Tool {
     conf.set(NAME+".tableName", tableName);
     conf.setLong(NAME+".startTime", startTime);
     conf.setLong(NAME+".endTime", endTime);
+    conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare);
     if (families != null) {
       conf.set(NAME+".families", families);
     }
@@ -408,6 +446,12 @@ public class VerifyReplication extends Configured 
implements Tool {
           continue;
         }
 
+        final String sleepToReCompareKey = "--recomparesleep=";
+        if (cmd.startsWith(sleepToReCompareKey)) {
+          sleepMsBeforeReCompare = 
Integer.parseInt(cmd.substring(sleepToReCompareKey.length()));
+          continue;
+        }
+        
         if (i == args.length-2) {
           peerId = cmd;
         }
@@ -453,6 +497,8 @@ public class VerifyReplication extends Configured 
implements Tool {
     System.err.println(" families     comma-separated list of families to 
copy");
     System.err.println(" row-prefixes comma-separated list of row key prefixes 
to filter on ");
     System.err.println(" delimiter    the delimiter used in display around 
rowkey");
+    System.err.println(" recomparesleep   milliseconds to sleep before 
recompare row, " +
+        "default value is 0 which disables the recompare.");
     System.err.println();
     System.err.println("Args:");
     System.err.println(" peerid       Id of the peer used for verification, 
must match the one given for replication");

Reply via email to