HBASE-19009 implement modifyTable and enable/disableTableReplication for 
AsyncAdmin


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d885e223
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d885e223
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d885e223

Branch: refs/heads/branch-2
Commit: d885e2232df6ac4c65b3a87eb45780b8fff60b91
Parents: fb79e9d
Author: Guanghao Zhang <zg...@apache.org>
Authored: Sun Nov 12 20:16:20 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Thu Nov 16 07:19:34 2017 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/client/AsyncAdmin.java  |  18 +
 .../hadoop/hbase/client/AsyncHBaseAdmin.java    |  17 +-
 .../hbase/client/ColumnFamilyDescriptor.java    |  27 ++
 .../apache/hadoop/hbase/client/HBaseAdmin.java  | 220 ++-------
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 313 ++++++++++++-
 .../hadoop/hbase/client/TableDescriptor.java    |  51 +-
 .../hbase/client/TableDescriptorBuilder.java    |  21 +-
 .../client/replication/ReplicationAdmin.java    |   8 +-
 .../replication/ReplicationPeerConfigUtil.java  | 468 +++++++++++++++++++
 .../replication/ReplicationSerDeHelper.java     | 437 -----------------
 .../replication/ReplicationPeerConfig.java      |  20 +
 .../hbase/shaded/protobuf/RequestConverter.java |   6 +-
 .../replication/ReplicationPeerZKImpl.java      |   6 +-
 .../replication/ReplicationPeersZKImpl.java     |  14 +-
 .../hadoop/hbase/master/MasterRpcServices.java  |  10 +-
 .../replication/master/TableCFsUpdater.java     |  14 +-
 .../client/TestAsyncReplicationAdminApi.java    |   2 -
 ...estAsyncReplicationAdminApiWithClusters.java | 242 ++++++++++
 .../replication/TestReplicationAdmin.java       |  16 +-
 .../replication/TestMasterReplication.java      |   4 +-
 .../replication/TestPerTableCFReplication.java  |  62 +--
 .../replication/master/TestTableCFsUpdater.java |  27 +-
 22 files changed, 1261 insertions(+), 742 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index f251a8f..722e8b5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -141,6 +141,12 @@ public interface AsyncAdmin {
    */
   CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] 
splitKeys);
 
