This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 86f193e3d3b0f00592fc22e5e9b0bb3a376bc663
Author: Duo Zhang <[email protected]>
AuthorDate: Thu Nov 21 10:40:23 2024 +0800

    Revert "VerifyReplication recompare async (#5051)"
    
    This reverts commit bd3484e12361ee05b1aea47c5da686b67026684a.
---
 .../mapreduce/replication/VerifyReplication.java   | 186 ++++-----------------
 .../VerifyReplicationRecompareRunnable.java        | 162 ------------------
 .../TestVerifyReplicationRecompareRunnable.java    | 154 -----------------
 .../replication/VerifyReplicationTestBase.java     | 125 +-------------
 4 files changed, 36 insertions(+), 591 deletions(-)

diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 36422b6e9f4..eb32998d8fb 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -20,12 +20,7 @@ package org.apache.hadoop.hbase.mapreduce.replication;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Arrays;
-import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
-import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
@@ -37,6 +32,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.ConnectionRegistryFactory;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -53,7 +49,6 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableMapper;
 import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableSplit;
-import 
org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication.Verifier.Counters;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
@@ -63,12 +58,12 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -92,11 +87,6 @@ public class VerifyReplication extends Configured implements 
Tool {
 
   public final static String NAME = "verifyrep";
   private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
-  private static ThreadPoolExecutor reCompareExecutor = null;
-  int reCompareTries = 0;
-  int reCompareBackoffExponent = 0;
-  int reCompareThreads = 0;
-  int sleepMsBeforeReCompare = 0;
   long startTime = 0;
   long endTime = Long.MAX_VALUE;
   int batch = -1;
@@ -107,6 +97,7 @@ public class VerifyReplication extends Configured implements 
Tool {
   String peerId = null;
   String peerQuorumAddress = null;
   String rowPrefixes = null;
+  int sleepMsBeforeReCompare = 0;
   boolean verbose = false;
   boolean includeDeletedCells = false;
   // Source table snapshot name
@@ -136,12 +127,7 @@ public class VerifyReplication extends Configured 
implements Tool {
       BADROWS,
       ONLY_IN_SOURCE_TABLE_ROWS,
       ONLY_IN_PEER_TABLE_ROWS,
-      CONTENT_DIFFERENT_ROWS,
-      RECOMPARES,
-      MAIN_THREAD_RECOMPARES,
-      SOURCE_ROW_CHANGED,
-      PEER_ROW_CHANGED,
-      FAILED_RECOMPARE
+      CONTENT_DIFFERENT_ROWS
     }
 
     private Connection sourceConnection;
@@ -150,9 +136,6 @@ public class VerifyReplication extends Configured 
implements Tool {
     private Table replicatedTable;
     private ResultScanner replicatedScanner;
     private Result currentCompareRowInPeerTable;
-    private Scan tableScan;
-    private int reCompareTries;
-    private int reCompareBackoffExponent;
     private int sleepMsBeforeReCompare;
     private String delimiter = "";
     private boolean verbose = false;
@@ -170,12 +153,7 @@ public class VerifyReplication extends Configured 
implements Tool {
       throws IOException {
       if (replicatedScanner == null) {
         Configuration conf = context.getConfiguration();
-        reCompareTries = conf.getInt(NAME + ".recompareTries", 0);
-        reCompareBackoffExponent = conf.getInt(NAME + 
".recompareBackoffExponent", 1);
         sleepMsBeforeReCompare = conf.getInt(NAME + ".sleepMsBeforeReCompare", 
0);
-        if (sleepMsBeforeReCompare > 0) {
-          reCompareTries = Math.max(reCompareTries, 1);
-        }
         delimiter = conf.get(NAME + ".delimiter", "");
         verbose = conf.getBoolean(NAME + ".verbose", false);
         batch = conf.getInt(NAME + ".batch", -1);
@@ -204,12 +182,9 @@ public class VerifyReplication extends Configured 
implements Tool {
         if (versions >= 0) {
           scan.readVersions(versions);
         }
-        int reCompareThreads = conf.getInt(NAME + ".recompareThreads", 0);
-        reCompareExecutor = buildReCompareExecutor(reCompareThreads, context);
         TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
         sourceConnection = ConnectionFactory.createConnection(conf);
         sourceTable = sourceConnection.getTable(tableName);
-        tableScan = scan;
 
         final InputSplit tableSplit = context.getInputSplit();
 
@@ -259,7 +234,7 @@ public class VerifyReplication extends Configured 
implements Tool {
       while (true) {
         if (currentCompareRowInPeerTable == null) {
           // reach the region end of peer table, row only in source table
-          logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null);
+          logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
           break;
         }
         int rowCmpRet = Bytes.compareTo(value.getRow(), 
currentCompareRowInPeerTable.getRow());
@@ -273,77 +248,55 @@ public class VerifyReplication extends Configured 
implements Tool {
                 "Good row key: " + delimiter + 
Bytes.toStringBinary(value.getRow()) + delimiter);
             }
           } catch (Exception e) {
-            logFailRowAndIncreaseCounter(context, 
Counters.CONTENT_DIFFERENT_ROWS, value,
-              currentCompareRowInPeerTable);
+            logFailRowAndIncreaseCounter(context, 
Counters.CONTENT_DIFFERENT_ROWS, value);
           }
           currentCompareRowInPeerTable = replicatedScanner.next();
           break;
         } else if (rowCmpRet < 0) {
           // row only exists in source table
-          logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null);
+          logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
           break;
         } else {
           // row only exists in peer table
-          logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_PEER_TABLE_ROWS, null,
+          logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_PEER_TABLE_ROWS,
             currentCompareRowInPeerTable);
           currentCompareRowInPeerTable = replicatedScanner.next();
         }
       }
     }
 
-    @SuppressWarnings("FutureReturnValueIgnored")
-    private void logFailRowAndIncreaseCounter(Context context, Counters 
counter, Result row,
-      Result replicatedRow) {
-      byte[] rowKey = getRow(row, replicatedRow);
-      if (reCompareTries == 0) {
-        context.getCounter(counter).increment(1);
-        context.getCounter(Counters.BADROWS).increment(1);
-        LOG.error("{}, rowkey={}{}{}", counter, delimiter, 
Bytes.toStringBinary(rowKey), delimiter);
-        return;
-      }
-
-      VerifyReplicationRecompareRunnable runnable = new 
VerifyReplicationRecompareRunnable(context,
-        row, replicatedRow, counter, delimiter, tableScan, sourceTable, 
replicatedTable,
-        reCompareTries, sleepMsBeforeReCompare, reCompareBackoffExponent, 
verbose);
-
-      if (reCompareExecutor == null) {
-        runnable.run();
-        return;
-      }
-
-      reCompareExecutor.submit(runnable);
-    }
-
-    @Override
-    protected void cleanup(Context context) {
-      if (reCompareExecutor != null && !reCompareExecutor.isShutdown()) {
-        reCompareExecutor.shutdown();
+    private void logFailRowAndIncreaseCounter(Context context, Counters 
counter, Result row) {
+      if (sleepMsBeforeReCompare > 0) {
+        Threads.sleep(sleepMsBeforeReCompare);
         try {
-          boolean terminated = reCompareExecutor.awaitTermination(1, 
TimeUnit.MINUTES);
-          if (!terminated) {
-            List<Runnable> queue = reCompareExecutor.shutdownNow();
-            for (Runnable runnable : queue) {
-              ((VerifyReplicationRecompareRunnable) runnable).fail();
-            }
-
-            terminated = reCompareExecutor.awaitTermination(1, 
TimeUnit.MINUTES);
-
-            if (!terminated) {
-              int activeCount = Math.max(1, 
reCompareExecutor.getActiveCount());
-              LOG.warn("Found {} possible recompares still running in the 
executable"
-                + " incrementing BADROWS and FAILED_RECOMPARE", activeCount);
-              context.getCounter(Counters.BADROWS).increment(activeCount);
-              
context.getCounter(Counters.FAILED_RECOMPARE).increment(activeCount);
+          Result sourceResult = sourceTable.get(new Get(row.getRow()));
+          Result replicatedResult = replicatedTable.get(new Get(row.getRow()));
+          Result.compareResults(sourceResult, replicatedResult, false);
+          if (!sourceResult.isEmpty()) {
+            context.getCounter(Counters.GOODROWS).increment(1);
+            if (verbose) {
+              LOG.info("Good row key (with recompare): " + delimiter
+                + Bytes.toStringBinary(row.getRow()) + delimiter);
             }
           }
-        } catch (InterruptedException e) {
-          throw new RuntimeException("Failed to await executor termination in 
cleanup", e);
+          return;
+        } catch (Exception e) {
+          LOG.error("recompare fail after sleep, rowkey=" + delimiter
+            + Bytes.toStringBinary(row.getRow()) + delimiter);
         }
       }
+      context.getCounter(counter).increment(1);
+      context.getCounter(Counters.BADROWS).increment(1);
+      LOG.error(counter.toString() + ", rowkey=" + delimiter + 
Bytes.toStringBinary(row.getRow())
+        + delimiter);
+    }
+
+    @Override
+    protected void cleanup(Context context) {
       if (replicatedScanner != null) {
         try {
           while (currentCompareRowInPeerTable != null) {
-            logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_PEER_TABLE_ROWS, null,
+            logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_PEER_TABLE_ROWS,
               currentCompareRowInPeerTable);
             currentCompareRowInPeerTable = replicatedScanner.next();
           }
@@ -491,10 +444,6 @@ public class VerifyReplication extends Configured 
implements Tool {
     conf.setInt(NAME + ".versions", versions);
     LOG.info("Number of version: " + versions);
 
-    conf.setInt(NAME + ".recompareTries", reCompareTries);
-    conf.setInt(NAME + ".recompareBackoffExponent", reCompareBackoffExponent);
-    conf.setInt(NAME + ".recompareThreads", reCompareThreads);
-
     // Set Snapshot specific parameters
     if (peerSnapshotName != null) {
       conf.set(NAME + ".peerSnapshotName", peerSnapshotName);
@@ -570,15 +519,6 @@ public class VerifyReplication extends Configured 
implements Tool {
     return job;
   }
 
-  protected static byte[] getRow(Result sourceResult, Result replicatedResult) 
{
-    if (sourceResult != null) {
-      return sourceResult.getRow();
-    } else if (replicatedResult != null) {
-      return replicatedResult.getRow();
-    }
-    throw new RuntimeException("Both sourceResult and replicatedResult are 
null!");
-  }
-
   private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
     if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
       String[] rowPrefixArray = rowPrefixes.split(",");
@@ -663,20 +603,11 @@ public class VerifyReplication extends Configured 
implements Tool {
           continue;
         }
 
-        final String deprecatedSleepToReCompareKey = "--recomparesleep=";
-        final String sleepToReCompareKey = "--recompareSleep=";
-        if (cmd.startsWith(deprecatedSleepToReCompareKey)) {
-          LOG.warn("--recomparesleep is deprecated and will be removed in 
4.0.0."
-            + " Use --recompareSleep instead.");
-          sleepMsBeforeReCompare =
-            
Integer.parseInt(cmd.substring(deprecatedSleepToReCompareKey.length()));
-          continue;
-        }
+        final String sleepToReCompareKey = "--recomparesleep=";
         if (cmd.startsWith(sleepToReCompareKey)) {
           sleepMsBeforeReCompare = 
Integer.parseInt(cmd.substring(sleepToReCompareKey.length()));
           continue;
         }
-
         final String verboseKey = "--verbose";
         if (cmd.startsWith(verboseKey)) {
           verbose = true;
@@ -725,25 +656,6 @@ public class VerifyReplication extends Configured 
implements Tool {
           continue;
         }
 
-        final String reCompareThreadArgs = "--recompareThreads=";
-        if (cmd.startsWith(reCompareThreadArgs)) {
-          reCompareThreads = 
Integer.parseInt(cmd.substring(reCompareThreadArgs.length()));
-          continue;
-        }
-
-        final String reCompareTriesKey = "--recompareTries=";
-        if (cmd.startsWith(reCompareTriesKey)) {
-          reCompareTries = 
Integer.parseInt(cmd.substring(reCompareTriesKey.length()));
-          continue;
-        }
-
-        final String reCompareBackoffExponentKey = 
"--recompareBackoffExponent=";
-        if (cmd.startsWith(reCompareBackoffExponentKey)) {
-          reCompareBackoffExponent =
-            
Integer.parseInt(cmd.substring(reCompareBackoffExponentKey.length()));
-          continue;
-        }
-
         if (cmd.startsWith("--")) {
           printUsage("Invalid argument '" + cmd + "'");
           return false;
@@ -823,8 +735,7 @@ public class VerifyReplication extends Configured 
implements Tool {
       System.err.println("ERROR: " + errorMsg);
     }
     System.err.println("Usage: verifyrep [--starttime=X]"
-      + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] 
[--recompareSleep=] "
-      + "[--recompareThreads=] [--recompareTries=] 
[--recompareBackoffExponent=]"
+      + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] 
[--recomparesleep=] "
       + "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] "
       + "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] 
[--peerSnapshotTmpDir=S] "
       + "[--peerFSAddress=T] [--peerHBaseRootAddress=U] 
<peerid|peerQuorumAddress> <tablename>");
@@ -840,14 +751,8 @@ public class VerifyReplication extends Configured 
implements Tool {
     System.err.println(" families     comma-separated list of families to 
copy");
     System.err.println(" row-prefixes comma-separated list of row key prefixes 
to filter on ");
     System.err.println(" delimiter    the delimiter used in display around 
rowkey");
-    System.err.println(" recompareSleep   milliseconds to sleep before 
recompare row, "
+    System.err.println(" recomparesleep   milliseconds to sleep before 
recompare row, "
       + "default value is 0 which disables the recompare.");
-    System.err.println(" recompareThreads number of threads to run recompares 
in");
-    System.err.println(" recompareTries   number of recompare attempts before 
incrementing "
-      + "the BADROWS counter. Defaults to 1 recompare");
-    System.out.println(" recompareBackoffExponent exponential multiplier to 
increase "
-      + "recompareSleep after each recompare attempt, "
-      + "default value is 0 which results in a constant sleep time");
     System.err.println(" verbose      logs row keys of good rows");
     System.err.println(" peerTableName  Peer Table Name");
     System.err.println(" sourceSnapshotName  Source Snapshot Name");
@@ -914,27 +819,6 @@ public class VerifyReplication extends Configured 
implements Tool {
         + "2181:/cluster-b \\\n" + "     TestTable");
   }
 
-  private static ThreadPoolExecutor buildReCompareExecutor(int maxThreads, 
Mapper.Context context) {
-    if (maxThreads == 0) {
-      return null;
-    }
-
-    return new ThreadPoolExecutor(0, maxThreads, 1L, TimeUnit.SECONDS, new 
SynchronousQueue<>(),
-      buildRejectedReComparePolicy(context));
-  }
-
-  private static CallerRunsPolicy buildRejectedReComparePolicy(Mapper.Context 
context) {
-    return new CallerRunsPolicy() {
-      @Override
-      public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
-        LOG.debug("Re-comparison execution rejected. Running in main thread.");
-        context.getCounter(Counters.MAIN_THREAD_RECOMPARES).increment(1);
-        // will run in the current thread
-        super.rejectedExecution(runnable, e);
-      }
-    };
-  }
-
   @Override
   public int run(String[] args) throws Exception {
     Configuration conf = this.getConf();
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplicationRecompareRunnable.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplicationRecompareRunnable.java
deleted file mode 100644
index 47f5e606b84..00000000000
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplicationRecompareRunnable.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce.replication;
-
-import java.io.IOException;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
[email protected]
-public class VerifyReplicationRecompareRunnable implements Runnable {
-
-  private static final Logger LOG =
-    LoggerFactory.getLogger(VerifyReplicationRecompareRunnable.class);
-
-  private final Mapper.Context context;
-  private final VerifyReplication.Verifier.Counters originalCounter;
-  private final String delimiter;
-  private final byte[] row;
-  private final Scan tableScan;
-  private final Table sourceTable;
-  private final Table replicatedTable;
-
-  private final int reCompareTries;
-  private final int sleepMsBeforeReCompare;
-  private final int reCompareBackoffExponent;
-  private final boolean verbose;
-
-  private Result sourceResult;
-  private Result replicatedResult;
-
-  public VerifyReplicationRecompareRunnable(Mapper.Context context, Result 
sourceResult,
-    Result replicatedResult, VerifyReplication.Verifier.Counters 
originalCounter, String delimiter,
-    Scan tableScan, Table sourceTable, Table replicatedTable, int 
reCompareTries,
-    int sleepMsBeforeReCompare, int reCompareBackoffExponent, boolean verbose) 
{
-    this.context = context;
-    this.sourceResult = sourceResult;
-    this.replicatedResult = replicatedResult;
-    this.originalCounter = originalCounter;
-    this.delimiter = delimiter;
-    this.tableScan = tableScan;
-    this.sourceTable = sourceTable;
-    this.replicatedTable = replicatedTable;
-    this.reCompareTries = reCompareTries;
-    this.sleepMsBeforeReCompare = sleepMsBeforeReCompare;
-    this.reCompareBackoffExponent = reCompareBackoffExponent;
-    this.verbose = verbose;
-    this.row = VerifyReplication.getRow(sourceResult, replicatedResult);
-  }
-
-  @Override
-  public void run() {
-    Get get = new Get(row);
-    get.setCacheBlocks(tableScan.getCacheBlocks());
-    get.setFilter(tableScan.getFilter());
-
-    int sleepMs = sleepMsBeforeReCompare;
-    int tries = 0;
-
-    while (++tries <= reCompareTries) {
-      
context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).increment(1);
-
-      try {
-        Thread.sleep(sleepMs);
-      } catch (InterruptedException e) {
-        LOG.warn("Sleeping interrupted, incrementing bad rows and aborting");
-        incrementOriginalAndBadCounter();
-        
context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1);
-        Thread.currentThread().interrupt();
-        return;
-      }
-
-      try {
-        if (fetchLatestRows(get) && matches(sourceResult, replicatedResult, 
null)) {
-          if (verbose) {
-            LOG.info("Good row key (with recompare): {}{}{}", delimiter, 
Bytes.toStringBinary(row),
-              delimiter);
-          }
-          
context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).increment(1);
-          return;
-        } else {
-          
context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1);
-        }
-      } catch (IOException e) {
-        
context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1);
-        if (verbose) {
-          LOG.info("Got an exception during recompare for rowkey={}", 
Bytes.toStringBinary(row), e);
-        }
-      }
-
-      sleepMs = sleepMs * (2 ^ reCompareBackoffExponent);
-    }
-
-    LOG.error("{}, rowkey={}{}{}", originalCounter, delimiter, 
Bytes.toStringBinary(row),
-      delimiter);
-    incrementOriginalAndBadCounter();
-  }
-
-  public void fail() {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Called fail on row={}", Bytes.toStringBinary(row));
-    }
-    incrementOriginalAndBadCounter();
-    
context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1);
-  }
-
-  private boolean fetchLatestRows(Get get) throws IOException {
-    Result sourceResult = sourceTable.get(get);
-    Result replicatedResult = replicatedTable.get(get);
-
-    boolean sourceMatches = matches(sourceResult, this.sourceResult,
-      VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED);
-    boolean replicatedMatches = matches(replicatedResult, 
this.replicatedResult,
-      VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED);
-
-    this.sourceResult = sourceResult;
-    this.replicatedResult = replicatedResult;
-    return sourceMatches && replicatedMatches;
-  }
-
-  private boolean matches(Result original, Result updated,
-    VerifyReplication.Verifier.Counters failCounter) {
-    try {
-      Result.compareResults(original, updated);
-      return true;
-    } catch (Exception e) {
-      if (failCounter != null) {
-        context.getCounter(failCounter).increment(1);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("{} for rowkey={}", failCounter, 
Bytes.toStringBinary(row));
-        }
-      }
-      return false;
-    }
-  }
-
-  private void incrementOriginalAndBadCounter() {
-    context.getCounter(originalCounter).increment(1);
-    
context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).increment(1);
-  }
-}
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationRecompareRunnable.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationRecompareRunnable.java
deleted file mode 100644
index 49c52fbcc3b..00000000000
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationRecompareRunnable.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
-import 
org.apache.hadoop.hbase.mapreduce.replication.VerifyReplicationRecompareRunnable;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.counters.GenericCounter;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@Category({ ReplicationTests.class, SmallTests.class })
-@RunWith(MockitoJUnitRunner.class)
-public class TestVerifyReplicationRecompareRunnable {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestVerifyReplicationRecompareRunnable.class);
-
-  @Mock
-  private Table sourceTable;
-
-  @Mock
-  private Table replicatedTable;
-
-  @Mock
-  private Mapper.Context context;
-
-  static Result genResult(int cols) {
-    KeyValue[] kvs = new KeyValue[cols];
-
-    for (int i = 0; i < cols; ++i) {
-      kvs[i] =
-        new KeyValue(genBytes(), genBytes(), genBytes(), 
System.currentTimeMillis(), genBytes());
-    }
-
-    return Result.create(kvs);
-  }
-
-  static byte[] genBytes() {
-    return Bytes.toBytes(ThreadLocalRandom.current().nextInt());
-  }
-
-  @Before
-  public void setUp() {
-    for (VerifyReplication.Verifier.Counters counter : 
VerifyReplication.Verifier.Counters
-      .values()) {
-      Counter emptyCounter = new GenericCounter(counter.name(), 
counter.name());
-      when(context.getCounter(counter)).thenReturn(emptyCounter);
-    }
-  }
-
-  @Test
-  public void itRecomparesGoodRow() throws IOException {
-    Result result = genResult(2);
-
-    when(sourceTable.get(any(Get.class))).thenReturn(result);
-    when(replicatedTable.get(any(Get.class))).thenReturn(result);
-
-    VerifyReplicationRecompareRunnable runnable = new 
VerifyReplicationRecompareRunnable(context,
-      genResult(5), null, 
VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, "",
-      new Scan(), sourceTable, replicatedTable, 3, 1, 0, true);
-
-    runnable.run();
-
-    assertEquals(0, 
context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
-    assertEquals(0,
-      
context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
-    assertEquals(1, 
context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
-    assertEquals(1,
-      
context.getCounter(VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED).getValue());
-    assertEquals(1,
-      
context.getCounter(VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED).getValue());
-    assertEquals(2, 
context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue());
-  }
-
-  @Test
-  public void itRecomparesBadRow() throws IOException {
-    Result replicatedResult = genResult(1);
-    when(sourceTable.get(any(Get.class))).thenReturn(genResult(5));
-    when(replicatedTable.get(any(Get.class))).thenReturn(replicatedResult);
-
-    VerifyReplicationRecompareRunnable runnable = new 
VerifyReplicationRecompareRunnable(context,
-      genResult(5), replicatedResult, 
VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS,
-      "", new Scan(), sourceTable, replicatedTable, 1, 1, 0, true);
-
-    runnable.run();
-
-    assertEquals(1, 
context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
-    assertEquals(1,
-      
context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
-    assertEquals(0, 
context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
-    assertEquals(1,
-      
context.getCounter(VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED).getValue());
-    assertEquals(0,
-      
context.getCounter(VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED).getValue());
-    assertEquals(1, 
context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue());
-  }
-
-  @Test
-  public void itHandlesExceptionOnRecompare() throws IOException {
-    when(sourceTable.get(any(Get.class))).thenThrow(new IOException("Error!"));
-    when(replicatedTable.get(any(Get.class))).thenReturn(genResult(5));
-
-    VerifyReplicationRecompareRunnable runnable = new 
VerifyReplicationRecompareRunnable(context,
-      genResult(5), null, 
VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, "",
-      new Scan(), sourceTable, replicatedTable, 1, 1, 0, true);
-
-    runnable.run();
-
-    assertEquals(1, 
context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
-    assertEquals(1,
-      
context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
-    assertEquals(1,
-      
context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue());
-    assertEquals(1, 
context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue());
-  }
-}
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/VerifyReplicationTestBase.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/VerifyReplicationTestBase.java
index e263076677a..26649226d9c 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/VerifyReplicationTestBase.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/VerifyReplicationTestBase.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -100,7 +99,7 @@ public abstract class VerifyReplicationTestBase extends 
TestReplicationBase {
     htable3 = connection2.getTable(peerTableName);
   }
 
-  static Counters runVerifyReplication(String[] args, int expectedGoodRows, 
int expectedBadRows)
+  static void runVerifyReplication(String[] args, int expectedGoodRows, int 
expectedBadRows)
     throws IOException, InterruptedException, ClassNotFoundException {
     Job job = new VerifyReplication().createSubmittableJob(new 
Configuration(CONF1), args);
     if (job == null) {
@@ -113,7 +112,6 @@ public abstract class VerifyReplicationTestBase extends 
TestReplicationBase {
       
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
     assertEquals(expectedBadRows,
       
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
-    return job.getCounters();
   }
 
   /**
@@ -440,127 +438,6 @@ public abstract class VerifyReplicationTestBase extends 
TestReplicationBase {
     checkRestoreTmpDir(CONF2, tmpPath2, 2);
   }
 
-  @Test
-  public void testVerifyReplicationThreadedRecompares() throws Exception {
-    // Populate the tables with same data
-    runBatchCopyTest();
-
-    // ONLY_IN_PEER_TABLE_ROWS
-    Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH)));
-    put.addColumn(noRepfamName, row, row);
-    htable3.put(put);
-
-    // CONTENT_DIFFERENT_ROWS
-    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1)));
-    put.addColumn(noRepfamName, row, Bytes.toBytes("diff value"));
-    htable3.put(put);
-
-    // ONLY_IN_SOURCE_TABLE_ROWS
-    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1)));
-    put.addColumn(noRepfamName, row, row);
-    htable1.put(put);
-
-    String[] args = new String[] { "--recompareThreads=10", 
"--recompareTries=3",
-      "--recompareSleep=1", "--peerTableName=" + 
peerTableName.getNameAsString(),
-      getClusterKey(UTIL2), tableName.getNameAsString() };
-    Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3);
-    assertEquals(
-      
counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(),
 9);
-    
assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(),
-      9);
-    assertEquals(
-      
counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(),
-      1);
-    assertEquals(
-      
counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(),
-      1);
-    
assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS)
-      .getValue(), 1);
-  }
-
-  @Test
-  public void testFailsRemainingComparesAfterShutdown() throws Exception {
-    // Populate the tables with same data
-    runBatchCopyTest();
-
-    // ONLY_IN_PEER_TABLE_ROWS
-    Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH)));
-    put.addColumn(noRepfamName, row, row);
-    htable3.put(put);
-
-    // CONTENT_DIFFERENT_ROWS
-    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1)));
-    put.addColumn(noRepfamName, row, Bytes.toBytes("diff value"));
-    htable3.put(put);
-
-    // ONLY_IN_SOURCE_TABLE_ROWS
-    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1)));
-    put.addColumn(noRepfamName, row, row);
-    htable1.put(put);
-
-    /**
-     * recompareSleep is set to exceed how long we wait on
-     * {@link VerifyReplication#reCompareExecutor} termination when doing 
cleanup. this allows us to
-     * test the counter-incrementing logic if the executor still hasn't 
terminated after the call to
-     * shutdown and awaitTermination
-     */
-    String[] args = new String[] { "--recompareThreads=1", 
"--recompareTries=1",
-      "--recompareSleep=121000", "--peerTableName=" + 
peerTableName.getNameAsString(),
-      getClusterKey(UTIL2), tableName.getNameAsString() };
-
-    Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3);
-    assertEquals(
-      
counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(),
 3);
-    
assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(),
-      3);
-    assertEquals(
-      
counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(),
-      1);
-    assertEquals(
-      
counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(),
-      1);
-    
assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS)
-      .getValue(), 1);
-  }
-
-  @Test
-  public void testVerifyReplicationSynchronousRecompares() throws Exception {
-    // Populate the tables with same data
-    runBatchCopyTest();
-
-    // ONLY_IN_PEER_TABLE_ROWS
-    Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH)));
-    put.addColumn(noRepfamName, row, row);
-    htable3.put(put);
-
-    // CONTENT_DIFFERENT_ROWS
-    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1)));
-    put.addColumn(noRepfamName, row, Bytes.toBytes("diff value"));
-    htable3.put(put);
-
-    // ONLY_IN_SOURCE_TABLE_ROWS
-    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1)));
-    put.addColumn(noRepfamName, row, row);
-    htable1.put(put);
-
-    String[] args = new String[] { "--recompareTries=3", "--recompareSleep=1",
-      "--peerTableName=" + peerTableName.getNameAsString(), 
getClusterKey(UTIL2),
-      tableName.getNameAsString() };
-    Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3);
-    assertEquals(
-      
counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(),
 9);
-    
assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(),
-      9);
-    assertEquals(
-      
counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(),
-      1);
-    assertEquals(
-      
counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(),
-      1);
-    
assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS)
-      .getValue(), 1);
-  }
-
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     htable3.close();


Reply via email to