hgromer commented on code in PR #5051:
URL: https://github.com/apache/hbase/pull/5051#discussion_r1119384944


##########
hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java:
##########
@@ -240,55 +266,138 @@ public void map(ImmutableBytesWritable row, final Result 
value, Context context)
                 "Good row key: " + delimiter + 
Bytes.toStringBinary(value.getRow()) + delimiter);
             }
           } catch (Exception e) {
-            logFailRowAndIncreaseCounter(context, 
Counters.CONTENT_DIFFERENT_ROWS, value);
+            logFailRowAndIncreaseCounter(context, 
Counters.CONTENT_DIFFERENT_ROWS, value, currentCompareRowInPeerTable);
           }
           currentCompareRowInPeerTable = replicatedScanner.next();
           break;
         } else if (rowCmpRet < 0) {
           // row only exists in source table
-          logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
+          logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null);
           break;
         } else {
           // row only exists in peer table
-          logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_PEER_TABLE_ROWS,
+          logFailRowAndIncreaseCounter(context, 
Counters.ONLY_IN_PEER_TABLE_ROWS, null,
             currentCompareRowInPeerTable);
           currentCompareRowInPeerTable = replicatedScanner.next();
         }
       }
     }
 
-    private void logFailRowAndIncreaseCounter(Context context, Counters 
counter, Result row) {
-      if (sleepMsBeforeReCompare > 0) {
-        Threads.sleep(sleepMsBeforeReCompare);
-        try {
-          Result sourceResult = sourceTable.get(new Get(row.getRow()));
-          Result replicatedResult = replicatedTable.get(new Get(row.getRow()));
-          Result.compareResults(sourceResult, replicatedResult, false);
-          if (!sourceResult.isEmpty()) {
-            context.getCounter(Counters.GOODROWS).increment(1);
+    private void logFailRowAndIncreaseCounter(Context context, Counters 
counter, Result row, Result replicatedRow) {
+      if (reCompareTries > 0 && sleepMsBeforeReCompare > 0) {
+        reCompareExecutor.submit(new RecompareRunnable(context, row, 
replicatedRow, counter));
+        return;
+      }
+
+      byte[] rowKey = getRow(row, replicatedRow);
+      context.getCounter(counter).increment(1);
+      context.getCounter(Counters.BADROWS).increment(1);
+      LOG.error(counter.toString() + ", rowkey=" + delimiter + 
Bytes.toStringBinary(rowKey)
+        + delimiter);
+    }
+
+    private class RecompareRunnable implements Runnable {
+      private final Context context;
+      private Result sourceResult;
+      private Result replicatedResult;
+      private final Counters originalCounter;
+      private final byte[] row;
+
+      public RecompareRunnable(Context context, Result sourceResult, Result 
replicatedResult, Counters originalCounter) {
+        this.context = context;
+        this.sourceResult = sourceResult;
+        this.replicatedResult = replicatedResult;
+        this.originalCounter = originalCounter;
+        this.row = getRow(sourceResult, replicatedResult);
+      }
+
+      @Override
+      public void run() {
+        Get get = new Get(row);
+        get.setCacheBlocks(tableScan.getCacheBlocks());
+        get.setFilter(tableScan.getFilter());
+
+        int sleepMs = sleepMsBeforeReCompare;
+        int tries = 0;
+
+        while (++tries <= reCompareTries) {
+          context.getCounter(Counters.RE_COMPARES).increment(1);
+
+          try {
+            Thread.sleep(sleepMs);
+          } catch (InterruptedException e) {
+            LOG.warn("Sleeping interrupted, incrementing bad rows and 
aborting");
+            incrementOriginalAndBadCounter();
+            Thread.currentThread().interrupt();
+          }
+
+          try {
+            fetchLatestRows(get);
+            if (matches(sourceResult, replicatedResult, null)) {
+              if (verbose) {
+                LOG.info("Good row key (with recompare): " + delimiter + 
Bytes.toStringBinary(row)
+                  + delimiter);
+              }
+              context.getCounter(Counters.GOODROWS).increment(1);
+              return;
+            }
+          } catch (IOException e) {
+            context.getCounter(Counters.FAILED_RECOMPARE).increment(1);
             if (verbose) {
-              LOG.info("Good row key (with recompare): " + delimiter
-                + Bytes.toStringBinary(row.getRow()) + delimiter);
+              LOG.info("Got an exception during recompare for rowkey=" + 
Bytes.toStringBinary(row),
+                e);
             }
           }
-          return;
+
+          sleepMs = sleepMs * (2 ^ reCompareBackoffExponent);
+        }
+
+        LOG.error(originalCounter.toString() + ", rowkey=" + delimiter + 
Bytes.toStringBinary(row) + delimiter);
+        incrementOriginalAndBadCounter();
+      }
+
+      private void fetchLatestRows(Get get) throws IOException {
+        int tries = 0;
+        while (++tries <= FETCH_LATEST_ROWS_TRIES) {
+          Result sourceResult = sourceTable.get(get);
+          Result replicatedResult = replicatedTable.get(get);
+
+          if (matches(sourceResult, this.sourceResult, 
Counters.SOURCE_ROW_CHANGED) &&
+            matches(replicatedResult, this.replicatedResult, 
Counters.PEER_ROW_CHANGED)) {
+            return;
+          }
+
+          this.sourceResult = sourceResult;
+          this.replicatedResult = replicatedResult;
+          Threads.sleep(FETCH_LATEST_ROWS_BACKOFF_MILLIS);
+        }
+      }
+
+      private boolean matches(Result original, Result updated, Counters 
failCounter) {
+        try {
+          Result.compareResults(original, updated);
+          return true;
         } catch (Exception e) {
-          LOG.error("recompare fail after sleep, rowkey=" + delimiter
-            + Bytes.toStringBinary(row.getRow()) + delimiter);
+          if (failCounter != null) {
+            context.getCounter(failCounter).increment(1);
+            LOG.debug(failCounter + " for rowkey=" + 
Bytes.toStringBinary(row));

Review Comment:
   I opted for checking `if (verbose)` just for consistency's sake, let me know 
if that's alright.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to