This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 26c4e86  HBASE-25976 Implement a master based ReplicationTracker 
(#3390)
26c4e86 is described below

commit 26c4e86ebdc42deff7cb22424fc58b22f3614815
Author: Duo Zhang <[email protected]>
AuthorDate: Thu Jun 17 18:24:49 2021 +0800

    HBASE-25976 Implement a master based ReplicationTracker (#3390)
    
    Signed-off-by: Bharath Vissapragada <[email protected]>
---
 hbase-replication/pom.xml                          |   5 +
 .../replication/MasterReplicationTracker.java      | 103 ++++++++
 .../hbase/replication/ReplicationFactory.java      |  17 +-
 .../hbase/replication/ReplicationListener.java     |   3 +-
 .../hbase/replication/ReplicationTracker.java      |  33 ++-
 .../hbase/replication/ReplicationTrackerBase.java  |  72 ++++++
 .../replication/ReplicationTrackerParams.java      |  98 ++++++++
 ...rackerZKImpl.java => ZKReplicationTracker.java} | 102 +++-----
 .../replication/ReplicationTrackerTestBase.java    | 110 +++++++++
 .../replication/TestMasterReplicationTracker.java  |  87 +++++++
 .../replication/TestZKReplicationPeerStorage.java  |  30 ++-
 .../replication/TestZKReplicationTracker.java      |  84 +++++++
 .../regionserver/DumpReplicationQueues.java        |   9 +-
 .../replication/regionserver/Replication.java      |   7 +-
 .../regionserver/ReplicationSourceManager.java     |  21 +-
 .../replication/TestReplicationTrackerZKImpl.java  | 270 ---------------------
 16 files changed, 685 insertions(+), 366 deletions(-)

diff --git a/hbase-replication/pom.xml b/hbase-replication/pom.xml
index 7295617..60bb16d 100644
--- a/hbase-replication/pom.xml
+++ b/hbase-replication/pom.xml
@@ -121,6 +121,11 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>jcl-over-slf4j</artifactId>
       <scope>test</scope>
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/MasterReplicationTracker.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/MasterReplicationTracker.java
new file mode 100644
index 0000000..c55a82e
--- /dev/null
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/MasterReplicationTracker.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link ReplicationTracker} implementation which polls the region servers 
list periodically from
+ * master.
+ */
[email protected]
+class MasterReplicationTracker extends ReplicationTrackerBase {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MasterReplicationTracker.class);
+
+  static final String REFRESH_INTERVAL_SECONDS =
+    "hbase.replication.tracker.master.refresh.interval.secs";
+
+  // default to refresh every 5 seconds
+  static final int REFRESH_INTERVAL_SECONDS_DEFAULT = 5;
+
+  private final ChoreService choreService;
+
+  private final ScheduledChore chore;
+
+  private final Admin admin;
+
+  private volatile Set<ServerName> regionServers;
+
+  MasterReplicationTracker(ReplicationTrackerParams params) {
+    try {
+      this.admin = params.connection().getAdmin();
+    } catch (IOException e) {
+      // should not happen
+      throw new AssertionError(e);
+    }
+    this.choreService = params.choreService();
+    int refreshIntervalSecs =
+      params.conf().getInt(REFRESH_INTERVAL_SECONDS, 
REFRESH_INTERVAL_SECONDS_DEFAULT);
+    this.chore = new ScheduledChore(getClass().getSimpleName(), 
params.stopptable(),
+      refreshIntervalSecs, 0, TimeUnit.SECONDS) {
+
+      @Override
+      protected void chore() {
+        try {
+          refresh();
+        } catch (IOException e) {
+          LOG.warn("failed to refresh region server list for replication", e);
+        }
+      }
+    };
+  }
+
+  private Set<ServerName> reload() throws IOException {
+    return Collections.unmodifiableSet(new 
HashSet<>(admin.getRegionServers()));
+
+  }
+
+  private void refresh() throws IOException {
+    Set<ServerName> newRegionServers = reload();
+    for (ServerName oldRs : regionServers) {
+      if (!newRegionServers.contains(oldRs)) {
+        notifyListeners(oldRs);
+      }
+    }
+    this.regionServers = newRegionServers;
+  }
+
+  @Override
+  protected Set<ServerName> 
internalLoadLiveRegionServersAndInitializeListeners()
+    throws IOException {
+    Set<ServerName> newRegionServers = reload();
+    this.regionServers = newRegionServers;
+    choreService.scheduleChore(chore);
+    return newRegionServers;
+  }
+}
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
index 2a970ba..b4d33d6 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
@@ -18,8 +18,7 @@
 package org.apache.hadoop.hbase.replication;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -27,14 +26,20 @@ import org.apache.yetus.audience.InterfaceAudience;
  * A factory class for instantiating replication objects that deal with 