+  /*
+   * Modify an existing table, more IRB friendly version.
+   * @param desc modified description of the table
+   */
+  CompletableFuture<Void> modifyTable(TableDescriptor desc);
+
   /**
    * Deletes a table.
    * @param tableName name of table to delete
@@ -553,6 +559,18 @@ public interface AsyncAdmin {
   CompletableFuture<List<TableCFs>> listReplicatedTableCFs();
 
   /**
+   * Enable a table's replication switch.
+   * @param tableName name of the table
+   */
+  CompletableFuture<Void> enableTableReplication(TableName tableName);
+
+  /**
+   * Disable a table's replication switch.
+   * @param tableName name of the table
+   */
+  CompletableFuture<Void> disableTableReplication(TableName tableName);
+
+  /**
    * Take a snapshot for the given table. If the table is enabled, a 
FLUSH-type snapshot will be
    * taken. If the table is disabled, an offline snapshot is taken. Snapshots 
are considered unique
    * based on <b>the name of the snapshot</b>. Attempts to take a snapshot 
with the same name (even

http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 250a38c..5a20291 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -128,6 +128,11 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
+  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
+    return wrap(rawAdmin.modifyTable(desc));
+  }
+
+  @Override
   public CompletableFuture<Void> deleteTable(TableName tableName) {
     return wrap(rawAdmin.deleteTable(tableName));
   }
@@ -420,6 +425,16 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
+  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
+    return wrap(rawAdmin.enableTableReplication(tableName));
+  }
+
+  @Override
+  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
+    return wrap(rawAdmin.disableTableReplication(tableName));
+  }
+
+  @Override
   public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) {
     return wrap(rawAdmin.snapshot(snapshot));
   }
@@ -709,4 +724,4 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
   public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> 
servers) {
     return wrap(rawAdmin.clearDeadServers(servers));
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java
index c232271..03f4582 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java
@@ -18,7 +18,9 @@
 package org.apache.hadoop.hbase.client;
 
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.hadoop.hbase.KeepDeletedCells;
 import org.apache.hadoop.hbase.MemoryCompactionPolicy;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -54,6 +56,31 @@ public interface ColumnFamilyDescriptor {
     return lhs.getConfiguration().hashCode() - 
rhs.getConfiguration().hashCode();
   };
 
+  static final Bytes REPLICATION_SCOPE_BYTES = new Bytes(
+      Bytes.toBytes(ColumnFamilyDescriptorBuilder.REPLICATION_SCOPE));
+
+  @InterfaceAudience.Private
+  static final Comparator<ColumnFamilyDescriptor> 
COMPARATOR_IGNORE_REPLICATION = (
+      ColumnFamilyDescriptor lcf, ColumnFamilyDescriptor rcf) -> {
+    int result = Bytes.compareTo(lcf.getName(), rcf.getName());
+    if (result != 0) {
+      return result;
+    }
+    // ColumnFamilyDescriptor.getValues is a immutable map, so copy it and 
remove
+    // REPLICATION_SCOPE_BYTES
+    Map<Bytes, Bytes> lValues = new HashMap<>();
+    lValues.putAll(lcf.getValues());
+    lValues.remove(REPLICATION_SCOPE_BYTES);
+    Map<Bytes, Bytes> rValues = new HashMap<>();
+    rValues.putAll(rcf.getValues());
+    rValues.remove(REPLICATION_SCOPE_BYTES);
+    result = lValues.hashCode() - rValues.hashCode();
+    if (result != 0) {
+      return result;
+    }
+    return lcf.getConfiguration().hashCode() - 
rcf.getConfiguration().hashCode();
+  };
+
   /**
    * @return The storefile/hfile blocksize for this column family.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 80f9d16..e153381 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -30,7 +30,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -45,6 +44,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -54,10 +54,8 @@ import org.apache.hadoop.hbase.CacheEvictionStats;
 import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
 import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.ClusterStatus.Option;
-import org.apache.hadoop.hbase.CompoundConfiguration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -76,7 +74,7 @@ import org.apache.hadoop.hbase.TableNotDisabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.UnknownRegionException;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.client.replication.TableCFs;
 import org.apache.hadoop.hbase.client.security.SecurityCapability;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
@@ -3893,7 +3891,7 @@ public class HBaseAdmin implements Admin {
       protected ReplicationPeerConfig rpcCall() throws Exception {
         GetReplicationPeerConfigResponse response = 
master.getReplicationPeerConfig(
           getRpcController(), 
RequestConverter.buildGetReplicationPeerConfigRequest(peerId));
-        return ReplicationSerDeHelper.convert(response.getPeerConfig());
+        return ReplicationPeerConfigUtil.convert(response.getPeerConfig());
       }
     });
   }
@@ -3919,7 +3917,7 @@ public class HBaseAdmin implements Admin {
       throw new ReplicationException("tableCfs is null");
     }
     ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
-    ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, 
peerConfig);
+    ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, 
peerConfig);
     updateReplicationPeerConfig(id, peerConfig);
   }
 
@@ -3931,7 +3929,7 @@ public class HBaseAdmin implements Admin {
       throw new ReplicationException("tableCfs is null");
     }
     ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
-    ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, 
peerConfig, id);
+    
ReplicationPeerConfigUtil.removeTableCFsFromReplicationPeerConfig(tableCfs, 
peerConfig, id);
     updateReplicationPeerConfig(id, peerConfig);
   }
 
@@ -3957,7 +3955,7 @@ public class HBaseAdmin implements Admin {
             .getPeerDescList();
         List<ReplicationPeerDescription> result = new 
ArrayList<>(peersList.size());
         for (ReplicationProtos.ReplicationPeerDescription peer : peersList) {
-          
result.add(ReplicationSerDeHelper.toReplicationPeerDescription(peer));
+          
result.add(ReplicationPeerConfigUtil.toReplicationPeerDescription(peer));
         }
         return result;
       }
@@ -4010,19 +4008,18 @@ public class HBaseAdmin implements Admin {
   @Override
   public List<TableCFs> listReplicatedTableCFs() throws IOException {
     List<TableCFs> replicatedTableCFs = new ArrayList<>();
-    HTableDescriptor[] tables = listTables();
-    for (HTableDescriptor table : tables) {
-      HColumnDescriptor[] columns = table.getColumnFamilies();
+    List<TableDescriptor> tables = listTableDescriptors();
+    tables.forEach(table -> {
       Map<String, Integer> cfs = new HashMap<>();
-      for (HColumnDescriptor column : columns) {
-        if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
-          cfs.put(column.getNameAsString(), column.getScope());
-        }
-      }
+      Stream.of(table.getColumnFamilies())
+          .filter(column -> column.getScope() != 
HConstants.REPLICATION_SCOPE_LOCAL)
+          .forEach(column -> {
+            cfs.put(column.getNameAsString(), column.getScope());
+          });
       if (!cfs.isEmpty()) {
         replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
       }
-    }
+    });
     return replicatedTableCFs;
   }
 
@@ -4046,84 +4043,13 @@ public class HBaseAdmin implements Admin {
       throw new IllegalArgumentException("Table name is null");
     }
     if (!tableExists(tableName)) {
-      throw new TableNotFoundException("Table '" + 
tableName.getNamespaceAsString()
+      throw new TableNotFoundException("Table '" + tableName.getNameAsString()
           + "' does not exists.");
     }
     setTableRep(tableName, false);
   }
 
   /**
-   * Copies the REPLICATION_SCOPE of table descriptor passed as an argument. 
Before copy, the method
-   * ensures that the name of table and column-families should match.
-   * @param peerHtd descriptor on peer cluster
-   * @param localHtd - The HTableDescriptor of table from source cluster.
-   * @return true If the name of table and column families match and 
REPLICATION_SCOPE copied
-   *         successfully. false If there is any mismatch in the names.
-   */
-  private boolean copyReplicationScope(final HTableDescriptor peerHtd,
-      final HTableDescriptor localHtd) {
-    // Copy the REPLICATION_SCOPE only when table names and the names of
-    // Column-Families are same.
-    int result = peerHtd.getTableName().compareTo(localHtd.getTableName());
-
-    if (result == 0) {
-      Iterator<HColumnDescriptor> remoteHCDIter = 
peerHtd.getFamilies().iterator();
-      Iterator<HColumnDescriptor> localHCDIter = 
localHtd.getFamilies().iterator();
-
-      while (remoteHCDIter.hasNext() && localHCDIter.hasNext()) {
-        HColumnDescriptor remoteHCD = remoteHCDIter.next();
-        HColumnDescriptor localHCD = localHCDIter.next();
-
-        String remoteHCDName = remoteHCD.getNameAsString();
-        String localHCDName = localHCD.getNameAsString();
-
-        if (remoteHCDName.equals(localHCDName)) {
-          remoteHCD.setScope(localHCD.getScope());
-        } else {
-          result = -1;
-          break;
-        }
-      }
-      if (remoteHCDIter.hasNext() || localHCDIter.hasNext()) {
-        return false;
-      }
-    }
-
-    return result == 0;
-  }
-
-  /**
-   * Compare the contents of the descriptor with another one passed as a 
parameter for replication
-   * purpose. The REPLICATION_SCOPE field is ignored during comparison.
-   * @param peerHtd descriptor on peer cluster
-   * @param localHtd descriptor on source cluster which needs to be replicated.
-   * @return true if the contents of the two descriptors match (ignoring just 
REPLICATION_SCOPE).
-   * @see java.lang.Object#equals(java.lang.Object)
-   */
-  private boolean compareForReplication(HTableDescriptor peerHtd, 
HTableDescriptor localHtd) {
-    if (peerHtd == localHtd) {
-      return true;
-    }
-    if (peerHtd == null) {
-      return false;
-    }
-    boolean result = false;
-
-    // Create a copy of peer HTD as we need to change its replication
-    // scope to match with the local HTD.
-    HTableDescriptor peerHtdCopy = new HTableDescriptor(peerHtd);
-
-    result = copyReplicationScope(peerHtdCopy, localHtd);
-
-    // If copy was successful, compare the two tables now.
-    if (result) {
-      result = (peerHtdCopy.compareTo(localHtd) == 0);
-    }
-
-    return result;
-  }
-
-  /**
    * Connect to peer and check the table descriptor on peer:
    * <ol>
    * <li>Create the same table on peer when not exist.</li>
@@ -4143,21 +4069,23 @@ public class HBaseAdmin implements Admin {
     }
 
     for (ReplicationPeerDescription peerDesc : peers) {
-      if (needToReplicate(tableName, peerDesc)) {
-        Configuration peerConf = getPeerClusterConfiguration(peerDesc);
+      if (peerDesc.getPeerConfig().needToReplicate(tableName)) {
+        Configuration peerConf =
+            ReplicationPeerConfigUtil.getPeerClusterConfiguration(this.conf, 
peerDesc);
         try (Connection conn = ConnectionFactory.createConnection(peerConf);
             Admin repHBaseAdmin = conn.getAdmin()) {
-          HTableDescriptor localHtd = getTableDescriptor(tableName);
-          HTableDescriptor peerHtd = null;
+          TableDescriptor tableDesc = getDescriptor(tableName);
+          TableDescriptor peerTableDesc = null;
           if (!repHBaseAdmin.tableExists(tableName)) {
-            repHBaseAdmin.createTable(localHtd, splits);
+            repHBaseAdmin.createTable(tableDesc, splits);
           } else {
-            peerHtd = repHBaseAdmin.getTableDescriptor(tableName);
-            if (peerHtd == null) {
+            peerTableDesc = repHBaseAdmin.getDescriptor(tableName);
+            if (peerTableDesc == null) {
               throw new IllegalArgumentException("Failed to get table 
descriptor for table "
                   + tableName.getNameAsString() + " from peer cluster " + 
peerDesc.getPeerId());
             }
-            if (!compareForReplication(peerHtd, localHtd)) {
+            if 
(TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc,
+              tableDesc) != 0) {
               throw new IllegalArgumentException("Table " + 
tableName.getNameAsString()
                   + " exists in peer cluster " + peerDesc.getPeerId()
                   + ", but the table descriptors are not same when compared 
with source cluster."
@@ -4170,108 +4098,20 @@ public class HBaseAdmin implements Admin {
   }
 
   /**
-   * Decide whether the table need replicate to the peer cluster according to 
the peer config
-   * @param table name of the table
-   * @param peer config for the peer
-   * @return true if the table need replicate to the peer cluster
-   */
-  private boolean needToReplicate(TableName table, ReplicationPeerDescription 
peer) {
-    ReplicationPeerConfig peerConfig = peer.getPeerConfig();
-    Set<String> namespaces = peerConfig.getNamespaces();
-    Map<TableName, List<String>> tableCFsMap = peerConfig.getTableCFsMap();
-    // If null means user has explicitly not configured any namespaces and 
table CFs
-    // so all the tables data are applicable for replication
-    if (namespaces == null && tableCFsMap == null) {
-      return true;
-    }
-    if (namespaces != null && 
namespaces.contains(table.getNamespaceAsString())) {
-      return true;
-    }
-    if (tableCFsMap != null && tableCFsMap.containsKey(table)) {
-      return true;
-    }
-    LOG.debug("Table " + table.getNameAsString()
-        + " doesn't need replicate to peer cluster, peerId=" + 
peer.getPeerId() + ", clusterKey="
-        + peerConfig.getClusterKey());
-    return false;
-  }
-
-  /**
    * Set the table's replication switch if the table's replication switch is 
already not set.
    * @param tableName name of the table
    * @param enableRep is replication switch enable or disable
    * @throws IOException if a remote or network exception occurs
    */
   private void setTableRep(final TableName tableName, boolean enableRep) 
