This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 7bf67e902b88d8a13541254926681b9d9d55ba8f
Author: Hernan Romer <[email protected]>
AuthorDate: Mon Jul 31 17:50:57 2023 -0400

    HBASE-26874 VerifyReplication recompare async (#5051)
    
    Signed-off-by: Bryan Beaudreault <[email protected]>
---
 .../mapreduce/replication/VerifyReplication.java   | 186 +++++++++++++++++----
 .../VerifyReplicationRecompareRunnable.java        | 162 ++++++++++++++++++
 .../TestVerifyReplicationRecompareRunnable.java    | 154 +++++++++++++++++
 .../replication/VerifyReplicationTestBase.java     | 125 +++++++++++++-
 4 files changed, 591 insertions(+), 36 deletions(-)

diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index eb32998d8fb..36422b6e9f4 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -20,7 +20,12 @@ package org.apache.hadoop.hbase.mapreduce.replication;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Arrays;
+import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
@@ -32,7 +37,6 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.ConnectionRegistryFactory;
-import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -49,6 +53,7 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableMapper;
 import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableSplit;
+import 
org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication.Verifier.Counters;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
@@ -58,12 +63,12 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Strings;
-import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -87,6 +92,11 @@ public class VerifyReplication extends Configured implements 
Tool {
 
   public final static String NAME = "verifyrep";
   private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
+  private static ThreadPoolExecutor reCompareExecutor = null;
+  int reCompareTries = 0;
+  int reCompareBackoffExponent = 0;
+  int reCompareThreads = 0;
+  int sleepMsBeforeReCompare = 0;
   long startTime = 0;
   long endTime = Long.MAX_VALUE;
   int batch = -1;
@@ -97,7 +107,6 @@ public class VerifyReplication extends Configured implements 
Tool {
   String peerId = null;
   String peerQuorumAddress = null;
   String rowPrefixes = null;
-  int sleepMsBeforeReCompare = 0;
   boolean verbose = false;
   boolean includeDeletedCells = false;
   // Source table snapshot name
@@ -127,7 +136,12 @@ public class VerifyReplication extends Configured 
implements Tool {
       BADROWS,
       ONLY_IN_SOURCE_TABLE_ROWS,
       ONLY_IN_PEER_TABLE_ROWS,
-      CONTENT_DIFFERENT_ROWS
+      CONTENT_DIFFERENT_ROWS,
+      RECOMPARES,
+      MAIN_THREAD_RECOMPARES,
+      SOURCE_ROW_CHANGED,
+      PEER_ROW_CHANGED,
+      FAILED_RECOMPARE
     }
 
     private Connection sourceConnection;
@@ -136,6 +150,9 @@ public class VerifyReplication extends Configured 
implements Tool {
     private Table replicatedTable;
     private ResultScanner replicatedScanner;
     private Result currentCompareRowInPeerTable;
+    private Scan tableScan;
+    private int reCompareTries;
+    private int reCompareBackoffExponent;
     private int sleepMsBeforeReCompare;
     private String delimiter = "";
     private boolean verbose = false;
@@ -153,7 +170,12 @@ public class VerifyReplication extends Configured 
implements Tool {
       throws IOException {
       if (replicatedScanner == null) {
         Configuration conf = context.getConfiguration();
+        reCompareTries = conf.getInt(NAME + ".recompareTries", 0);
+        reCompareBackoffExponent = conf.getInt(NAME + 
".recompareBackoffExponent", 1);
         sleepMsBeforeReCompare = conf.getInt(NAME + ".sleepMsBeforeReCompare", 
0);
+        if (sleepMsBeforeReCompare > 0) {
+          reCompareTries = Math.max(reCompareTries, 1);
+        }
         delimiter = conf.get(NAME + ".delimiter", "");
         verbose = conf.getBoolean(NAME + ".verbose", false);
         batch = conf.getInt(NAME + ".batch", -1);
@@ -182,9 +204,12 @@ public class VerifyReplication extends Configured 
implements Tool {
         if (versions >= 0) {
           scan.readVersions(versions);
         }
+        int reCompareThreads = conf.getInt(NAME + ".recompareThreads", 0);
+        reCompareExecutor = buildReCompareExecutor(reCompareThreads, context);
         TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
         sourceConnection = ConnectionFactory.createConnection(conf);
         sourceTable = sourceConnection.getTable(tableName);
+        tableScan = scan;
 
         final InputSplit tableSplit = context.getInputSplit();
 
@@ -234,7 +259,7 @@ public class VerifyReplication extends Configured 
implements Tool {
       while (true) {
         if (currentCompareRowInPeerTable == null) {
           // reach the region end of peer table, row only in source table
-          logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
+          logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null);
           break;
         }
         int rowCmpRet = Bytes.compareTo(value.getRow(), 
currentCompareRowInPeerTable.getRow());
@@ -248,55 +273,77 @@ public class VerifyReplication extends Configured 
implements Tool {
                 "Good row key: " + delimiter + 
Bytes.toStringBinary(value.getRow()) + delimiter);
             }
           } catch (Exception e) {
-            logFailRowAndIncreaseCounter(context, 
Counters.CONTENT_DIFFERENT_ROWS, value);
+            logFailRowAndIncreaseCounter(context, 
Counters.CONTENT_DIFFERENT_ROWS, value,
+              currentCompareRowInPeerTable);
           }
           currentCompareRowInPeerTable = replicatedScanner.next();
           break;
         } else if (rowCmpRet < 0) {
           // row only exists in source table
-          logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
+          logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null);
           break;
         } else {
           // row only exists in peer table
-          logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_PEER_TABLE_ROWS,
+          logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_PEER_TABLE_ROWS, null,
             currentCompareRowInPeerTable);
           currentCompareRowInPeerTable = replicatedScanner.next();
         }
       }
     }
 
-    private void logFailRowAndIncreaseCounter(Context context, Counters 
counter, Result row) {
-      if (sleepMsBeforeReCompare > 0) {
-        Threads.sleep(sleepMsBeforeReCompare);
-        try {
-          Result sourceResult = sourceTable.get(new Get(row.getRow()));
-          Result replicatedResult = replicatedTable.get(new Get(row.getRow()));
-          Result.compareResults(sourceResult, replicatedResult, false);
-          if (!sourceResult.isEmpty()) {
-            context.getCounter(Counters.GOODROWS).increment(1);
-            if (verbose) {
-              LOG.info("Good row key (with recompare): " + delimiter
-                + Bytes.toStringBinary(row.getRow()) + delimiter);
-            }
-          }
-          return;
-        } catch (Exception e) {
-          LOG.error("recompare fail after sleep, rowkey=" + delimiter
-            + Bytes.toStringBinary(row.getRow()) + delimiter);
-        }
+    @SuppressWarnings("FutureReturnValueIgnored")
+    private void logFailRowAndIncreaseCounter(Context context, Counters 
counter, Result row,
+      Result replicatedRow) {
+      byte[] rowKey = getRow(row, replicatedRow);
+      if (reCompareTries == 0) {
+        context.getCounter(counter).increment(1);
+        context.getCounter(Counters.BADROWS).increment(1);
+        LOG.error("{}, rowkey={}{}{}", counter, delimiter, 
Bytes.toStringBinary(rowKey), delimiter);
+        return;
+      }
+
+      VerifyReplicationRecompareRunnable runnable = new 
VerifyReplicationRecompareRunnable(context,
+        row, replicatedRow, counter, delimiter, tableScan, sourceTable, 
replicatedTable,
+        reCompareTries, sleepMsBeforeReCompare, reCompareBackoffExponent, 
verbose);
+
+      if (reCompareExecutor == null) {
+        runnable.run();
+        return;
       }
-      context.getCounter(counter).increment(1);
-      context.getCounter(Counters.BADROWS).increment(1);
-      LOG.error(counter.toString() + ", rowkey=" + delimiter + 
Bytes.toStringBinary(row.getRow())
-        + delimiter);
+
+      reCompareExecutor.submit(runnable);
     }
 
     @Override
     protected void cleanup(Context context) {
+      if (reCompareExecutor != null && !reCompareExecutor.isShutdown()) {
+        reCompareExecutor.shutdown();
+        try {
+          boolean terminated = reCompareExecutor.awaitTermination(1, 
TimeUnit.MINUTES);
+          if (!terminated) {
+            List<Runnable> queue = reCompareExecutor.shutdownNow();
+            for (Runnable runnable : queue) {
+              ((VerifyReplicationRecompareRunnable) runnable).fail();
+            }
+
+            terminated = reCompareExecutor.awaitTermination(1, 
TimeUnit.MINUTES);
+
+            if (!terminated) {
+              int activeCount = Math.max(1, 
reCompareExecutor.getActiveCount());
+              LOG.warn("Found {} possible recompares still running in the 
executable"
+                + " incrementing BADROWS and FAILED_RECOMPARE", activeCount);
+              context.getCounter(Counters.BADROWS).increment(activeCount);
+              
context.getCounter(Counters.FAILED_RECOMPARE).increment(activeCount);
+            }
+          }
+        } catch (InterruptedException e) {
+          throw new RuntimeException("Failed to await executor termination in 
cleanup", e);
+        }
+      }
       if (replicatedScanner != null) {
         try {
           while (currentCompareRowInPeerTable != null) {
-            logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_PEER_TABLE_ROWS,
+            logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_PEER_TABLE_ROWS, null,
               currentCompareRowInPeerTable);
             currentCompareRowInPeerTable = replicatedScanner.next();
           }
@@ -444,6 +491,10 @@ public class VerifyReplication extends Configured 
implements Tool {
     conf.setInt(NAME + ".versions", versions);
     LOG.info("Number of version: " + versions);
 
+    conf.setInt(NAME + ".recompareTries", reCompareTries);
+    conf.setInt(NAME + ".recompareBackoffExponent", reCompareBackoffExponent);
+    conf.setInt(NAME + ".recompareThreads", reCompareThreads);
+
     // Set Snapshot specific parameters
     if (peerSnapshotName != null) {
       conf.set(NAME + ".peerSnapshotName", peerSnapshotName);
@@ -519,6 +570,15 @@ public class VerifyReplication extends Configured 
implements Tool {
     return job;
   }
 
+  protected static byte[] getRow(Result sourceResult, Result replicatedResult) 
{
+    if (sourceResult != null) {
+      return sourceResult.getRow();
+    } else if (replicatedResult != null) {
+      return replicatedResult.getRow();
+    }
+    throw new RuntimeException("Both sourceResult and replicatedResult are 
null!");
+  }
+
   private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
     if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
       String[] rowPrefixArray = rowPrefixes.split(",");
@@ -603,11 +663,20 @@ public class VerifyReplication extends Configured 
implements Tool {
           continue;
         }
 
-        final String sleepToReCompareKey = "--recomparesleep=";
+        final String deprecatedSleepToReCompareKey = "--recomparesleep=";
+        final String sleepToReCompareKey = "--recompareSleep=";
+        if (cmd.startsWith(deprecatedSleepToReCompareKey)) {
+          LOG.warn("--recomparesleep is deprecated and will be removed in 
4.0.0."
+            + " Use --recompareSleep instead.");
+          sleepMsBeforeReCompare =
+            
Integer.parseInt(cmd.substring(deprecatedSleepToReCompareKey.length()));
+          continue;
+        }
         if (cmd.startsWith(sleepToReCompareKey)) {
           sleepMsBeforeReCompare = 
Integer.parseInt(cmd.substring(sleepToReCompareKey.length()));
           continue;
         }
+
         final String verboseKey = "--verbose";
         if (cmd.startsWith(verboseKey)) {
           verbose = true;
@@ -656,6 +725,25 @@ public class VerifyReplication extends Configured 
implements Tool {
           continue;
         }
 
+        final String reCompareThreadArgs = "--recompareThreads=";
+        if (cmd.startsWith(reCompareThreadArgs)) {
+          reCompareThreads = 
Integer.parseInt(cmd.substring(reCompareThreadArgs.length()));
+          continue;
+        }
+
+        final String reCompareTriesKey = "--recompareTries=";
+        if (cmd.startsWith(reCompareTriesKey)) {
+          reCompareTries = 
Integer.parseInt(cmd.substring(reCompareTriesKey.length()));
+          continue;
+        }
+
+        final String reCompareBackoffExponentKey = 
"--recompareBackoffExponent=";
+        if (cmd.startsWith(reCompareBackoffExponentKey)) {
+          reCompareBackoffExponent =
+            
Integer.parseInt(cmd.substring(reCompareBackoffExponentKey.length()));
+          continue;
+        }
+
         if (cmd.startsWith("--")) {
           printUsage("Invalid argument '" + cmd + "'");
           return false;
@@ -735,7 +823,8 @@ public class VerifyReplication extends Configured 
implements Tool {
       System.err.println("ERROR: " + errorMsg);
     }
     System.err.println("Usage: verifyrep [--starttime=X]"
-      + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] 
[--recomparesleep=] "
+      + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] 
[--recompareSleep=] "
+      + "[--recompareThreads=] [--recompareTries=] 
[--recompareBackoffExponent=]"
       + "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] "
       + "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] 
[--peerSnapshotTmpDir=S] "
       + "[--peerFSAddress=T] [--peerHBaseRootAddress=U] 
<peerid|peerQuorumAddress> <tablename>");
@@ -751,8 +840,14 @@ public class VerifyReplication extends Configured 
implements Tool {
     System.err.println(" families     comma-separated list of families to 
copy");
     System.err.println(" row-prefixes comma-separated list of row key prefixes 
to filter on ");
     System.err.println(" delimiter    the delimiter used in display around 
rowkey");
-    System.err.println(" recomparesleep   milliseconds to sleep before 
recompare row, "
+    System.err.println(" recompareSleep   milliseconds to sleep before 
recompare row, "
       + "default value is 0 which disables the recompare.");
+    System.err.println(" recompareThreads number of threads to run recompares 
in");
+    System.err.println(" recompareTries   number of recompare attempts before 
incrementing "
+      + "the BADROWS counter. Defaults to 1 recompare");
+    System.out.println(" recompareBackoffExponent exponential multiplier to 
increase "
+      + "recompareSleep after each recompare attempt, "
+      + "default value is 0 which results in a constant sleep time");
     System.err.println(" verbose      logs row keys of good rows");
     System.err.println(" peerTableName  Peer Table Name");
     System.err.println(" sourceSnapshotName  Source Snapshot Name");
@@ -819,6 +914,27 @@ public class VerifyReplication extends Configured 
implements Tool {
         + "2181:/cluster-b \\\n" + "     TestTable");
   }
 
+  private static ThreadPoolExecutor buildReCompareExecutor(int maxThreads, 
Mapper.Context context) {
+    if (maxThreads == 0) {
+      return null;
+    }
+
+    return new ThreadPoolExecutor(0, maxThreads, 1L, TimeUnit.SECONDS, new 
SynchronousQueue<>(),
+      buildRejectedReComparePolicy(context));
+  }
+
+  private static CallerRunsPolicy buildRejectedReComparePolicy(Mapper.Context 
context) {
+    return new CallerRunsPolicy() {
+      @Override
+      public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
+        LOG.debug("Re-comparison execution rejected. Running in main thread.");
+        context.getCounter(Counters.MAIN_THREAD_RECOMPARES).increment(1);
+        // will run in the current thread
+        super.rejectedExecution(runnable, e);
+      }
+    };
+  }
+
   @Override
   public int run(String[] args) throws Exception {
     Configuration conf = this.getConf();
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplicationRecompareRunnable.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplicationRecompareRunnable.java
new file mode 100644
index 00000000000..47f5e606b84
--- /dev/null
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplicationRecompareRunnable.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce.replication;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
[email protected]
+public class VerifyReplicationRecompareRunnable implements Runnable {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(VerifyReplicationRecompareRunnable.class);
+
+  private final Mapper.Context context;
+  private final VerifyReplication.Verifier.Counters originalCounter;
+  private final String delimiter;
+  private final byte[] row;
+  private final Scan tableScan;
+  private final Table sourceTable;
+  private final Table replicatedTable;
+
+  private final int reCompareTries;
+  private final int sleepMsBeforeReCompare;
+  private final int reCompareBackoffExponent;
+  private final boolean verbose;
+
+  private Result sourceResult;
+  private Result replicatedResult;
+
+  public VerifyReplicationRecompareRunnable(Mapper.Context context, Result 
sourceResult,
+    Result replicatedResult, VerifyReplication.Verifier.Counters 
originalCounter, String delimiter,
+    Scan tableScan, Table sourceTable, Table replicatedTable, int 
reCompareTries,
+    int sleepMsBeforeReCompare, int reCompareBackoffExponent, boolean verbose) 
{
+    this.context = context;
+    this.sourceResult = sourceResult;
+    this.replicatedResult = replicatedResult;
+    this.originalCounter = originalCounter;
+    this.delimiter = delimiter;
+    this.tableScan = tableScan;
+    this.sourceTable = sourceTable;
+    this.replicatedTable = replicatedTable;
+    this.reCompareTries = reCompareTries;
+    this.sleepMsBeforeReCompare = sleepMsBeforeReCompare;
+    this.reCompareBackoffExponent = reCompareBackoffExponent;
+    this.verbose = verbose;
+    this.row = VerifyReplication.getRow(sourceResult, replicatedResult);
+  }
+
+  @Override
+  public void run() {
+    Get get = new Get(row);
+    get.setCacheBlocks(tableScan.getCacheBlocks());
+    get.setFilter(tableScan.getFilter());
+
+    int sleepMs = sleepMsBeforeReCompare;
+    int tries = 0;
+
+    while (++tries <= reCompareTries) {
+      
context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).increment(1);
+
+      try {
+        Thread.sleep(sleepMs);
+      } catch (InterruptedException e) {
+        LOG.warn("Sleeping interrupted, incrementing bad rows and aborting");
+        incrementOriginalAndBadCounter();
+        
context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1);
+        Thread.currentThread().interrupt();
+        return;
+      }
+
+      try {
+        if (fetchLatestRows(get) && matches(sourceResult, replicatedResult, 
null)) {
+          if (verbose) {
+            LOG.info("Good row key (with recompare): {}{}{}", delimiter, 
Bytes.toStringBinary(row),
+              delimiter);
+          }
+          
context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).increment(1);
+          return;
+        } else {
+          
context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1);
+        }
+      } catch (IOException e) {
+        
context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1);
+        if (verbose) {
+          LOG.info("Got an exception during recompare for rowkey={}", 
Bytes.toStringBinary(row), e);
+        }
+      }
+
+      sleepMs = sleepMs * (2 ^ reCompareBackoffExponent);
+    }
+
+    LOG.error("{}, rowkey={}{}{}", originalCounter, delimiter, 
Bytes.toStringBinary(row),
+      delimiter);
+    incrementOriginalAndBadCounter();
+  }
+
+  public void fail() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Called fail on row={}", Bytes.toStringBinary(row));
+    }
+    incrementOriginalAndBadCounter();
+    
context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1);
+  }
+
+  private boolean fetchLatestRows(Get get) throws IOException {
+    Result sourceResult = sourceTable.get(get);
+    Result replicatedResult = replicatedTable.get(get);
+
+    boolean sourceMatches = matches(sourceResult, this.sourceResult,
+      VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED);
+    boolean replicatedMatches = matches(replicatedResult, 
this.replicatedResult,
+      VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED);
+
+    this.sourceResult = sourceResult;
+    this.replicatedResult = replicatedResult;
+    return sourceMatches && replicatedMatches;
+  }
+
+  private boolean matches(Result original, Result updated,
+    VerifyReplication.Verifier.Counters failCounter) {
+    try {
+      Result.compareResults(original, updated);
+      return true;
+    } catch (Exception e) {
+      if (failCounter != null) {
+        context.getCounter(failCounter).increment(1);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("{} for rowkey={}", failCounter, 
Bytes.toStringBinary(row));
+        }
+      }
+      return false;
+    }
+  }
+
+  private void incrementOriginalAndBadCounter() {
+    context.getCounter(originalCounter).increment(1);
+    
context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).increment(1);
+  }
+}
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationRecompareRunnable.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationRecompareRunnable.java
new file mode 100644
index 00000000000..49c52fbcc3b
--- /dev/null
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationRecompareRunnable.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
+import 
org.apache.hadoop.hbase.mapreduce.replication.VerifyReplicationRecompareRunnable;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.counters.GenericCounter;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@Category({ ReplicationTests.class, SmallTests.class })
+@RunWith(MockitoJUnitRunner.class)
+public class TestVerifyReplicationRecompareRunnable {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestVerifyReplicationRecompareRunnable.class);
+
+  @Mock
+  private Table sourceTable;
+
+  @Mock
+  private Table replicatedTable;
+
+  @Mock
+  private Mapper.Context context;
+
+  static Result genResult(int cols) {
+    KeyValue[] kvs = new KeyValue[cols];
+
+    for (int i = 0; i < cols; ++i) {
+      kvs[i] =
+        new KeyValue(genBytes(), genBytes(), genBytes(), 
System.currentTimeMillis(), genBytes());
+    }
+
+    return Result.create(kvs);
+  }
+
+  static byte[] genBytes() {
+    return Bytes.toBytes(ThreadLocalRandom.current().nextInt());
+  }
+
+  @Before
+  public void setUp() {
+    for (VerifyReplication.Verifier.Counters counter : 
VerifyReplication.Verifier.Counters
+      .values()) {
+      Counter emptyCounter = new GenericCounter(counter.name(), 
counter.name());
+      when(context.getCounter(counter)).thenReturn(emptyCounter);
+    }
+  }
+
+  @Test
+  public void itRecomparesGoodRow() throws IOException {
+    Result result = genResult(2);
+
+    when(sourceTable.get(any(Get.class))).thenReturn(result);
+    when(replicatedTable.get(any(Get.class))).thenReturn(result);
+
+    VerifyReplicationRecompareRunnable runnable = new 
VerifyReplicationRecompareRunnable(context,
+      genResult(5), null, 
VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, "",
+      new Scan(), sourceTable, replicatedTable, 3, 1, 0, true);
+
+    runnable.run();
+
+    assertEquals(0, 
context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+    assertEquals(0,
+      
context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
+    assertEquals(1, 
context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
+    assertEquals(1,
+      
context.getCounter(VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED).getValue());
+    assertEquals(1,
+      
context.getCounter(VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED).getValue());
+    assertEquals(2, 
context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue());
+  }
+
+  @Test
+  public void itRecomparesBadRow() throws IOException {
+    Result replicatedResult = genResult(1);
+    when(sourceTable.get(any(Get.class))).thenReturn(genResult(5));
+    when(replicatedTable.get(any(Get.class))).thenReturn(replicatedResult);
+
+    VerifyReplicationRecompareRunnable runnable = new 
VerifyReplicationRecompareRunnable(context,
+      genResult(5), replicatedResult, 
VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS,
+      "", new Scan(), sourceTable, replicatedTable, 1, 1, 0, true);
+
+    runnable.run();
+
+    assertEquals(1, 
context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+    assertEquals(1,
+      
context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
+    assertEquals(0, 
context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
+    assertEquals(1,
+      
context.getCounter(VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED).getValue());
+    assertEquals(0,
+      
context.getCounter(VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED).getValue());
+    assertEquals(1, 
context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue());
+  }
+
+  @Test
+  public void itHandlesExceptionOnRecompare() throws IOException {
+    when(sourceTable.get(any(Get.class))).thenThrow(new IOException("Error!"));
+    when(replicatedTable.get(any(Get.class))).thenReturn(genResult(5));
+
+    VerifyReplicationRecompareRunnable runnable = new 
VerifyReplicationRecompareRunnable(context,
+      genResult(5), null, 
VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, "",
+      new Scan(), sourceTable, replicatedTable, 1, 1, 0, true);
+
+    runnable.run();
+
+    assertEquals(1, 
context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+    assertEquals(1,
+      
context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
+    assertEquals(1,
+      
context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue());
+    assertEquals(1, 
context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue());
+  }
+}
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/VerifyReplicationTestBase.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/VerifyReplicationTestBase.java
index 26649226d9c..e263076677a 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/VerifyReplicationTestBase.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/VerifyReplicationTestBase.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -99,7 +100,7 @@ public abstract class VerifyReplicationTestBase extends 
TestReplicationBase {
     htable3 = connection2.getTable(peerTableName);
   }
 
-  static void runVerifyReplication(String[] args, int expectedGoodRows, int 
expectedBadRows)
+  static Counters runVerifyReplication(String[] args, int expectedGoodRows, 
int expectedBadRows)
     throws IOException, InterruptedException, ClassNotFoundException {
     Job job = new VerifyReplication().createSubmittableJob(new 
Configuration(CONF1), args);
     if (job == null) {
@@ -112,6 +113,7 @@ public abstract class VerifyReplicationTestBase extends 
TestReplicationBase {
       
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
     assertEquals(expectedBadRows,
       
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+    return job.getCounters();
   }
 
   /**
@@ -438,6 +440,127 @@ public abstract class VerifyReplicationTestBase extends 
TestReplicationBase {
     checkRestoreTmpDir(CONF2, tmpPath2, 2);
   }
 
+  @Test
+  public void testVerifyReplicationThreadedRecompares() throws Exception {
+    // Populate the tables with same data
+    runBatchCopyTest();
+
+    // ONLY_IN_PEER_TABLE_ROWS
+    Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH)));
+    put.addColumn(noRepfamName, row, row);
+    htable3.put(put);
+
+    // CONTENT_DIFFERENT_ROWS
+    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1)));
+    put.addColumn(noRepfamName, row, Bytes.toBytes("diff value"));
+    htable3.put(put);
+
+    // ONLY_IN_SOURCE_TABLE_ROWS
+    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1)));
+    put.addColumn(noRepfamName, row, row);
+    htable1.put(put);
+
+    String[] args = new String[] { "--recompareThreads=10", 
"--recompareTries=3",
+      "--recompareSleep=1", "--peerTableName=" + 
peerTableName.getNameAsString(),
+      getClusterKey(UTIL2), tableName.getNameAsString() };
+    Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3);
+    assertEquals(
+      
counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(),
 9);
+    
assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(),
+      9);
+    assertEquals(
+      
counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(),
+      1);
+    assertEquals(
+      
counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(),
+      1);
+    
assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS)
+      .getValue(), 1);
+  }
+
+  @Test
+  public void testFailsRemainingComparesAfterShutdown() throws Exception {
+    // Populate the tables with same data
+    runBatchCopyTest();
+
+    // ONLY_IN_PEER_TABLE_ROWS
+    Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH)));
+    put.addColumn(noRepfamName, row, row);
+    htable3.put(put);
+
+    // CONTENT_DIFFERENT_ROWS
+    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1)));
+    put.addColumn(noRepfamName, row, Bytes.toBytes("diff value"));
+    htable3.put(put);
+
+    // ONLY_IN_SOURCE_TABLE_ROWS
+    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1)));
+    put.addColumn(noRepfamName, row, row);
+    htable1.put(put);
+
+    /**
+     * recompareSleep is set to exceed how long we wait on
+     * {@link VerifyReplication#reCompareExecutor} termination when doing 
cleanup. this allows us to
+     * test the counter-incrementing logic if the executor still hasn't 
terminated after the call to
+     * shutdown and awaitTermination
+     */
+    String[] args = new String[] { "--recompareThreads=1", 
"--recompareTries=1",
+      "--recompareSleep=121000", "--peerTableName=" + 
peerTableName.getNameAsString(),
+      getClusterKey(UTIL2), tableName.getNameAsString() };
+
+    Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3);
+    assertEquals(
+      
counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(),
 3);
+    
assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(),
+      3);
+    assertEquals(
+      
counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(),
+      1);
+    assertEquals(
+      
counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(),
+      1);
+    
assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS)
+      .getValue(), 1);
+  }
+
+  @Test
+  public void testVerifyReplicationSynchronousRecompares() throws Exception {
+    // Populate the tables with same data
+    runBatchCopyTest();
+
+    // ONLY_IN_PEER_TABLE_ROWS
+    Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH)));
+    put.addColumn(noRepfamName, row, row);
+    htable3.put(put);
+
+    // CONTENT_DIFFERENT_ROWS
+    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1)));
+    put.addColumn(noRepfamName, row, Bytes.toBytes("diff value"));
+    htable3.put(put);
+
+    // ONLY_IN_SOURCE_TABLE_ROWS
+    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1)));
+    put.addColumn(noRepfamName, row, row);
+    htable1.put(put);
+
+    String[] args = new String[] { "--recompareTries=3", "--recompareSleep=1",
+      "--peerTableName=" + peerTableName.getNameAsString(), 
getClusterKey(UTIL2),
+      tableName.getNameAsString() };
+    Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3);
+    assertEquals(
+      
counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(),
 9);
+    
assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(),
+      9);
+    assertEquals(
+      
counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(),
+      1);
+    assertEquals(
+      
counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(),
+      1);
+    
assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS)
+      .getValue(), 1);
+  }
+
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     htable3.close();


Reply via email to