replication state.
  */
 @InterfaceAudience.Private
-public class ReplicationFactory {
+public final class ReplicationFactory {
+
+  public static final String REPLICATION_TRACKER_IMPL = 
"hbase.replication.tracker.impl";
+
+  private ReplicationFactory() {
+  }
 
   public static ReplicationPeers getReplicationPeers(ZKWatcher zk, 
Configuration conf) {
     return new ReplicationPeers(zk, conf);
   }
 
-  public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper, 
Abortable abortable,
-      Stoppable stopper) {
-    return new ReplicationTrackerZKImpl(zookeeper, abortable, stopper);
+  public static ReplicationTracker 
getReplicationTracker(ReplicationTrackerParams params) {
+    Class<? extends ReplicationTracker> clazz = 
params.conf().getClass(REPLICATION_TRACKER_IMPL,
+      ZKReplicationTracker.class, ReplicationTracker.class);
+    return ReflectionUtils.newInstance(clazz, params);
   }
 }
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
index f040bf9..5c21e1e 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -33,5 +34,5 @@ public interface ReplicationListener {
    * A region server has been removed from the local cluster
    * @param regionServer the removed region server
    */
-  public void regionServerRemoved(String regionServer);
+  public void regionServerRemoved(ServerName regionServer);
 }
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
index 9370226..1d59401 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
@@ -18,18 +18,20 @@
  */
 package org.apache.hadoop.hbase.replication;
 
-import java.util.List;
-
+import java.io.IOException;
+import java.util.Set;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
- * This is the interface for a Replication Tracker. A replication tracker 
provides the facility to
- * subscribe and track events that reflect a change in replication state. 
These events are used by
- * the ReplicationSourceManager to coordinate replication tasks such as 
addition/deletion of queues
- * and queue failover. These events are defined in the ReplicationListener 
interface. If a class
- * would like to listen to replication events it must implement the 
ReplicationListener interface
- * and register itself with a Replication Tracker.
+ * This is the interface for a Replication Tracker.
+ * <p/>
+ * A replication tracker provides the facility to subscribe and track events 
that reflect a change
+ * in replication state. These events are used by the ReplicationSourceManager 
to coordinate
+ * replication tasks such as addition/deletion of queues and queue failover. 
These events are
+ * defined in the ReplicationListener interface. If a class would like to 
listen to replication
+ * events it must implement the ReplicationListener interface and register 
itself with a Replication
+ * Tracker.
  */
 @InterfaceAudience.Private
 public interface ReplicationTracker {
@@ -40,11 +42,20 @@ public interface ReplicationTracker {
    */
   void registerListener(ReplicationListener listener);
 
+  /**
+   * Remove a replication listener
+   * @param listener the listener to remove
+   */
   void removeListener(ReplicationListener listener);
 
   /**
-   * Returns a list of other live region servers in the cluster.
-   * @return List of region servers.
+   * In this method, you need to load the newest list of region server list 
and return it, and all
+   * later changes to the region server list must be passed to the listeners.
+   * <p/>
+   * This is very important for us to not miss a region server crash.
+   * <p/>
+   * Notice that this method can only be called once.
+   * @return Set of region servers.
    */
-  List<ServerName> getListOfRegionServers();
+  Set<ServerName> loadLiveRegionServersAndInitializeListeners() throws 
IOException;
 }
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerBase.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerBase.java
new file mode 100644
index 0000000..96a3061
--- /dev/null
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerBase.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base implementation class for replication tracker.
+ */
[email protected]
+abstract class ReplicationTrackerBase implements ReplicationTracker {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationTrackerBase.class);
+
+  // listeners to be notified
+  private final List<ReplicationListener> listeners = new 
CopyOnWriteArrayList<>();
+
+  private final AtomicBoolean initialized = new AtomicBoolean(false);
+
+  @Override
+  public void registerListener(ReplicationListener listener) {
+    listeners.add(listener);
+  }
+
+  @Override
+  public void removeListener(ReplicationListener listener) {
+    listeners.remove(listener);
+  }
+
+  protected final void notifyListeners(ServerName regionServer) {
+    LOG.info("{} is dead, triggering replicatorRemoved event", regionServer);
+    for (ReplicationListener listener : listeners) {
+      listener.regionServerRemoved(regionServer);
+    }
+  }
+
+  @Override
+  public final Set<ServerName> loadLiveRegionServersAndInitializeListeners() 
throws IOException {
+    if (!initialized.compareAndSet(false, true)) {
+      throw new IllegalStateException(
+        "loadLiveRegionServersAndInitializeListeners can only be called once");
+    }
+    return internalLoadLiveRegionServersAndInitializeListeners();
+  }
+
+  protected abstract Set<ServerName> 
internalLoadLiveRegionServersAndInitializeListeners()
+    throws IOException;
+
+}
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerParams.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerParams.java
new file mode 100644
index 0000000..9aeedcf
--- /dev/null
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerParams.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Parameters for constructing a {@link ReplicationTracker}.
+ */
[email protected]
+public final class ReplicationTrackerParams {
+
+  private final Configuration conf;
+
+  private final Stoppable stopptable;
+
+  private ZKWatcher zookeeper;
+
+  private Abortable abortable;
+
+  private Connection conn;
+
+  private ChoreService choreService;
+
+  private ReplicationTrackerParams(Configuration conf, Stoppable stopptable) {
+    this.conf = conf;
+    this.stopptable = stopptable;
+  }
+
+  public ReplicationTrackerParams zookeeper(ZKWatcher zookeeper) {
+    this.zookeeper = zookeeper;
+    return this;
+  }
+
+  public ReplicationTrackerParams abortable(Abortable abortable) {
+    this.abortable = abortable;
+    return this;
+  }
+
+  public ReplicationTrackerParams connection(Connection conn) {
+    this.conn = conn;
+    return this;
+  }
+
+  public ReplicationTrackerParams choreService(ChoreService choreService) {
+    this.choreService = choreService;
+    return this;
+  }
+
+  public Configuration conf() {
+    return conf;
+  }
+
+  public Stoppable stopptable() {
+    return stopptable;
+  }
+
+  public ZKWatcher zookeeper() {
+    return zookeeper;
+  }
+
+  public Abortable abortable() {
+    return abortable;
+  }
+
+  public Connection connection() {
+    return conn;
+  }
+
+  public ChoreService choreService() {
+    return choreService;
+  }
+
+  public static ReplicationTrackerParams create(Configuration conf, Stoppable 
stopptable) {
+    return new ReplicationTrackerParams(conf, stopptable);
+  }
+}
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationTracker.java
similarity index 65%
rename from 
hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
rename to 
hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationTracker.java
index 6fc3c45..b74187a 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationTracker.java
@@ -17,11 +17,11 @@
  */
 package org.apache.hadoop.hbase.replication;
 
-import java.util.ArrayList;
+import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.Set;
 import java.util.stream.Collectors;
-
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Stoppable;
@@ -30,8 +30,8 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
 
 /**
  * This class is a ZooKeeper implementation of the ReplicationTracker 
interface. This class is
@@ -39,9 +39,7 @@ import org.slf4j.LoggerFactory;
  * interface.
  */
 @InterfaceAudience.Private
-public class ReplicationTrackerZKImpl implements ReplicationTracker {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationTrackerZKImpl.class);
+class ZKReplicationTracker extends ReplicationTrackerBase {
 
   // Zookeeper
   private final ZKWatcher zookeeper;
@@ -49,42 +47,14 @@ public class ReplicationTrackerZKImpl implements 
ReplicationTracker {
   private final Abortable abortable;
   // All about stopping
   private final Stoppable stopper;
-  // listeners to be notified
-  private final List<ReplicationListener> listeners = new 
CopyOnWriteArrayList<>();
   // List of all the other region servers in this cluster
-  private final List<ServerName> otherRegionServers = new ArrayList<>();
+  private final Set<ServerName> regionServers = new HashSet<>();
 
-  public ReplicationTrackerZKImpl(ZKWatcher zookeeper, Abortable abortable, 
Stoppable stopper) {
-    this.zookeeper = zookeeper;
-    this.abortable = abortable;
-    this.stopper = stopper;
+  ZKReplicationTracker(ReplicationTrackerParams params) {
+    this.zookeeper = params.zookeeper();
+    this.abortable = params.abortable();
+    this.stopper = params.stopptable();
     this.zookeeper.registerListener(new 
OtherRegionServerWatcher(this.zookeeper));
-    // watch the changes
-    refreshOtherRegionServersList(true);
-  }
-
-  @Override
-  public void registerListener(ReplicationListener listener) {
-    listeners.add(listener);
-  }
-
-  @Override
-  public void removeListener(ReplicationListener listener) {
-    listeners.remove(listener);
-  }
-
-  /**
-   * Return a snapshot of the current region servers.
-   */
-  @Override
-  public List<ServerName> getListOfRegionServers() {
-    refreshOtherRegionServersList(false);
-
-    List<ServerName> list = null;
-    synchronized (otherRegionServers) {
-      list = new ArrayList<>(otherRegionServers);
-    }
-    return list;
   }
 
   /**
@@ -106,6 +76,9 @@ public class ReplicationTrackerZKImpl implements 
ReplicationTracker {
      */
     @Override
     public void nodeCreated(String path) {
+      if (stopper.isStopped()) {
+        return;
+      }
       refreshListIfRightPath(path);
     }
 
@@ -118,14 +91,10 @@ public class ReplicationTrackerZKImpl implements 
ReplicationTracker {
       if (stopper.isStopped()) {
         return;
       }
-      boolean cont = refreshListIfRightPath(path);
-      if (!cont) {
+      if (!refreshListIfRightPath(path)) {
         return;
       }
-      LOG.info(path + " znode expired, triggering replicatorRemoved event");
-      for (ReplicationListener rl : listeners) {
-        rl.regionServerRemoved(getZNodeName(path));
-      }
+      notifyListeners(ServerName.valueOf(getZNodeName(path)));
     }
 
     /**
@@ -144,7 +113,7 @@ public class ReplicationTrackerZKImpl implements 
ReplicationTracker {
       if (!path.startsWith(this.watcher.getZNodePaths().rsZNode)) {
         return false;
       }
-      return refreshOtherRegionServersList(true);
+      return refreshRegionServerList();
     }
   }
 
@@ -154,8 +123,8 @@ public class ReplicationTrackerZKImpl implements 
ReplicationTracker {
    * @return the id or an empty string if path is invalid
    */
   private String getZNodeName(String fullPath) {
-    String[] parts = fullPath.split("/");
-    return parts.length > 0 ? parts[parts.length - 1] : "";
+    List<String> parts = Splitter.on('/').splitToList(fullPath);
+    return parts.size() > 0 ? parts.get(parts.size() - 1) : "";
   }
 
   /**
@@ -164,14 +133,14 @@ public class ReplicationTrackerZKImpl implements 
ReplicationTracker {
    * @return true if the local list of the other region servers was updated 
with the ZK data (even
    *         if it was empty), false if the data was missing in ZK
    */
-  private boolean refreshOtherRegionServersList(boolean watch) {
-    List<ServerName> newRsList = getRegisteredRegionServers(watch);
+  private boolean refreshRegionServerList() {
+    Set<ServerName> newRsList = getRegisteredRegionServers();
     if (newRsList == null) {
       return false;
     } else {
-      synchronized (otherRegionServers) {
-        otherRegionServers.clear();
-        otherRegionServers.addAll(newRsList);
+      synchronized (regionServers) {
+        regionServers.clear();
+        regionServers.addAll(newRsList);
       }
     }
     return true;
@@ -181,19 +150,26 @@ public class ReplicationTrackerZKImpl implements 
ReplicationTracker {
    * Get a list of all the other region servers in this cluster and set a watch
    * @return a list of server nanes
    */
-  private List<ServerName> getRegisteredRegionServers(boolean watch) {
+  private Set<ServerName> getRegisteredRegionServers() {
     List<String> result = null;
     try {
-      if (watch) {
-        result = ZKUtil.listChildrenAndWatchThem(this.zookeeper,
-                this.zookeeper.getZNodePaths().rsZNode);
-      } else {
-        result = ZKUtil.listChildrenNoWatch(this.zookeeper, 
this.zookeeper.getZNodePaths().rsZNode);
-      }
+      result =
+        ZKUtil.listChildrenAndWatchThem(this.zookeeper, 
this.zookeeper.getZNodePaths().rsZNode);
     } catch (KeeperException e) {
       this.abortable.abort("Get list of registered region servers", e);
     }
     return result == null ? null :
-      
result.stream().map(ServerName::parseServerName).collect(Collectors.toList());
+      
result.stream().map(ServerName::parseServerName).collect(Collectors.toSet());
+  }
+
+  @Override
+  protected Set<ServerName> 
internalLoadLiveRegionServersAndInitializeListeners()
+    throws IOException {
+    if (!refreshRegionServerList()) {
+      throw new IOException("failed to refresh region server list");
+    }
+    synchronized (regionServers) {
+      return new HashSet<>(regionServers);
+    }
   }
 }
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationTrackerTestBase.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationTrackerTestBase.java
new file mode 100644
index 0000000..270e9f6
--- /dev/null
+++ 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationTrackerTestBase.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for testing {@link ReplicationTracker} and {@link 
ReplicationListener}.
+ */
+public abstract class ReplicationTrackerTestBase {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationTrackerTestBase.class);
+
+  private ReplicationTracker rt;
+
+  private AtomicInteger rsRemovedCount;
+
+  private volatile ServerName rsRemovedData;
+
+  @Before
+  public void setUp() {
+    ReplicationTrackerParams params = createParams();
+    rt = ReplicationFactory.getReplicationTracker(params);
+    rsRemovedCount = new AtomicInteger(0);
+    rsRemovedData = null;
+  }
+
+  protected abstract ReplicationTrackerParams createParams();
+
+  protected abstract void addServer(ServerName sn) throws Exception;
+
+  protected abstract void removeServer(ServerName sn) throws Exception;
+
+  @Test
+  public void testWatchRegionServers() throws Exception {
+    ServerName sn =
+      ServerName.valueOf("hostname2.example.org,1234," + 
EnvironmentEdgeManager.currentTime());
+    addServer(sn);
+    rt.registerListener(new DummyReplicationListener());
+    assertEquals(1, rt.loadLiveRegionServersAndInitializeListeners().size());
+    // delete one
+    removeServer(sn);
+    // wait for event
+    Waiter.waitFor(HBaseConfiguration.create(), 15000, () -> 
rsRemovedCount.get() >= 1);
+    assertEquals(sn, rsRemovedData);
+  }
+
+  private class DummyReplicationListener implements ReplicationListener {
+
+    @Override
+    public void regionServerRemoved(ServerName regionServer) {
+      rsRemovedData = regionServer;
+      rsRemovedCount.getAndIncrement();
+      LOG.debug("Received regionServerRemoved event: " + regionServer);
+    }
+  }
+
+  protected static class WarnOnlyStoppable implements Stoppable {
+
+    @Override
+    public void stop(String why) {
+      LOG.warn("TestReplicationTracker received stop, ignoring. Reason: " + 
why);
+    }
+
+    @Override
+    public boolean isStopped() {
+      return false;
+    }
+  }
+
+  protected static class WarnOnlyAbortable implements Abortable {
+
+    @Override
+    public void abort(String why, Throwable e) {
+      LOG.warn("TestReplicationTracker received abort, ignoring. Reason: " + 
why);
+    }
+
+    @Override
+    public boolean isAborted() {
+      return false;
+    }
+  }
+}
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplicationTracker.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplicationTracker.java
new file mode 100644
index 0000000..357908f
--- /dev/null
+++ 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplicationTracker.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestMasterReplicationTracker extends ReplicationTrackerTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestMasterReplicationTracker.class);
+
+  private static Configuration CONF;
+
+  private static Connection CONN;
+
+  private static ChoreService CHORE_SERVICE;
+
+  private static List<ServerName> SERVERS = new CopyOnWriteArrayList<>();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws IOException {
+    CONF = HBaseConfiguration.create();
+    CONF.setClass(ReplicationFactory.REPLICATION_TRACKER_IMPL, 
MasterReplicationTracker.class,
+      ReplicationTracker.class);
+    Admin admin = mock(Admin.class);
+    when(admin.getRegionServers()).thenReturn(SERVERS);
+    CONN = mock(Connection.class);
+    when(CONN.getAdmin()).thenReturn(admin);
+    CHORE_SERVICE = new ChoreService("TestMasterReplicationTracker");
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() {
+    CHORE_SERVICE.shutdown();
+  }
+
+  @Override
+  protected ReplicationTrackerParams createParams() {
+    return ReplicationTrackerParams.create(CONF, new WarnOnlyStoppable())
+      .abortable(new 
WarnOnlyAbortable()).connection(CONN).choreService(CHORE_SERVICE);
+  }
+
+  @Override
+  protected void addServer(ServerName sn) throws Exception {
+    SERVERS.add(sn);
+  }
+
+  @Override
+  protected void removeServer(ServerName sn) throws Exception {
+    SERVERS.remove(sn);
+  }
+}
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
index 77b71ad..a5a4566 100644
--- 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
+++ 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
@@ -19,12 +19,15 @@ package org.apache.hadoop.hbase.replication;
 
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -40,6 +43,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.zookeeper.KeeperException;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -184,7 +188,8 @@ public class TestZKReplicationPeerStorage {
     }
   }
 
-  @Test public void testBaseReplicationPeerConfig() throws 
ReplicationException{
+  @Test
+  public void testBaseReplicationPeerConfig() throws ReplicationException {
     String customPeerConfigKey = "hbase.xxx.custom_config";
     String customPeerConfigValue = "test";
     String customPeerConfigUpdatedValue = "testUpdated";
@@ -227,7 +232,8 @@ public class TestZKReplicationPeerStorage {
       getConfiguration().get(customPeerConfigSecondKey));
   }
 
-  @Test public void testBaseReplicationRemovePeerConfig() throws 
ReplicationException {
+  @Test
+  public void testBaseReplicationRemovePeerConfig() throws 
ReplicationException {
     String customPeerConfigKey = "hbase.xxx.custom_config";
     String customPeerConfigValue = "test";
     ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
@@ -256,7 +262,8 @@ public class TestZKReplicationPeerStorage {
     
assertNull(replicationPeerConfigRemoved.getConfiguration().get(customPeerConfigKey));
   }
 
-  @Test public void testBaseReplicationRemovePeerConfigWithNoExistingConfig()
+  @Test
+  public void testBaseReplicationRemovePeerConfigWithNoExistingConfig()
     throws ReplicationException {
     String customPeerConfigKey = "hbase.xxx.custom_config";
     ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
@@ -271,4 +278,21 @@ public class TestZKReplicationPeerStorage {
       updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
     
assertNull(updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
   }
+
+  @Test
+  public void testPeerNameControl() throws Exception {
+    String clusterKey = "key";
+    STORAGE.addPeer("6", 
ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), true);
+
+    try {
+      STORAGE.addPeer("6", 
ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(),
+        true);
+      fail();
+    } catch (ReplicationException e) {
+      assertThat(e.getCause(), 
instanceOf(KeeperException.NodeExistsException.class));
+    } finally {
+      // clean up
+      STORAGE.removePeer("6");
+    }
+  }
 }
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationTracker.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationTracker.java
new file mode 100644
index 0000000..4ef42e3
--- /dev/null
+++ 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationTracker.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseZKTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestZKReplicationTracker extends ReplicationTrackerTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestZKReplicationTracker.class);
+
+  private static Configuration CONF;
+
+  private static HBaseZKTestingUtility UTIL;
+
+  private static ZKWatcher ZKW;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    UTIL = new HBaseZKTestingUtility();
+    UTIL.startMiniZKCluster();
+    CONF = UTIL.getConfiguration();
+    CONF.setClass(ReplicationFactory.REPLICATION_TRACKER_IMPL, 
ZKReplicationTracker.class,
+      ReplicationTracker.class);
+    ZKWatcher zk = HBaseZKTestingUtility.getZooKeeperWatcher(UTIL);
+    ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode);
+    ZKW = HBaseZKTestingUtility.getZooKeeperWatcher(UTIL);
+  }
+
+  @Override
+  protected ReplicationTrackerParams createParams() {
+    return ReplicationTrackerParams.create(CONF, new WarnOnlyStoppable())
+      .abortable(new WarnOnlyAbortable()).zookeeper(ZKW);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    Closeables.close(ZKW, true);
+    UTIL.shutdownMiniZKCluster();
+  }
+
+  @Override
+  protected void addServer(ServerName sn) throws Exception {
+    ZKUtil.createAndWatch(ZKW, 
ZNodePaths.joinZNode(ZKW.getZNodePaths().rsZNode, sn.toString()),
+      HConstants.EMPTY_BYTE_ARRAY);
+  }
+
+  @Override
+  protected void removeServer(ServerName sn) throws Exception {
+    ZKUtil.deleteNode(ZKW, ZNodePaths.joinZNode(ZKW.getZNodePaths().rsZNode, 
sn.toString()));
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index 8e1969f..5561384 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -51,6 +51,7 @@ import 
org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
+import org.apache.hadoop.hbase.replication.ReplicationTrackerParams;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -311,9 +312,11 @@ public class DumpReplicationQueues extends Configured 
implements Tool {
     StringBuilder sb = new StringBuilder();
 
     queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, 
getConf());
-    replicationTracker = ReplicationFactory.getReplicationTracker(zkw, new 
WarnOnlyAbortable(),
-      new WarnOnlyStoppable());
-    Set<ServerName> liveRegionServers = new 
HashSet<>(replicationTracker.getListOfRegionServers());
+    replicationTracker = ReplicationFactory
+      .getReplicationTracker(ReplicationTrackerParams.create(getConf(), new 
WarnOnlyStoppable())
+        .abortable(new WarnOnlyAbortable()).zookeeper(zkw));
+    Set<ServerName> liveRegionServers =
+      new 
HashSet<>(replicationTracker.loadLiveRegionServersAndInitializeListeners());
 
     // Loops each peer on each RS and dumps the queues
     List<ServerName> regionservers = queueStorage.getListOfReplicators();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 4ca2c56..12f8ed8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
+import org.apache.hadoop.hbase.replication.ReplicationTrackerParams;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WALFactory;
@@ -110,8 +111,10 @@ public class Replication implements 
ReplicationSourceService, ReplicationSinkSer
       this.replicationPeers =
           ReplicationFactory.getReplicationPeers(server.getZooKeeper(), 
this.conf);
       this.replicationPeers.init();
-      this.replicationTracker =
-          ReplicationFactory.getReplicationTracker(server.getZooKeeper(), 
this.server, this.server);
+      this.replicationTracker = ReplicationFactory
+        
.getReplicationTracker(ReplicationTrackerParams.create(server.getConfiguration(),
 server)
+          
.abortable(server).zookeeper(server.getZooKeeper()).choreService(server.getChoreService())
+          .connection(server.getConnection()));
     } catch (Exception e) {
       throw new IOException("Failed replication handler create", e);
     }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 9926251..68f6f8b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -41,7 +41,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -262,16 +261,24 @@ public class ReplicationSourceManager implements 
ReplicationListener {
       server.abort("Failed to get all replicators", e);
       return;
     }
+    Set<ServerName> liveRegionServers;
+    try {
+      // must call this method to load the first snapshot of live region 
servers and initialize
+      // listeners
+      liveRegionServers = 
replicationTracker.loadLiveRegionServersAndInitializeListeners();
+    } catch (IOException e) {
+      server.abort("Failed load live region server list for replication", e);
+      return;
+    }
+    LOG.info("Current list of replicators: {}, live RSes: {}", 
currentReplicators,
+      liveRegionServers);
     if (currentReplicators == null || currentReplicators.isEmpty()) {
       return;
     }
-    List<ServerName> otherRegionServers = 
replicationTracker.getListOfRegionServers();
-    LOG.info(
-      "Current list of replicators: " + currentReplicators + " other RSs: " + 
otherRegionServers);
 
     // Look if there's anything to process after a restart
     for (ServerName rs : currentReplicators) {
-      if (!otherRegionServers.contains(rs)) {
+      if (!liveRegionServers.contains(rs)) {
         transferQueues(rs);
       }
     }
@@ -659,8 +666,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   }
 
   @Override
-  public void regionServerRemoved(String regionserver) {
-    transferQueues(ServerName.valueOf(regionserver));
+  public void regionServerRemoved(ServerName regionserver) {
+    transferQueues(regionserver);
   }
 
   /**
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
deleted file mode 100644
index 86743fe..0000000
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hbase.ChoreService;
-import org.apache.hadoop.hbase.ClusterId;
-import org.apache.hadoop.hbase.CoordinatedStateManager;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-import org.apache.zookeeper.KeeperException;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class tests the ReplicationTrackerZKImpl class and ReplicationListener 
interface. One
- * MiniZKCluster is used throughout the entire class. The cluster is 
initialized with the creation
- * of the rsZNode. All other znode creation/initialization is handled by the 
replication state
- * interfaces (i.e. ReplicationPeers, etc.). Each test case in this class 
should ensure that the
- * MiniZKCluster is cleaned and returned to it's initial state (i.e. nothing 
but the rsZNode).
- */
-@Category({ReplicationTests.class, MediumTests.class})
-public class TestReplicationTrackerZKImpl {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestReplicationTrackerZKImpl.class);
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestReplicationTrackerZKImpl.class);
-
-  private static Configuration conf;
-  private static HBaseTestingUtility utility;
-
-  // Each one of the below variables are reinitialized before every test case
-  private ZKWatcher zkw;
-  private ReplicationPeers rp;
-  private ReplicationTracker rt;
-  private AtomicInteger rsRemovedCount;
-  private String rsRemovedData;
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    utility = new HBaseTestingUtility();
-    utility.startMiniZKCluster();
-    conf = utility.getConfiguration();
-    ZKWatcher zk = HBaseTestingUtility.getZooKeeperWatcher(utility);
-    ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode);
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
-    String fakeRs1 = ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
-            "hostname1.example.org:1234");
-    try {
-      ZKClusterId.setClusterId(zkw, new ClusterId());
-      rp = ReplicationFactory.getReplicationPeers(zkw, conf);
-      rp.init();
-      rt = ReplicationFactory.getReplicationTracker(zkw, new 
DummyServer(fakeRs1),
-        new DummyServer(fakeRs1));
-    } catch (Exception e) {
-      fail("Exception during test setup: " + e);
-    }
-    rsRemovedCount = new AtomicInteger(0);
-    rsRemovedData = "";
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    utility.shutdownMiniZKCluster();
-  }
-
-  @Test
-  public void testGetListOfRegionServers() throws Exception {
-    // 0 region servers
-    assertEquals(0, rt.getListOfRegionServers().size());
-
-    // 1 region server
-    ZKUtil.createWithParents(zkw, 
ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
-      "hostname1.example.org,1234,1611218678009"));
-    List<ServerName> rss = rt.getListOfRegionServers();
-    assertEquals(rss.toString(), 1, rss.size());
-
-    // 2 region servers
-    ZKUtil.createWithParents(zkw, 
ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
-      "hostname2.example.org,1234,1611218678009"));
-    rss = rt.getListOfRegionServers();
-    assertEquals(rss.toString(), 2, rss.size());
-
-    // 1 region server
-    ZKUtil.deleteNode(zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
-      "hostname2.example.org,1234,1611218678009"));
-    rss = rt.getListOfRegionServers();
-    assertEquals(1, rss.size());
-
-    // 0 region server
-    ZKUtil.deleteNode(zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
-      "hostname1.example.org,1234,1611218678009"));
-    rss = rt.getListOfRegionServers();
-    assertEquals(rss.toString(), 0, rss.size());
-  }
-
-  @Test
-  public void testRegionServerRemovedEvent() throws Exception {
-    ZKUtil.createAndWatch(zkw,
-      ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, 
"hostname2.example.org:1234"),
-      HConstants.EMPTY_BYTE_ARRAY);
-    rt.registerListener(new DummyReplicationListener());
-    // delete one
-    ZKUtil.deleteNode(zkw,
-      ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, 
"hostname2.example.org:1234"));
-    // wait for event
-    while (rsRemovedCount.get() < 1) {
-      Thread.sleep(5);
-    }
-    assertEquals("hostname2.example.org:1234", rsRemovedData);
-  }
-
-  @Test
-  public void testPeerNameControl() throws Exception {
-    int exists = 0;
-    rp.getPeerStorage().addPeer("6",
-      
ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(),
 true);
-
-    try {
-      rp.getPeerStorage().addPeer("6",
-        
ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(),
 true);
-    } catch (ReplicationException e) {
-      if (e.getCause() instanceof KeeperException.NodeExistsException) {
-        exists++;
-      }
-    }
-
-    assertEquals(1, exists);
-
-    // clean up
-    rp.getPeerStorage().removePeer("6");
-  }
-
-  private class DummyReplicationListener implements ReplicationListener {
-
-    @Override
-    public void regionServerRemoved(String regionServer) {
-      rsRemovedData = regionServer;
-      rsRemovedCount.getAndIncrement();
-      LOG.debug("Received regionServerRemoved event: " + regionServer);
-    }
-  }
-
-  private class DummyServer implements Server {
-    private String serverName;
-    private boolean isAborted = false;
-    private boolean isStopped = false;
-
-    public DummyServer(String serverName) {
-      this.serverName = serverName;
-    }
-
-    @Override
-    public Configuration getConfiguration() {
-      return conf;
-    }
-
-    @Override
-    public ZKWatcher getZooKeeper() {
-      return zkw;
-    }
-
-    @Override
-    public CoordinatedStateManager getCoordinatedStateManager() {
-      return null;
-    }
-
-    @Override
-    public ClusterConnection getConnection() {
-      return null;
-    }
-
-    @Override
-    public ServerName getServerName() {
-      return ServerName.valueOf(this.serverName);
-    }
-
-    @Override
-    public void abort(String why, Throwable e) {
-      LOG.info("Aborting " + serverName);
-      this.isAborted = true;
-    }
-
-    @Override
-    public boolean isAborted() {
-      return this.isAborted;
-    }
-
-    @Override
-    public void stop(String why) {
-      this.isStopped = true;
-    }
-
-    @Override
-    public boolean isStopped() {
-      return this.isStopped;
-    }
-
-    @Override
-    public ChoreService getChoreService() {
-      return null;
-    }
-
-    @Override
-    public ClusterConnection getClusterConnection() {
-      // TODO Auto-generated method stub
-      return null;
-    }
-
-    @Override
-    public FileSystem getFileSystem() {
-      return null;
-    }
-
-    @Override
-    public boolean isStopping() {
-      return false;
-    }
-
-    @Override
-    public Connection createConnection(Configuration conf) throws IOException {
-      return null;
-    }
-  }
-}

Reply via email to