This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 2cc49a1c137 HBASE-28636 Add UTs for testing copy/sync table between 
clusters (#5963) (#5971)
2cc49a1c137 is described below

commit 2cc49a1c137f0682c80fbb8a1730b5d215f77ea0
Author: Duo Zhang <[email protected]>
AuthorDate: Fri Jun 7 15:58:44 2024 +0800

    HBASE-28636 Add UTs for testing copy/sync table between clusters (#5963) 
(#5971)
    
    Signed-off-by: Xin Sun <[email protected]>
    (cherry picked from commit a94b78666f529f6731cc79c09b628fd099d177ad)
---
 .../apache/hadoop/hbase/mapreduce/SyncTable.java   |  21 +-
 .../hadoop/hbase/mapreduce/CopyTableTestBase.java  | 279 +++++++++
 .../hadoop/hbase/mapreduce/TestCopyTable.java      | 288 ++-------
 .../mapreduce/TestCopyTableToPeerCluster.java      | 148 +++++
 .../hadoop/hbase/mapreduce/TestSyncTable.java      | 688 ++++++++++-----------
 .../org/apache/hadoop/hbase/mob/MobTestUtil.java   |  11 +-
 6 files changed, 816 insertions(+), 619 deletions(-)

diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
index 89fca4fad4d..b34e2a1daac 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
@@ -92,9 +92,7 @@ public class SyncTable extends Configured implements Tool {
   private void initCredentialsForHBase(String zookeeper, Job job) throws 
IOException {
     Configuration peerConf =
       HBaseConfiguration.createClusterConf(job.getConfiguration(), zookeeper);
-    if 
("kerberos".equalsIgnoreCase(peerConf.get("hbase.security.authentication"))) {
-      TableMapReduceUtil.initCredentialsForCluster(job, peerConf);
-    }
+    TableMapReduceUtil.initCredentialsForCluster(job, peerConf);
   }
 
   public Job createSubmittableJob(String[] args) throws IOException {
@@ -172,12 +170,6 @@ public class SyncTable extends Configured implements Tool {
       // would be nice to add an option for bulk load instead
     }
 
-    // Obtain an authentication token, for the specified cluster, on behalf of 
the current user
-    if (sourceZkCluster != null) {
-      Configuration peerConf =
-        HBaseConfiguration.createClusterConf(job.getConfiguration(), 
sourceZkCluster);
-      TableMapReduceUtil.initCredentialsForCluster(job, peerConf);
-    }
     return job;
   }
 
@@ -220,7 +212,6 @@ public class SyncTable extends Configured implements Tool {
 
     @Override
     protected void setup(Context context) throws IOException {
-
       Configuration conf = context.getConfiguration();
       sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
       sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, 
null);
@@ -292,9 +283,7 @@ public class SyncTable extends Configured implements Tool {
         }
       } catch (Throwable t) {
         mapperException = t;
-        Throwables.propagateIfInstanceOf(t, IOException.class);
-        Throwables.propagateIfInstanceOf(t, InterruptedException.class);
-        Throwables.propagate(t);
+        throw t;
       }
     }
 
@@ -693,9 +682,9 @@ public class SyncTable extends Configured implements Tool {
 
       // propagate first exception
       if (mapperException != null) {
-        Throwables.propagateIfInstanceOf(mapperException, IOException.class);
-        Throwables.propagateIfInstanceOf(mapperException, 
InterruptedException.class);
-        Throwables.propagate(mapperException);
+        Throwables.throwIfInstanceOf(mapperException, IOException.class);
+        Throwables.throwIfInstanceOf(mapperException, 
InterruptedException.class);
+        Throwables.throwIfUnchecked(mapperException);
       }
     }
 
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/CopyTableTestBase.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/CopyTableTestBase.java
new file mode 100644
index 00000000000..d7648c26406
--- /dev/null
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/CopyTableTestBase.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.mob.MobTestUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+/**
+ * Base class for testing CopyTable MR tool.
+ */
+public abstract class CopyTableTestBase {
+
+  protected static final byte[] ROW1 = Bytes.toBytes("row1");
+  protected static final byte[] ROW2 = Bytes.toBytes("row2");
+  protected static final String FAMILY_A_STRING = "a";
+  protected static final String FAMILY_B_STRING = "b";
+  protected static final byte[] FAMILY_A = Bytes.toBytes(FAMILY_A_STRING);
+  protected static final byte[] FAMILY_B = Bytes.toBytes(FAMILY_B_STRING);
+  protected static final byte[] QUALIFIER = Bytes.toBytes("q");
+
+  @Rule
+  public TestName name = new TestName();
+
+  protected abstract Table createSourceTable(TableDescriptor desc) throws 
Exception;
+
+  protected abstract Table createTargetTable(TableDescriptor desc) throws 
Exception;
+
+  protected abstract void dropSourceTable(TableName tableName) throws 
Exception;
+
+  protected abstract void dropTargetTable(TableName tableName) throws 
Exception;
+
+  protected abstract String[] getPeerClusterOptions() throws Exception;
+
+  protected final void loadData(Table t, byte[] family, byte[] column) throws 
IOException {
+    for (int i = 0; i < 10; i++) {
+      byte[] row = Bytes.toBytes("row" + i);
+      Put p = new Put(row);
+      p.addColumn(family, column, row);
+      t.put(p);
+    }
+  }
+
+  protected final void verifyRows(Table t, byte[] family, byte[] column) 
throws IOException {
+    for (int i = 0; i < 10; i++) {
+      byte[] row = Bytes.toBytes("row" + i);
+      Get g = new Get(row).addFamily(family);
+      Result r = t.get(g);
+      assertNotNull(r);
+      assertEquals(1, r.size());
+      Cell cell = r.rawCells()[0];
+      assertTrue(CellUtil.matchingQualifier(cell, column));
+      assertEquals(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(),
+        cell.getValueLength(), row, 0, row.length), 0);
+    }
+  }
+
+  protected final void doCopyTableTest(Configuration conf, boolean bulkload) 
throws Exception {
+    TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
+    TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
+    byte[] family = Bytes.toBytes("family");
+    byte[] column = Bytes.toBytes("c1");
+    TableDescriptor desc1 = TableDescriptorBuilder.newBuilder(tableName1)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
+    TableDescriptor desc2 = TableDescriptorBuilder.newBuilder(tableName2)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
+
+    try (Table t1 = createSourceTable(desc1); Table t2 = 
createTargetTable(desc2)) {
+      // put rows into the first table
+      loadData(t1, family, column);
+
+      String[] peerClusterOptions = getPeerClusterOptions();
+      if (bulkload) {
+        assertTrue(runCopy(conf,
+          ArrayUtils.addAll(peerClusterOptions, "--new.name=" + 
tableName2.getNameAsString(),
+            "--bulkload", tableName1.getNameAsString())));
+      } else {
+        assertTrue(runCopy(conf, ArrayUtils.addAll(peerClusterOptions,
+          "--new.name=" + tableName2.getNameAsString(), 
tableName1.getNameAsString())));
+      }
+
+      // verify the data was copied into table 2
+      verifyRows(t2, family, column);
+    } finally {
+      dropSourceTable(tableName1);
+      dropTargetTable(tableName2);
+    }
+  }
+
+  protected final void doCopyTableTestWithMob(Configuration conf, boolean 
bulkload)
+    throws Exception {
+    TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
+    TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
+    byte[] family = Bytes.toBytes("mob");
+    byte[] column = Bytes.toBytes("c1");
+
+    ColumnFamilyDescriptorBuilder cfd = 
ColumnFamilyDescriptorBuilder.newBuilder(family);
+
+    cfd.setMobEnabled(true);
+    cfd.setMobThreshold(5);
+    TableDescriptor desc1 =
+      
TableDescriptorBuilder.newBuilder(tableName1).setColumnFamily(cfd.build()).build();
+    TableDescriptor desc2 =
+      
TableDescriptorBuilder.newBuilder(tableName2).setColumnFamily(cfd.build()).build();
+
+    try (Table t1 = createSourceTable(desc1); Table t2 = 
createTargetTable(desc2)) {
+      // put rows into the first table
+      for (int i = 0; i < 10; i++) {
+        Put p = new Put(Bytes.toBytes("row" + i));
+        p.addColumn(family, column, column);
+        t1.put(p);
+      }
+
+      String[] peerClusterOptions = getPeerClusterOptions();
+      if (bulkload) {
+        assertTrue(runCopy(conf,
+          ArrayUtils.addAll(peerClusterOptions, "--new.name=" + 
tableName2.getNameAsString(),
+            "--bulkload", tableName1.getNameAsString())));
+      } else {
+        assertTrue(runCopy(conf, ArrayUtils.addAll(peerClusterOptions,
+          "--new.name=" + tableName2.getNameAsString(), 
tableName1.getNameAsString())));
+      }
+
+      // verify the data was copied into table 2
+      for (int i = 0; i < 10; i++) {
+        Get g = new Get(Bytes.toBytes("row" + i));
+        Result r = t2.get(g);
+        assertEquals(1, r.size());
+        assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], column));
+        assertEquals("compare row values between two tables",
+          t1.getDescriptor().getValue("row" + i), 
t2.getDescriptor().getValue("row" + i));
+      }
+
+      assertEquals("compare count of mob rows after table copy", 
MobTestUtil.countMobRows(t1),
+        MobTestUtil.countMobRows(t2));
+      assertEquals("compare count of mob row values between two tables",
+        t1.getDescriptor().getValues().size(), 
t2.getDescriptor().getValues().size());
+      assertTrue("The mob row count is 0 but should be > 0", 
MobTestUtil.countMobRows(t2) > 0);
+    } finally {
+      dropSourceTable(tableName1);
+      dropTargetTable(tableName2);
+    }
+  }
+
+  protected final boolean runCopy(Configuration conf, String[] args) throws 
Exception {
+    int status = ToolRunner.run(conf, new CopyTable(), args);
+    return status == 0;
+  }
+
+  protected final void testStartStopRow(Configuration conf) throws Exception {
+    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
+    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
+    final byte[] family = Bytes.toBytes("family");
+    final byte[] column = Bytes.toBytes("c1");
+    final byte[] row0 = Bytes.toBytesBinary("\\x01row0");
+    final byte[] row1 = Bytes.toBytesBinary("\\x01row1");
+    final byte[] row2 = Bytes.toBytesBinary("\\x01row2");
+    TableDescriptor desc1 = TableDescriptorBuilder.newBuilder(tableName1)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
+    TableDescriptor desc2 = TableDescriptorBuilder.newBuilder(tableName2)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
+    try (Table t1 = createSourceTable(desc1); Table t2 = 
createTargetTable(desc2)) {
+      // put rows into the first table
+      Put p = new Put(row0);
+      p.addColumn(family, column, column);
+      t1.put(p);
+      p = new Put(row1);
+      p.addColumn(family, column, column);
+      t1.put(p);
+      p = new Put(row2);
+      p.addColumn(family, column, column);
+      t1.put(p);
+
+      String[] peerClusterOptions = getPeerClusterOptions();
+      assertTrue(runCopy(conf, ArrayUtils.addAll(peerClusterOptions, 
"--new.name=" + tableName2,
+        "--startrow=\\x01row1", "--stoprow=\\x01row2", 
tableName1.getNameAsString())));
+
+      // verify the data was copied into table 2
+      // row1 exist, row0, row2 do not exist
+      Get g = new Get(row1);
+      Result r = t2.get(g);
+      assertEquals(1, r.size());
+      assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], column));
+
+      g = new Get(row0);
+      r = t2.get(g);
+      assertEquals(0, r.size());
+
+      g = new Get(row2);
+      r = t2.get(g);
+      assertEquals(0, r.size());
+    } finally {
+      dropSourceTable(tableName1);
+      dropTargetTable(tableName2);
+    }
+  }
+
+  protected final void testRenameFamily(Configuration conf) throws Exception {
+    TableName sourceTable = TableName.valueOf(name.getMethodName() + 
"-source");
+    TableName targetTable = TableName.valueOf(name.getMethodName() + 
"-target");
+
+    TableDescriptor desc1 = TableDescriptorBuilder.newBuilder(sourceTable)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_A))
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_B)).build();
+    TableDescriptor desc2 = TableDescriptorBuilder.newBuilder(targetTable)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_A))
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_B)).build();
+
+    try (Table t = createSourceTable(desc1); Table t2 = 
createTargetTable(desc2)) {
+      Put p = new Put(ROW1);
+      p.addColumn(FAMILY_A, QUALIFIER, Bytes.toBytes("Data11"));
+      p.addColumn(FAMILY_B, QUALIFIER, Bytes.toBytes("Data12"));
+      p.addColumn(FAMILY_A, QUALIFIER, Bytes.toBytes("Data13"));
+      t.put(p);
+      p = new Put(ROW2);
+      p.addColumn(FAMILY_B, QUALIFIER, Bytes.toBytes("Dat21"));
+      p.addColumn(FAMILY_A, QUALIFIER, Bytes.toBytes("Data22"));
+      p.addColumn(FAMILY_B, QUALIFIER, Bytes.toBytes("Data23"));
+      t.put(p);
+
+      long currentTime = EnvironmentEdgeManager.currentTime();
+      String[] args = ArrayUtils.addAll(getPeerClusterOptions(), "--new.name=" 
+ targetTable,
+        "--families=a:b", "--all.cells", "--starttime=" + (currentTime - 
100000),
+        "--endtime=" + (currentTime + 100000), "--versions=1", 
sourceTable.getNameAsString());
+      assertNull(t2.get(new Get(ROW1)).getRow());
+
+      assertTrue(runCopy(conf, args));
+
+      assertNotNull(t2.get(new Get(ROW1)).getRow());
+      Result res = t2.get(new Get(ROW1));
+      byte[] b1 = res.getValue(FAMILY_B, QUALIFIER);
+      assertEquals("Data13", Bytes.toString(b1));
+      assertNotNull(t2.get(new Get(ROW2)).getRow());
+      res = t2.get(new Get(ROW2));
+      b1 = res.getValue(FAMILY_A, QUALIFIER);
+      // Data from the family of B is not copied
+      assertNull(b1);
+    } finally {
+      dropSourceTable(sourceTable);
+      dropTargetTable(targetTable);
+    }
+  }
+}
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java
index 8a811e6d654..da420cfe7a7 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java
@@ -18,37 +18,26 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.mob.MobTestUtil;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MapReduceTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.LauncherSecurityManager;
-import org.apache.hadoop.util.ToolRunner;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -60,20 +49,13 @@ import org.junit.rules.TestName;
  * Basic test for the CopyTable M/R tool
  */
 @Category({ MapReduceTests.class, LargeTests.class })