throws IOException {
-    HTableDescriptor htd = new HTableDescriptor(getTableDescriptor(tableName));
-    ReplicationState currentReplicationState = getTableReplicationState(htd);
-    if (enableRep && currentReplicationState != ReplicationState.ENABLED
-        || !enableRep && currentReplicationState != ReplicationState.DISABLED) 
{
-      for (HColumnDescriptor hcd : htd.getFamilies()) {
-        hcd.setScope(enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL
-            : HConstants.REPLICATION_SCOPE_LOCAL);
-      }
-      modifyTable(tableName, htd);
+    TableDescriptor tableDesc = getDescriptor(tableName);
+    if (!tableDesc.matchReplicationScope(enableRep)) {
+      int scope =
+          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : 
HConstants.REPLICATION_SCOPE_LOCAL;
+      
modifyTable(TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build());
     }
   }
 
-  /**
-   * This enum indicates the current state of the replication for a given 
table.
-   */
-  private enum ReplicationState {
-    ENABLED, // all column families enabled
-    MIXED, // some column families enabled, some disabled
-    DISABLED // all column families disabled
-  }
-
-  /**
-   * @param htd table descriptor details for the table to check
-   * @return ReplicationState the current state of the table.
-   */
-  private ReplicationState getTableReplicationState(HTableDescriptor htd) {
-    boolean hasEnabled = false;
-    boolean hasDisabled = false;
-
-    for (HColumnDescriptor hcd : htd.getFamilies()) {
-      if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
-          && hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
-        hasDisabled = true;
-      } else {
-        hasEnabled = true;
-      }
-    }
-
-    if (hasEnabled && hasDisabled) return ReplicationState.MIXED;
-    if (hasEnabled) return ReplicationState.ENABLED;
-    return ReplicationState.DISABLED;
-  }
-
-  /**
-   * Returns the configuration needed to talk to the remote slave cluster.
-   * @param peer the description of replication peer
-   * @return the configuration for the peer cluster, null if it was unable to 
get the configuration
-   * @throws IOException
-   */
-  private Configuration getPeerClusterConfiguration(ReplicationPeerDescription 
peer)
-      throws IOException {
-    ReplicationPeerConfig peerConfig = peer.getPeerConfig();
-    Configuration otherConf;
-    try {
-      otherConf = HBaseConfiguration.createClusterConf(this.conf, 
peerConfig.getClusterKey());
-    } catch (IOException e) {
-      throw new IOException("Can't get peer configuration for peerId=" + 
peer.getPeerId(), e);
-    }
-
-    if (!peerConfig.getConfiguration().isEmpty()) {
-      CompoundConfiguration compound = new CompoundConfiguration();
-      compound.add(otherConf);
-      compound.addStringMap(peerConfig.getConfiguration());
-      return compound;
-    }
-
-    return otherConf;
-  }
-
   @Override
   public void clearCompactionQueues(final ServerName sn, final Set<String> 
queues)
     throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index d77cd15..bcf581b 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -42,6 +42,7 @@ import java.util.stream.Stream;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
 import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.ClusterStatus.Option;
@@ -64,7 +65,7 @@ import 
org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterReques
 import 
org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
 import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable;
 import org.apache.hadoop.hbase.client.Scan.ReadType;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.client.replication.TableCFs;
 import org.apache.hadoop.hbase.client.security.SecurityCapability;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
@@ -188,6 +189,8 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColu
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
@@ -506,6 +509,14 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
+  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
+    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(
+      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, 
ng.getNonceGroup(),
+        ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done),
+      (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, 
desc.getTableName()));
+  }
+
+  @Override
   public CompletableFuture<Void> deleteTable(TableName tableName) {
     return this.<DeleteTableRequest, DeleteTableResponse> 
procedureCall(RequestConverter
         .buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
@@ -1515,7 +1526,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
               .<GetReplicationPeerConfigRequest, 
GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
                 controller, stub, 
RequestConverter.buildGetReplicationPeerConfigRequest(peerId), (
                     s, c, req, done) -> s.getReplicationPeerConfig(c, req, 
done),
-                (resp) -> 
ReplicationSerDeHelper.convert(resp.getPeerConfig()))).call();
+                (resp) -> 
ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call();
   }
 
   @Override
@@ -1541,7 +1552,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
     CompletableFuture<Void> future = new CompletableFuture<Void>();
     getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
       if (!completeExceptionally(future, error)) {
-        ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, 
peerConfig);
+        
ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, 
peerConfig);
         updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) 
-> {
           if (!completeExceptionally(future, error)) {
             future.complete(result);
@@ -1560,21 +1571,23 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
 
     CompletableFuture<Void> future = new CompletableFuture<Void>();
-    getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
-      if (!completeExceptionally(future, error)) {
-        try {
-          
ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, 
peerConfig, id);
-        } catch (ReplicationException e) {
-          future.completeExceptionally(e);
-          return;
-        }
-        updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) 
-> {
-          if (!completeExceptionally(future, error)) {
-            future.complete(result);
+    getReplicationPeerConfig(id).whenComplete(
+      (peerConfig, error) -> {
+        if (!completeExceptionally(future, error)) {
+          try {
+            
ReplicationPeerConfigUtil.removeTableCFsFromReplicationPeerConfig(tableCfs, 
peerConfig,
+              id);
+          } catch (ReplicationException e) {
+            future.completeExceptionally(e);
+            return;
           }
-        });
-      }
-    });
+          updateReplicationPeerConfig(id, peerConfig).whenComplete((result, 
err) -> {
+            if (!completeExceptionally(future, error)) {
+              future.complete(result);
+            }
+          });
+        }
+      });
     return future;
   }
 
@@ -1602,7 +1615,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
                 request,
                 (s, c, req, done) -> s.listReplicationPeers(c, req, done),
                 (resp) -> resp.getPeerDescList().stream()
-                    .map(ReplicationSerDeHelper::toReplicationPeerDescription)
+                    
.map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
                     .collect(Collectors.toList()))).call();
   }
 
@@ -2168,9 +2181,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
           returnedFuture.completeExceptionally(err);
           return;
         }
