Repository: hbase
Updated Branches:
  refs/heads/hbase-12439 3c35a722d -> 05ab41d1b


http://git-wip-us.apache.org/repos/asf/hbase/blob/e1f4aaea/hbase-protocol-shaded/src/main/protobuf/Master.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto 
b/hbase-protocol-shaded/src/main/protobuf/Master.proto
index b283ed9..384ac67 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Master.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto
@@ -32,6 +32,7 @@ import "ClusterStatus.proto";
 import "ErrorHandling.proto";
 import "Procedure.proto";
 import "Quota.proto";
+import "Replication.proto";
 
 /* Column-level protobufs */
 
@@ -846,4 +847,12 @@ service MasterService {
   /** returns a list of procedures */
   rpc ListProcedures(ListProceduresRequest)
     returns(ListProceduresResponse);
+
+  /** Add a replication peer */
+  rpc AddReplicationPeer(AddReplicationPeerRequest)
+    returns(AddReplicationPeerResponse);
+
+  /** Remove a replication peer */
+  rpc RemoveReplicationPeer(RemoveReplicationPeerRequest)
+    returns(RemoveReplicationPeerResponse);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1f4aaea/hbase-protocol-shaded/src/main/protobuf/Replication.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto 
b/hbase-protocol-shaded/src/main/protobuf/Replication.proto
new file mode 100644
index 0000000..0bdf2c0
--- /dev/null
+++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto
@@ -0,0 +1,42 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
+option java_outer_classname = "ReplicationProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "ZooKeeper.proto";
+
+message AddReplicationPeerRequest {
+  required string peer_id = 1;
+  required ReplicationPeer peer_config = 2;
+}
+
+message AddReplicationPeerResponse {
+}
+
+message RemoveReplicationPeerRequest {
+  required string peer_id = 1;
+}
+
+message RemoveReplicationPeerResponse {
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1f4aaea/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
index 9abcd52..5067b3b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.master.RegionPlan;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
 
@@ -1827,4 +1828,45 @@ public interface MasterObserver extends Coprocessor {
   void postBalanceRSGroup(final ObserverContext<MasterCoprocessorEnvironment> 
ctx,
                           String groupName, boolean balancerRan) throws 
IOException;
 
+  /**
+   * Called before add a replication peer
+   * @param ctx the environment to interact with the framework and master
+   * @param peerId a short name that identifies the peer
+   * @param peerConfig configuration for the replication peer
+   * @throws IOException on failure
+   */
+  default void preAddReplicationPeer(final 
ObserverContext<MasterCoprocessorEnvironment> ctx,
+      String peerId, ReplicationPeerConfig peerConfig) throws IOException {
+  }
+
+  /**
+   * Called after add a replication peer
+   * @param ctx the environment to interact with the framework and master
+   * @param peerId a short name that identifies the peer
+   * @param peerConfig configuration for the replication peer
+   * @throws IOException on failure
+   */
+  default void postAddReplicationPeer(final 
ObserverContext<MasterCoprocessorEnvironment> ctx,
+      String peerId, ReplicationPeerConfig peerConfig) throws IOException {
+  }
+
+  /**
+   * Called before remove a replication peer
+   * @param ctx
+   * @param peerId a short name that identifies the peer
+   * @throws IOException on failure
+   */
+  default void preRemoveReplicationPeer(final 
ObserverContext<MasterCoprocessorEnvironment> ctx,
+      String peerId) throws IOException {
+  }
+
+  /**
+   * Called after remove a replication peer
+   * @param ctx
+   * @param peerId a short name that identifies the peer
+   * @throws IOException on failure
+   */
+  default void postRemoveReplicationPeer(final 
ObserverContext<MasterCoprocessorEnvironment> ctx,
+      String peerId) throws IOException {
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1f4aaea/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index c5c246b..da35da1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -119,6 +119,7 @@ import 
org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
 import org.apache.hadoop.hbase.master.procedure.SplitTableRegionProcedure;
 import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
+import org.apache.hadoop.hbase.master.replication.ReplicationManager;
 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
@@ -138,7 +139,12 @@ import 
org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
 import 
org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
 import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
@@ -291,6 +297,9 @@ public class HMaster extends HRegionServer implements 
MasterServices {
   // manager of assignment nodes in zookeeper
   private AssignmentManager assignmentManager;
 
+  // manager of replication
+  private ReplicationManager replicationManager;
+
   // buffer for "fatal error" notices from region servers
   // in the cluster. This is only used for assisting
   // operations/debugging.
@@ -640,6 +649,8 @@ public class HMaster extends HRegionServer implements 
MasterServices {
       this.balancer, this.service, this.metricsMaster,
       this.tableLockManager, tableStateManager);
 
+    this.replicationManager = new ReplicationManager(conf, zooKeeper, this);
+
     this.regionServerTracker = new RegionServerTracker(zooKeeper, this, 
this.serverManager);
     this.regionServerTracker.start();
 
@@ -3135,4 +3146,30 @@ public class HMaster extends HRegionServer implements 
MasterServices {
   public FavoredNodesManager getFavoredNodesManager() {
     return favoredNodesManager;
   }
+
+  @Override
+  public void addReplicationPeer(String peerId, ReplicationPeerConfig 
peerConfig)
+      throws ReplicationException, IOException {
+    if (cpHost != null) {
+      cpHost.preAddReplicationPeer(peerId, peerConfig);
+    }
+    LOG.info(getClientIdAuditPrefix() + " creating replication peer, id=" + 
peerId + ", config="
+        + peerConfig);
+    this.replicationManager.addReplicationPeer(peerId, peerConfig);
+    if (cpHost != null) {
+      cpHost.postAddReplicationPeer(peerId, peerConfig);
+    }
+  }
+
+  @Override
+  public void removeReplicationPeer(String peerId) throws 
ReplicationException, IOException {
+    if (cpHost != null) {
+      cpHost.preRemoveReplicationPeer(peerId);
+    }
+    LOG.info(getClientIdAuditPrefix() + " removing replication peer, id=" + 
peerId);
+    this.replicationManager.removeReplicationPeer(peerId);
+    if (cpHost != null) {
+      cpHost.postRemoveReplicationPeer(peerId);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1f4aaea/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index a18068d..97fbe67 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
 import org.apache.hadoop.hbase.security.User;
@@ -1645,4 +1646,45 @@ public class MasterCoprocessorHost
     });
   }
 
+  public void preAddReplicationPeer(final String peerId, final 
ReplicationPeerConfig peerConfig)
+      throws IOException {
+    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+      @Override
+      public void call(MasterObserver observer, 
ObserverContext<MasterCoprocessorEnvironment> ctx)
+          throws IOException {
+        observer.preAddReplicationPeer(ctx, peerId, peerConfig);
+      }
+    });
+  }
+
+  public void postAddReplicationPeer(final String peerId, final 
ReplicationPeerConfig peerConfig)
+      throws IOException {
+    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+      @Override
+      public void call(MasterObserver observer, 
ObserverContext<MasterCoprocessorEnvironment> ctx)
+          throws IOException {
+        observer.postAddReplicationPeer(ctx, peerId, peerConfig);
+      }
+    });
+  }
+
+  public void preRemoveReplicationPeer(final String peerId) throws IOException 
{
+    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+      @Override
+      public void call(MasterObserver observer, 
ObserverContext<MasterCoprocessorEnvironment> ctx)
+          throws IOException {
+        observer.preRemoveReplicationPeer(ctx, peerId);
+      }
+    });
+  }
+
+  public void postRemoveReplicationPeer(final String peerId) throws 
IOException {
+    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+      @Override
+      public void call(MasterObserver observer, 
ObserverContext<MasterCoprocessorEnvironment> ctx)
+          throws IOException {
+        observer.postRemoveReplicationPeer(ctx, peerId);
+      }
+    });
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1f4aaea/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 2990076..afd807c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.UnknownRegionException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
 import org.apache.hadoop.hbase.errorhandling.ForeignException;
 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
