Repository: hbase
Updated Branches:
  refs/heads/master d89682ea9 -> 600fdee84


http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
index 2de61cb..8f09479 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
@@ -30,14 +30,14 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 
@@ -114,7 +114,7 @@ public class ReplicationPeerZKImpl extends 
ReplicationStateZKBase
     try {
       byte[] data = peerConfigTracker.getData(false);
       if (data != null) {
-        this.peerConfig = ReplicationSerDeHelper.parsePeerFrom(data);
+        this.peerConfig = ReplicationPeerConfigUtil.parsePeerFrom(data);
       }
     } catch (DeserializationException e) {
       LOG.error("", e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index 0f39b2a..cc84c1d 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -35,8 +35,7 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.CompoundConfiguration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
@@ -46,6 +45,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
 import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 
 /**
@@ -131,7 +131,7 @@ public class ReplicationPeersZKImpl extends 
ReplicationStateZKBase implements Re
 
       List<ZKUtilOp> listOfOps = new ArrayList<>(2);
       ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id),
-        ReplicationSerDeHelper.toByteArray(peerConfig));
+        ReplicationPeerConfigUtil.toByteArray(peerConfig));
       // b/w PeerWatcher and ReplicationZookeeper#add method to create the
       // peer-state znode. This happens while adding a peer
       // The peer state data is set as "ENABLED" by default.
@@ -206,9 +206,9 @@ public class ReplicationPeersZKImpl extends 
ReplicationStateZKBase implements Re
       }
       rpc.setTableCFsMap(tableCFs);
       ZKUtil.setData(this.zookeeper, getPeerNode(id),
-          ReplicationSerDeHelper.toByteArray(rpc));
+          ReplicationPeerConfigUtil.toByteArray(rpc));
       LOG.info("Peer tableCFs with id= " + id + " is now " +
-        ReplicationSerDeHelper.convertToString(tableCFs));
+        ReplicationPeerConfigUtil.convertToString(tableCFs));
     } catch (KeeperException e) {
       throw new ReplicationException("Unable to change tableCFs of the peer 
with id=" + id, e);
     }
@@ -303,7 +303,7 @@ public class ReplicationPeersZKImpl extends 
ReplicationStateZKBase implements Re
     }
 
     try {
-      return ReplicationSerDeHelper.parsePeerFrom(data);
+      return ReplicationPeerConfigUtil.parsePeerFrom(data);
     } catch (DeserializationException e) {
       LOG.warn("Failed to parse cluster key from peerId=" + peerId
           + ", specifically the content from the following znode: " + znode);
@@ -372,7 +372,7 @@ public class ReplicationPeersZKImpl extends 
ReplicationStateZKBase implements Re
 
     try {
       ZKUtil.setData(this.zookeeper, getPeerNode(id),
-          ReplicationSerDeHelper.toByteArray(existingConfig));
+          ReplicationPeerConfigUtil.toByteArray(existingConfig));
     }
     catch(KeeperException ke){
       throw new ReplicationException("There was a problem trying to save 
changes to the " +

http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 3c751f7..2e3df2d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.client.VersionInfoUtil;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.errorhandling.ForeignException;
 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
@@ -1809,7 +1809,7 @@ public class MasterRpcServices extends RSRpcServices
       AddReplicationPeerRequest request) throws ServiceException {
     try {
       master.addReplicationPeer(request.getPeerId(),
-        ReplicationSerDeHelper.convert(request.getPeerConfig()));
+        ReplicationPeerConfigUtil.convert(request.getPeerConfig()));
       return AddReplicationPeerResponse.newBuilder().build();
     } catch (ReplicationException | IOException e) {
       throw new ServiceException(e);
@@ -1858,7 +1858,7 @@ public class MasterRpcServices extends RSRpcServices
       String peerId = request.getPeerId();
       ReplicationPeerConfig peerConfig = 
master.getReplicationPeerConfig(peerId);
       response.setPeerId(peerId);
-      response.setPeerConfig(ReplicationSerDeHelper.convert(peerConfig));
+      response.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
     } catch (ReplicationException | IOException e) {
       throw new ServiceException(e);
     }
@@ -1870,7 +1870,7 @@ public class MasterRpcServices extends RSRpcServices
       UpdateReplicationPeerConfigRequest request) throws ServiceException {
     try {
       master.updateReplicationPeerConfig(request.getPeerId(),
-        ReplicationSerDeHelper.convert(request.getPeerConfig()));
+        ReplicationPeerConfigUtil.convert(request.getPeerConfig()));
       return UpdateReplicationPeerConfigResponse.newBuilder().build();
     } catch (ReplicationException | IOException e) {
       throw new ServiceException(e);
@@ -1885,7 +1885,7 @@ public class MasterRpcServices extends RSRpcServices
       List<ReplicationPeerDescription> peers = master
           .listReplicationPeers(request.hasRegex() ? request.getRegex() : 
null);
       for (ReplicationPeerDescription peer : peers) {
-        
response.addPeerDesc(ReplicationSerDeHelper.toProtoReplicationPeerDescription(peer));
+        
response.addPeerDesc(ReplicationPeerConfigUtil.toProtoReplicationPeerDescription(peer));
       }
     } catch (ReplicationException | IOException e) {
       throw new ServiceException(e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java
index 0585c97..d094d1c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java
@@ -23,15 +23,15 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
 import org.apache.zookeeper.KeeperException;
 
 import java.io.IOException;
@@ -79,12 +79,12 @@ public class TableCFsUpdater extends ReplicationStateZKBase 
{
           // we copy TableCFs node into PeerNode
           LOG.info("copy tableCFs into peerNode:" + peerId);
           ReplicationProtos.TableCF[] tableCFs =
-                  ReplicationSerDeHelper.parseTableCFs(
+                  ReplicationPeerConfigUtil.parseTableCFs(
                           ZKUtil.getData(this.zookeeper, tableCFsNode));
           if (tableCFs != null && tableCFs.length > 0) {
-            rpc.setTableCFsMap(ReplicationSerDeHelper.convert2Map(tableCFs));
+            
rpc.setTableCFsMap(ReplicationPeerConfigUtil.convert2Map(tableCFs));
             ZKUtil.setData(this.zookeeper, peerNode,
-              ReplicationSerDeHelper.toByteArray(rpc));
+              ReplicationPeerConfigUtil.toByteArray(rpc));
           }
         } else {
           LOG.info("No tableCFs in peerNode:" + peerId);
@@ -113,7 +113,7 @@ public class TableCFsUpdater extends ReplicationStateZKBase 
{
       return null;
     }
     try {
-      return ReplicationSerDeHelper.parsePeerFrom(data);
+      return ReplicationPeerConfigUtil.parsePeerFrom(data);
     } catch (DeserializationException e) {
       LOG.warn("Failed to parse cluster key from peer=" + peerNode);
       return null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
index 3e577bc..e489078 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
@@ -41,10 +41,8 @@ import 
org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.junit.BeforeClass;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java
new file mode 100644
index 0000000..bf60053
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static 
org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Class to test asynchronous replication admin operations when more than 1 
cluster
+ */
+@RunWith(Parameterized.class)
+@Category({LargeTests.class, ClientTests.class})
+public class TestAsyncReplicationAdminApiWithClusters extends 
TestAsyncAdminBase {
+
+  private final static String ID_SECOND = "2";
+
+  private static HBaseTestingUtility TEST_UTIL2;
+  private static Configuration conf2;
+  private static AsyncAdmin admin2;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 
60000);
+    
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 
120000);
+    
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
+    TEST_UTIL.startMiniCluster();
+    ASYNC_CONN = 
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+
+    conf2 = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+    TEST_UTIL2 = new HBaseTestingUtility(conf2);
+    TEST_UTIL2.startMiniCluster();
+    admin2 =
+        
ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get().getAdmin();
+
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(TEST_UTIL2.getClusterKey());
+    ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    Pattern pattern = Pattern.compile(tableName.getNameAsString() + ".*");
+    cleanupTables(admin, pattern);
+    cleanupTables(admin2, pattern);
+  }
+
+  private void cleanupTables(AsyncAdmin admin, Pattern pattern) {
+    admin.listTableNames(pattern, false).whenCompleteAsync((tables, err) -> {
+      if (tables != null) {
+        tables.forEach(table -> {
+          try {
+            admin.disableTable(table).join();
+          } catch (Exception e) {
+            LOG.debug("Table: " + tableName + " already disabled, so just 
deleting it.");
+          }
+          admin.deleteTable(table).join();
+        });
+      }
+    }, ForkJoinPool.commonPool()).join();
+  }
+
+  private void createTableWithDefaultConf(AsyncAdmin admin, TableName 
tableName) {
+    TableDescriptorBuilder builder = 
TableDescriptorBuilder.newBuilder(tableName);
+    builder.addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
+    admin.createTable(builder.build()).join();
+  }
+
+  @Test
+  public void testEnableAndDisableTableReplication() throws Exception {
+    // default replication scope is local
+    createTableWithDefaultConf(tableName);
+    admin.enableTableReplication(tableName).join();
+    TableDescriptor tableDesc = admin.getTableDescriptor(tableName).get();
+    for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
+      assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
+    }
+
+    admin.disableTableReplication(tableName).join();
+    tableDesc = admin.getTableDescriptor(tableName).get();
+    for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
+      assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope());
+    }
+  }
+
+  @Test
+  public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws 
Exception {
+    // Only create table in source cluster
+    createTableWithDefaultConf(tableName);
+    assertFalse(admin2.tableExists(tableName).get());
+    admin.enableTableReplication(tableName).join();
+    assertTrue(admin2.tableExists(tableName).get());
+  }
+
+  @Test
+  public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() 
throws Exception {
+    createTableWithDefaultConf(admin, tableName);
+    createTableWithDefaultConf(admin2, tableName);
+    TableDescriptorBuilder builder =
+        
TableDescriptorBuilder.newBuilder(admin.getTableDescriptor(tableName).get());
+    
builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("newFamily"))
+        .build());
+    admin2.disableTable(tableName).join();
+    admin2.modifyTable(builder.build()).join();
+    admin2.enableTable(tableName).join();
+
+    try {
+      admin.enableTableReplication(tableName).join();
+      fail("Exception should be thrown if table descriptors in the clusters 
are not same.");
+    } catch (Exception ignored) {
+      // ok
+    }
+
+    admin.disableTable(tableName).join();
+    admin.modifyTable(builder.build()).join();
+    admin.enableTable(tableName).join();
+    admin.enableTableReplication(tableName).join();
+    TableDescriptor tableDesc = admin.getTableDescriptor(tableName).get();
+    for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
+      assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
+    }
+  }
+
+  @Test
+  public void testDisableReplicationForNonExistingTable() throws Exception {
+    try {
+      admin.disableTableReplication(tableName).join();
+    } catch (CompletionException e) {
+      assertTrue(e.getCause() instanceof TableNotFoundException);
+    }
+  }
+
+  @Test
+  public void testEnableReplicationForNonExistingTable() throws Exception {
+    try {
+      admin.enableTableReplication(tableName).join();
+    } catch (CompletionException e) {
+      assertTrue(e.getCause() instanceof TableNotFoundException);
+    }
+  }
+
+  @Test
+  public void testDisableReplicationWhenTableNameAsNull() throws Exception {
+    try {
+      admin.disableTableReplication(null).join();
+    } catch (CompletionException e) {
+      assertTrue(e.getCause() instanceof IllegalArgumentException);
+    }
+  }
+
+  @Test
+  public void testEnableReplicationWhenTableNameAsNull() throws Exception {
+    try {
+      admin.enableTableReplication(null).join();
+    } catch (CompletionException e) {
+      assertTrue(e.getCause() instanceof IllegalArgumentException);
+    }
+  }
+
+  /*
+   * Test enable table replication should create table only in user explicit 
specified table-cfs.
+   * HBASE-14717
+   */
+  @Test
+  public void testEnableReplicationForExplicitSetTableCfs() throws Exception {
+    TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + 
"2");
+    // Only create table in source cluster
+    createTableWithDefaultConf(tableName);
+    createTableWithDefaultConf(tableName2);
+    assertFalse("Table should not exists in the peer cluster",
+      admin2.tableExists(tableName).get());
+    assertFalse("Table should not exists in the peer cluster",
+      admin2.tableExists(tableName2).get());
+
+    Map<TableName, ? extends Collection<String>> tableCfs = new HashMap<>();
+    tableCfs.put(tableName, null);
+    ReplicationPeerConfig rpc = 
admin.getReplicationPeerConfig(ID_SECOND).get();
+    rpc.setTableCFsMap(tableCfs);
+    try {
+      // Only add tableName to replication peer config
+      admin.updateReplicationPeerConfig(ID_SECOND, rpc).join();
+      admin.enableTableReplication(tableName2).join();
+      assertFalse("Table should not be created if user has set table cfs 
explicitly for the "
+          + "peer and this is not part of that collection", 
admin2.tableExists(tableName2).get());
+
+      // Add tableName2 to replication peer config, too
+      tableCfs.put(tableName2, null);
+      rpc.setTableCFsMap(tableCfs);
+      admin.updateReplicationPeerConfig(ID_SECOND, rpc).join();
+      admin.enableTableReplication(tableName2).join();
+      assertTrue(
+        "Table should be created if user has explicitly added table into table 
cfs collection",
+        admin2.tableExists(tableName2).get());
+    } finally {
+      rpc.setTableCFsMap(null);
+      admin.updateReplicationPeerConfig(ID_SECOND, rpc).join();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index a23b76a..62951ef 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -241,7 +241,7 @@ public class TestReplicationAdmin {
     tableCFs.put(tableName1, null);
     admin.appendPeerTableCFs(ID_ONE, tableCFs);
     Map<TableName, List<String>> result =
-      
ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
+      
ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
     assertEquals(1, result.size());
     assertEquals(true, result.containsKey(tableName1));
     assertNull(result.get(tableName1));
@@ -250,7 +250,7 @@ public class TestReplicationAdmin {
     tableCFs.clear();
     tableCFs.put(tableName2, null);
     admin.appendPeerTableCFs(ID_ONE, tableCFs);
-    result = 
ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
+    result = 
ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
     assertEquals(2, result.size());
     assertTrue("Should contain t1", result.containsKey(tableName1));
     assertTrue("Should contain t2", result.containsKey(tableName2));
@@ -262,7 +262,7 @@ public class TestReplicationAdmin {
     tableCFs.put(tableName3, new ArrayList<>());
     tableCFs.get(tableName3).add("f1");
     admin.appendPeerTableCFs(ID_ONE, tableCFs);
-    result = 
ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
+    result = 
ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
     assertEquals(3, result.size());
     assertTrue("Should contain t1", result.containsKey(tableName1));
     assertTrue("Should contain t2", result.containsKey(tableName2));
@@ -277,7 +277,7 @@ public class TestReplicationAdmin {
     tableCFs.get(tableName4).add("f1");
     tableCFs.get(tableName4).add("f2");
     admin.appendPeerTableCFs(ID_ONE, tableCFs);
-    result = 
ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
+    result = 
ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
     assertEquals(4, result.size());
     assertTrue("Should contain t1", result.containsKey(tableName1));
     assertTrue("Should contain t2", result.containsKey(tableName2));
@@ -299,7 +299,7 @@ public class TestReplicationAdmin {
     tableCFs.put(tableName5, new ArrayList<>());
     tableCFs.get(tableName5).add("f1");
     admin.appendPeerTableCFs(ID_ONE, tableCFs);
-    result = 
ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
+    result = 
ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
     assertEquals(5, result.size());
     assertTrue("Should contain t5", result.containsKey(tableName5));
     // null means replication all cfs of tab5
@@ -313,7 +313,7 @@ public class TestReplicationAdmin {
     tableCFs.clear();
     tableCFs.put(tableName6, new ArrayList<>());
     admin.appendPeerTableCFs(ID_ONE, tableCFs);
-    result = 
ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
+    result = 
ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
     assertEquals(6, result.size());
     assertTrue("Should contain t6", result.containsKey(tableName6));
     // null means replication all cfs of tab6
@@ -354,7 +354,7 @@ public class TestReplicationAdmin {
     } catch (ReplicationException e) {
     }
     Map<TableName, List<String>> result =
-      
ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
+      
ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
     assertEquals(2, result.size());
     assertTrue("Should contain t1", result.containsKey(tableName1));
     assertTrue("Should contain t2", result.containsKey(tableName2));
@@ -373,7 +373,7 @@ public class TestReplicationAdmin {
     tableCFs.clear();
     tableCFs.put(tableName1, null);
     admin.removePeerTableCFs(ID_ONE, tableCFs);
-    result = 
ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
+    result = 
ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
     assertEquals(1, result.size());
     assertEquals(1, result.get(tableName2).size());
     assertEquals("cf1", result.get(tableName2).get(0));

http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 3e499b2..6b7d36b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -58,7 +58,7 @@ import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
@@ -528,7 +528,7 @@ public class TestMasterReplication {
         .getAdmin()) {
       admin.addReplicationPeer(id,
         new 
ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey())
-            
.setTableCFsMap(ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs)));
+            
.setTableCFsMap(ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs)));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
index abf2db3..6572404 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
 import org.apache.hadoop.hbase.testclassification.FlakeyTests;
@@ -187,13 +187,13 @@ public class TestPerTableCFReplication {
     Map<TableName, List<String>> tabCFsMap = null;
 
     // 1. null or empty string, result should be null
-    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(null);
+    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(null);
     assertEquals(null, tabCFsMap);
 
-    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig("");
+    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig("");
     assertEquals(null, tabCFsMap);
 
-    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig("   ");
+    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig("   ");
     assertEquals(null, tabCFsMap);
 
     final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
@@ -201,20 +201,20 @@ public class TestPerTableCFReplication {
     final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3");
 
     // 2. single table: "tableName1" / "tableName2:cf1" / "tableName3:cf1,cf3"
-    tabCFsMap = 
ReplicationSerDeHelper.parseTableCFsFromConfig(tableName1.getNameAsString());
+    tabCFsMap = 
ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName1.getNameAsString());
     assertEquals(1, tabCFsMap.size()); // only one table
     assertTrue(tabCFsMap.containsKey(tableName1));   // its table name is 
"tableName1"
     assertFalse(tabCFsMap.containsKey(tableName2));  // not other table
     assertEquals(null, tabCFsMap.get(tableName1));   // null cf-list,
 
-    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(tableName2 + 
":cf1");
+    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName2 + 
":cf1");
     assertEquals(1, tabCFsMap.size()); // only one table
     assertTrue(tabCFsMap.containsKey(tableName2));   // its table name is 
"tableName2"
     assertFalse(tabCFsMap.containsKey(tableName1));  // not other table
     assertEquals(1, tabCFsMap.get(tableName2).size());   // cf-list contains 
only 1 cf
     assertEquals("cf1", tabCFsMap.get(tableName2).get(0));// the only cf is 
"cf1"
 
-    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(tableName3 + " 
: cf1 , cf3");
+    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName3 + 
" : cf1 , cf3");
     assertEquals(1, tabCFsMap.size()); // only one table
     assertTrue(tabCFsMap.containsKey(tableName3));   // its table name is 
"tableName2"
     assertFalse(tabCFsMap.containsKey(tableName1));  // not other table
@@ -223,7 +223,7 @@ public class TestPerTableCFReplication {
     assertTrue(tabCFsMap.get(tableName3).contains("cf3"));// contains "cf3"
 
     // 3. multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3"
-    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(tableName1 + " 
; " + tableName2
+    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName1 + 
" ; " + tableName2
             + ":cf1 ; " + tableName3 + ":cf1,cf3");
     // 3.1 contains 3 tables : "tableName1", "tableName2" and "tableName3"
     assertEquals(3, tabCFsMap.size());
@@ -242,7 +242,7 @@ public class TestPerTableCFReplication {
 
     // 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) 
can be tolerated
     // still use the example of multiple tables: "tableName1 ; tableName2:cf1 
; tableName3:cf1,cf3"
-    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(
+    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(
       tableName1 + " ; ; " + tableName2 + ":cf1 ; " + tableName3 + ":cf1,,cf3 
;");
     // 4.1 contains 3 tables : "tableName1", "tableName2" and "tableName3"
     assertEquals(3, tabCFsMap.size());
@@ -261,7 +261,7 @@ public class TestPerTableCFReplication {
 
     // 5. invalid format "tableName1:tt:cf1 ; tableName2::cf1 ; 
tableName3:cf1,cf3"
     //    "tableName1:tt:cf1" and "tableName2::cf1" are invalid and will be 
ignored totally
-    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(
+    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(
       tableName1 + ":tt:cf1 ; " + tableName2 + "::cf1 ; " + tableName3 + 
":cf1,cf3");
     // 5.1 no "tableName1" and "tableName2", only "tableName3"
     assertEquals(1, tabCFsMap.size()); // only one table
@@ -281,10 +281,10 @@ public class TestPerTableCFReplication {
     Map<TableName, List<String>> tabCFsMap = null;
 
     // 1. null or empty string, result should be null
-    assertNull(ReplicationSerDeHelper.convert(tabCFsMap));
+    assertNull(ReplicationPeerConfigUtil.convert(tabCFsMap));
 
     tabCFsMap = new HashMap<>();
-    tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
+    tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
     assertEquals(0, tableCFs.length);
 
     final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
@@ -294,7 +294,7 @@ public class TestPerTableCFReplication {
     // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
     tabCFsMap.clear();
     tabCFsMap.put(tableName1, null);
-    tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
+    tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
     assertEquals(1, tableCFs.length); // only one table
     assertEquals(tableName1.toString(),
         tableCFs[0].getTableName().getQualifier().toStringUtf8());
@@ -303,7 +303,7 @@ public class TestPerTableCFReplication {
     tabCFsMap.clear();
     tabCFsMap.put(tableName2, new ArrayList<>());
     tabCFsMap.get(tableName2).add("cf1");
-    tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
+    tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
     assertEquals(1, tableCFs.length); // only one table
     assertEquals(tableName2.toString(),
         tableCFs[0].getTableName().getQualifier().toStringUtf8());
@@ -314,7 +314,7 @@ public class TestPerTableCFReplication {
     tabCFsMap.put(tableName3, new ArrayList<>());
     tabCFsMap.get(tableName3).add("cf1");
     tabCFsMap.get(tableName3).add("cf3");
-    tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
+    tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
     assertEquals(1, tableCFs.length);
     assertEquals(tableName3.toString(),
         tableCFs[0].getTableName().getQualifier().toStringUtf8());
@@ -330,28 +330,28 @@ public class TestPerTableCFReplication {
     tabCFsMap.get(tableName3).add("cf1");
     tabCFsMap.get(tableName3).add("cf3");
 
-    tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
+    tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
     assertEquals(3, tableCFs.length);
-    assertNotNull(ReplicationSerDeHelper.getTableCF(tableCFs, 
tableName1.toString()));
-    assertNotNull(ReplicationSerDeHelper.getTableCF(tableCFs, 
tableName2.toString()));
-    assertNotNull(ReplicationSerDeHelper.getTableCF(tableCFs, 
tableName3.toString()));
+    assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, 
tableName1.toString()));
+    assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, 
tableName2.toString()));
+    assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, 
tableName3.toString()));
 
     assertEquals(0,
-        ReplicationSerDeHelper.getTableCF(tableCFs, 
tableName1.toString()).getFamiliesCount());
+        ReplicationPeerConfigUtil.getTableCF(tableCFs, 
tableName1.toString()).getFamiliesCount());
 
-    assertEquals(1,
-        ReplicationSerDeHelper.getTableCF(tableCFs, 
tableName2.toString()).getFamiliesCount());
-    assertEquals("cf1",
-        ReplicationSerDeHelper.getTableCF(tableCFs, 
tableName2.toString()).getFamilies(0).toStringUtf8());
+    assertEquals(1, ReplicationPeerConfigUtil.getTableCF(tableCFs, 
tableName2.toString())
+        .getFamiliesCount());
+    assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, 
tableName2.toString())
+        .getFamilies(0).toStringUtf8());
 
-    assertEquals(2,
-        ReplicationSerDeHelper.getTableCF(tableCFs, 
tableName3.toString()).getFamiliesCount());
-    assertEquals("cf1",
-        ReplicationSerDeHelper.getTableCF(tableCFs, 
tableName3.toString()).getFamilies(0).toStringUtf8());
-    assertEquals("cf3",
-        ReplicationSerDeHelper.getTableCF(tableCFs, 
tableName3.toString()).getFamilies(1).toStringUtf8());
+    assertEquals(2, ReplicationPeerConfigUtil.getTableCF(tableCFs, 
tableName3.toString())
+        .getFamiliesCount());
+    assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, 
tableName3.toString())
+        .getFamilies(0).toStringUtf8());
+    assertEquals("cf3", ReplicationPeerConfigUtil.getTableCF(tableCFs, 
tableName3.toString())
+        .getFamilies(1).toStringUtf8());
 
-    tabCFsMap = ReplicationSerDeHelper.convert2Map(tableCFs);
+    tabCFsMap = ReplicationPeerConfigUtil.convert2Map(tableCFs);
     assertEquals(3, tabCFsMap.size());
     assertTrue(tabCFsMap.containsKey(tableName1));
     assertTrue(tabCFsMap.containsKey(tableName2));

http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
index 8c604f4..1a02317 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -99,14 +99,15 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
     ReplicationPeerConfig rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(zkw.getQuorum());
     String peerNode = getPeerNode(peerId);
-    ZKUtil.createWithParents(zkw, peerNode, 
ReplicationSerDeHelper.toByteArray(rpc));
+    ZKUtil.createWithParents(zkw, peerNode, 
ReplicationPeerConfigUtil.toByteArray(rpc));
 
     String tableCFs = tableName1 + ":cf1,cf2;" + tableName2 + ":cf3;" + 
tableName3;
     String tableCFsNode = getTableCFsNode(peerId);
     LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
     ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
 
-    ReplicationPeerConfig actualRpc = 
ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
+    ReplicationPeerConfig actualRpc =
+        ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
     String actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
 
     assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
@@ -117,14 +118,14 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
     rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(zkw.getQuorum());
     peerNode = getPeerNode(peerId);
-    ZKUtil.createWithParents(zkw, peerNode, 
ReplicationSerDeHelper.toByteArray(rpc));
+    ZKUtil.createWithParents(zkw, peerNode, 
ReplicationPeerConfigUtil.toByteArray(rpc));
 
     tableCFs = tableName1 + ":cf1,cf3;" + tableName2 + ":cf2";
     tableCFsNode = getTableCFsNode(peerId);
     LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
     ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
 
-    actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
+    actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
     actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
 
     assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
@@ -135,14 +136,14 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
     rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(zkw.getQuorum());
     peerNode = getPeerNode(peerId);
-    ZKUtil.createWithParents(zkw, peerNode, 
ReplicationSerDeHelper.toByteArray(rpc));
+    ZKUtil.createWithParents(zkw, peerNode, 
ReplicationPeerConfigUtil.toByteArray(rpc));
 
     tableCFs = "";
     tableCFsNode = getTableCFsNode(peerId);
     LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
     ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
 
-    actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
+    actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
     actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
 
     assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
@@ -153,10 +154,10 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
     rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(zkw.getQuorum());
     peerNode = getPeerNode(peerId);
-    ZKUtil.createWithParents(zkw, peerNode, 
ReplicationSerDeHelper.toByteArray(rpc));
+    ZKUtil.createWithParents(zkw, peerNode, 
ReplicationPeerConfigUtil.toByteArray(rpc));
 
     tableCFsNode = getTableCFsNode(peerId);
-    actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
+    actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
     actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
 
     assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
@@ -167,7 +168,7 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
 
     peerId = "1";
     peerNode = getPeerNode(peerId);
-    actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
+    actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
     assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
     Map<TableName, List<String>> tableNameListMap = actualRpc.getTableCFsMap();
     assertEquals(3, tableNameListMap.size());
@@ -184,7 +185,7 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
 
     peerId = "2";
     peerNode = getPeerNode(peerId);
-    actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
+    actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
     assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
     tableNameListMap = actualRpc.getTableCFsMap();
     assertEquals(2, tableNameListMap.size());
@@ -198,14 +199,14 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
 
     peerId = "3";
     peerNode = getPeerNode(peerId);
-    actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
+    actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
     assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
     tableNameListMap = actualRpc.getTableCFsMap();
     assertNull(tableNameListMap);
 
     peerId = "4";
     peerNode = getPeerNode(peerId);
-    actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
+    actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
     assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
     tableNameListMap = actualRpc.getTableCFsMap();
     assertNull(tableNameListMap);

Reply via email to