-public class TestCopyTable {
+public class TestCopyTable extends CopyTableTestBase {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestCopyTable.class);
 
   private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
-  private static final byte[] ROW1 = Bytes.toBytes("row1");
-  private static final byte[] ROW2 = Bytes.toBytes("row2");
-  private static final String FAMILY_A_STRING = "a";
-  private static final String FAMILY_B_STRING = "b";
-  private static final byte[] FAMILY_A = Bytes.toBytes(FAMILY_A_STRING);
-  private static final byte[] FAMILY_B = Bytes.toBytes(FAMILY_B_STRING);
-  private static final byte[] QUALIFIER = Bytes.toBytes("q");
 
   @Rule
   public TestName name = new TestName();
@@ -88,95 +70,29 @@ public class TestCopyTable {
     TEST_UTIL.shutdownMiniCluster();
   }
 
-  private void doCopyTableTest(boolean bulkload) throws Exception {
-    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
-    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
-    final byte[] FAMILY = Bytes.toBytes("family");
-    final byte[] COLUMN1 = Bytes.toBytes("c1");
-
-    try (Table t1 = TEST_UTIL.createTable(tableName1, FAMILY);
-      Table t2 = TEST_UTIL.createTable(tableName2, FAMILY);) {
-      // put rows into the first table
-      loadData(t1, FAMILY, COLUMN1);
-
-      CopyTable copy = new CopyTable();
-      int code;
-      if (bulkload) {
-        code = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()), 
copy,
-          new String[] { "--new.name=" + tableName2.getNameAsString(), 
"--bulkload",
-            tableName1.getNameAsString() });
-      } else {
-        code = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()), 
copy, new String[] {
-          "--new.name=" + tableName2.getNameAsString(), 
tableName1.getNameAsString() });
-      }
-      assertEquals("copy job failed", 0, code);
-
-      // verify the data was copied into table 2
-      verifyRows(t2, FAMILY, COLUMN1);
-    } finally {
-      TEST_UTIL.deleteTable(tableName1);
-      TEST_UTIL.deleteTable(tableName2);
-    }
+  @Override
+  protected Table createSourceTable(TableDescriptor desc) throws Exception {
+    return TEST_UTIL.createTable(desc, null);
   }
 
-  private void doCopyTableTestWithMob(boolean bulkload) throws Exception {
-    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
-    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
-    final byte[] FAMILY = Bytes.toBytes("mob");
-    final byte[] COLUMN1 = Bytes.toBytes("c1");
-
-    ColumnFamilyDescriptorBuilder cfd = 
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY);
-
-    cfd.setMobEnabled(true);
-    cfd.setMobThreshold(5);
-    TableDescriptor desc1 =
-      
TableDescriptorBuilder.newBuilder(tableName1).setColumnFamily(cfd.build()).build();
-    TableDescriptor desc2 =
-      
TableDescriptorBuilder.newBuilder(tableName2).setColumnFamily(cfd.build()).build();
-
-    try (Table t1 = TEST_UTIL.createTable(desc1, null);
-      Table t2 = TEST_UTIL.createTable(desc2, null);) {
-
-      // put rows into the first table
-      for (int i = 0; i < 10; i++) {
-        Put p = new Put(Bytes.toBytes("row" + i));
-        p.addColumn(FAMILY, COLUMN1, COLUMN1);
-        t1.put(p);
-      }
-
-      CopyTable copy = new CopyTable();
+  @Override
+  protected Table createTargetTable(TableDescriptor desc) throws Exception {
+    return TEST_UTIL.createTable(desc, null);
+  }
 
-      int code;
-      if (bulkload) {
-        code = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()), 
copy,
-          new String[] { "--new.name=" + tableName2.getNameAsString(), 
"--bulkload",
-            tableName1.getNameAsString() });
-      } else {
-        code = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()), 
copy, new String[] {
-          "--new.name=" + tableName2.getNameAsString(), 
tableName1.getNameAsString() });
-      }
-      assertEquals("copy job failed", 0, code);
+  @Override
+  protected void dropSourceTable(TableName tableName) throws Exception {
+    TEST_UTIL.deleteTable(tableName);
+  }
 