@@ -86,7 +87,12 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.access.AccessController;
 import org.apache.hadoop.hbase.security.visibility.VisibilityController;
@@ -1638,4 +1644,27 @@ public class MasterRpcServices extends RSRpcServices
     }
     return null;
   }
+
+  @Override
+  public AddReplicationPeerResponse addReplicationPeer(RpcController 
controller,
+      AddReplicationPeerRequest request) throws ServiceException {
+    try {
+      master.addReplicationPeer(request.getPeerId(),
+        ReplicationSerDeHelper.convert(request.getPeerConfig()));
+      return AddReplicationPeerResponse.newBuilder().build();
+    } catch (ReplicationException | IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public RemoveReplicationPeerResponse removeReplicationPeer(RpcController 
controller,
+      RemoveReplicationPeerRequest request) throws ServiceException {
+    try {
+      master.removeReplicationPeer(request.getPeerId());
+      return RemoveReplicationPeerResponse.newBuilder().build();
+    } catch (ReplicationException | IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1f4aaea/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 7845101..5fc9d16 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -39,6 +39,8 @@ import 
org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 
 import com.google.protobuf.Service;
 
@@ -415,4 +417,18 @@ public interface MasterServices extends Server {
    * @return Favored Nodes Manager
    */
   public FavoredNodesManager getFavoredNodesManager();
+
+  /**
+   * Add a new replication peer for replicating data to slave cluster
+   * @param peerId a short name that identifies the peer
+   * @param peerConfig configuration for the replication slave cluster
+   */
+  void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
+      throws ReplicationException, IOException;
+
+  /**
+   * Removes a peer and stops the replication
+   * @param peerId a short name that identifies the peer
+   */
+  void removeReplicationPeer(String peerId) throws ReplicationException, 
IOException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1f4aaea/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
new file mode 100644
index 0000000..748f7af
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
+/**
+ * Manages and performs all replication admin operations.
+ * Used to add/remove a replication peer.
+ */
[email protected]
+public class ReplicationManager {
+
+  private final Configuration conf;
+  private final ZooKeeperWatcher zkw;
+  private final ReplicationQueuesClient replicationQueuesClient;
+  private final ReplicationPeers replicationPeers;
+
+  public ReplicationManager(Configuration conf, ZooKeeperWatcher zkw, 
Abortable abortable)
+      throws IOException {
+    this.conf = conf;
+    this.zkw = zkw;
+    try {
+      this.replicationQueuesClient = ReplicationFactory
+          .getReplicationQueuesClient(new 
ReplicationQueuesClientArguments(conf, abortable, zkw));
+      this.replicationQueuesClient.init();
+      this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf,
+        this.replicationQueuesClient, abortable);
+      this.replicationPeers.init();
+    } catch (Exception e) {
+      throw new IOException("Failed to construct ReplicationManager", e);
+    }
+  }
+
+  public void addReplicationPeer(String peerId, ReplicationPeerConfig 
peerConfig)
+      throws ReplicationException {
+    checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
+      peerConfig.getTableCFsMap());
+    this.replicationPeers.registerPeer(peerId, peerConfig);
+  }
+
+  public void removeReplicationPeer(String peerId) throws ReplicationException 
{
+    this.replicationPeers.unregisterPeer(peerId);
+  }
+
+  /**
+   * Set a namespace in the peer config means that all tables in this namespace
+   * will be replicated to the peer cluster.
+   *
+   * 1. If you already have set a namespace in the peer config, then you can't 
set any table
+   *    of this namespace to the peer config.
+   * 2. If you already have set a table in the peer config, then you can't set 
this table's
+   *    namespace to the peer config.
+   *
+   * @param namespaces
+   * @param tableCfs
+   * @throws ReplicationException
+   */
+  private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
+      Map<TableName, ? extends Collection<String>> tableCfs) throws 
ReplicationException {
+    if (namespaces == null || namespaces.isEmpty()) {
+      return;
+    }
+    if (tableCfs == null || tableCfs.isEmpty()) {
+      return;
+    }
+    for (Map.Entry<TableName, ? extends Collection<String>> entry : 
tableCfs.entrySet()) {
+      TableName table = entry.getKey();
+      if (namespaces.contains(table.getNamespaceAsString())) {
+        throw new ReplicationException(
+            "Table-cfs config conflict with namespaces config in peer");
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1f4aaea/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index d9afbc8..0452883 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -105,6 +105,7 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
@@ -2695,4 +2696,16 @@ public class AccessController extends 
BaseMasterAndRegionObserver
                                 String groupName) throws IOException {
     requirePermission(getActiveUser(ctx), "balanceRSGroup", Action.ADMIN);
   }
+
+  @Override
+  public void preAddReplicationPeer(final 
ObserverContext<MasterCoprocessorEnvironment> ctx,
+      String peerId, ReplicationPeerConfig peerConfig) throws IOException {
+    requirePermission(getActiveUser(ctx), "addReplicationPeer", Action.ADMIN);
+  }
+
+  @Override
+  public void preRemoveReplicationPeer(final 
ObserverContext<MasterCoprocessorEnvironment> ctx,
+      String peerId) throws IOException {
+    requirePermission(getActiveUser(ctx), "removeReplicationPeer", 
Action.ADMIN);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1f4aaea/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index 7363fb9..10c73a6 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
@@ -76,8 +77,9 @@ public class TestReplicationAdmin {
    */
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    TEST_UTIL.startMiniZKCluster();
+    TEST_UTIL.startMiniCluster();
     Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
     admin = new ReplicationAdmin(conf);
   }
 
@@ -86,7 +88,7 @@ public class TestReplicationAdmin {
     if (admin != null) {
       admin.close();
     }
-    TEST_UTIL.shutdownMiniZKCluster();
+    TEST_UTIL.shutdownMiniCluster();
   }
 
   /**
@@ -105,7 +107,7 @@ public class TestReplicationAdmin {
     // try adding the same (fails)
     try {
       admin.addPeer(ID_ONE, rpc1, null);
-    } catch (IllegalArgumentException iae) {
+    } catch (Exception e) {
       // OK!
     }
     assertEquals(1, admin.getPeersCount());
@@ -113,14 +115,14 @@ public class TestReplicationAdmin {
     try {
       admin.removePeer(ID_SECOND);
       fail();
-    } catch (IllegalArgumentException iae) {
+    } catch (Exception iae) {
       // OK!
     }
     assertEquals(1, admin.getPeersCount());
     // Add a second since multi-slave is supported
     try {
       admin.addPeer(ID_SECOND, rpc2, null);
-    } catch (IllegalStateException iae) {
+    } catch (Exception iae) {
       fail();
     }
     assertEquals(2, admin.getPeersCount());
@@ -170,7 +172,7 @@ public class TestReplicationAdmin {
     try {
       admin.addPeer(ID_ONE, rpc1, null);
       fail();
-    } catch (ReplicationException e) {
+    } catch (Exception e) {
       // OK!
     }
     repQueues.removeQueue(ID_ONE);
@@ -181,7 +183,7 @@ public class TestReplicationAdmin {
     try {
       admin.addPeer(ID_ONE, rpc2, null);
       fail();
-    } catch (ReplicationException e) {
+    } catch (Exception e) {
       // OK!
     }
     repQueues.removeAllQueues();
@@ -422,7 +424,7 @@ public class TestReplicationAdmin {
   }
 
   @Test
-  public void testNamespacesAndTableCfsConfigConflict() throws 
ReplicationException {
+  public void testNamespacesAndTableCfsConfigConflict() throws Exception {
     String ns1 = "ns1";
     String ns2 = "ns2";
     TableName tab1 = TableName.valueOf("ns1:tabl");
@@ -471,7 +473,7 @@ public class TestReplicationAdmin {
   }
 
   @Test
-  public void testPeerBandwidth() throws ReplicationException {
+  public void testPeerBandwidth() throws Exception {
     ReplicationPeerConfig rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(KEY_ONE);
     admin.addPeer(ID_ONE, rpc);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1f4aaea/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index aec4057..55138a0 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -40,6 +40,8 @@ import 
org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -380,4 +382,13 @@ public class MockNoopMasterServices implements 
MasterServices, Server {
   public MasterProcedureManagerHost getMasterProcedureManagerHost() {
     return null;
   }
+
+  @Override
+  public void addReplicationPeer(String peerId, ReplicationPeerConfig 
peerConfig)
+      throws ReplicationException {
+  }
+
+  @Override
+  public void removeReplicationPeer(String peerId) throws ReplicationException 
{
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1f4aaea/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
index 7f2fb08..2e83c56 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -275,8 +276,8 @@ public class TestMasterNoCluster {
       void initClusterSchemaService() throws IOException, InterruptedException 
{}
 
       @Override
-      void initializeZKBasedSystemTrackers() throws IOException,
-      InterruptedException, KeeperException, CoordinatedStateException {
+      void initializeZKBasedSystemTrackers() throws IOException, 
InterruptedException,
+          KeeperException, CoordinatedStateException {
         super.initializeZKBasedSystemTrackers();
         // Record a newer server in server manager at first
         getServerManager().recordNewServerWithLock(newServer, 
ServerLoad.EMPTY_SERVERLOAD);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1f4aaea/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index b3739fb..474039b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -123,18 +123,18 @@ public class TestReplicationBase {
     utility2 = new HBaseTestingUtility(conf2);
     utility2.setZkCluster(miniZK);
     zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null, true);
-
-    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
-    rpc.setClusterKey(utility2.getClusterKey());
-    admin.addPeer("2", rpc, null);
-
     LOG.info("Setup second Zk");
+
     CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1);
     utility1.startMiniCluster(2);
     // Have a bunch of slave servers, because inter-cluster shipping logic 
uses number of sinks
     // as a component in deciding maximum number of parallel batches to send 
to the peer cluster.
     utility2.startMiniCluster(4);
 
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(utility2.getClusterKey());
+    admin.addPeer("2", rpc, null);
+
     HTableDescriptor table = new HTableDescriptor(tableName);
     HColumnDescriptor fam = new HColumnDescriptor(famName);
     fam.setMaxVersions(100);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1f4aaea/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java
index af0e357..a680f70 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java
@@ -130,14 +130,14 @@ public class TestReplicationWithTags {
     utility2 = new HBaseTestingUtility(conf2);
     utility2.setZkCluster(miniZK);
 
-    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
-    rpc.setClusterKey(utility2.getClusterKey());
-    replicationAdmin.addPeer("2", rpc, null);
-
     LOG.info("Setup second Zk");
     utility1.startMiniCluster(2);
     utility2.startMiniCluster(2);
 
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(utility2.getClusterKey());
+    replicationAdmin.addPeer("2", rpc, null);
+
     HTableDescriptor table = new HTableDescriptor(TABLE_NAME);
     HColumnDescriptor fam = new HColumnDescriptor(FAMILY);
     fam.setMaxVersions(3);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1f4aaea/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
index 6fcccaf..c9f4319 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
@@ -106,14 +106,14 @@ public class TestSerialReplication {
     utility2.setZkCluster(miniZK);
     new ZooKeeperWatcher(conf2, "cluster2", null, true);
 
+    utility1.startMiniCluster(1, 10);
+    utility2.startMiniCluster(1, 1);
+
     ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
     ReplicationPeerConfig rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(utility2.getClusterKey());
     admin1.addPeer("1", rpc, null);
 
-    utility1.startMiniCluster(1, 10);
-    utility2.startMiniCluster(1, 1);
-
     utility1.getHBaseAdmin().setBalancerRunning(false, true);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1f4aaea/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
index 33ff094..a0f6f29 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
@@ -2870,4 +2870,34 @@ public class TestAccessController extends SecureTestUtil 
{
     verifyAllowed(action1, SUPERUSER, USER_ADMIN);
     verifyDenied(action1, USER_CREATE, USER_RW, USER_RO, USER_NONE, 
USER_OWNER);
   }
+
+  @Test
+  public void testAddReplicationPeer() throws Exception {
+    AccessTestAction action = new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        
ACCESS_CONTROLLER.preAddReplicationPeer(ObserverContext.createAndPrepare(CP_ENV,
 null),
+          "test", null);
+        return null;
+      }
+    };
+
+    verifyAllowed(action, SUPERUSER, USER_ADMIN);
+    verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER);
+  }
+
+  @Test
+  public void testRemoveReplicationPeer() throws Exception {
+    AccessTestAction action = new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        
ACCESS_CONTROLLER.preRemoveReplicationPeer(ObserverContext.createAndPrepare(CP_ENV,
 null),
+          "test");
+        return null;
+      }
+    };
+
+    verifyAllowed(action, SUPERUSER, USER_ADMIN);
+    verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1f4aaea/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java
index fafa500..56a7260 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java
@@ -128,14 +128,16 @@ public class 
TestVisibilityLabelReplicationWithExpAsString extends TestVisibilit
     TEST_UTIL1 = new HBaseTestingUtility(conf1);
     TEST_UTIL1.setZkCluster(miniZK);
     zkw2 = new ZooKeeperWatcher(conf1, "cluster2", null, true);
-    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
-    rpc.setClusterKey(TEST_UTIL1.getClusterKey());
-    replicationAdmin.addPeer("2", rpc, null);
 
     TEST_UTIL.startMiniCluster(1);
     // Wait for the labels table to become available
     TEST_UTIL.waitTableEnabled(LABELS_TABLE_NAME.getName(), 50000);
     TEST_UTIL1.startMiniCluster(1);
+
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(TEST_UTIL1.getClusterKey());
+    replicationAdmin.addPeer("2", rpc, null);
+
     HTableDescriptor table = new HTableDescriptor(TABLE_NAME);
     HColumnDescriptor desc = new HColumnDescriptor(fam);
     desc.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1f4aaea/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
index a62a281..31b74fb 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
@@ -177,14 +177,16 @@ public class TestVisibilityLabelsReplication {
     TEST_UTIL1 = new HBaseTestingUtility(conf1);
     TEST_UTIL1.setZkCluster(miniZK);
     zkw2 = new ZooKeeperWatcher(conf1, "cluster2", null, true);
-    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
-    rpc.setClusterKey(TEST_UTIL1.getClusterKey());
-    replicationAdmin.addPeer("2", rpc, null);
 
     TEST_UTIL.startMiniCluster(1);
     // Wait for the labels table to become available
     TEST_UTIL.waitTableEnabled(LABELS_TABLE_NAME.getName(), 50000);
     TEST_UTIL1.startMiniCluster(1);
+
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(TEST_UTIL1.getClusterKey());
+    replicationAdmin.addPeer("2", rpc, null);
+
     Admin hBaseAdmin = TEST_UTIL.getHBaseAdmin();
     HTableDescriptor table = new HTableDescriptor(TABLE_NAME);
     HColumnDescriptor desc = new HColumnDescriptor(fam);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1f4aaea/src/main/asciidoc/_chapters/appendix_acl_matrix.adoc
----------------------------------------------------------------------
diff --git a/src/main/asciidoc/_chapters/appendix_acl_matrix.adoc 
b/src/main/asciidoc/_chapters/appendix_acl_matrix.adoc
index e222875..a71d916 100644
--- a/src/main/asciidoc/_chapters/appendix_acl_matrix.adoc
+++ b/src/main/asciidoc/_chapters/appendix_acl_matrix.adoc
@@ -116,6 +116,8 @@ In case the table goes out of date, the unit tests which 
check for accuracy of p
 |        | setUserQuota(Table level) | 
superuser\|global(A)\|NS(A)\|TableOwner\|table(A)
 |        | setTableQuota | superuser\|global(A)\|NS(A)\|TableOwner\|table(A)
 |        | setNamespaceQuota | superuser\|global(A)
+|        | addReplicationPeer | superuser\|global(A)
+|        | removeReplicationPeer | superuser\|global(A)
 | Region | openRegion | superuser\|global(A)
 |        | closeRegion | superuser\|global(A)
 |        | flush | 
superuser\|global(A)\|global\(C)\|TableOwner\|table(A)\|table\(C)

Reply via email to