-        LOG.info("location is " + location);
         if (!location.isPresent() || location.get().getRegion() == null) {
-          LOG.info("unknown location is " + location);
           returnedFuture.completeExceptionally(new UnknownRegionException(
               "Invalid region name or encoded region name: "
                   + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
@@ -2323,6 +2334,18 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
+  private class ModifyTableProcedureBiConsumer extends 
TableProcedureBiConsumer {
+
+    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
+      super(tableName);
+    }
+
+    @Override
+    String getOperationType() {
+      return "ENABLE";
+    }
+  }
+
   private class DeleteTableProcedureBiConsumer extends 
TableProcedureBiConsumer {
 
     DeleteTableProcedureBiConsumer(TableName tableName) {
@@ -3031,4 +3054,254 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
         .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
         .startLogErrorsCnt(startLogErrorsCnt);
   }
+
+  @Override
+  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
+    if (tableName == null) {
+      return failedFuture(new IllegalArgumentException("Table name is null"));
+    }
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    tableExists(tableName).whenComplete(
+      (exist, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        if (!exist) {
+          future.completeExceptionally(new TableNotFoundException("Table '"
+              + tableName.getNameAsString() + "' does not exists."));
+          return;
+        }
+        getTableSplits(tableName).whenComplete((splits, err1) -> {
+          if (err1 != null) {
+            future.completeExceptionally(err1);
+          } else {
+            checkAndSyncTableToPeerClusters(tableName, 
splits).whenComplete((result, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
+              } else {
+                setTableReplication(tableName, true).whenComplete((result3, 
err3) -> {
+                  if (err3 != null) {
+                    future.completeExceptionally(err3);
+                  } else {
+                    future.complete(result3);
+                  }
+                });
+              }
+            });
+          }
+        });
+      });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
+    if (tableName == null) {
+      return failedFuture(new IllegalArgumentException("Table name is null"));
+    }
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    tableExists(tableName).whenComplete(
+      (exist, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        if (!exist) {
+          future.completeExceptionally(new TableNotFoundException("Table '"
+              + tableName.getNameAsString() + "' does not exists."));
+          return;
+        }
+        setTableReplication(tableName, false).whenComplete((result, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          } else {
+            future.complete(result);
+          }
+        });
+      });
+    return future;
+  }
+
+  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
+    CompletableFuture<byte[][]> future = new CompletableFuture<>();
+    getTableRegions(tableName).whenComplete((regions, err2) -> {
+      if (err2 != null) {
+        future.completeExceptionally(err2);
+        return;
+      }
+      if (regions.size() == 1) {
+        future.complete(null);
+      } else {
+        byte[][] splits = new byte[regions.size() - 1][];
+        for (int i = 1; i < regions.size(); i++) {
+          splits[i - 1] = regions.get(i).getStartKey();
+        }
+        future.complete(splits);
+      }
+    });
+    return future;
+  }
+
+  /**
+   * Connect to peer and check the table descriptor on peer:
+   * <ol>
+   * <li>Create the same table on peer when not exist.</li>
+   * <li>Throw an exception if the table already has replication enabled on 
any of the column
+   * families.</li>
+   * <li>Throw an exception if the table exists on peer cluster but 
descriptors are not same.</li>
+   * </ol>
+   * @param tableName name of the table to sync to the peer
+   * @param splits table split keys
+   */
+  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName 
tableName,
+      byte[][] splits) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    listReplicationPeers().whenComplete(
+      (peers, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        if (peers == null || peers.size() <= 0) {
+          future.completeExceptionally(new IllegalArgumentException(
+              "Found no peer cluster for replication."));
+          return;
+        }
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        peers.stream().filter(peer -> 
peer.getPeerConfig().needToReplicate(tableName))
+            .forEach(peer -> {
+              futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
+            });
+        CompletableFuture.allOf(futures.toArray(new 
CompletableFuture<?>[futures.size()]))
+            .whenComplete((result, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
+              } else {
+                future.complete(result);
+              }
+            });
+      });
+    return future;
+  }
+
+  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName 
tableName, byte[][] splits,
+      ReplicationPeerDescription peer) {
+    Configuration peerConf = null;
+    try {
+      peerConf =
+          ReplicationPeerConfigUtil
+              .getPeerClusterConfiguration(connection.getConfiguration(), 
peer);
+    } catch (IOException e) {
+      return failedFuture(e);
+    }
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    ConnectionFactory.createAsyncConnection(peerConf).whenComplete(
+      (conn, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        getTableDescriptor(tableName).whenComplete(
+          (tableDesc, err1) -> {
+            if (err1 != null) {
+              future.completeExceptionally(err1);
+              return;
+            }
+            AsyncAdmin peerAdmin = conn.getAdmin();
+            peerAdmin.tableExists(tableName).whenComplete(
+              (exist, err2) -> {
+                if (err2 != null) {
+                  future.completeExceptionally(err2);
+                  return;
+                }
+                if (!exist) {
+                  CompletableFuture<Void> createTableFuture = null;
+                  if (splits == null) {
+                    createTableFuture = peerAdmin.createTable(tableDesc);
+                  } else {
+                    createTableFuture = peerAdmin.createTable(tableDesc, 
splits);
+                  }
+                  createTableFuture.whenComplete(
+                    (result, err3) -> {
+                      if (err3 != null) {
+                        future.completeExceptionally(err3);
+                      } else {
+                        future.complete(result);
+                      }
+                    });
+                } else {
+                  compareTableWithPeerCluster(tableName, tableDesc, peer, 
peerAdmin).whenComplete(
+                    (result, err4) -> {
+                      if (err4 != null) {
+                        future.completeExceptionally(err4);
+                      } else {
+                        future.complete(result);
+                      }
+                    });
+                }
+              });
+          });
+      });
+    return future;
+  }
+
+  private CompletableFuture<Void> compareTableWithPeerCluster(TableName 
tableName,
+      TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin 
peerAdmin) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    peerAdmin.getTableDescriptor(tableName).whenComplete(
+      (peerTableDesc, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        if (peerTableDesc == null) {
+          future.completeExceptionally(new IllegalArgumentException(
+              "Failed to get table descriptor for table " + 
tableName.getNameAsString()
+                  + " from peer cluster " + peer.getPeerId()));
+          return;
+        }
+        if 
(TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, 
tableDesc) != 0) {
+          future.completeExceptionally(new IllegalArgumentException("Table "
+              + tableName.getNameAsString() + " exists in peer cluster " + 
peer.getPeerId()
+              + ", but the table descriptors are not same when compared with 
source cluster."
+              + " Thus can not enable the table's replication switch."));
+          return;
+        }
+        future.complete(null);
+      });
+    return future;
+  }
+
+  /**
+   * Set the table's replication switch if the table's replication switch is 
already not set.
+   * @param tableName name of the table
+   * @param enableRep is replication switch enable or disable
+   */
+  private CompletableFuture<Void> setTableReplication(TableName tableName, 
boolean enableRep) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    getTableDescriptor(tableName).whenComplete(
+      (tableDesc, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        if (!tableDesc.matchReplicationScope(enableRep)) {
+          int scope =
+              enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : 
HConstants.REPLICATION_SCOPE_LOCAL;
+          TableDescriptor newTableDesc =
+              
TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
+          modifyTable(newTableDesc).whenComplete((result, err2) -> {
+            if (err2 != null) {
+              future.completeExceptionally(err2);
+            } else {
+              future.complete(result);
+            }
+          });
+        } else {
+          future.complete(null);
+        }
+      });
+    return future;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
index 4e2deed..f485c4e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
@@ -24,10 +24,11 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;
-
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * TableDescriptor contains the details about an HBase table such as the 
descriptors of
@@ -39,8 +40,15 @@ import org.apache.hadoop.hbase.util.Bytes;
 public interface TableDescriptor {
 
   @InterfaceAudience.Private
-  static final Comparator<TableDescriptor> COMPARATOR
-    = (TableDescriptor lhs, TableDescriptor rhs) -> {
+  Comparator<TableDescriptor> COMPARATOR = 
getComparator(ColumnFamilyDescriptor.COMPARATOR);
+
+  @InterfaceAudience.Private
+  Comparator<TableDescriptor> COMPARATOR_IGNORE_REPLICATION =
+      getComparator(ColumnFamilyDescriptor.COMPARATOR_IGNORE_REPLICATION);
+
+  static Comparator<TableDescriptor>
+      getComparator(Comparator<ColumnFamilyDescriptor> cfComparator) {
+    return (TableDescriptor lhs, TableDescriptor rhs) -> {
       int result = lhs.getTableName().compareTo(rhs.getTableName());
       if (result != 0) {
         return result;
@@ -52,16 +60,17 @@ public interface TableDescriptor {
         return result;
       }
 
-      for (Iterator<ColumnFamilyDescriptor> it = lhsFamilies.iterator(),
-              it2 = rhsFamilies.iterator(); it.hasNext();) {
-        result = ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), 
it2.next());
+      for (Iterator<ColumnFamilyDescriptor> it = lhsFamilies.iterator(), it2 =
+          rhsFamilies.iterator(); it.hasNext();) {
+        result = cfComparator.compare(it.next(), it2.next());
         if (result != 0) {
           return result;
         }
       }
       // punt on comparison for ordering, just calculate difference
       return Integer.compare(lhs.getValues().hashCode(), 
rhs.getValues().hashCode());
-  };
+    };
+  }
 
   /**
    * Returns the count of the column families of the table.
@@ -266,4 +275,30 @@ public interface TableDescriptor {
    */
   boolean isReadOnly();
 
+  /**
+   * Check if the table's cfs' replication scope matched with the replication 
state
+   * @param enabled replication state
+   * @return true if matched, otherwise false
+   */
+  default boolean matchReplicationScope(boolean enabled) {
+    boolean hasEnabled = false;
+    boolean hasDisabled = false;
+
+    for (ColumnFamilyDescriptor cf : getColumnFamilies()) {
+      if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
+          && cf.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
+        hasDisabled = true;
+      } else {
+        hasEnabled = true;
+      }
+    }
+
+    if (hasEnabled && hasDisabled) {
+      return false;
+    }
+    if (hasEnabled) {
+      return enabled;
+    }
+    return !enabled;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
index 7bde1c1..ef59311 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
@@ -33,18 +33,19 @@ import java.util.TreeSet;
 import java.util.function.Function;
 import java.util.regex.Matcher;
 import java.util.stream.Stream;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * @since 2.0.0
@@ -409,6 +410,24 @@ public class TableDescriptorBuilder {
     return this;
   }
 
+  /**
+   * Sets replication scope all & only the columns already in the builder. 
Columns added later won't
+   * be backfilled with replication scope.
+   * @param scope replication scope
+   * @return a TableDescriptorBuilder
+   */
+  public TableDescriptorBuilder setReplicationScope(int scope) {
+    Map<byte[], ColumnFamilyDescriptor> newFamilies = new 
TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
+    newFamilies.putAll(desc.families);
+    newFamilies
+        .forEach((cf, cfDesc) -> {
+          desc.removeColumnFamily(cf);
+          
desc.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cfDesc).setScope(scope)
+              .build());
+        });
+    return this;
+  }
+
   public TableDescriptor build() {
     return new ModifyableTableDescriptor(desc);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 39f2045..5a5913c 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -141,7 +141,7 @@ public class ReplicationAdmin implements Closeable {
    * */
   @Deprecated
   public static Map<TableName, List<String>> parseTableCFsFromConfig(String 
tableCFsConfig) {
-    return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig);
+    return ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCFsConfig);
   }
 
   /**
@@ -228,7 +228,7 @@ public class ReplicationAdmin implements Closeable {
   @Deprecated
   public String getPeerTableCFs(String id) throws IOException {
     ReplicationPeerConfig peerConfig = admin.getReplicationPeerConfig(id);
-    return ReplicationSerDeHelper.convertToString(peerConfig.getTableCFsMap());
+    return 
ReplicationPeerConfigUtil.convertToString(peerConfig.getTableCFsMap());
   }
 
   /**
@@ -243,7 +243,7 @@ public class ReplicationAdmin implements Closeable {
   @Deprecated
   public void appendPeerTableCFs(String id, String tableCfs) throws 
ReplicationException,
       IOException {
-    appendPeerTableCFs(id, 
ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs));
+    appendPeerTableCFs(id, 
ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs));
   }
 
   /**
@@ -300,7 +300,7 @@ public class ReplicationAdmin implements Closeable {
   @Deprecated
   public void removePeerTableCFs(String id, String tableCf) throws 
ReplicationException,
       IOException {
-    removePeerTableCFs(id, 
ReplicationSerDeHelper.parseTableCFsFromConfig(tableCf));
+    removePeerTableCFs(id, 
ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCf));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
new file mode 100644
index 0000000..be468ae
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
@@ -0,0 +1,468 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.replication;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CompoundConfiguration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Set;
+
+/**
+ * Helper for TableCFs Operations.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public final class ReplicationPeerConfigUtil {
+
+  private static final Log LOG = 
LogFactory.getLog(ReplicationPeerConfigUtil.class);
+
+  private ReplicationPeerConfigUtil() {}
+
+  public static String convertToString(Set<String> namespaces) {
+    if (namespaces == null) {
+      return null;
+    }
+    return StringUtils.join(namespaces, ';');
+  }
+
+  /** convert map to TableCFs Object */
+  public static ReplicationProtos.TableCF[] convert(
+      Map<TableName, ? extends Collection<String>> tableCfs) {
+    if (tableCfs == null) {
+      return null;
+    }
+    List<ReplicationProtos.TableCF> tableCFList = new 
ArrayList<>(tableCfs.entrySet().size());
+    ReplicationProtos.TableCF.Builder tableCFBuilder =  
ReplicationProtos.TableCF.newBuilder();
+    for (Map.Entry<TableName, ? extends Collection<String>> entry : 
tableCfs.entrySet()) {
+      tableCFBuilder.clear();
+      
tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey()));
+      Collection<String> v = entry.getValue();
+      if (v != null && !v.isEmpty()) {
+        for (String value : entry.getValue()) {
+          tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value));
+        }
+      }
+      tableCFList.add(tableCFBuilder.build());
+    }
+    return tableCFList.toArray(new 
ReplicationProtos.TableCF[tableCFList.size()]);
+  }
+
+  public static String convertToString(Map<TableName, ? extends 
Collection<String>> tableCfs) {
+    if (tableCfs == null) {
+      return null;
+    }
+    return convert(convert(tableCfs));
+  }
+
+  /**
+   *  Convert string to TableCFs Object.
+   *  This is only for read TableCFs information from TableCF node.
+   *  Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3.
+   * */
+  public static ReplicationProtos.TableCF[] convert(String tableCFsConfig) {
+    if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
+      return null;
+    }
+
+    ReplicationProtos.TableCF.Builder tableCFBuilder = 
ReplicationProtos.TableCF.newBuilder();
+    String[] tables = tableCFsConfig.split(";");
+    List<ReplicationProtos.TableCF> tableCFList = new 
ArrayList<>(tables.length);
+
+    for (String tab : tables) {
+      // 1 ignore empty table config
+      tab = tab.trim();
+      if (tab.length() == 0) {
+        continue;
+      }
+      // 2 split to "table" and "cf1,cf2"
+      //   for each table: "table#cf1,cf2" or "table"
+      String[] pair = tab.split(":");
+      String tabName = pair[0].trim();
+      if (pair.length > 2 || tabName.length() == 0) {
+        LOG.info("incorrect format:" + tableCFsConfig);
+        continue;
+      }
+
+      tableCFBuilder.clear();
+      // split namespace from tableName
+      String ns = "default";
+      String tName = tabName;
+      String[] dbs = tabName.split("\\.");
+      if (dbs != null && dbs.length == 2) {
+        ns = dbs[0];
+        tName = dbs[1];
+      }
+      tableCFBuilder.setTableName(
+        ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName)));
+
+      // 3 parse "cf1,cf2" part to List<cf>
+      if (pair.length == 2) {
+        String[] cfsList = pair[1].split(",");
+        for (String cf : cfsList) {
+          String cfName = cf.trim();
+          if (cfName.length() > 0) {
+            tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName));
+          }
+        }
+      }
+      tableCFList.add(tableCFBuilder.build());
+    }
+    return tableCFList.toArray(new 
ReplicationProtos.TableCF[tableCFList.size()]);
+  }
+
+  /**
+   *  Convert TableCFs Object to String.
+   *  Output String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3
+   * */
+  public static String convert(ReplicationProtos.TableCF[] tableCFs) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0, n = tableCFs.length; i < n; i++) {
+      ReplicationProtos.TableCF tableCF = tableCFs[i];
+      String namespace = tableCF.getTableName().getNamespace().toStringUtf8();
+      if (!Strings.isEmpty(namespace)) {
+        sb.append(namespace).append(".").
+            append(tableCF.getTableName().getQualifier().toStringUtf8())
+            .append(":");
+      } else {
+        sb.append(tableCF.getTableName().toString()).append(":");
+      }
+      for (int j = 0; j < tableCF.getFamiliesCount(); j++) {
+        sb.append(tableCF.getFamilies(j).toStringUtf8()).append(",");
+      }
+      sb.deleteCharAt(sb.length() - 1).append(";");
+    }
+    if (sb.length() > 0) {
+      sb.deleteCharAt(sb.length() - 1);
+    }
+    return sb.toString();
+  }
+
+  /**
+   *  Get TableCF in TableCFs, if not exist, return null.
+   * */
+  public static ReplicationProtos.TableCF 
getTableCF(ReplicationProtos.TableCF[] tableCFs,
+                                           String table) {
+    for (int i = 0, n = tableCFs.length; i < n; i++) {
+      ReplicationProtos.TableCF tableCF = tableCFs[i];
+      if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) {
+        return tableCF;
+      }
+    }
+    return null;
+  }
+
+  /**
+   *  Parse bytes into TableCFs.
+   *  It is used for backward compatibility.
+   *  Old format bytes have no PB_MAGIC Header
+   * */
+  public static ReplicationProtos.TableCF[] parseTableCFs(byte[] bytes) throws 
IOException {
+    if (bytes == null) {
+      return null;
+    }
+    return ReplicationPeerConfigUtil.convert(Bytes.toString(bytes));
+  }
+
+  /**
+   *  Convert tableCFs string into Map.
+   * */
+  public static Map<TableName, List<String>> parseTableCFsFromConfig(String 
tableCFsConfig) {
+    ReplicationProtos.TableCF[] tableCFs = convert(tableCFsConfig);
+    return convert2Map(tableCFs);
+  }
+
+  /**
+   *  Convert tableCFs Object to Map.
+   * */
+  public static Map<TableName, List<String>> 
convert2Map(ReplicationProtos.TableCF[] tableCFs) {
+    if (tableCFs == null || tableCFs.length == 0) {
+      return null;
+    }
+    Map<TableName, List<String>> tableCFsMap = new HashMap<>();
+    for (int i = 0, n = tableCFs.length; i < n; i++) {
+      ReplicationProtos.TableCF tableCF = tableCFs[i];
+      List<String> families = new ArrayList<>();
+      for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) {
+        families.add(tableCF.getFamilies(j).toStringUtf8());
+      }
+      if (families.size() > 0) {
+        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), 
families);
+      } else {
+        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), 
null);
+      }
+    }
+
+    return tableCFsMap;
+  }
+
+  /**
+   * @param bytes Content of a peer znode.
+   * @return ClusterKey parsed from the passed bytes.
+   * @throws DeserializationException
+   */
+  public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
+      throws DeserializationException {
+    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
+      int pblen = ProtobufUtil.lengthOfPBMagic();
+      ReplicationProtos.ReplicationPeer.Builder builder =
+          ReplicationProtos.ReplicationPeer.newBuilder();
+      ReplicationProtos.ReplicationPeer peer;
+      try {
+        ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
+        peer = builder.build();
+      } catch (IOException e) {
+        throw new DeserializationException(e);
+      }
+      return convert(peer);
+    } else {
+      if (bytes.length > 0) {
+        return new 
ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes));
+      }
+      return new ReplicationPeerConfig().setClusterKey("");
+    }
+  }
+
+  public static ReplicationPeerConfig 
convert(ReplicationProtos.ReplicationPeer peer) {
+    ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
+    if (peer.hasClusterkey()) {
+      peerConfig.setClusterKey(peer.getClusterkey());
+    }
+    if (peer.hasReplicationEndpointImpl()) {
+      peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
+    }
+
+    for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) {
+      peerConfig.getPeerData().put(pair.getFirst().toByteArray(), 
pair.getSecond().toByteArray());
+    }
+
+    for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
+      peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
+    }
+
+    Map<TableName, ? extends Collection<String>> tableCFsMap = convert2Map(
+      peer.getTableCfsList().toArray(new 
ReplicationProtos.TableCF[peer.getTableCfsCount()]));
+    if (tableCFsMap != null) {
+      peerConfig.setTableCFsMap(tableCFsMap);
+    }
+    List<ByteString> namespacesList = peer.getNamespacesList();
+    if (namespacesList != null && namespacesList.size() != 0) {
+      Set<String> namespaces = new HashSet<>();
+      for (ByteString namespace : namespacesList) {
+        namespaces.add(namespace.toStringUtf8());
+      }
+      peerConfig.setNamespaces(namespaces);
+    }
+    if (peer.hasBandwidth()) {
+      peerConfig.setBandwidth(peer.getBandwidth());
+    }
+    return peerConfig;
+  }
+
+  public static ReplicationProtos.ReplicationPeer 
convert(ReplicationPeerConfig peerConfig) {
+    ReplicationProtos.ReplicationPeer.Builder builder =
+        ReplicationProtos.ReplicationPeer.newBuilder();
+    if (peerConfig.getClusterKey() != null) {
+      builder.setClusterkey(peerConfig.getClusterKey());
+    }
+    if (peerConfig.getReplicationEndpointImpl() != null) {
+      
builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
+    }
+
+    for (Map.Entry<byte[], byte[]> entry : 
peerConfig.getPeerData().entrySet()) {
+      builder.addData(HBaseProtos.BytesBytesPair.newBuilder()
+          .setFirst(UnsafeByteOperations.unsafeWrap(entry.getKey()))
+          .setSecond(UnsafeByteOperations.unsafeWrap(entry.getValue()))
+          .build());
+    }
+
+    for (Map.Entry<String, String> entry : 
peerConfig.getConfiguration().entrySet()) {
+      builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder()
+          .setName(entry.getKey())
+          .setValue(entry.getValue())
+          .build());
+    }
+
+    ReplicationProtos.TableCF[] tableCFs = 
convert(peerConfig.getTableCFsMap());
+    if (tableCFs != null) {
+      for (int i = 0; i < tableCFs.length; i++) {
+        builder.addTableCfs(tableCFs[i]);
+      }
+    }
+    Set<String> namespaces = peerConfig.getNamespaces();
+    if (namespaces != null) {
+      for (String namespace : namespaces) {
+        builder.addNamespaces(ByteString.copyFromUtf8(namespace));
+      }
+    }
+
+    builder.setBandwidth(peerConfig.getBandwidth());
+    return builder.build();
+  }
+
+  /**
+   * @param peerConfig
+   * @return Serialized protobuf of <code>peerConfig</code> with pb magic 
prefix prepended suitable
+   *         for use as content of a this.peersZNode; i.e. the content of 
PEER_ID znode under
+   *         /hbase/replication/peers/PEER_ID
+   */
+  public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
+    byte[] bytes = convert(peerConfig).toByteArray();
+    return ProtobufUtil.prependPBMagic(bytes);
+  }
+
+  public static ReplicationPeerDescription toReplicationPeerDescription(
+      ReplicationProtos.ReplicationPeerDescription desc) {
+    boolean enabled = ReplicationProtos.ReplicationState.State.ENABLED == 
desc.getState()
+        .getState();
+    ReplicationPeerConfig config = convert(desc.getConfig());
+    return new ReplicationPeerDescription(desc.getId(), enabled, config);
+  }
+
+  public static ReplicationProtos.ReplicationPeerDescription 
toProtoReplicationPeerDescription(
+      ReplicationPeerDescription desc) {
+    ReplicationProtos.ReplicationPeerDescription.Builder builder =
+        ReplicationProtos.ReplicationPeerDescription.newBuilder();
+    builder.setId(desc.getPeerId());
+    ReplicationProtos.ReplicationState.Builder stateBuilder = 
ReplicationProtos.ReplicationState
+        .newBuilder();
+    stateBuilder.setState(desc.isEnabled() ? 
ReplicationProtos.ReplicationState.State.ENABLED
+        : ReplicationProtos.ReplicationState.State.DISABLED);
+    builder.setState(stateBuilder.build());
+    builder.setConfig(convert(desc.getPeerConfig()));
+    return builder.build();
+  }
+
+  public static void appendTableCFsToReplicationPeerConfig(
+      Map<TableName, ? extends Collection<String>> tableCfs, 
ReplicationPeerConfig peerConfig) {
+    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
+    if (preTableCfs == null) {
+      peerConfig.setTableCFsMap(tableCfs);
+    } else {
+      for (Map.Entry<TableName, ? extends Collection<String>> entry : 
tableCfs.entrySet()) {
+        TableName table = entry.getKey();
+        Collection<String> appendCfs = entry.getValue();
+        if (preTableCfs.containsKey(table)) {
+          List<String> cfs = preTableCfs.get(table);
+          if (cfs == null || appendCfs == null || appendCfs.isEmpty()) {
+            preTableCfs.put(table, null);
+          } else {
+            Set<String> cfSet = new HashSet<String>(cfs);
+            cfSet.addAll(appendCfs);
+            preTableCfs.put(table, Lists.newArrayList(cfSet));
+          }
+        } else {
+          if (appendCfs == null || appendCfs.isEmpty()) {
+            preTableCfs.put(table, null);
+          } else {
+            preTableCfs.put(table, Lists.newArrayList(appendCfs));
+          }
+        }
+      }
+    }
+  }
+
+  public static void removeTableCFsFromReplicationPeerConfig(
+      Map<TableName, ? extends Collection<String>> tableCfs, 
ReplicationPeerConfig peerConfig,
+      String id) throws ReplicationException {
+    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
+    if (preTableCfs == null) {
+      throw new ReplicationException("Table-Cfs for peer: " + id + " is null");
+    }
+    for (Map.Entry<TableName, ? extends Collection<String>> entry : 
tableCfs.entrySet()) {
+      TableName table = entry.getKey();
+      Collection<String> removeCfs = entry.getValue();
+      if (preTableCfs.containsKey(table)) {
+        List<String> cfs = preTableCfs.get(table);
+        if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
+          preTableCfs.remove(table);
+        } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) 
{
+          Set<String> cfSet = new HashSet<String>(cfs);
+          cfSet.removeAll(removeCfs);
+          if (cfSet.isEmpty()) {
+            preTableCfs.remove(table);
+          } else {
+            preTableCfs.put(table, Lists.newArrayList(cfSet));
+          }
+        } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) 
{
+          throw new ReplicationException("Cannot remove cf of table: " + table
+              + " which doesn't specify cfs from table-cfs config in peer: " + 
id);
+        } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
+          throw new ReplicationException("Cannot remove table: " + table
+              + " which has specified cfs from table-cfs config in peer: " + 
id);
+        }
+      } else {
+        throw new ReplicationException("No table: "
+            + table + " in table-cfs config of peer: " + id);
+      }
+    }
+  }
+
+  /**
+   * Returns the configuration needed to talk to the remote slave cluster.
+   * @param conf the base configuration
+   * @param peer the description of replication peer
+   * @return the configuration for the peer cluster, null if it was unable to 
get the configuration
+   * @throws IOException when create peer cluster configuration failed
+   */
+  public static Configuration getPeerClusterConfiguration(Configuration conf,
+      ReplicationPeerDescription peer) throws IOException {
+    ReplicationPeerConfig peerConfig = peer.getPeerConfig();
+    Configuration otherConf;
+    try {
+      otherConf = HBaseConfiguration.createClusterConf(conf, 
peerConfig.getClusterKey());
+    } catch (IOException e) {
+      throw new IOException("Can't get peer configuration for peerId=" + 
peer.getPeerId(), e);
+    }
+
+    if (!peerConfig.getConfiguration().isEmpty()) {
+      CompoundConfiguration compound = new CompoundConfiguration();
+      compound.add(otherConf);
+      compound.addStringMap(peerConfig.getConfiguration());
+      return compound;
+    }
+
+    return otherConf;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
deleted file mode 100644
index 986a09f..0000000
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client.replication;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Strings;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.ArrayList;
-import java.util.Set;
-
-/**
- * Helper for TableCFs Operations.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Stable
-public final class ReplicationSerDeHelper {
-
-  private static final Log LOG = 
LogFactory.getLog(ReplicationSerDeHelper.class);
-
-  private ReplicationSerDeHelper() {}
-
-  public static String convertToString(Set<String> namespaces) {
-    if (namespaces == null) {
-      return null;
-    }
-    return StringUtils.join(namespaces, ';');
-  }
-
-  /** convert map to TableCFs Object */
-  public static ReplicationProtos.TableCF[] convert(
-      Map<TableName, ? extends Collection<String>> tableCfs) {
-    if (tableCfs == null) {
-      return null;
-    }
-    List<ReplicationProtos.TableCF> tableCFList = new 
ArrayList<>(tableCfs.entrySet().size());
-    ReplicationProtos.TableCF.Builder tableCFBuilder =  
ReplicationProtos.TableCF.newBuilder();
-    for (Map.Entry<TableName, ? extends Collection<String>> entry : 
tableCfs.entrySet()) {
-      tableCFBuilder.clear();
-      
tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey()));
-      Collection<String> v = entry.getValue();
-      if (v != null && !v.isEmpty()) {
-        for (String value : entry.getValue()) {
-          tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value));
-        }
-      }
-      tableCFList.add(tableCFBuilder.build());
-    }
-    return tableCFList.toArray(new 
ReplicationProtos.TableCF[tableCFList.size()]);
-  }
-
-  public static String convertToString(Map<TableName, ? extends 
Collection<String>> tableCfs) {
-    if (tableCfs == null) {
-      return null;
-    }
-    return convert(convert(tableCfs));
-  }
-
-  /**
-   *  Convert string to TableCFs Object.
-   *  This is only for read TableCFs information from TableCF node.
-   *  Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3.
-   * */
-  public static ReplicationProtos.TableCF[] convert(String tableCFsConfig) {
-    if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
-      return null;
-    }
-
-    ReplicationProtos.TableCF.Builder tableCFBuilder = 
ReplicationProtos.TableCF.newBuilder();
-    String[] tables = tableCFsConfig.split(";");
-    List<ReplicationProtos.TableCF> tableCFList = new 
ArrayList<>(tables.length);
-
-    for (String tab : tables) {
-      // 1 ignore empty table config
-      tab = tab.trim();
-      if (tab.length() == 0) {
-        continue;
-      }
-      // 2 split to "table" and "cf1,cf2"
-      //   for each table: "table#cf1,cf2" or "table"
-      String[] pair = tab.split(":");
-      String tabName = pair[0].trim();
-      if (pair.length > 2 || tabName.length() == 0) {
-        LOG.info("incorrect format:" + tableCFsConfig);
-        continue;
-      }
-
-      tableCFBuilder.clear();
-      // split namespace from tableName
-      String ns = "default";
-      String tName = tabName;
-      String[] dbs = tabName.split("\\.");
-      if (dbs != null && dbs.length == 2) {
-        ns = dbs[0];
-        tName = dbs[1];
-      }
-      tableCFBuilder.setTableName(
-        ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName)));
-
-      // 3 parse "cf1,cf2" part to List<cf>
-      if (pair.length == 2) {
-        String[] cfsList = pair[1].split(",");
-        for (String cf : cfsList) {
-          String cfName = cf.trim();
-          if (cfName.length() > 0) {
-            tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName));
-          }
-        }
-      }
-      tableCFList.add(tableCFBuilder.build());
-    }
-    return tableCFList.toArray(new 
ReplicationProtos.TableCF[tableCFList.size()]);
-  }
-
-  /**
-   *  Convert TableCFs Object to String.
-   *  Output String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3
-   * */
-  public static String convert(ReplicationProtos.TableCF[] tableCFs) {
-    StringBuilder sb = new StringBuilder();
-    for (int i = 0, n = tableCFs.length; i < n; i++) {
-      ReplicationProtos.TableCF tableCF = tableCFs[i];
-      String namespace = tableCF.getTableName().getNamespace().toStringUtf8();
-      if (!Strings.isEmpty(namespace)) {
-        sb.append(namespace).append(".").
-            append(tableCF.getTableName().getQualifier().toStringUtf8())
-            .append(":");
-      } else {
-        sb.append(tableCF.getTableName().toString()).append(":");
-      }
-      for (int j = 0; j < tableCF.getFamiliesCount(); j++) {
-        sb.append(tableCF.getFamilies(j).toStringUtf8()).append(",");
-      }
-      sb.deleteCharAt(sb.length() - 1).append(";");
-    }
-    if (sb.length() > 0) {
-      sb.deleteCharAt(sb.length() - 1);
-    }
-    return sb.toString();
-  }
-
-  /**
-   *  Get TableCF in TableCFs, if not exist, return null.
-   * */
-  public static ReplicationProtos.TableCF 
getTableCF(ReplicationProtos.TableCF[] tableCFs,
-                                           String table) {
-    for (int i = 0, n = tableCFs.length; i < n; i++) {
-      ReplicationProtos.TableCF tableCF = tableCFs[i];
-      if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) {
-        return tableCF;
-      }
-    }
-    return null;
-  }
-
-  /**
-   *  Parse bytes into TableCFs.
-   *  It is used for backward compatibility.
-   *  Old format bytes have no PB_MAGIC Header
-   * */
-  public static ReplicationProtos.TableCF[] parseTableCFs(byte[] bytes) throws 
IOException {
-    if (bytes == null) {
-      return null;
-    }
-    return ReplicationSerDeHelper.convert(Bytes.toString(bytes));
-  }
-
-  /**
-   *  Convert tableCFs string into Map.
-   * */
-  public static Map<TableName, List<String>> parseTableCFsFromConfig(String 
tableCFsConfig) {
-    ReplicationProtos.TableCF[] tableCFs = convert(tableCFsConfig);
-    return convert2Map(tableCFs);
-  }
-
-  /**
-   *  Convert tableCFs Object to Map.
-   * */
-  public static Map<TableName, List<String>> 
convert2Map(ReplicationProtos.TableCF[] tableCFs) {
-    if (tableCFs == null || tableCFs.length == 0) {
-      return null;
-    }
-    Map<TableName, List<String>> tableCFsMap = new HashMap<>();
-    for (int i = 0, n = tableCFs.length; i < n; i++) {
-      ReplicationProtos.TableCF tableCF = tableCFs[i];
-      List<String> families = new ArrayList<>();
-      for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) {
-        families.add(tableCF.getFamilies(j).toStringUtf8());
-      }
-      if (families.size() > 0) {
-        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), 
families);
-      } else {
-        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), 
null);
-      }
-    }
-
-    return tableCFsMap;
-  }
-
-  /**
-   * @param bytes Content of a peer znode.
-   * @return ClusterKey parsed from the passed bytes.
-   * @throws DeserializationException
-   */
-  public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
-      throws DeserializationException {
-    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
-      int pblen = ProtobufUtil.lengthOfPBMagic();
-      ReplicationProtos.ReplicationPeer.Builder builder =
-          ReplicationProtos.ReplicationPeer.newBuilder();
-      ReplicationProtos.ReplicationPeer peer;
-      try {
-        ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
-        peer = builder.build();
-      } catch (IOException e) {
-        throw new DeserializationException(e);
-      }
-      return convert(peer);
-    } else {
-      if (bytes.length > 0) {
-        return new 
ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes));
-      }
-      return new ReplicationPeerConfig().setClusterKey("");
-    }
-  }
-
-  public static ReplicationPeerConfig 
convert(ReplicationProtos.ReplicationPeer peer) {
-    ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
-    if (peer.hasClusterkey()) {
-      peerConfig.setClusterKey(peer.getClusterkey());
-    }
-    if (peer.hasReplicationEndpointImpl()) {
-      peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
-    }
-
-    for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) {
-      peerConfig.getPeerData().put(pair.getFirst().toByteArray(), 
pair.getSecond().toByteArray());
-    }
-
-    for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
-      peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
-    }
-
-    Map<TableName, ? extends Collection<String>> tableCFsMap = convert2Map(
-      peer.getTableCfsList().toArray(new 
ReplicationProtos.TableCF[peer.getTableCfsCount()]));
-    if (tableCFsMap != null) {
-      peerConfig.setTableCFsMap(tableCFsMap);
-    }
-    List<ByteString> namespacesList = peer.getNamespacesList();
-    if (namespacesList != null && namespacesList.size() != 0) {
-      Set<String> namespaces = new HashSet<>();
-      for (ByteString namespace : namespacesList) {
-        namespaces.add(namespace.toStringUtf8());
-      }
-      peerConfig.setNamespaces(namespaces);
-    }
-    if (peer.hasBandwidth()) {
-      peerConfig.setBandwidth(peer.getBandwidth());
-    }
-    return peerConfig;
-  }
-
-  public static ReplicationProtos.ReplicationPeer 
convert(ReplicationPeerConfig peerConfig) {
-    ReplicationProtos.ReplicationPeer.Builder builder = 
ReplicationProtos.ReplicationPeer.newBuilder();
-    if (peerConfig.getClusterKey() != null) {
-      builder.setClusterkey(peerConfig.getClusterKey());
-    }
-    if (peerConfig.getReplicationEndpointImpl() != null) {
-      
builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
-    }
-
-    for (Map.Entry<byte[], byte[]> entry : 
peerConfig.getPeerData().entrySet()) {
-      builder.addData(HBaseProtos.BytesBytesPair.newBuilder()
-          .setFirst(UnsafeByteOperations.unsafeWrap(entry.getKey()))
-          .setSecond(UnsafeByteOperations.unsafeWrap(entry.getValue()))
-          .build());
-    }
-
-    for (Map.Entry<String, String> entry : 
peerConfig.getConfiguration().entrySet()) {
-      builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder()
-          .setName(entry.getKey())
-          .setValue(entry.getValue())
-          .build());
-    }
-
-    ReplicationProtos.TableCF[] tableCFs = 
convert(peerConfig.getTableCFsMap());
-    if (tableCFs != null) {
-      for (int i = 0; i < tableCFs.length; i++) {
-        builder.addTableCfs(tableCFs[i]);
-      }
-    }
-    Set<String> namespaces = peerConfig.getNamespaces();
-    if (namespaces != null) {
-      for (String namespace : namespaces) {
-        builder.addNamespaces(ByteString.copyFromUtf8(namespace));
-      }
-    }
-
-    builder.setBandwidth(peerConfig.getBandwidth());
-    return builder.build();
-  }
-
-  /**
-   * @param peerConfig
-   * @return Serialized protobuf of <code>peerConfig</code> with pb magic 
prefix prepended suitable
-   *         for use as content of a this.peersZNode; i.e. the content of 
PEER_ID znode under
-   *         /hbase/replication/peers/PEER_ID
-   */
-  public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
-    byte[] bytes = convert(peerConfig).toByteArray();
-    return ProtobufUtil.prependPBMagic(bytes);
-  }
-
-  public static ReplicationPeerDescription toReplicationPeerDescription(
-      ReplicationProtos.ReplicationPeerDescription desc) {
-    boolean enabled = ReplicationProtos.ReplicationState.State.ENABLED == 
desc.getState()
-        .getState();
-    ReplicationPeerConfig config = convert(desc.getConfig());
-    return new ReplicationPeerDescription(desc.getId(), enabled, config);
-  }
-
-  public static ReplicationProtos.ReplicationPeerDescription 
toProtoReplicationPeerDescription(
-      ReplicationPeerDescription desc) {
-    ReplicationProtos.ReplicationPeerDescription.Builder builder = 
ReplicationProtos.ReplicationPeerDescription
-        .newBuilder();
-    builder.setId(desc.getPeerId());
-    ReplicationProtos.ReplicationState.Builder stateBuilder = 
ReplicationProtos.ReplicationState
-        .newBuilder();
-    stateBuilder.setState(desc.isEnabled() ? 
ReplicationProtos.ReplicationState.State.ENABLED
-        : ReplicationProtos.ReplicationState.State.DISABLED);
-    builder.setState(stateBuilder.build());
-    builder.setConfig(convert(desc.getPeerConfig()));
-    return builder.build();
-  }
-
-  public static void appendTableCFsToReplicationPeerConfig(
-      Map<TableName, ? extends Collection<String>> tableCfs, 
ReplicationPeerConfig peerConfig) {
-    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
-    if (preTableCfs == null) {
-      peerConfig.setTableCFsMap(tableCfs);
-    } else {
-      for (Map.Entry<TableName, ? extends Collection<String>> entry : 
tableCfs.entrySet()) {
-        TableName table = entry.getKey();
-        Collection<String> appendCfs = entry.getValue();
-        if (preTableCfs.containsKey(table)) {
-          List<String> cfs = preTableCfs.get(table);
-          if (cfs == null || appendCfs == null || appendCfs.isEmpty()) {
-            preTableCfs.put(table, null);
-          } else {
-            Set<String> cfSet = new HashSet<String>(cfs);
-            cfSet.addAll(appendCfs);
-            preTableCfs.put(table, Lists.newArrayList(cfSet));
-          }
-        } else {
-          if (appendCfs == null || appendCfs.isEmpty()) {
-            preTableCfs.put(table, null);
-          } else {
-            preTableCfs.put(table, Lists.newArrayList(appendCfs));
-          }
-        }
-      }
-    }
-  }
-
-  public static void removeTableCFsFromReplicationPeerConfig(
-      Map<TableName, ? extends Collection<String>> tableCfs, 
ReplicationPeerConfig peerConfig,
-      String id) throws ReplicationException {
-    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
-    if (preTableCfs == null) {
-      throw new ReplicationException("Table-Cfs for peer: " + id + " is null");
-    }
-    for (Map.Entry<TableName, ? extends Collection<String>> entry : 
tableCfs.entrySet()) {
-      TableName table = entry.getKey();
-      Collection<String> removeCfs = entry.getValue();
-      if (preTableCfs.containsKey(table)) {
-        List<String> cfs = preTableCfs.get(table);
-        if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
-          preTableCfs.remove(table);
-        } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) 
{
-          Set<String> cfSet = new HashSet<String>(cfs);
-          cfSet.removeAll(removeCfs);
-          if (cfSet.isEmpty()) {
-            preTableCfs.remove(table);
-          } else {
-            preTableCfs.put(table, Lists.newArrayList(cfSet));
-          }
-        } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) 
{
-          throw new ReplicationException("Cannot remove cf of table: " + table
-              + " which doesn't specify cfs from table-cfs config in peer: " + 
id);
-        } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
-          throw new ReplicationException("Cannot remove table: " + table
-              + " which has specified cfs from table-cfs config in peer: " + 
id);
-        }
-      } else {
-        throw new ReplicationException("No table: " + table + " in table-cfs 
config of peer: " + id);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index bdd6e74..4d429c9 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -123,4 +123,24 @@ public class ReplicationPeerConfig {
     builder.append("bandwidth=").append(bandwidth);
     return builder.toString();
   }
+
+  /**
+   * Decide whether the table need replicate to the peer cluster
+   * @param table name of the table
+   * @return true if the table need replicate to the peer cluster
+   */
+  public boolean needToReplicate(TableName table) {
+    // If null means user has explicitly not configured any namespaces and 
table CFs
+    // so all the tables data are applicable for replication
+    if (namespaces == null && tableCFsMap == null) {
+      return true;
+    }
+    if (namespaces != null && 
namespaces.contains(table.getNamespaceAsString())) {
+      return true;
+    }
+    if (tableCFsMap != null && tableCFsMap.containsKey(table)) {
+      return true;
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d885e223/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 4558deb..9eff114 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -1621,7 +1621,7 @@ public final class RequestConverter {
       String peerId, ReplicationPeerConfig peerConfig) {
     AddReplicationPeerRequest.Builder builder = 
AddReplicationPeerRequest.newBuilder();
     builder.setPeerId(peerId);
-    builder.setPeerConfig(ReplicationSerDeHelper.convert(peerConfig));
+    builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
     return builder.build();
   }
 
@@ -1658,7 +1658,7 @@ public final class RequestConverter {
     UpdateReplicationPeerConfigRequest.Builder builder = 
UpdateReplicationPeerConfigRequest
         .newBuilder();
     builder.setPeerId(peerId);
-    builder.setPeerConfig(ReplicationSerDeHelper.convert(peerConfig));
+    builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
     return builder.build();
   }
 

Reply via email to