-      // verify the data was copied into table 2
-      for (int i = 0; i < 10; i++) {
-        Get g = new Get(Bytes.toBytes("row" + i));
-        Result r = t2.get(g);
-        assertEquals(1, r.size());
-        assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN1));
-        assertEquals("compare row values between two tables",
-          t1.getDescriptor().getValue("row" + i), 
t2.getDescriptor().getValue("row" + i));
-      }
+  @Override
+  protected void dropTargetTable(TableName tableName) throws Exception {
+    TEST_UTIL.deleteTable(tableName);
+  }
 
-      assertEquals("compare count of mob rows after table copy",
-        MobTestUtil.countMobRows(TEST_UTIL, t1), 
MobTestUtil.countMobRows(TEST_UTIL, t2));
-      assertEquals("compare count of mob row values between two tables",
-        t1.getDescriptor().getValues().size(), 
t2.getDescriptor().getValues().size());
-      assertTrue("The mob row count is 0 but should be > 0",
-        MobTestUtil.countMobRows(TEST_UTIL, t2) > 0);
-    } finally {
-      TEST_UTIL.deleteTable(tableName1);
-      TEST_UTIL.deleteTable(tableName2);
-    }
+  @Override
+  protected String[] getPeerClusterOptions() throws Exception {
+    return new String[0];
   }
 
   /**
@@ -184,7 +100,7 @@ public class TestCopyTable {
    */
   @Test
   public void testCopyTable() throws Exception {
-    doCopyTableTest(false);
+    doCopyTableTest(TEST_UTIL.getConfiguration(), false);
   }
 
   /**
@@ -192,7 +108,7 @@ public class TestCopyTable {
    */
   @Test
   public void testCopyTableWithBulkload() throws Exception {
-    doCopyTableTest(true);
+    doCopyTableTest(TEST_UTIL.getConfiguration(), true);
   }
 
   /**
@@ -200,7 +116,7 @@ public class TestCopyTable {
    */
   @Test
   public void testCopyTableWithMob() throws Exception {
-    doCopyTableTestWithMob(false);
+    doCopyTableTestWithMob(TEST_UTIL.getConfiguration(), false);
   }
 
   /**
@@ -208,58 +124,12 @@ public class TestCopyTable {
    */
   @Test
   public void testCopyTableWithBulkloadWithMob() throws Exception {
-    doCopyTableTestWithMob(true);
+    doCopyTableTestWithMob(TEST_UTIL.getConfiguration(), true);
   }
 
   @Test
   public void testStartStopRow() throws Exception {
-    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
-    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
-    final byte[] FAMILY = Bytes.toBytes("family");
-    final byte[] COLUMN1 = Bytes.toBytes("c1");
-    final byte[] ROW0 = Bytes.toBytesBinary("\\x01row0");
-    final byte[] ROW1 = Bytes.toBytesBinary("\\x01row1");
-    final byte[] ROW2 = Bytes.toBytesBinary("\\x01row2");
-
-    Table t1 = TEST_UTIL.createTable(tableName1, FAMILY);
-    Table t2 = TEST_UTIL.createTable(tableName2, FAMILY);
-
-    // put rows into the first table
-    Put p = new Put(ROW0);
-    p.addColumn(FAMILY, COLUMN1, COLUMN1);
-    t1.put(p);
-    p = new Put(ROW1);
-    p.addColumn(FAMILY, COLUMN1, COLUMN1);
-    t1.put(p);
-    p = new Put(ROW2);
-    p.addColumn(FAMILY, COLUMN1, COLUMN1);
-    t1.put(p);
-
-    CopyTable copy = new CopyTable();
-    assertEquals(0,
-      ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()), copy,
-        new String[] { "--new.name=" + tableName2, "--startrow=\\x01row1", 
"--stoprow=\\x01row2",
-          tableName1.getNameAsString() }));
-
-    // verify the data was copied into table 2
-    // row1 exist, row0, row2 do not exist
-    Get g = new Get(ROW1);
-    Result r = t2.get(g);
-    assertEquals(1, r.size());
-    assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN1));
-
-    g = new Get(ROW0);
-    r = t2.get(g);
-    assertEquals(0, r.size());
-
-    g = new Get(ROW2);
-    r = t2.get(g);
-    assertEquals(0, r.size());
-
-    t1.close();
-    t2.close();
-    TEST_UTIL.deleteTable(tableName1);
-    TEST_UTIL.deleteTable(tableName2);
+    testStartStopRow(TEST_UTIL.getConfiguration());
   }
 
   /**
@@ -267,42 +137,7 @@ public class TestCopyTable {
    */
   @Test
   public void testRenameFamily() throws Exception {
-    final TableName sourceTable = TableName.valueOf(name.getMethodName() + 
"source");
-    final TableName targetTable = TableName.valueOf(name.getMethodName() + 
"-target");
-
-    byte[][] families = { FAMILY_A, FAMILY_B };
-
-    Table t = TEST_UTIL.createTable(sourceTable, families);
-    Table t2 = TEST_UTIL.createTable(targetTable, families);
-    Put p = new Put(ROW1);
-    p.addColumn(FAMILY_A, QUALIFIER, Bytes.toBytes("Data11"));
-    p.addColumn(FAMILY_B, QUALIFIER, Bytes.toBytes("Data12"));
-    p.addColumn(FAMILY_A, QUALIFIER, Bytes.toBytes("Data13"));
-    t.put(p);
-    p = new Put(ROW2);
-    p.addColumn(FAMILY_B, QUALIFIER, Bytes.toBytes("Dat21"));
-    p.addColumn(FAMILY_A, QUALIFIER, Bytes.toBytes("Data22"));
-    p.addColumn(FAMILY_B, QUALIFIER, Bytes.toBytes("Data23"));
-    t.put(p);
-
-    long currentTime = EnvironmentEdgeManager.currentTime();
-    String[] args = new String[] { "--new.name=" + targetTable, 
"--families=a:b", "--all.cells",
-      "--starttime=" + (currentTime - 100000), "--endtime=" + (currentTime + 
100000),
-      "--versions=1", sourceTable.getNameAsString() };
-    assertNull(t2.get(new Get(ROW1)).getRow());
-
-    assertTrue(runCopy(args));
-
-    assertNotNull(t2.get(new Get(ROW1)).getRow());
-    Result res = t2.get(new Get(ROW1));
-    byte[] b1 = res.getValue(FAMILY_B, QUALIFIER);
-    assertEquals("Data13", new String(b1));
-    assertNotNull(t2.get(new Get(ROW2)).getRow());
-    res = t2.get(new Get(ROW2));
-    b1 = res.getValue(FAMILY_A, QUALIFIER);
-    // Data from the family of B is not copied
-    assertNull(b1);
-
+    testRenameFamily(TEST_UTIL.getConfiguration());
   }
 
   /**
@@ -332,35 +167,6 @@ public class TestCopyTable {
     assertTrue(data.toString().contains("Usage:"));
   }
 
-  private boolean runCopy(String[] args) throws Exception {
-    int status =
-      ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()), new 
CopyTable(), args);
-    return status == 0;
-  }
-
-  private void loadData(Table t, byte[] family, byte[] column) throws 
IOException {
-    for (int i = 0; i < 10; i++) {
-      byte[] row = Bytes.toBytes("row" + i);
-      Put p = new Put(row);
-      p.addColumn(family, column, row);
-      t.put(p);
-    }
-  }
-
-  private void verifyRows(Table t, byte[] family, byte[] column) throws 
IOException {
-    for (int i = 0; i < 10; i++) {
-      byte[] row = Bytes.toBytes("row" + i);
-      Get g = new Get(row).addFamily(family);
-      Result r = t.get(g);
-      Assert.assertNotNull(r);
-      Assert.assertEquals(1, r.size());
-      Cell cell = r.rawCells()[0];
-      Assert.assertTrue(CellUtil.matchingQualifier(cell, column));
-      Assert.assertEquals(Bytes.compareTo(cell.getValueArray(), 
cell.getValueOffset(),
-        cell.getValueLength(), row, 0, row.length), 0);
-    }
-  }
-
   private Table createTable(TableName tableName, byte[] family, boolean isMob) 
throws IOException {
     if (isMob) {
       ColumnFamilyDescriptor cfd = 
ColumnFamilyDescriptorBuilder.newBuilder(family)
@@ -377,20 +183,26 @@ public class TestCopyTable {
     throws Exception {
     TableName table1 = TableName.valueOf(tablePrefix + 1);
     TableName table2 = TableName.valueOf(tablePrefix + 2);
-    Table t1 = createTable(table1, FAMILY_A, isMob);
-    Table t2 = createTable(table2, FAMILY_A, isMob);
-    loadData(t1, FAMILY_A, Bytes.toBytes("qualifier"));
     String snapshot = tablePrefix + "_snapshot";
-    TEST_UTIL.getAdmin().snapshot(snapshot, table1);
-    boolean success;
-    if (bulkLoad) {
-      success =
-        runCopy(new String[] { "--snapshot", "--new.name=" + table2, 
"--bulkload", snapshot });
-    } else {
-      success = runCopy(new String[] { "--snapshot", "--new.name=" + table2, 
snapshot });
+    try (Table t1 = createTable(table1, FAMILY_A, isMob);
+      Table t2 = createTable(table2, FAMILY_A, isMob)) {
+      loadData(t1, FAMILY_A, Bytes.toBytes("qualifier"));
+      TEST_UTIL.getAdmin().snapshot(snapshot, table1);
+      boolean success;
+      if (bulkLoad) {
+        success = runCopy(TEST_UTIL.getConfiguration(),
+          new String[] { "--snapshot", "--new.name=" + table2, "--bulkload", 
snapshot });
+      } else {
+        success = runCopy(TEST_UTIL.getConfiguration(),
+          new String[] { "--snapshot", "--new.name=" + table2, snapshot });
+      }
+      assertTrue(success);
+      verifyRows(t2, FAMILY_A, Bytes.toBytes("qualifier"));
+    } finally {
+      TEST_UTIL.getAdmin().deleteSnapshot(snapshot);
+      TEST_UTIL.deleteTable(table1);
+      TEST_UTIL.deleteTable(table2);
     }
-    Assert.assertTrue(success);
-    verifyRows(t2, FAMILY_A, Bytes.toBytes("qualifier"));
   }
 
   @Test
@@ -413,19 +225,15 @@ public class TestCopyTable {
     testCopyTableBySnapshot("testLoadingSnapshotAndBulkLoadToMobTable", true, 
true);
   }
 
-  @Test
-  public void testLoadingSnapshotToRemoteCluster() throws Exception {
-    Assert.assertFalse(runCopy(
-      new String[] { "--snapshot", "--peerAdr=hbase://remoteHBase", 
"sourceSnapshotName" }));
-  }
-
   @Test
   public void testLoadingSnapshotWithoutSnapshotName() throws Exception {
-    Assert.assertFalse(runCopy(new String[] { "--snapshot", 
"--peerAdr=hbase://remoteHBase" }));
+    assertFalse(runCopy(TEST_UTIL.getConfiguration(), new String[] { 
"--snapshot" }));
   }
 
   @Test
   public void testLoadingSnapshotWithoutDestTable() throws Exception {
-    Assert.assertFalse(runCopy(new String[] { "--snapshot", 
"sourceSnapshotName" }));
+    assertFalse(
+      runCopy(TEST_UTIL.getConfiguration(), new String[] { "--snapshot", 
"sourceSnapshotName" }));
   }
+
 }
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTableToPeerCluster.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTableToPeerCluster.java
new file mode 100644
index 00000000000..8e8ef0913d2
--- /dev/null
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTableToPeerCluster.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertFalse;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test CopyTable between clusters
+ */
+@Category({ MapReduceTests.class, LargeTests.class })
+public class TestCopyTableToPeerCluster extends CopyTableTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestCopyTableToPeerCluster.class);
+
+  private static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
+
+  private static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    UTIL1.startMiniCluster(3);
+    UTIL2.startMiniCluster(3);
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    UTIL1.shutdownMiniCluster();
+    UTIL2.shutdownMiniCluster();
+  }
+
+  @Override
+  protected Table createSourceTable(TableDescriptor desc) throws Exception {
+    return UTIL1.createTable(desc, null);
+  }
+
+  @Override
+  protected Table createTargetTable(TableDescriptor desc) throws Exception {
+    return UTIL2.createTable(desc, null);
+  }
+
+  @Override
+  protected void dropSourceTable(TableName tableName) throws Exception {
+    UTIL1.deleteTable(tableName);
+  }
+
+  @Override
+  protected void dropTargetTable(TableName tableName) throws Exception {
+    UTIL2.deleteTable(tableName);
+  }
+
+  @Override
+  protected String[] getPeerClusterOptions() throws Exception {
+    return new String[] { "--peer.adr=" + UTIL2.getClusterKey() };
+  }
+
+  /**
+   * Simple end-to-end test
+   */
+  @Test
+  public void testCopyTable() throws Exception {
+    doCopyTableTest(UTIL1.getConfiguration(), false);
+  }
+
+  /**
+   * Simple end-to-end test on table with MOB
+   */
+  @Test
+  public void testCopyTableWithMob() throws Exception {
+    doCopyTableTestWithMob(UTIL1.getConfiguration(), false);
+  }
+
+  @Test
+  public void testStartStopRow() throws Exception {
+    testStartStopRow(UTIL1.getConfiguration());
+  }
+
+  /**
+   * Test copy of table from sourceTable to targetTable all rows from family a
+   */
+  @Test
+  public void testRenameFamily() throws Exception {
+    testRenameFamily(UTIL1.getConfiguration());
+  }
+
+  @Test
+  public void testBulkLoadNotSupported() throws Exception {
+    TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
+    TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
+    try (Table t1 = UTIL1.createTable(tableName1, FAMILY_A);
+      Table t2 = UTIL2.createTable(tableName2, FAMILY_A)) {
+      assertFalse(runCopy(UTIL1.getConfiguration(),
+        new String[] { "--new.name=" + tableName2.getNameAsString(), 
"--bulkload",
+          "--peer.adr=" + UTIL2.getClusterKey(), tableName1.getNameAsString() 
}));
+    } finally {
+      UTIL1.deleteTable(tableName1);
+      UTIL2.deleteTable(tableName2);
+    }
+  }
+
+  @Test
+  public void testSnapshotNotSupported() throws Exception {
+    TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
+    TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
+    String snapshot = tableName1.getNameAsString() + "_snapshot";
+    try (Table t1 = UTIL1.createTable(tableName1, FAMILY_A);
+      Table t2 = UTIL2.createTable(tableName2, FAMILY_A)) {
+      UTIL1.getAdmin().snapshot(snapshot, tableName1);
+      assertFalse(runCopy(UTIL1.getConfiguration(),
+        new String[] { "--new.name=" + tableName2.getNameAsString(), 
"--snapshot",
+          "--peer.adr=" + UTIL2.getClusterKey(), snapshot }));
+    } finally {
+      UTIL1.getAdmin().deleteSnapshot(snapshot);
+      UTIL1.deleteTable(tableName1);
+      UTIL2.deleteTable(tableName2);
+    }
+
+  }
+}
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
index 4070da2e0e7..732d9d8ff88 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
@@ -18,9 +18,13 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.util.Arrays;
+import java.util.function.BooleanSupplier;
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -35,11 +39,11 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.mapreduce.SyncTable.SyncMapper.Counter;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.mapreduce.Counters;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -49,12 +53,10 @@ import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
-
 /**
  * Basic test for the SyncTable M/R tool
  */
-@Category(LargeTests.class)
+@Category({ MapReduceTests.class, LargeTests.class })
 public class TestSyncTable {
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
@@ -62,20 +64,23 @@ public class TestSyncTable {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestSyncTable.class);
 
-  private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+  private static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
+
+  private static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
 
   @Rule
   public TestName name = new TestName();
 
   @BeforeClass
   public static void beforeClass() throws Exception {
-    TEST_UTIL.startMiniCluster(3);
+    UTIL1.startMiniCluster(3);
+    UTIL2.startMiniCluster(3);
   }
 
   @AfterClass
   public static void afterClass() throws Exception {
-    TEST_UTIL.cleanupDataTestDirOnTestFS();
-    TEST_UTIL.shutdownMiniCluster();
+    UTIL2.shutdownMiniCluster();
+    UTIL1.shutdownMiniCluster();
   }
 
   private static byte[][] generateSplits(int numRows, int numRegions) {
@@ -86,16 +91,17 @@ public class TestSyncTable {
     return splitRows;
   }
 
-  @Test
-  public void testSyncTable() throws Exception {
+  private void testSyncTable(HBaseTestingUtility source, HBaseTestingUtility 
target,
+    String... options) throws Exception {
     final TableName sourceTableName = TableName.valueOf(name.getMethodName() + 
"_source");
     final TableName targetTableName = TableName.valueOf(name.getMethodName() + 
"_target");
-    Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTable");
+    Path testDir = source.getDataTestDirOnTestFS(name.getMethodName());
 
-    writeTestData(sourceTableName, targetTableName);
-    hashSourceTable(sourceTableName, testDir);
-    Counters syncCounters = syncTables(sourceTableName, targetTableName, 
testDir);
-    assertEqualTables(90, sourceTableName, targetTableName, false);
+    writeTestData(source, sourceTableName, target, targetTableName);
+    hashSourceTable(source, sourceTableName, testDir);
+    Counters syncCounters =
+      syncTables(target.getConfiguration(), sourceTableName, targetTableName, 
testDir, options);
+    assertEqualTables(90, source, sourceTableName, target, targetTableName, 
false);
 
     assertEquals(60, 
syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
     assertEquals(10, 
syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
@@ -104,21 +110,37 @@ public class TestSyncTable {
     assertEquals(50, 
syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
     assertEquals(20, 
syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
 
-    TEST_UTIL.deleteTable(sourceTableName);
-    TEST_UTIL.deleteTable(targetTableName);
+    source.deleteTable(sourceTableName);
+    target.deleteTable(targetTableName);
+  }
+
+  @Test
+  public void testSyncTable() throws Exception {
+    testSyncTable(UTIL1, UTIL1);
+  }
+
+  @Test
+  public void testSyncTableToPeerCluster() throws Exception {
+    testSyncTable(UTIL1, UTIL2, "--sourcezkcluster=" + UTIL1.getClusterKey());
+  }
+
+  @Test
+  public void testSyncTableFromSourceToPeerCluster() throws Exception {
+    testSyncTable(UTIL2, UTIL1, "--sourcezkcluster=" + UTIL2.getClusterKey(),
+      "--targetzkcluster=" + UTIL1.getClusterKey());
   }
 
   @Test
   public void testSyncTableDoDeletesFalse() throws Exception {
     final TableName sourceTableName = TableName.valueOf(name.getMethodName() + 
"_source");
     final TableName targetTableName = TableName.valueOf(name.getMethodName() + 
"_target");
-    Path testDir = 
TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoDeletesFalse");
+    Path testDir = UTIL1.getDataTestDirOnTestFS(name.getMethodName());
 
-    writeTestData(sourceTableName, targetTableName);
-    hashSourceTable(sourceTableName, testDir);
-    Counters syncCounters =
-      syncTables(sourceTableName, targetTableName, testDir, 
"--doDeletes=false");
-    assertTargetDoDeletesFalse(100, sourceTableName, targetTableName);
+    writeTestData(UTIL1, sourceTableName, UTIL1, targetTableName);
+    hashSourceTable(UTIL1, sourceTableName, testDir);
+    Counters syncCounters = syncTables(UTIL1.getConfiguration(), 
sourceTableName, targetTableName,
+      testDir, "--doDeletes=false");
+    assertTargetDoDeletesFalse(100, UTIL1, sourceTableName, UTIL1, 
targetTableName);
 
     assertEquals(60, 
syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
     assertEquals(10, 
syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
@@ -127,20 +149,21 @@ public class TestSyncTable {
     assertEquals(50, 
syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
     assertEquals(20, 
syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
 
-    TEST_UTIL.deleteTable(sourceTableName);
-    TEST_UTIL.deleteTable(targetTableName);
+    UTIL1.deleteTable(sourceTableName);
+    UTIL1.deleteTable(targetTableName);
   }
 
   @Test
   public void testSyncTableDoPutsFalse() throws Exception {
     final TableName sourceTableName = TableName.valueOf(name.getMethodName() + 
"_source");
     final TableName targetTableName = TableName.valueOf(name.getMethodName() + 
"_target");
-    Path testDir = 
TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoPutsFalse");
+    Path testDir = UTIL2.getDataTestDirOnTestFS(name.getMethodName());
 
-    writeTestData(sourceTableName, targetTableName);
-    hashSourceTable(sourceTableName, testDir);
-    Counters syncCounters = syncTables(sourceTableName, targetTableName, 
testDir, "--doPuts=false");
-    assertTargetDoPutsFalse(70, sourceTableName, targetTableName);
+    writeTestData(UTIL2, sourceTableName, UTIL2, targetTableName);
+    hashSourceTable(UTIL2, sourceTableName, testDir);
+    Counters syncCounters = syncTables(UTIL2.getConfiguration(), 
sourceTableName, targetTableName,
+      testDir, "--doPuts=false");
+    assertTargetDoPutsFalse(70, UTIL2, sourceTableName, UTIL2, 
targetTableName);
 
     assertEquals(60, 
syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
     assertEquals(10, 
syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
@@ -149,21 +172,21 @@ public class TestSyncTable {
     assertEquals(50, 
syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
     assertEquals(20, 
syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
 
-    TEST_UTIL.deleteTable(sourceTableName);
-    TEST_UTIL.deleteTable(targetTableName);
+    UTIL2.deleteTable(sourceTableName);
+    UTIL2.deleteTable(targetTableName);
   }
 
   @Test
   public void testSyncTableIgnoreTimestampsTrue() throws Exception {
     final TableName sourceTableName = TableName.valueOf(name.getMethodName() + 
"_source");
     final TableName targetTableName = TableName.valueOf(name.getMethodName() + 
"_target");
-    Path testDir = 
TEST_UTIL.getDataTestDirOnTestFS("testSyncTableIgnoreTimestampsTrue");
+    Path testDir = UTIL1.getDataTestDirOnTestFS(name.getMethodName());
     long current = EnvironmentEdgeManager.currentTime();
-    writeTestData(sourceTableName, targetTableName, current - 1000, current);
-    hashSourceTable(sourceTableName, testDir, "--ignoreTimestamps=true");
-    Counters syncCounters =
-      syncTables(sourceTableName, targetTableName, testDir, 
"--ignoreTimestamps=true");
-    assertEqualTables(90, sourceTableName, targetTableName, true);
+    writeTestData(UTIL1, sourceTableName, UTIL2, targetTableName, current - 
1000, current);
+    hashSourceTable(UTIL1, sourceTableName, testDir, 
"--ignoreTimestamps=true");
+    Counters syncCounters = syncTables(UTIL2.getConfiguration(), 
sourceTableName, targetTableName,
+      testDir, "--ignoreTimestamps=true", "--sourcezkcluster=" + 
UTIL1.getClusterKey());
+    assertEqualTables(90, UTIL1, sourceTableName, UTIL2, targetTableName, 
true);
 
     assertEquals(50, 
syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
     assertEquals(10, 
syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
@@ -172,256 +195,202 @@ public class TestSyncTable {
     assertEquals(30, 
syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
     assertEquals(20, 
syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
 
-    TEST_UTIL.deleteTable(sourceTableName);
-    TEST_UTIL.deleteTable(targetTableName);
+    UTIL1.deleteTable(sourceTableName);
+    UTIL2.deleteTable(targetTableName);
   }
 
-  private void assertEqualTables(int expectedRows, TableName sourceTableName,
-    TableName targetTableName, boolean ignoreTimestamps) throws Exception {
-    Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
-    Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
-
-    ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
-    ResultScanner targetScanner = targetTable.getScanner(new Scan());
-
-    for (int i = 0; i < expectedRows; i++) {
-      Result sourceRow = sourceScanner.next();
-      Result targetRow = targetScanner.next();
-
-      LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : 
Bytes.toInt(sourceRow.getRow()))
-        + " cells:" + sourceRow);
-      LOG.debug("TARGET row: " + (targetRow == null ? "null" : 
Bytes.toInt(targetRow.getRow()))
-        + " cells:" + targetRow);
-
-      if (sourceRow == null) {
-        Assert.fail("Expected " + expectedRows + " source rows but only found 
" + i);
-      }
-      if (targetRow == null) {
-        Assert.fail("Expected " + expectedRows + " target rows but only found 
" + i);
-      }
-      Cell[] sourceCells = sourceRow.rawCells();
-      Cell[] targetCells = targetRow.rawCells();
-      if (sourceCells.length != targetCells.length) {
-        LOG.debug("Source cells: " + Arrays.toString(sourceCells));
-        LOG.debug("Target cells: " + Arrays.toString(targetCells));
-        Assert.fail("Row " + Bytes.toInt(sourceRow.getRow()) + " has " + 
sourceCells.length
-          + " cells in source table but " + targetCells.length + " cells in 
target table");
-      }
-      for (int j = 0; j < sourceCells.length; j++) {
-        Cell sourceCell = sourceCells[j];
-        Cell targetCell = targetCells[j];
-        try {
-          if (!CellUtil.matchingRows(sourceCell, targetCell)) {
-            Assert.fail("Rows don't match");
-          }
-          if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
-            Assert.fail("Families don't match");
-          }
-          if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
-            Assert.fail("Qualifiers don't match");
-          }
-          if (!ignoreTimestamps && !CellUtil.matchingTimestamp(sourceCell, 
targetCell)) {
-            Assert.fail("Timestamps don't match");
-          }
-          if (!CellUtil.matchingValue(sourceCell, targetCell)) {
-            Assert.fail("Values don't match");
-          }
-        } catch (Throwable t) {
-          LOG.debug("Source cell: " + sourceCell + " target cell: " + 
targetCell);
-          Throwables.propagate(t);
-        }
-      }
+  private void assertCellEquals(Cell sourceCell, Cell targetCell, 
BooleanSupplier checkTimestamp) {
+    assertTrue("Rows don't match, source: " + sourceCell + ", target: " + 
targetCell,
+      CellUtil.matchingRows(sourceCell, targetCell));
+    assertTrue("Families don't match, source: " + sourceCell + ", target: " + 
targetCell,
+      CellUtil.matchingFamily(sourceCell, targetCell));
+    assertTrue("Qualifiers don't match, source: " + sourceCell + ", target: " 
+ targetCell,
+      CellUtil.matchingQualifier(sourceCell, targetCell));
+    if (checkTimestamp.getAsBoolean()) {
+      assertTrue("Timestamps don't match, source: " + sourceCell + ", target: 
" + targetCell,
+        CellUtil.matchingTimestamp(sourceCell, targetCell));
     }
-    Result sourceRow = sourceScanner.next();
-    if (sourceRow != null) {
-      Assert.fail("Source table has more than " + expectedRows + " rows.  Next 
row: "
-        + Bytes.toInt(sourceRow.getRow()));
-    }
-    Result targetRow = targetScanner.next();
-    if (targetRow != null) {
-      Assert.fail("Target table has more than " + expectedRows + " rows.  Next 
row: "
-        + Bytes.toInt(targetRow.getRow()));
-    }
-    sourceScanner.close();
-    targetScanner.close();
-    sourceTable.close();
-    targetTable.close();
+    assertTrue("Values don't match, source: " + sourceCell + ", target: " + 
targetCell,
+      CellUtil.matchingValue(sourceCell, targetCell));
   }
 
-  private void assertTargetDoDeletesFalse(int expectedRows, TableName 
sourceTableName,
-    TableName targetTableName) throws Exception {
-    Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
-    Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
-
-    ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
-    ResultScanner targetScanner = targetTable.getScanner(new Scan());
-    Result targetRow = targetScanner.next();
-    Result sourceRow = sourceScanner.next();
-    int rowsCount = 0;
-    while (targetRow != null) {
-      rowsCount++;
-      // only compares values for existing rows, skipping rows existing on
-      // target only that were not deleted given --doDeletes=false
-      if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) {
-        targetRow = targetScanner.next();
-        continue;
-      }
-
-      LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : 
Bytes.toInt(sourceRow.getRow()))
-        + " cells:" + sourceRow);
-      LOG.debug("TARGET row: " + (targetRow == null ? "null" : 
Bytes.toInt(targetRow.getRow()))
-        + " cells:" + targetRow);
-
-      Cell[] sourceCells = sourceRow.rawCells();
-      Cell[] targetCells = targetRow.rawCells();
-      int targetRowKey = Bytes.toInt(targetRow.getRow());
-      if (targetRowKey >= 70 && targetRowKey < 80) {
-        if (sourceCells.length == targetCells.length) {
-          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
-          LOG.debug("Target cells: " + Arrays.toString(targetCells));
-          Assert
-            .fail("Row " + targetRowKey + " should have more cells in " + 
"target than in source");
+  private void assertEqualTables(int expectedRows, HBaseTestingUtility 
sourceCluster,
+    TableName sourceTableName, HBaseTestingUtility targetCluster, TableName 
targetTableName,
+    boolean ignoreTimestamps) throws Exception {
+    try (Table sourceTable = 
sourceCluster.getConnection().getTable(sourceTableName);
+      Table targetTable = 
targetCluster.getConnection().getTable(targetTableName);
+      ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
+      ResultScanner targetScanner = targetTable.getScanner(new Scan())) {
+      for (int i = 0; i < expectedRows; i++) {
+        Result sourceRow = sourceScanner.next();
+        Result targetRow = targetScanner.next();
+
+        LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : 
Bytes.toInt(sourceRow.getRow()))
+          + " cells:" + sourceRow);
+        LOG.debug("TARGET row: " + (targetRow == null ? "null" : 
Bytes.toInt(targetRow.getRow()))
+          + " cells:" + targetRow);
+
+        if (sourceRow == null) {
+          fail("Expected " + expectedRows + " source rows but only found " + 
i);
         }
-
-      } else {
+        if (targetRow == null) {
+          fail("Expected " + expectedRows + " target rows but only found " + 
i);
+        }
+        Cell[] sourceCells = sourceRow.rawCells();
+        Cell[] targetCells = targetRow.rawCells();
         if (sourceCells.length != targetCells.length) {
           LOG.debug("Source cells: " + Arrays.toString(sourceCells));
           LOG.debug("Target cells: " + Arrays.toString(targetCells));
-          Assert.fail("Row " + Bytes.toInt(sourceRow.getRow()) + " has " + 
sourceCells.length
+          fail("Row " + Bytes.toInt(sourceRow.getRow()) + " has " + 
sourceCells.length
             + " cells in source table but " + targetCells.length + " cells in 
target table");
         }
-      }
-      for (int j = 0; j < sourceCells.length; j++) {
-        Cell sourceCell = sourceCells[j];
-        Cell targetCell = targetCells[j];
-        try {
-          if (!CellUtil.matchingRow(sourceCell, targetCell)) {
-            Assert.fail("Rows don't match");
-          }
-          if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
-            Assert.fail("Families don't match");
-          }
-          if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
-            Assert.fail("Qualifiers don't match");
-          }
-          if (targetRowKey < 80 && targetRowKey >= 90) {
-            if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
-              Assert.fail("Timestamps don't match");
-            }
-          }
-          if (!CellUtil.matchingValue(sourceCell, targetCell)) {
-            Assert.fail("Values don't match");
-          }
-        } catch (Throwable t) {
-          LOG.debug("Source cell: " + sourceCell + " target cell: " + 
targetCell);
-          Throwables.propagate(t);
+        for (int j = 0; j < sourceCells.length; j++) {
+          Cell sourceCell = sourceCells[j];
+          Cell targetCell = targetCells[j];
+          assertCellEquals(sourceCell, targetCell, () -> !ignoreTimestamps);
         }
       }
-      targetRow = targetScanner.next();
-      sourceRow = sourceScanner.next();
+      Result sourceRow = sourceScanner.next();
+      if (sourceRow != null) {
+        fail("Source table has more than " + expectedRows + " rows.  Next row: 
"
+          + Bytes.toInt(sourceRow.getRow()));
+      }
+      Result targetRow = targetScanner.next();
+      if (targetRow != null) {
+        fail("Target table has more than " + expectedRows + " rows.  Next row: 
"
+          + Bytes.toInt(targetRow.getRow()));
+      }
     }
-    assertEquals("Target expected rows does not match.", expectedRows, 
rowsCount);
-    sourceScanner.close();
-    targetScanner.close();
-    sourceTable.close();
-    targetTable.close();
   }
 
-  private void assertTargetDoPutsFalse(int expectedRows, TableName 
sourceTableName,
-    TableName targetTableName) throws Exception {
-    Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
-    Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
-
-    ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
-    ResultScanner targetScanner = targetTable.getScanner(new Scan());
-    Result targetRow = targetScanner.next();
-    Result sourceRow = sourceScanner.next();
-    int rowsCount = 0;
-
-    while (targetRow != null) {
-      // only compares values for existing rows, skipping rows existing on
-      // source only that were not added to target given --doPuts=false
-      if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) {
-        sourceRow = sourceScanner.next();
-        continue;
-      }
+  private void assertTargetDoDeletesFalse(int expectedRows, 
HBaseTestingUtility sourceCluster,
+    TableName sourceTableName, HBaseTestingUtility targetCluster, TableName 
targetTableName)
+    throws Exception {
+    try (Table sourceTable = 
sourceCluster.getConnection().getTable(sourceTableName);
+      Table targetTable = 
targetCluster.getConnection().getTable(targetTableName);
 
-      LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : 
Bytes.toInt(sourceRow.getRow()))
-        + " cells:" + sourceRow);
-      LOG.debug("TARGET row: " + (targetRow == null ? "null" : 
Bytes.toInt(targetRow.getRow()))
-        + " cells:" + targetRow);
-
-      LOG.debug("rowsCount: " + rowsCount);
-
-      Cell[] sourceCells = sourceRow.rawCells();
-      Cell[] targetCells = targetRow.rawCells();
-      int targetRowKey = Bytes.toInt(targetRow.getRow());
-      if (targetRowKey >= 40 && targetRowKey < 60) {
-        LOG.debug("Source cells: " + Arrays.toString(sourceCells));
-        LOG.debug("Target cells: " + Arrays.toString(targetCells));
-        Assert.fail("There shouldn't exist any rows between 40 and 60, since "
-          + "Puts are disabled and Deletes are enabled.");
-      } else if (targetRowKey >= 60 && targetRowKey < 70) {
-        if (sourceCells.length == targetCells.length) {
-          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
-          LOG.debug("Target cells: " + Arrays.toString(targetCells));
-          Assert.fail(
-            "Row " + Bytes.toInt(sourceRow.getRow()) + " shouldn't have same 
number of cells.");
+      ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
+      ResultScanner targetScanner = targetTable.getScanner(new Scan())) {
+      Result targetRow = targetScanner.next();
+      Result sourceRow = sourceScanner.next();
+      int rowsCount = 0;
+      while (targetRow != null) {
+        rowsCount++;
+        // only compares values for existing rows, skipping rows existing on
+        // target only that were not deleted given --doDeletes=false
+        if (Bytes.toInt(sourceRow.getRow()) != 
Bytes.toInt(targetRow.getRow())) {
+          targetRow = targetScanner.next();
+          continue;
         }
-      } else if (targetRowKey >= 80 && targetRowKey < 90) {
-        LOG.debug("Source cells: " + Arrays.toString(sourceCells));
-        LOG.debug("Target cells: " + Arrays.toString(targetCells));
-        Assert.fail("There should be no rows between 80 and 90 on target, as "
-          + "these had different timestamps and should had been deleted.");
-      } else if (targetRowKey >= 90 && targetRowKey < 100) {
-        for (int j = 0; j < sourceCells.length; j++) {
-          Cell sourceCell = sourceCells[j];
-          Cell targetCell = targetCells[j];
-          if (CellUtil.matchingValue(sourceCell, targetCell)) {
-            Assert.fail("Cells values should not match for rows between "
-              + "90 and 100. Target row id: " + 
(Bytes.toInt(targetRow.getRow())));
+
+        LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : 
Bytes.toInt(sourceRow.getRow()))
+          + " cells:" + sourceRow);
+        LOG.debug("TARGET row: " + (targetRow == null ? "null" : 
Bytes.toInt(targetRow.getRow()))
+          + " cells:" + targetRow);
+
+        Cell[] sourceCells = sourceRow.rawCells();
+        Cell[] targetCells = targetRow.rawCells();
+        int targetRowKey = Bytes.toInt(targetRow.getRow());
+        if (targetRowKey >= 70 && targetRowKey < 80) {
+          if (sourceCells.length == targetCells.length) {
+            LOG.debug("Source cells: " + Arrays.toString(sourceCells));
+            LOG.debug("Target cells: " + Arrays.toString(targetCells));
+            fail("Row " + targetRowKey + " should have more cells in " + 
"target than in source");
+          }
+
+        } else {
+          if (sourceCells.length != targetCells.length) {
+            LOG.debug("Source cells: " + Arrays.toString(sourceCells));
+            LOG.debug("Target cells: " + Arrays.toString(targetCells));
+            fail("Row " + Bytes.toInt(sourceRow.getRow()) + " has " + 
sourceCells.length
+              + " cells in source table but " + targetCells.length + " cells 
in target table");
           }
         }
-      } else {
         for (int j = 0; j < sourceCells.length; j++) {
           Cell sourceCell = sourceCells[j];
           Cell targetCell = targetCells[j];
-          try {
-            if (!CellUtil.matchingRow(sourceCell, targetCell)) {
-              Assert.fail("Rows don't match");
-            }
-            if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
-              Assert.fail("Families don't match");
-            }
-            if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
-              Assert.fail("Qualifiers don't match");
-            }
-            if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
-              Assert.fail("Timestamps don't match");
-            }
-            if (!CellUtil.matchingValue(sourceCell, targetCell)) {
-              Assert.fail("Values don't match");
+          assertCellEquals(sourceCell, targetCell, () -> targetRowKey < 80 && 
targetRowKey >= 90);
+        }
+        targetRow = targetScanner.next();
+        sourceRow = sourceScanner.next();
+      }
+      assertEquals("Target expected rows does not match.", expectedRows, 
rowsCount);
+    }
+  }
+
+  private void assertTargetDoPutsFalse(int expectedRows, HBaseTestingUtility 
sourceCluster,
+    TableName sourceTableName, HBaseTestingUtility targetCluster, TableName 
targetTableName)
+    throws Exception {
+    try (Table sourceTable = 
sourceCluster.getConnection().getTable(sourceTableName);
+      Table targetTable = 
targetCluster.getConnection().getTable(targetTableName);
+      ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
+      ResultScanner targetScanner = targetTable.getScanner(new Scan())) {
+      Result targetRow = targetScanner.next();
+      Result sourceRow = sourceScanner.next();
+      int rowsCount = 0;
+
+      while (targetRow != null) {
+        // only compares values for existing rows, skipping rows existing on
+        // source only that were not added to target given --doPuts=false
+        if (Bytes.toInt(sourceRow.getRow()) != 
Bytes.toInt(targetRow.getRow())) {
+          sourceRow = sourceScanner.next();
+          continue;
+        }
+
+        LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : 
Bytes.toInt(sourceRow.getRow()))
+          + " cells:" + sourceRow);
+        LOG.debug("TARGET row: " + (targetRow == null ? "null" : 
Bytes.toInt(targetRow.getRow()))
+          + " cells:" + targetRow);
+
+        LOG.debug("rowsCount: " + rowsCount);
+
+        Cell[] sourceCells = sourceRow.rawCells();
+        Cell[] targetCells = targetRow.rawCells();
+        int targetRowKey = Bytes.toInt(targetRow.getRow());
+        if (targetRowKey >= 40 && targetRowKey < 60) {
+          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
+          LOG.debug("Target cells: " + Arrays.toString(targetCells));
+          fail("There shouldn't exist any rows between 40 and 60, since "
+            + "Puts are disabled and Deletes are enabled.");
+        } else if (targetRowKey >= 60 && targetRowKey < 70) {
+          if (sourceCells.length == targetCells.length) {
+            LOG.debug("Source cells: " + Arrays.toString(sourceCells));
+            LOG.debug("Target cells: " + Arrays.toString(targetCells));
+            fail(
+              "Row " + Bytes.toInt(sourceRow.getRow()) + " shouldn't have same 
number of cells.");
+          }
+        } else if (targetRowKey >= 80 && targetRowKey < 90) {
+          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
+          LOG.debug("Target cells: " + Arrays.toString(targetCells));
+          fail("There should be no rows between 80 and 90 on target, as "
+            + "these had different timestamps and should had been deleted.");
+        } else if (targetRowKey >= 90 && targetRowKey < 100) {
+          for (int j = 0; j < sourceCells.length; j++) {
+            Cell sourceCell = sourceCells[j];
+            Cell targetCell = targetCells[j];
+            if (CellUtil.matchingValue(sourceCell, targetCell)) {
+              fail("Cells values should not match for rows between " + "90 and 
100. Target row id: "
+                + Bytes.toInt(targetRow.getRow()));
             }
-          } catch (Throwable t) {
-            LOG.debug("Source cell: " + sourceCell + " target cell: " + 
targetCell);
-            Throwables.propagate(t);
+          }
+        } else {
+          for (int j = 0; j < sourceCells.length; j++) {
+            Cell sourceCell = sourceCells[j];
+            Cell targetCell = targetCells[j];
+            assertCellEquals(sourceCell, targetCell, () -> true);
           }
         }
+        rowsCount++;
+        targetRow = targetScanner.next();
+        sourceRow = sourceScanner.next();
       }
-      rowsCount++;
-      targetRow = targetScanner.next();
-      sourceRow = sourceScanner.next();
+      assertEquals("Target expected rows does not match.", expectedRows, 
rowsCount);
     }
-    assertEquals("Target expected rows does not match.", expectedRows, 
rowsCount);
-    sourceScanner.close();
-    targetScanner.close();
-    sourceTable.close();
-    targetTable.close();
   }
 
-  private Counters syncTables(TableName sourceTableName, TableName 
targetTableName, Path testDir,
-    String... options) throws Exception {
-    SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration());
+  private Counters syncTables(Configuration conf, TableName sourceTableName,
+    TableName targetTableName, Path testDir, String... options) throws 
Exception {
+    SyncTable syncTable = new SyncTable(conf);
     String[] args = Arrays.copyOf(options, options.length + 3);
     args[options.length] = testDir.toString();
     args[options.length + 1] = sourceTableName.getNameAsString();
@@ -433,12 +402,12 @@ public class TestSyncTable {
     return syncTable.counters;
   }
 
-  private void hashSourceTable(TableName sourceTableName, Path testDir, 
String... options)
-    throws Exception {
+  private void hashSourceTable(HBaseTestingUtility sourceCluster, TableName 
sourceTableName,
+    Path testDir, String... options) throws Exception {
     int numHashFiles = 3;
     long batchSize = 100; // should be 2 batches per region
     int scanBatch = 1;
-    HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
+    HashTable hashTable = new HashTable(sourceCluster.getConfiguration());
     String[] args = Arrays.copyOf(options, options.length + 5);
     args[options.length] = "--batchsize=" + batchSize;
     args[options.length + 1] = "--numhashfiles=" + numHashFiles;
@@ -448,7 +417,7 @@ public class TestSyncTable {
     int code = hashTable.run(args);
     assertEquals("hash table job failed", 0, code);
 
-    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    FileSystem fs = sourceCluster.getTestFileSystem();
 
     HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), 
testDir);
     assertEquals(sourceTableName.getNameAsString(), tableHash.tableName);
@@ -459,8 +428,9 @@ public class TestSyncTable {
     LOG.info("Hash table completed");
   }
 
-  private void writeTestData(TableName sourceTableName, TableName 
targetTableName,
-    long... timestamps) throws Exception {
+  private void writeTestData(HBaseTestingUtility sourceCluster, TableName 
sourceTableName,
+    HBaseTestingUtility targetCluster, TableName targetTableName, long... 
timestamps)
+    throws Exception {
     final byte[] family = Bytes.toBytes("family");
     final byte[] column1 = Bytes.toBytes("c1");
     final byte[] column2 = Bytes.toBytes("c2");
@@ -476,102 +446,100 @@ public class TestSyncTable {
       timestamps = new long[] { current, current };
     }
 
-    Table sourceTable =
-      TEST_UTIL.createTable(sourceTableName, family, generateSplits(numRows, 
sourceRegions));
-
-    Table targetTable =
-      TEST_UTIL.createTable(targetTableName, family, generateSplits(numRows, 
targetRegions));
-
-    int rowIndex = 0;
-    // a bunch of identical rows
-    for (; rowIndex < 40; rowIndex++) {
-      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
-      sourcePut.addColumn(family, column1, timestamps[0], value1);
-      sourcePut.addColumn(family, column2, timestamps[0], value2);
-      sourceTable.put(sourcePut);
-
-      Put targetPut = new Put(Bytes.toBytes(rowIndex));
-      targetPut.addColumn(family, column1, timestamps[1], value1);
-      targetPut.addColumn(family, column2, timestamps[1], value2);
-      targetTable.put(targetPut);
-    }
-    // some rows only in the source table
-    // ROWSWITHDIFFS: 10
-    // TARGETMISSINGROWS: 10
-    // TARGETMISSINGCELLS: 20
-    for (; rowIndex < 50; rowIndex++) {
-      Put put = new Put(Bytes.toBytes(rowIndex));
-      put.addColumn(family, column1, timestamps[0], value1);
-      put.addColumn(family, column2, timestamps[0], value2);
-      sourceTable.put(put);
-    }
-    // some rows only in the target table
-    // ROWSWITHDIFFS: 10
-    // SOURCEMISSINGROWS: 10
-    // SOURCEMISSINGCELLS: 20
-    for (; rowIndex < 60; rowIndex++) {
-      Put put = new Put(Bytes.toBytes(rowIndex));
-      put.addColumn(family, column1, timestamps[1], value1);
-      put.addColumn(family, column2, timestamps[1], value2);
-      targetTable.put(put);
-    }
-    // some rows with 1 missing cell in target table
-    // ROWSWITHDIFFS: 10
-    // TARGETMISSINGCELLS: 10
-    for (; rowIndex < 70; rowIndex++) {
-      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
-      sourcePut.addColumn(family, column1, timestamps[0], value1);
-      sourcePut.addColumn(family, column2, timestamps[0], value2);
-      sourceTable.put(sourcePut);
-
-      Put targetPut = new Put(Bytes.toBytes(rowIndex));
-      targetPut.addColumn(family, column1, timestamps[1], value1);
-      targetTable.put(targetPut);
-    }
-    // some rows with 1 missing cell in source table
-    // ROWSWITHDIFFS: 10
-    // SOURCEMISSINGCELLS: 10
-    for (; rowIndex < 80; rowIndex++) {
-      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
-      sourcePut.addColumn(family, column1, timestamps[0], value1);
-      sourceTable.put(sourcePut);
-
-      Put targetPut = new Put(Bytes.toBytes(rowIndex));
-      targetPut.addColumn(family, column1, timestamps[1], value1);
-      targetPut.addColumn(family, column2, timestamps[1], value2);
-      targetTable.put(targetPut);
-    }
-    // some rows differing only in timestamp
-    // ROWSWITHDIFFS: 10
-    // SOURCEMISSINGCELLS: 20
-    // TARGETMISSINGCELLS: 20
-    for (; rowIndex < 90; rowIndex++) {
-      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
-      sourcePut.addColumn(family, column1, timestamps[0], column1);
-      sourcePut.addColumn(family, column2, timestamps[0], value2);
-      sourceTable.put(sourcePut);
-
-      Put targetPut = new Put(Bytes.toBytes(rowIndex));
-      targetPut.addColumn(family, column1, timestamps[1] + 1, column1);
-      targetPut.addColumn(family, column2, timestamps[1] - 1, value2);
-      targetTable.put(targetPut);
-    }
-    // some rows with different values
-    // ROWSWITHDIFFS: 10
-    // DIFFERENTCELLVALUES: 20
-    for (; rowIndex < numRows; rowIndex++) {
-      Put sourcePut = new Put(Bytes.toBytes(rowIndex));
-      sourcePut.addColumn(family, column1, timestamps[0], value1);
-      sourcePut.addColumn(family, column2, timestamps[0], value2);
-      sourceTable.put(sourcePut);
-
-      Put targetPut = new Put(Bytes.toBytes(rowIndex));
-      targetPut.addColumn(family, column1, timestamps[1], value3);
-      targetPut.addColumn(family, column2, timestamps[1], value3);
-      targetTable.put(targetPut);
+    try (
+      Table sourceTable =
+        sourceCluster.createTable(sourceTableName, family, 
generateSplits(numRows, sourceRegions));
+      Table targetTable = targetCluster.createTable(targetTableName, family,
+        generateSplits(numRows, targetRegions))) {
+
+      int rowIndex = 0;
+      // a bunch of identical rows
+      for (; rowIndex < 40; rowIndex++) {
+        Put sourcePut = new Put(Bytes.toBytes(rowIndex));
+        sourcePut.addColumn(family, column1, timestamps[0], value1);
+        sourcePut.addColumn(family, column2, timestamps[0], value2);
+        sourceTable.put(sourcePut);
+
+        Put targetPut = new Put(Bytes.toBytes(rowIndex));
+        targetPut.addColumn(family, column1, timestamps[1], value1);
+        targetPut.addColumn(family, column2, timestamps[1], value2);
+        targetTable.put(targetPut);
+      }
+      // some rows only in the source table
+      // ROWSWITHDIFFS: 10
+      // TARGETMISSINGROWS: 10
+      // TARGETMISSINGCELLS: 20
+      for (; rowIndex < 50; rowIndex++) {
+        Put put = new Put(Bytes.toBytes(rowIndex));
+        put.addColumn(family, column1, timestamps[0], value1);
+        put.addColumn(family, column2, timestamps[0], value2);
+        sourceTable.put(put);
+      }
+      // some rows only in the target table
+      // ROWSWITHDIFFS: 10
+      // SOURCEMISSINGROWS: 10
+      // SOURCEMISSINGCELLS: 20
+      for (; rowIndex < 60; rowIndex++) {
+        Put put = new Put(Bytes.toBytes(rowIndex));
+        put.addColumn(family, column1, timestamps[1], value1);
+        put.addColumn(family, column2, timestamps[1], value2);
+        targetTable.put(put);
+      }
+      // some rows with 1 missing cell in target table
+      // ROWSWITHDIFFS: 10
+      // TARGETMISSINGCELLS: 10
+      for (; rowIndex < 70; rowIndex++) {
+        Put sourcePut = new Put(Bytes.toBytes(rowIndex));
+        sourcePut.addColumn(family, column1, timestamps[0], value1);
+        sourcePut.addColumn(family, column2, timestamps[0], value2);
+        sourceTable.put(sourcePut);
+
+        Put targetPut = new Put(Bytes.toBytes(rowIndex));
+        targetPut.addColumn(family, column1, timestamps[1], value1);
+        targetTable.put(targetPut);
+      }
+      // some rows with 1 missing cell in source table
+      // ROWSWITHDIFFS: 10
+      // SOURCEMISSINGCELLS: 10
+      for (; rowIndex < 80; rowIndex++) {
+        Put sourcePut = new Put(Bytes.toBytes(rowIndex));
+        sourcePut.addColumn(family, column1, timestamps[0], value1);
+        sourceTable.put(sourcePut);
+
+        Put targetPut = new Put(Bytes.toBytes(rowIndex));
+        targetPut.addColumn(family, column1, timestamps[1], value1);
+        targetPut.addColumn(family, column2, timestamps[1], value2);
+        targetTable.put(targetPut);
+      }
+      // some rows differing only in timestamp
+      // ROWSWITHDIFFS: 10
+      // SOURCEMISSINGCELLS: 20
+      // TARGETMISSINGCELLS: 20
+      for (; rowIndex < 90; rowIndex++) {
+        Put sourcePut = new Put(Bytes.toBytes(rowIndex));
+        sourcePut.addColumn(family, column1, timestamps[0], column1);
+        sourcePut.addColumn(family, column2, timestamps[0], value2);
+        sourceTable.put(sourcePut);
+
+        Put targetPut = new Put(Bytes.toBytes(rowIndex));
+        targetPut.addColumn(family, column1, timestamps[1] + 1, column1);
+        targetPut.addColumn(family, column2, timestamps[1] - 1, value2);
+        targetTable.put(targetPut);
+      }
+      // some rows with different values
+      // ROWSWITHDIFFS: 10
+      // DIFFERENTCELLVALUES: 20
+      for (; rowIndex < numRows; rowIndex++) {
+        Put sourcePut = new Put(Bytes.toBytes(rowIndex));
+        sourcePut.addColumn(family, column1, timestamps[0], value1);
+        sourcePut.addColumn(family, column2, timestamps[0], value2);
+        sourceTable.put(sourcePut);
+
+        Put targetPut = new Put(Bytes.toBytes(rowIndex));
+        targetPut.addColumn(family, column1, timestamps[1], value3);
+        targetPut.addColumn(family, column2, timestamps[1], value3);
+        targetTable.put(targetPut);
+      }
     }
-
-    sourceTable.close();
-    targetTable.close();
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
index a6de320e13a..1d3d5bf5ad3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -115,11 +114,17 @@ public class MobTestUtil {
    * @param table to get the scanner
    * @return the number of rows
    */
-  public static int countMobRows(HBaseTestingUtility util, Table table) throws 
IOException {
+  public static int countMobRows(Table table) throws IOException {
     Scan scan = new Scan();
     // Do not retrieve the mob data when scanning
     scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
-    return util.countRows(table, scan);
+    try (ResultScanner results = table.getScanner(scan)) {
+      int count = 0;
+      while (results.next() != null) {
+        count++;
+      }
+      return count;
+    }
   }
 
   public static Path generateMOBFileForRegion(Configuration conf, TableName 
tableName,

Reply via email to