This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 748e369  HBASE-26029 It is not reliable to use nodeDeleted event to 
track region server's death (#3430)
748e369 is described below

commit 748e369629bca0f68cc259d5269aa2adfe62ff22
Author: Duo Zhang <[email protected]>
AuthorDate: Wed Jun 30 08:44:19 2021 +0800

    HBASE-26029 It is not reliable to use nodeDeleted event to track region 
server's death (#3430)
    
    Signed-off-by: Xin Sun <[email protected]>
---
 .../src/main/protobuf/MasterProcedure.proto        |  25 +-
 .../replication/MasterReplicationTracker.java      | 103 -------
 .../hbase/replication/ReplicationFactory.java      |   7 -
 .../hbase/replication/ReplicationTracker.java      |  61 ----
 .../hbase/replication/ReplicationTrackerBase.java  |  72 -----
 .../replication/ReplicationTrackerParams.java      |  98 -------
 .../hbase/replication/ZKReplicationTracker.java    | 175 ------------
 .../replication/ReplicationTrackerTestBase.java    | 110 --------
 .../replication/TestMasterReplicationTracker.java  |  87 ------
 .../replication/TestZKReplicationTracker.java      |  84 ------
 .../apache/hadoop/hbase/executor/EventType.java    |   9 +-
 .../apache/hadoop/hbase/executor/ExecutorType.java |   3 +-
 .../master/procedure/ServerCrashProcedure.java     |   7 +-
 .../master/procedure/ServerProcedureInterface.java |  16 +-
 .../hadoop/hbase/master/procedure/ServerQueue.java |   2 +
 .../ClaimReplicationQueueRemoteProcedure.java      | 127 +++++++++
 .../ClaimReplicationQueuesProcedure.java           | 147 ++++++++++
 .../BaseRSProcedureCallable.java}                  |  49 ++--
 .../hadoop/hbase/regionserver/HRegionServer.java   |   4 +
 .../hbase/regionserver/SplitWALCallable.java       |  31 +--
 ...ble.java => ClaimReplicationQueueCallable.java} |  47 ++--
 .../regionserver/DumpReplicationQueues.java        |  28 +-
 .../regionserver/PeerProcedureHandler.java         |   5 +-
 .../regionserver/PeerProcedureHandlerImpl.java     |   7 +
 .../regionserver/RefreshPeerCallable.java          |  29 +-
 .../replication/regionserver/Replication.java      |  12 +-
 .../regionserver/ReplicationSourceManager.java     | 307 +++++++--------------
 .../regionserver/ReplicationSyncUp.java            |  35 ++-
 .../SwitchRpcThrottleRemoteCallable.java           |  27 +-
 .../replication/TestClaimReplicationQueue.java     | 165 +++++++++++
 .../hbase/replication/TestReplicationBase.java     |  78 +++---
 .../regionserver/TestReplicationSourceManager.java |   8 +-
 32 files changed, 771 insertions(+), 1194 deletions(-)

diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto 
b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index e060077..9511ee4 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -312,8 +312,9 @@ enum ServerCrashState {
   SERVER_CRASH_WAIT_ON_ASSIGN = 9;
   SERVER_CRASH_SPLIT_META_LOGS = 10;
   SERVER_CRASH_ASSIGN_META = 11;
-  SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR=12;
-  SERVER_CRASH_DELETE_SPLIT_WALS_DIR=13;
+  SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR = 12;
+  SERVER_CRASH_DELETE_SPLIT_WALS_DIR = 13;
+  SERVER_CRASH_CLAIM_REPLICATION_QUEUES = 14;
   SERVER_CRASH_HANDLE_RIT2 = 20[deprecated=true];
   SERVER_CRASH_FINISH = 100;
 }
@@ -561,3 +562,23 @@ enum SplitWALState {
   DISPATCH_WAL_TO_WORKER = 2;
   RELEASE_SPLIT_WORKER = 3;
 }
+
+message ClaimReplicationQueuesStateData {
+  required ServerName crashed_server = 1;
+}
+
+message ClaimReplicationQueueRemoteStateData {
+  required ServerName crashed_server = 1;
+  required string queue = 2;
+  required ServerName target_server = 3;
+}
+
+message ClaimReplicationQueueRemoteParameter {
+  required ServerName crashed_server = 1;
+  required string queue = 2;
+}
+
+enum ClaimReplicationQueuesState {
+  CLAIM_REPLICATION_QUEUES_DISPATCH = 1;
+  CLAIM_REPLICATION_QUEUES_FINISH = 2;
+}
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/MasterReplicationTracker.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/MasterReplicationTracker.java
deleted file mode 100644
index c55a82e..0000000
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/MasterReplicationTracker.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.hbase.ChoreService;
-import org.apache.hadoop.hbase.ScheduledChore;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link ReplicationTracker} implementation which polls the region servers 
list periodically from
- * master.
- */
[email protected]
-class MasterReplicationTracker extends ReplicationTrackerBase {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(MasterReplicationTracker.class);
-
-  static final String REFRESH_INTERVAL_SECONDS =
-    "hbase.replication.tracker.master.refresh.interval.secs";
-
-  // default to refresh every 5 seconds
-  static final int REFRESH_INTERVAL_SECONDS_DEFAULT = 5;
-
-  private final ChoreService choreService;
-
-  private final ScheduledChore chore;
-
-  private final Admin admin;
-
-  private volatile Set<ServerName> regionServers;
-
-  MasterReplicationTracker(ReplicationTrackerParams params) {
-    try {
-      this.admin = params.connection().getAdmin();
-    } catch (IOException e) {
-      // should not happen
-      throw new AssertionError(e);
-    }
-    this.choreService = params.choreService();
-    int refreshIntervalSecs =
-      params.conf().getInt(REFRESH_INTERVAL_SECONDS, 
REFRESH_INTERVAL_SECONDS_DEFAULT);
-    this.chore = new ScheduledChore(getClass().getSimpleName(), 
params.stopptable(),
-      refreshIntervalSecs, 0, TimeUnit.SECONDS) {
-
-      @Override
-      protected void chore() {
-        try {
-          refresh();
-        } catch (IOException e) {
-          LOG.warn("failed to refresh region server list for replication", e);
-        }
-      }
-    };
-  }
-
-  private Set<ServerName> reload() throws IOException {
-    return Collections.unmodifiableSet(new 
HashSet<>(admin.getRegionServers()));
-
-  }
-
-  private void refresh() throws IOException {
-    Set<ServerName> newRegionServers = reload();
-    for (ServerName oldRs : regionServers) {
-      if (!newRegionServers.contains(oldRs)) {
-        notifyListeners(oldRs);
-      }
-    }
-    this.regionServers = newRegionServers;
-  }
-
-  @Override
-  protected Set<ServerName> 
internalLoadLiveRegionServersAndInitializeListeners()
-    throws IOException {
-    Set<ServerName> newRegionServers = reload();
-    this.regionServers = newRegionServers;
-    choreService.scheduleChore(chore);
-    return newRegionServers;
-  }
-}
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
index b4d33d6..8342160 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.replication;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -36,10 +35,4 @@ public final class ReplicationFactory {
   public static ReplicationPeers getReplicationPeers(ZKWatcher zk, 
Configuration conf) {
     return new ReplicationPeers(zk, conf);
   }
-
-  public static ReplicationTracker 
getReplicationTracker(ReplicationTrackerParams params) {
-    Class<? extends ReplicationTracker> clazz = 
params.conf().getClass(REPLICATION_TRACKER_IMPL,
-      ZKReplicationTracker.class, ReplicationTracker.class);
-    return ReflectionUtils.newInstance(clazz, params);
-  }
 }
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
deleted file mode 100644
index 1d59401..0000000
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.IOException;
-import java.util.Set;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * This is the interface for a Replication Tracker.
- * <p/>
- * A replication tracker provides the facility to subscribe and track events 
that reflect a change
- * in replication state. These events are used by the ReplicationSourceManager 
to coordinate
- * replication tasks such as addition/deletion of queues and queue failover. 
These events are
- * defined in the ReplicationListener interface. If a class would like to 
listen to replication
- * events it must implement the ReplicationListener interface and register 
itself with a Replication
- * Tracker.
- */
[email protected]
-public interface ReplicationTracker {
-
-  /**
-   * Register a replication listener to receive replication events.
-   * @param listener
-   */
-  void registerListener(ReplicationListener listener);
-
-  /**
-   * Remove a replication listener
-   * @param listener the listener to remove
-   */
-  void removeListener(ReplicationListener listener);
-
-  /**
-   * In this method, you need to load the newest list of region server list 
and return it, and all
-   * later changes to the region server list must be passed to the listeners.
-   * <p/>
-   * This is very important for us to not miss a region server crash.
-   * <p/>
-   * Notice that this method can only be called once.
-   * @return Set of region servers.
-   */
-  Set<ServerName> loadLiveRegionServersAndInitializeListeners() throws 
IOException;
-}
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerBase.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerBase.java
deleted file mode 100644
index 96a3061..0000000
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerBase.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base implementation class for replication tracker.
- */
[email protected]
-abstract class ReplicationTrackerBase implements ReplicationTracker {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationTrackerBase.class);
-
-  // listeners to be notified
-  private final List<ReplicationListener> listeners = new 
CopyOnWriteArrayList<>();
-
-  private final AtomicBoolean initialized = new AtomicBoolean(false);
-
-  @Override
-  public void registerListener(ReplicationListener listener) {
-    listeners.add(listener);
-  }
-
-  @Override
-  public void removeListener(ReplicationListener listener) {
-    listeners.remove(listener);
-  }
-
-  protected final void notifyListeners(ServerName regionServer) {
-    LOG.info("{} is dead, triggering replicatorRemoved event", regionServer);
-    for (ReplicationListener listener : listeners) {
-      listener.regionServerRemoved(regionServer);
-    }
-  }
-
-  @Override
-  public final Set<ServerName> loadLiveRegionServersAndInitializeListeners() 
throws IOException {
-    if (!initialized.compareAndSet(false, true)) {
-      throw new IllegalStateException(
-        "loadLiveRegionServersAndInitializeListeners can only be called once");
-    }
-    return internalLoadLiveRegionServersAndInitializeListeners();
-  }
-
-  protected abstract Set<ServerName> 
internalLoadLiveRegionServersAndInitializeListeners()
-    throws IOException;
-
-}
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerParams.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerParams.java
deleted file mode 100644
index 9aeedcf..0000000
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerParams.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.ChoreService;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * Parameters for constructing a {@link ReplicationTracker}.
- */
[email protected]
-public final class ReplicationTrackerParams {
-
-  private final Configuration conf;
-
-  private final Stoppable stopptable;
-
-  private ZKWatcher zookeeper;
-
-  private Abortable abortable;
-
-  private Connection conn;
-
-  private ChoreService choreService;
-
-  private ReplicationTrackerParams(Configuration conf, Stoppable stopptable) {
-    this.conf = conf;
-    this.stopptable = stopptable;
-  }
-
-  public ReplicationTrackerParams zookeeper(ZKWatcher zookeeper) {
-    this.zookeeper = zookeeper;
-    return this;
-  }
-
-  public ReplicationTrackerParams abortable(Abortable abortable) {
-    this.abortable = abortable;
-    return this;
-  }
-
-  public ReplicationTrackerParams connection(Connection conn) {
-    this.conn = conn;
-    return this;
-  }
-
-  public ReplicationTrackerParams choreService(ChoreService choreService) {
-    this.choreService = choreService;
-    return this;
-  }
-
-  public Configuration conf() {
-    return conf;
-  }
-
-  public Stoppable stopptable() {
-    return stopptable;
-  }
-
-  public ZKWatcher zookeeper() {
-    return zookeeper;
-  }
-
-  public Abortable abortable() {
-    return abortable;
-  }
-
-  public Connection connection() {
-    return conn;
-  }
-
-  public ChoreService choreService() {
-    return choreService;
-  }
-
-  public static ReplicationTrackerParams create(Configuration conf, Stoppable 
stopptable) {
-    return new ReplicationTrackerParams(conf, stopptable);
-  }
-}
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationTracker.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationTracker.java
deleted file mode 100644
index b74187a..0000000
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationTracker.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.zookeeper.ZKListener;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
-
-import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
-
-/**
- * This class is a ZooKeeper implementation of the ReplicationTracker 
interface. This class is
- * responsible for handling replication events that are defined in the 
ReplicationListener
- * interface.
- */
[email protected]
-class ZKReplicationTracker extends ReplicationTrackerBase {
-
-  // Zookeeper
-  private final ZKWatcher zookeeper;
-  // Server to abort.
-  private final Abortable abortable;
-  // All about stopping
-  private final Stoppable stopper;
-  // List of all the other region servers in this cluster
-  private final Set<ServerName> regionServers = new HashSet<>();
-
-  ZKReplicationTracker(ReplicationTrackerParams params) {
-    this.zookeeper = params.zookeeper();
-    this.abortable = params.abortable();
-    this.stopper = params.stopptable();
-    this.zookeeper.registerListener(new 
OtherRegionServerWatcher(this.zookeeper));
-  }
-
-  /**
-   * Watcher used to be notified of the other region server's death in the 
local cluster. It
-   * initiates the process to transfer the queues if it is able to grab the 
lock.
-   */
-  public class OtherRegionServerWatcher extends ZKListener {
-
-    /**
-     * Construct a ZooKeeper event listener.
-     */
-    public OtherRegionServerWatcher(ZKWatcher watcher) {
-      super(watcher);
-    }
-
-    /**
-     * Called when a new node has been created.
-     * @param path full path of the new node
-     */
-    @Override
-    public void nodeCreated(String path) {
-      if (stopper.isStopped()) {
-        return;
-      }
-      refreshListIfRightPath(path);
-    }
-
-    /**
-     * Called when a node has been deleted
-     * @param path full path of the deleted node
-     */
-    @Override
-    public void nodeDeleted(String path) {
-      if (stopper.isStopped()) {
-        return;
-      }
-      if (!refreshListIfRightPath(path)) {
-        return;
-      }
-      notifyListeners(ServerName.valueOf(getZNodeName(path)));
-    }
-
-    /**
-     * Called when an existing node has a child node added or removed.
-     * @param path full path of the node whose children have changed
-     */
-    @Override
-    public void nodeChildrenChanged(String path) {
-      if (stopper.isStopped()) {
-        return;
-      }
-      refreshListIfRightPath(path);
-    }
-
-    private boolean refreshListIfRightPath(String path) {
-      if (!path.startsWith(this.watcher.getZNodePaths().rsZNode)) {
-        return false;
-      }
-      return refreshRegionServerList();
-    }
-  }
-
-  /**
-   * Extracts the znode name of a peer cluster from a ZK path
-   * @param fullPath Path to extract the id from
-   * @return the id or an empty string if path is invalid
-   */
-  private String getZNodeName(String fullPath) {
-    List<String> parts = Splitter.on('/').splitToList(fullPath);
-    return parts.size() > 0 ? parts.get(parts.size() - 1) : "";
-  }
-
-  /**
-   * Reads the list of region servers from ZK and atomically clears our local 
view of it and
-   * replaces it with the updated list.
-   * @return true if the local list of the other region servers was updated 
with the ZK data (even
-   *         if it was empty), false if the data was missing in ZK
-   */
-  private boolean refreshRegionServerList() {
-    Set<ServerName> newRsList = getRegisteredRegionServers();
-    if (newRsList == null) {
-      return false;
-    } else {
-      synchronized (regionServers) {
-        regionServers.clear();
-        regionServers.addAll(newRsList);
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Get a list of all the other region servers in this cluster and set a watch
-   * @return a list of server nanes
-   */
-  private Set<ServerName> getRegisteredRegionServers() {
-    List<String> result = null;
-    try {
-      result =
-        ZKUtil.listChildrenAndWatchThem(this.zookeeper, 
this.zookeeper.getZNodePaths().rsZNode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Get list of registered region servers", e);
-    }
-    return result == null ? null :
-      
result.stream().map(ServerName::parseServerName).collect(Collectors.toSet());
-  }
-
-  @Override
-  protected Set<ServerName> 
internalLoadLiveRegionServersAndInitializeListeners()
-    throws IOException {
-    if (!refreshRegionServerList()) {
-      throw new IOException("failed to refresh region server list");
-    }
-    synchronized (regionServers) {
-      return new HashSet<>(regionServers);
-    }
-  }
-}
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationTrackerTestBase.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationTrackerTestBase.java
deleted file mode 100644
index 270e9f6..0000000
--- 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationTrackerTestBase.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base class for testing {@link ReplicationTracker} and {@link 
ReplicationListener}.
- */
-public abstract class ReplicationTrackerTestBase {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationTrackerTestBase.class);
-
-  private ReplicationTracker rt;
-
-  private AtomicInteger rsRemovedCount;
-
-  private volatile ServerName rsRemovedData;
-
-  @Before
-  public void setUp() {
-    ReplicationTrackerParams params = createParams();
-    rt = ReplicationFactory.getReplicationTracker(params);
-    rsRemovedCount = new AtomicInteger(0);
-    rsRemovedData = null;
-  }
-
-  protected abstract ReplicationTrackerParams createParams();
-
-  protected abstract void addServer(ServerName sn) throws Exception;
-
-  protected abstract void removeServer(ServerName sn) throws Exception;
-
-  @Test
-  public void testWatchRegionServers() throws Exception {
-    ServerName sn =
-      ServerName.valueOf("hostname2.example.org,1234," + 
EnvironmentEdgeManager.currentTime());
-    addServer(sn);
-    rt.registerListener(new DummyReplicationListener());
-    assertEquals(1, rt.loadLiveRegionServersAndInitializeListeners().size());
-    // delete one
-    removeServer(sn);
-    // wait for event
-    Waiter.waitFor(HBaseConfiguration.create(), 15000, () -> 
rsRemovedCount.get() >= 1);
-    assertEquals(sn, rsRemovedData);
-  }
-
-  private class DummyReplicationListener implements ReplicationListener {
-
-    @Override
-    public void regionServerRemoved(ServerName regionServer) {
-      rsRemovedData = regionServer;
-      rsRemovedCount.getAndIncrement();
-      LOG.debug("Received regionServerRemoved event: " + regionServer);
-    }
-  }
-
-  protected static class WarnOnlyStoppable implements Stoppable {
-
-    @Override
-    public void stop(String why) {
-      LOG.warn("TestReplicationTracker received stop, ignoring. Reason: " + 
why);
-    }
-
-    @Override
-    public boolean isStopped() {
-      return false;
-    }
-  }
-
-  protected static class WarnOnlyAbortable implements Abortable {
-
-    @Override
-    public void abort(String why, Throwable e) {
-      LOG.warn("TestReplicationTracker received abort, ignoring. Reason: " + 
why);
-    }
-
-    @Override
-    public boolean isAborted() {
-      return false;
-    }
-  }
-}
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplicationTracker.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplicationTracker.java
deleted file mode 100644
index 357908f..0000000
--- 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplicationTracker.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ChoreService;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.experimental.categories.Category;
-
-@Category({ ReplicationTests.class, MediumTests.class })
-public class TestMasterReplicationTracker extends ReplicationTrackerTestBase {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestMasterReplicationTracker.class);
-
-  private static Configuration CONF;
-
-  private static Connection CONN;
-
-  private static ChoreService CHORE_SERVICE;
-
-  private static List<ServerName> SERVERS = new CopyOnWriteArrayList<>();
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws IOException {
-    CONF = HBaseConfiguration.create();
-    CONF.setClass(ReplicationFactory.REPLICATION_TRACKER_IMPL, 
MasterReplicationTracker.class,
-      ReplicationTracker.class);
-    Admin admin = mock(Admin.class);
-    when(admin.getRegionServers()).thenReturn(SERVERS);
-    CONN = mock(Connection.class);
-    when(CONN.getAdmin()).thenReturn(admin);
-    CHORE_SERVICE = new ChoreService("TestMasterReplicationTracker");
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() {
-    CHORE_SERVICE.shutdown();
-  }
-
-  @Override
-  protected ReplicationTrackerParams createParams() {
-    return ReplicationTrackerParams.create(CONF, new WarnOnlyStoppable())
-      .abortable(new 
WarnOnlyAbortable()).connection(CONN).choreService(CHORE_SERVICE);
-  }
-
-  @Override
-  protected void addServer(ServerName sn) throws Exception {
-    SERVERS.add(sn);
-  }
-
-  @Override
-  protected void removeServer(ServerName sn) throws Exception {
-    SERVERS.remove(sn);
-  }
-}
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationTracker.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationTracker.java
deleted file mode 100644
index 4ef42e3..0000000
--- 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationTracker.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseZKTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.experimental.categories.Category;
-
-import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
-
-@Category({ ReplicationTests.class, MediumTests.class })
-public class TestZKReplicationTracker extends ReplicationTrackerTestBase {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestZKReplicationTracker.class);
-
-  private static Configuration CONF;
-
-  private static HBaseZKTestingUtility UTIL;
-
-  private static ZKWatcher ZKW;
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    UTIL = new HBaseZKTestingUtility();
-    UTIL.startMiniZKCluster();
-    CONF = UTIL.getConfiguration();
-    CONF.setClass(ReplicationFactory.REPLICATION_TRACKER_IMPL, 
ZKReplicationTracker.class,
-      ReplicationTracker.class);
-    ZKWatcher zk = HBaseZKTestingUtility.getZooKeeperWatcher(UTIL);
-    ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode);
-    ZKW = HBaseZKTestingUtility.getZooKeeperWatcher(UTIL);
-  }
-
-  @Override
-  protected ReplicationTrackerParams createParams() {
-    return ReplicationTrackerParams.create(CONF, new WarnOnlyStoppable())
-      .abortable(new WarnOnlyAbortable()).zookeeper(ZKW);
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    Closeables.close(ZKW, true);
-    UTIL.shutdownMiniZKCluster();
-  }
-
-  @Override
-  protected void addServer(ServerName sn) throws Exception {
-    ZKUtil.createAndWatch(ZKW, 
ZNodePaths.joinZNode(ZKW.getZNodePaths().rsZNode, sn.toString()),
-      HConstants.EMPTY_BYTE_ARRAY);
-  }
-
-  @Override
-  protected void removeServer(ServerName sn) throws Exception {
-    ZKUtil.deleteNode(ZKW, ZNodePaths.joinZNode(ZKW.getZNodePaths().rsZNode, 
sn.toString()));
-  }
-}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
index 32993f5..3d1fa39 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
@@ -287,7 +287,14 @@ public enum EventType {
    *
    * RS_REFRESH_PEER
    */
-  RS_REFRESH_PEER (84, ExecutorType.RS_REFRESH_PEER);
+  RS_REFRESH_PEER(84, ExecutorType.RS_REFRESH_PEER),
+
+  /**
+   * RS claim replication queue.<br>
+   *
+   * RS_CLAIM_REPLICATION_QUEUE
+   */
+  RS_CLAIM_REPLICATION_QUEUE(86, ExecutorType.RS_CLAIM_REPLICATION_QUEUE);
 
   private final int code;
   private final ExecutorType executor;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
index ca82d71..ea83122 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
@@ -51,7 +51,8 @@ public enum ExecutorType {
   RS_OPEN_PRIORITY_REGION    (30),
   RS_REFRESH_PEER(31),
   RS_SWITCH_RPC_THROTTLE(33),
-  RS_IN_MEMORY_COMPACTION(34);
+  RS_IN_MEMORY_COMPACTION(34),
+  RS_CLAIM_REPLICATION_QUEUE(35);
 
   ExecutorType(int value) {
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
index c3e3f53..462a389 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.master.SplitWALManager;
 import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
 import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
 import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
+import 
org.apache.hadoop.hbase.master.replication.ClaimReplicationQueuesProcedure;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.procedure2.Procedure;
@@ -235,11 +236,15 @@ public class ServerCrashProcedure
             }
             assignRegions(env, regionsOnCrashedServer);
           }
-          setNextState(ServerCrashState.SERVER_CRASH_FINISH);
+          setNextState(ServerCrashState.SERVER_CRASH_CLAIM_REPLICATION_QUEUES);
           break;
         case SERVER_CRASH_HANDLE_RIT2:
           // Noop. Left in place because we used to call handleRIT here for a 
second time
           // but no longer necessary since HBASE-20634.
+          setNextState(ServerCrashState.SERVER_CRASH_CLAIM_REPLICATION_QUEUES);
+          break;
+        case SERVER_CRASH_CLAIM_REPLICATION_QUEUES:
+          addChildProcedure(new ClaimReplicationQueuesProcedure(serverName));
           setNextState(ServerCrashState.SERVER_CRASH_FINISH);
           break;
         case SERVER_CRASH_FINISH:
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
index 8162269..a7abfdc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
@@ -27,7 +27,9 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public interface ServerProcedureInterface {
   public enum ServerOperationType {
-    CRASH_HANDLER, SWITCH_RPC_THROTTLE,
+    CRASH_HANDLER,
+    SWITCH_RPC_THROTTLE,
+
     /**
      * help find a available region server as worker and release worker after 
task done invoke
      * SPLIT_WAL_REMOTE operation to send real WAL splitting request to worker 
manage the split wal
@@ -38,7 +40,17 @@ public interface ServerProcedureInterface {
     /**
      * send the split WAL request to region server and handle the response
      */
-    SPLIT_WAL_REMOTE
+    SPLIT_WAL_REMOTE,
+
+    /**
+     * Get all the replication queues of a crash server and assign them to 
other region servers
+     */
+    CLAIM_REPLICATION_QUEUES,
+
+    /**
+     * send the claim replication queue request to region server to actually 
assign it
+     */
+    CLAIM_REPLICATION_QUEUE_REMOTE
   }
 
   /**
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
index 1659ab5..726ee14 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
@@ -38,6 +38,8 @@ class ServerQueue extends Queue<ServerName> {
       case SWITCH_RPC_THROTTLE:
       case SPLIT_WAL:
       case SPLIT_WAL_REMOTE:
+      case CLAIM_REPLICATION_QUEUES:
+      case CLAIM_REPLICATION_QUEUE_REMOTE:
         return false;
       default:
         break;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java
new file mode 100644
index 0000000..c8c5704
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import 
org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
+import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
+import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import 
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
+import 
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
+import 
org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteParameter;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteStateData;
+
[email protected]
+public class ClaimReplicationQueueRemoteProcedure extends ServerRemoteProcedure
+  implements ServerProcedureInterface, RemoteProcedure<MasterProcedureEnv, 
ServerName> {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(ClaimReplicationQueueRemoteProcedure.class);
+
+  private ServerName crashedServer;
+
+  private String queue;
+
+  public ClaimReplicationQueueRemoteProcedure() {
+  }
+
+  public ClaimReplicationQueueRemoteProcedure(ServerName crashedServer, String 
queue,
+    ServerName targetServer) {
+    this.crashedServer = crashedServer;
+    this.queue = queue;
+    this.targetServer = targetServer;
+  }
+
+  @Override
+  public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, 
ServerName remote) {
+    assert targetServer.equals(remote);
+    return Optional.of(new ServerOperation(this, getProcId(), 
ClaimReplicationQueueCallable.class,
+      ClaimReplicationQueueRemoteParameter.newBuilder()
+        
.setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setQueue(queue).build()
+        .toByteArray()));
+  }
+
+  @Override
+  public ServerName getServerName() {
+    // return crashed server here, as we are going to recover its replication 
queues so we should
+    // use its scheduler queue instead of the one for the target server.
+    return crashedServer;
+  }
+
+  @Override
+  public boolean hasMetaTableRegion() {
+    return false;
+  }
+
+  @Override
+  public ServerOperationType getServerOperationType() {
+    return ServerOperationType.CLAIM_REPLICATION_QUEUE_REMOTE;
+  }
+
+  @Override
+  protected void complete(MasterProcedureEnv env, Throwable error) {
+    if (error != null) {
+      LOG.warn("Failed to claim replication queue {} of crashed server on 
server {} ", queue,
+        crashedServer, targetServer, error);
+      this.succ = false;
+    } else {
+      this.succ = true;
+    }
+  }
+
+  @Override
+  protected void rollback(MasterProcedureEnv env) throws IOException, 
InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected boolean abort(MasterProcedureEnv env) {
+    return false;
+  }
+
+  @Override
+  protected boolean waitInitialized(MasterProcedureEnv env) {
+    return env.waitInitialized(this);
+  }
+
+  @Override
+  protected void serializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    serializer.serialize(ClaimReplicationQueueRemoteStateData.newBuilder()
+      
.setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setQueue(queue)
+      .setTargetServer(ProtobufUtil.toServerName(targetServer)).build());
+  }
+
+  @Override
+  protected void deserializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    ClaimReplicationQueueRemoteStateData data =
+      serializer.deserialize(ClaimReplicationQueueRemoteStateData.class);
+    crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
+    queue = data.getQueue();
+    targetServer = ProtobufUtil.toServerName(data.getTargetServer());
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueuesProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueuesProcedure.java
new file mode 100644
index 0000000..5a35c3f
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueuesProcedure.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueuesStateData;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+
+/**
+ * Used to assign the replication queues of a dead server to other region 
servers.
+ */
[email protected]
+public class ClaimReplicationQueuesProcedure extends 
Procedure<MasterProcedureEnv>
+  implements ServerProcedureInterface {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ClaimReplicationQueuesProcedure.class);
+
+  private ServerName crashedServer;
+
+  private RetryCounter retryCounter;
+
+  public ClaimReplicationQueuesProcedure() {
+  }
+
+  public ClaimReplicationQueuesProcedure(ServerName crashedServer) {
+    this.crashedServer = crashedServer;
+  }
+
+  @Override
+  public ServerName getServerName() {
+    return crashedServer;
+  }
+
+  @Override
+  public boolean hasMetaTableRegion() {
+    return false;
+  }
+
+  @Override
+  public ServerOperationType getServerOperationType() {
+    return ServerOperationType.CLAIM_REPLICATION_QUEUES;
+  }
+
+  @Override
+  protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
+    throws ProcedureYieldException, ProcedureSuspendedException, 
InterruptedException {
+    ReplicationQueueStorage storage = 
env.getReplicationPeerManager().getQueueStorage();
+    try {
+      List<String> queues = storage.getAllQueues(crashedServer);
+      if (queues.isEmpty()) {
+        LOG.debug("Finish claiming replication queues for {}", crashedServer);
+        storage.removeReplicatorIfQueueIsEmpty(crashedServer);
+        // we are done
+        return null;
+      }
+      LOG.debug("There are {} replication queues need to be claimed for {}", 
queues.size(),
+        crashedServer);
+      List<ServerName> targetServers =
+        env.getMasterServices().getServerManager().getOnlineServersList();
+      if (targetServers.isEmpty()) {
+        throw new ReplicationException("no region server available");
+      }
+      Collections.shuffle(targetServers);
+      ClaimReplicationQueueRemoteProcedure[] procs =
+        new ClaimReplicationQueueRemoteProcedure[Math.min(queues.size(), 
targetServers.size())];
+      for (int i = 0; i < procs.length; i++) {
+        procs[i] = new ClaimReplicationQueueRemoteProcedure(crashedServer, 
queues.get(i),
+          targetServers.get(i));
+      }
+      return procs;
+    } catch (ReplicationException e) {
+      if (retryCounter == null) {
+        retryCounter = 
ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
+      }
+      long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+      LOG.warn("Failed to claim replication queues for {}, suspend {}secs {}; 
{};", crashedServer,
+        backoff / 1000, e);
+      setTimeout(Math.toIntExact(backoff));
+      setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
+      skipPersistence();
+      throw new ProcedureSuspendedException();
+    }
+  }
+
+  @Override
+  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
+    setState(ProcedureProtos.ProcedureState.RUNNABLE);
+    env.getProcedureScheduler().addFront(this);
+    return false;
+  }
+
+  @Override
+  protected void rollback(MasterProcedureEnv env) throws IOException, 
InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected boolean abort(MasterProcedureEnv env) {
+    return false;
+  }
+
+  @Override
+  protected void serializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    serializer.serialize(ClaimReplicationQueuesStateData.newBuilder()
+      .setCrashedServer(ProtobufUtil.toServerName(crashedServer)).build());
+  }
+
+  @Override
+  protected void deserializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    ClaimReplicationQueuesStateData data =
+      serializer.deserialize(ClaimReplicationQueuesStateData.class);
+    crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/BaseRSProcedureCallable.java
similarity index 55%
copy from 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
copy to 
hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/BaseRSProcedureCallable.java
index 65da9af..76b749e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/BaseRSProcedureCallable.java
@@ -15,27 +15,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.hadoop.hbase.procedure2;
 
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.yetus.audience.InterfaceAudience;
 
-/**
- * A handler for modifying replication peer in peer procedures.
- */
 @InterfaceAudience.Private
-public interface PeerProcedureHandler {
-
-  public void addPeer(String peerId) throws ReplicationException, IOException;
-
-  public void removePeer(String peerId) throws ReplicationException, 
IOException;
-
-  public void disablePeer(String peerId) throws ReplicationException, 
IOException;
-
-  public void enablePeer(String peerId) throws ReplicationException, 
IOException;
-
-  public void updatePeerConfig(String peerId) throws ReplicationException, 
IOException;
+public abstract class BaseRSProcedureCallable implements RSProcedureCallable {
+
+  protected HRegionServer rs;
+
+  private Exception initError;
+
+  @Override
+  public final Void call() throws Exception {
+    if (initError != null) {
+      throw initError;
+    }
+    doCall();
+    return null;
+  }
+
+  @Override
+  public final void init(byte[] parameter, HRegionServer rs) {
+    this.rs = rs;
+    try {
+      initParameter(parameter);
+    } catch (Exception e) {
+      initError = e;
+    }
+  }
+
+  protected abstract void doCall() throws Exception;
+
+  protected abstract void initParameter(byte[] parameter) throws Exception;
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 6a78c9d..fb8e042 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2092,6 +2092,10 @@ public class HRegionServer extends Thread implements
         conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 
1);
     executorService.startExecutorService(executorService.new 
ExecutorConfig().setExecutorType(
         
ExecutorType.RS_SWITCH_RPC_THROTTLE).setCorePoolSize(switchRpcThrottleThreads));
+    final int claimReplicationQueueThreads =
+      
conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1);
+    executorService.startExecutorService(executorService.new 
ExecutorConfig().setExecutorType(
+        
ExecutorType.RS_CLAIM_REPLICATION_QUEUE).setCorePoolSize(claimReplicationQueueThreads));
 
     Threads
       .setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", 
uncaughtExceptionHandler);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java
index 03f2061..319ecca 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java
@@ -19,12 +19,12 @@ package org.apache.hadoop.hbase.regionserver;
 import java.util.concurrent.locks.Lock;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
+import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable;
 import org.apache.hadoop.hbase.util.KeyLocker;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
 import 
org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
 
 /**
@@ -43,27 +43,18 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
  * we switch to procedure-based WAL splitting.
  */
 @InterfaceAudience.Private
-public class SplitWALCallable implements RSProcedureCallable {
-  private static final Logger LOG = 
LoggerFactory.getLogger(SplitWALCallable.class);
+public class SplitWALCallable extends BaseRSProcedureCallable {
 
   private String walPath;
-  private Exception initError;
-  private HRegionServer rs;
   private final KeyLocker<String> splitWALLocks = new KeyLocker<>();
   private volatile Lock splitWALLock = null;
 
 
   @Override
-  public void init(byte[] parameter, HRegionServer rs) {
-    try {
-      this.rs = rs;
-      MasterProcedureProtos.SplitWALParameter param =
-          MasterProcedureProtos.SplitWALParameter.parseFrom(parameter);
-      this.walPath = param.getWalPath();
-    } catch (InvalidProtocolBufferException e) {
-      LOG.error("Parse proto buffer of split WAL request failed ", e);
-      initError = e;
-    }
+  protected void initParameter(byte[] parameter) throws 
InvalidProtocolBufferException {
+    MasterProcedureProtos.SplitWALParameter param =
+      MasterProcedureProtos.SplitWALParameter.parseFrom(parameter);
+    this.walPath = param.getWalPath();
   }
 
   @Override
@@ -90,10 +81,7 @@ public class SplitWALCallable implements RSProcedureCallable 
{
   }
 
   @Override
-  public Void call() throws Exception {
-    if (initError != null) {
-      throw initError;
-    }
+  protected void doCall() throws Exception {
     //grab a lock
     splitWALLock = splitWALLocks.acquireLock(walPath);
     try {
@@ -110,7 +98,6 @@ public class SplitWALCallable implements RSProcedureCallable 
{
     } finally {
       splitWALLock.unlock();
     }
-    return null;
   }
 
   public String getWalPath() {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ClaimReplicationQueueCallable.java
similarity index 55%
copy from 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java
copy to 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ClaimReplicationQueueCallable.java
index b2e698f..ddae731 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ClaimReplicationQueueCallable.java
@@ -17,46 +17,39 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import 
org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleRemoteStateData;
 
-/**
- * The callable executed at RS side to switch rpc throttle state. <br/>
- */
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteParameter;
+
 @InterfaceAudience.Private
-public class SwitchRpcThrottleRemoteCallable implements RSProcedureCallable {
-  private HRegionServer rs;
-  private boolean rpcThrottleEnabled;
-  private Exception initError;
+public class ClaimReplicationQueueCallable extends BaseRSProcedureCallable {
+
+  private ServerName crashedServer;
+
+  private String queue;
 
   @Override
-  public Void call() throws Exception {
-    if (initError != null) {
-      throw initError;
-    }
-    rs.getRegionServerRpcQuotaManager().switchRpcThrottle(rpcThrottleEnabled);
-    return null;
+  public EventType getEventType() {
+    return EventType.RS_CLAIM_REPLICATION_QUEUE;
   }
 
   @Override
-  public void init(byte[] parameter, HRegionServer rs) {
-    this.rs = rs;
-    try {
-      SwitchRpcThrottleRemoteStateData param =
-          SwitchRpcThrottleRemoteStateData.parseFrom(parameter);
-      rpcThrottleEnabled = param.getRpcThrottleEnabled();
-    } catch (InvalidProtocolBufferException e) {
-      initError = e;
-    }
+  protected void doCall() throws Exception {
+    PeerProcedureHandler handler = 
rs.getReplicationSourceService().getPeerProcedureHandler();
+    handler.claimReplicationQueue(crashedServer, queue);
   }
 
   @Override
-  public EventType getEventType() {
-    return EventType.M_RS_SWITCH_RPC_THROTTLE;
+  protected void initParameter(byte[] parameter) throws 
InvalidProtocolBufferException {
+    ClaimReplicationQueueRemoteParameter param =
+      ClaimReplicationQueueRemoteParameter.parseFrom(parameter);
+    crashedServer = ProtobufUtil.toServerName(param.getCrashedServer());
+    queue = param.getQueue();
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index 5561384..cc0482d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -36,7 +35,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -44,14 +42,11 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.replication.TableCFs;
 import org.apache.hadoop.hbase.io.WALLink;
 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
-import org.apache.hadoop.hbase.replication.ReplicationTracker;
-import org.apache.hadoop.hbase.replication.ReplicationTrackerParams;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -305,18 +300,13 @@ public class DumpReplicationQueues extends Configured 
implements Tool {
     return sb.toString();
   }
 
-  public String dumpQueues(ZKWatcher zkw, Set<String> peerIds,
-      boolean hdfs) throws Exception {
+  public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, boolean hdfs) 
throws Exception {
     ReplicationQueueStorage queueStorage;
-    ReplicationTracker replicationTracker;
     StringBuilder sb = new StringBuilder();
 
     queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, 
getConf());
-    replicationTracker = ReplicationFactory
-      .getReplicationTracker(ReplicationTrackerParams.create(getConf(), new 
WarnOnlyStoppable())
-        .abortable(new WarnOnlyAbortable()).zookeeper(zkw));
-    Set<ServerName> liveRegionServers =
-      new 
HashSet<>(replicationTracker.loadLiveRegionServersAndInitializeListeners());
+    Set<ServerName> liveRegionServers = ZKUtil.listChildrenNoWatch(zkw, 
zkw.getZNodePaths().rsZNode)
+      .stream().map(ServerName::parseServerName).collect(Collectors.toSet());
 
     // Loops each peer on each RS and dumps the queues
     List<ServerName> regionservers = queueStorage.getListOfReplicators();
@@ -419,16 +409,4 @@ public class DumpReplicationQueues extends Configured 
implements Tool {
       return false;
     }
   }
-
-  private static class WarnOnlyStoppable implements Stoppable {
-    @Override
-    public void stop(String why) {
-      LOG.warn("DumpReplicationQueue received stop, ignoring.  Reason: " + 
why);
-    }
-
-    @Override
-    public boolean isStopped() {
-      return false;
-    }
-  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
index 65da9af..a96e860 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
@@ -19,7 +19,7 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
-
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -38,4 +38,7 @@ public interface PeerProcedureHandler {
   public void enablePeer(String peerId) throws ReplicationException, 
IOException;
 
   public void updatePeerConfig(String peerId) throws ReplicationException, 
IOException;
+
+  void claimReplicationQueue(ServerName crashedServer, String queue)
+    throws ReplicationException, IOException;
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
index 78c1977..f12a654 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
 import java.util.concurrent.locks.Lock;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -132,4 +133,10 @@ public class PeerProcedureHandlerImpl implements 
PeerProcedureHandler {
       peerLock.unlock();
     }
   }
+
+  @Override
+  public void claimReplicationQueue(ServerName crashedServer, String queue)
+    throws ReplicationException, IOException {
+    replicationSourceManager.claimQueue(crashedServer, queue);
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
index 91fee44..2beb9f5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
@@ -18,8 +18,7 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,24 +32,16 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.R
  * The callable executed at RS side to refresh the peer config/state. <br/>
  */
 @InterfaceAudience.Private
-public class RefreshPeerCallable implements RSProcedureCallable {
+public class RefreshPeerCallable extends BaseRSProcedureCallable {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RefreshPeerCallable.class);
 
-  private HRegionServer rs;
-
   private String peerId;
 
   private PeerModificationType type;
 
-  private Exception initError;
-
   @Override
-  public Void call() throws Exception {
-    if (initError != null) {
-      throw initError;
-    }
-
+  protected void doCall() throws Exception {
     LOG.info("Received a peer change event, peerId=" + peerId + ", type=" + 
type);
     PeerProcedureHandler handler = 
rs.getReplicationSourceService().getPeerProcedureHandler();
     switch (type) {
@@ -72,19 +63,13 @@ public class RefreshPeerCallable implements 
RSProcedureCallable {
       default:
         throw new IllegalArgumentException("Unknown peer modification type: " 
+ type);
     }
-    return null;
   }
 
   @Override
-  public void init(byte[] parameter, HRegionServer rs) {
-    this.rs = rs;
-    try {
-      RefreshPeerParameter param = RefreshPeerParameter.parseFrom(parameter);
-      this.peerId = param.getPeerId();
-      this.type = param.getType();
-    } catch (InvalidProtocolBufferException e) {
-      initError = e;
-    }
+  protected void initParameter(byte[] parameter) throws 
InvalidProtocolBufferException {
+    RefreshPeerParameter param = RefreshPeerParameter.parseFrom(parameter);
+    this.peerId = param.getPeerId();
+    this.type = param.getType();
   }
 
   @Override
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 12f8ed8..842955e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -38,8 +38,6 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
-import org.apache.hadoop.hbase.replication.ReplicationTracker;
-import org.apache.hadoop.hbase.replication.ReplicationTrackerParams;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WALFactory;
@@ -64,7 +62,6 @@ public class Replication implements ReplicationSourceService, 
ReplicationSinkSer
   private ReplicationSourceManager replicationManager;
   private ReplicationQueueStorage queueStorage;
   private ReplicationPeers replicationPeers;
-  private ReplicationTracker replicationTracker;
   private Configuration conf;
   private ReplicationSink replicationSink;
   // Hosting server
@@ -111,10 +108,6 @@ public class Replication implements 
ReplicationSourceService, ReplicationSinkSer
       this.replicationPeers =
           ReplicationFactory.getReplicationPeers(server.getZooKeeper(), 
this.conf);
       this.replicationPeers.init();
-      this.replicationTracker = ReplicationFactory
-        
.getReplicationTracker(ReplicationTrackerParams.create(server.getConfiguration(),
 server)
-          
.abortable(server).zookeeper(server.getZooKeeper()).choreService(server.getChoreService())
-          .connection(server.getConnection()));
     } catch (Exception e) {
       throw new IOException("Failed replication handler create", e);
     }
@@ -126,9 +119,8 @@ public class Replication implements 
ReplicationSourceService, ReplicationSinkSer
     }
     this.globalMetricsSource = CompatibilitySingletonFactory
         .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
-    this.replicationManager = new ReplicationSourceManager(queueStorage, 
replicationPeers,
-        replicationTracker, conf, this.server, fs, logDir, oldLogDir, 
clusterId, walFactory,
-      globalMetricsSource);
+    this.replicationManager = new ReplicationSourceManager(queueStorage, 
replicationPeers, conf,
+      this.server, fs, logDir, oldLogDir, clusterId, walFactory, 
globalMetricsSource);
     // Get the user-space WAL provider
     WALProvider walProvider = walFactory != null? walFactory.getWALProvider(): 
null;
     if (walProvider != null) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 68f6f8b..fe83168 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -33,9 +33,7 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -44,7 +42,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
@@ -53,14 +50,12 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationListener;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
 import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
@@ -103,24 +98,30 @@ import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
  * {@link #preLogRoll(Path)}.</li>
  * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are 
three methods which
  * modify it, {@link #removePeer(String)} ,
+<<<<<<< HEAD
  * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
  * {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
  * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by
+=======
+ * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
+ * {@link ReplicationSourceManager#claimQueue(ServerName, String)}.
+ * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is 
called by
+>>>>>>> 51893b9ba3... HBASE-26029 It is not reliable to use nodeDeleted event 
to track region server's death (#3430)
  * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will 
terminate the
  * {@link ReplicationSourceInterface} firstly, then remove the wals from
- * {@link #walsByIdRecoveredQueues}. And {@link 
ReplicationSourceManager.NodeFailoverWorker#run()}
- * will add the wals to {@link #walsByIdRecoveredQueues} firstly, then start 
up a
- * {@link ReplicationSourceInterface}. So there is no race here. For
- * {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link 
#removePeer(String)}, there
- * is already synchronized on {@link #oldsources}. So no need synchronized on
- * {@link #walsByIdRecoveredQueues}.</li>
+ * {@link #walsByIdRecoveredQueues}. And
+ * {@link ReplicationSourceManager#claimQueue(ServerName, String)} will add 
the wals to
+ * {@link #walsByIdRecoveredQueues} firstly, then start up a {@link 
ReplicationSourceInterface}. So
+ * there is no race here. For {@link 
ReplicationSourceManager#claimQueue(ServerName, String)} and
+ * {@link #removePeer(String)}, there is already synchronized on {@link 
#oldsources}. So no need
+ * synchronized on {@link #walsByIdRecoveredQueues}.</li>
  * <li>Need synchronized on {@link #latestPaths} to avoid the new open source 
miss new log.</li>
  * <li>Need synchronized on {@link #oldsources} to avoid adding recovered 
source for the
  * to-be-removed peer.</li>
  * </ul>
  */
 @InterfaceAudience.Private
-public class ReplicationSourceManager implements ReplicationListener {
+public class ReplicationSourceManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationSourceManager.class);
   // all the sources that read this RS's logs and every peer only has one 
replication source
   private final ConcurrentMap<String, ReplicationSourceInterface> sources;
@@ -135,7 +136,6 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    */
   private final ReplicationQueueStorage queueStorage;
 
-  private final ReplicationTracker replicationTracker;
   private final ReplicationPeers replicationPeers;
   // UUID for this cluster
   private final UUID clusterId;
@@ -193,7 +193,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    * @param oldLogDir the directory where old logs are archived
    */
   public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
-      ReplicationPeers replicationPeers, ReplicationTracker 
replicationTracker, Configuration conf,
+      ReplicationPeers replicationPeers, Configuration conf,
       Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID 
clusterId,
       WALFactory walFactory,
       MetricsReplicationGlobalSourceSource globalMetrics) throws IOException {
@@ -202,7 +202,6 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     this.sources = new ConcurrentHashMap<>();
     this.queueStorage = queueStorage;
     this.replicationPeers = replicationPeers;
-    this.replicationTracker = replicationTracker;
     this.server = server;
     this.walsById = new ConcurrentHashMap<>();
     this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
@@ -215,7 +214,6 @@ public class ReplicationSourceManager implements 
ReplicationListener {
                                                                                
          // seconds
     this.clusterId = clusterId;
     this.walFactory = walFactory;
-    this.replicationTracker.registerListener(this);
     // It's preferable to failover 1 RS at a time, but with good zk servers
     // more could be processed at the same time.
     int nbWorkers = conf.getInt("replication.executor.workers", 1);
@@ -236,12 +234,9 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   }
 
   /**
-   * Adds a normal source per registered peer cluster and tries to process all 
old region server wal
-   * queues
-   * <p>
-   * The returned future is for adoptAbandonedQueues task.
+   * Adds a normal source per registered peer cluster.
    */
-  Future<?> init() throws IOException {
+  void init() throws IOException {
     for (String id : this.replicationPeers.getAllPeerIds()) {
       addSource(id);
       if (replicationForBulkLoadDataEnabled) {
@@ -250,38 +245,6 @@ public class ReplicationSourceManager implements 
ReplicationListener {
         throwIOExceptionWhenFail(() -> 
this.queueStorage.addPeerToHFileRefs(id));
       }
     }
-    return this.executor.submit(this::adoptAbandonedQueues);
-  }
-
-  private void adoptAbandonedQueues() {
-    List<ServerName> currentReplicators = null;
-    try {
-      currentReplicators = queueStorage.getListOfReplicators();
-    } catch (ReplicationException e) {
-      server.abort("Failed to get all replicators", e);
-      return;
-    }
-    Set<ServerName> liveRegionServers;
-    try {
-      // must call this method to load the first snapshot of live region 
servers and initialize
-      // listeners
-      liveRegionServers = 
replicationTracker.loadLiveRegionServersAndInitializeListeners();
-    } catch (IOException e) {
-      server.abort("Failed load live region server list for replication", e);
-      return;
-    }
-    LOG.info("Current list of replicators: {}, live RSes: {}", 
currentReplicators,
-      liveRegionServers);
-    if (currentReplicators == null || currentReplicators.isEmpty()) {
-      return;
-    }
-
-    // Look if there's anything to process after a restart
-    for (ServerName rs : currentReplicators) {
-      if (!liveRegionServers.contains(rs)) {
-        transferQueues(rs);
-      }
-    }
   }
 
   /**
@@ -665,169 +628,101 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     }
   }
 
-  @Override
-  public void regionServerRemoved(ServerName regionserver) {
-    transferQueues(regionserver);
-  }
-
-  /**
-   * Transfer all the queues of the specified to this region server. First it 
tries to grab a lock
-   * and if it works it will move the old queues and finally will delete the 
old queues.
-   * <p>
-   * It creates one old source for any type of source of the old rs.
-   */
-  private void transferQueues(ServerName deadRS) {
-    if (server.getServerName().equals(deadRS)) {
-      // it's just us, give up
+  void claimQueue(ServerName deadRS, String queue) {
+    // Wait a bit before transferring the queues, we may be shutting down.
+    // This sleep may not be enough in some cases.
+    try {
+      Thread.sleep(sleepBeforeFailover +
+        (long) (ThreadLocalRandom.current().nextFloat() * 
sleepBeforeFailover));
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while waiting before transferring a queue.");
+      Thread.currentThread().interrupt();
+    }
+    // We try to lock that rs' queue directory
+    if (server.isStopped()) {
+      LOG.info("Not transferring queue since we are shutting down");
       return;
     }
-    NodeFailoverWorker transfer = new NodeFailoverWorker(deadRS);
+    // After claim the queues from dead region server, wewill skip to start the
+    // RecoveredReplicationSource if the peer has been removed. but there's 
possible that remove a
+    // peer with peerId = 2 and add a peer with peerId = 2 again during 
failover. So we need to get
+    // a copy of the replication peer first to decide whether we should start 
the
+    // RecoveredReplicationSource. If the latest peer is not the old peer, we 
should also skip to
+    // start the RecoveredReplicationSource, Otherwise the rs will abort (See 
HBASE-20475).
+    String peerId = new ReplicationQueueInfo(queue).getPeerId();
+    ReplicationPeerImpl oldPeer = replicationPeers.getPeer(peerId);
+    if (oldPeer == null) {
+      LOG.info("Not transferring queue since the replication peer {} for queue 
{} does not exist",
+        peerId, queue);
+      return;
+    }
+    Pair<String, SortedSet<String>> claimedQueue;
     try {
-      this.executor.execute(transfer);
-    } catch (RejectedExecutionException ex) {
-      
CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
-          .getGlobalSource().incrFailedRecoveryQueue();
-      LOG.info("Cancelling the transfer of " + deadRS + " because of " + 
ex.getMessage());
+      claimedQueue = queueStorage.claimQueue(deadRS, queue, 
server.getServerName());
+    } catch (ReplicationException e) {
+      LOG.error(
+        "ReplicationException: cannot claim dead region ({})'s " +
+          "replication queue. Znode : ({})" +
+          " Possible solution: check if znode size exceeds jute.maxBuffer 
value. " +
+          " If so, increase it for both client and server side.",
+        deadRS, queueStorage.getRsNode(deadRS), e);
+      server.abort("Failed to claim queue from dead regionserver.", e);
+      return;
     }
-  }
-
-  /**
-   * Class responsible to setup new ReplicationSources to take care of the 
queues from dead region
-   * servers.
-   */
-  class NodeFailoverWorker extends Thread {
-
-    private final ServerName deadRS;
-    // After claim the queues from dead region server, the NodeFailoverWorker 
will skip to start
-    // the RecoveredReplicationSource if the peer has been removed. but 
there's possible that
-    // remove a peer with peerId = 2 and add a peer with peerId = 2 again 
during the
-    // NodeFailoverWorker. So we need a deep copied <peerId, peer> map to 
decide whether we
-    // should start the RecoveredReplicationSource. If the latest peer is not 
the old peer when
-    // NodeFailoverWorker begin, we should skip to start the 
RecoveredReplicationSource, Otherwise
-    // the rs will abort (See HBASE-20475).
-    private final Map<String, ReplicationPeerImpl> peersSnapshot;
-
-    public NodeFailoverWorker(ServerName deadRS) {
-      super("Failover-for-" + deadRS);
-      this.deadRS = deadRS;
-      peersSnapshot = new HashMap<>(replicationPeers.getPeerCache());
+    if (claimedQueue.getSecond().isEmpty()) {
+      return;
     }
-
-    private boolean isOldPeer(String peerId, ReplicationPeerImpl newPeerRef) {
-      ReplicationPeerImpl oldPeerRef = peersSnapshot.get(peerId);
-      return oldPeerRef != null && oldPeerRef == newPeerRef;
+    String queueId = claimedQueue.getFirst();
+    Set<String> walsSet = claimedQueue.getSecond();
+    ReplicationPeerImpl peer = replicationPeers.getPeer(peerId);
+    if (peer == null || peer != oldPeer) {
+      LOG.warn("Skipping failover for peer {} of node {}, peer is null", 
peerId, deadRS);
+      abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), 
queueId));
+      return;
+    }
+    if (server instanceof ReplicationSyncUp.DummyServer &&
+      peer.getPeerState().equals(PeerState.DISABLED)) {
+      LOG.warn(
+        "Peer {} is disabled. ReplicationSyncUp tool will skip " + 
"replicating data to this peer.",
+        peerId);
+      return;
     }
 
-    @Override
-    public void run() {
-      // Wait a bit before transferring the queues, we may be shutting down.
-      // This sleep may not be enough in some cases.
-      try {
-        Thread.sleep(sleepBeforeFailover +
-          (long) (ThreadLocalRandom.current().nextFloat() * 
sleepBeforeFailover));
-      } catch (InterruptedException e) {
-        LOG.warn("Interrupted while waiting before transferring a queue.");
-        Thread.currentThread().interrupt();
-      }
-      // We try to lock that rs' queue directory
-      if (server.isStopped()) {
-        LOG.info("Not transferring queue since we are shutting down");
+    ReplicationSourceInterface src;
+    try {
+      src = createSource(queueId, peer);
+    } catch (IOException e) {
+      LOG.error("Can not create replication source for peer {} and queue {}", 
peerId, queueId, e);
+      server.abort("Failed to create replication source after claiming 
queue.", e);
+      return;
+    }
+    // synchronized on oldsources to avoid adding recovered source for the 
to-be-removed peer
+    synchronized (oldsources) {
+      peer = replicationPeers.getPeer(src.getPeerId());
+      if (peer == null || peer != oldPeer) {
+        src.terminate("Recovered queue doesn't belong to any current peer");
+        deleteQueue(queueId);
         return;
       }
-      Map<String, Set<String>> newQueues = new HashMap<>();
-      try {
-        List<String> queues = queueStorage.getAllQueues(deadRS);
-        while (!queues.isEmpty()) {
-          Pair<String, SortedSet<String>> peer = 
queueStorage.claimQueue(deadRS,
-            queues.get(ThreadLocalRandom.current().nextInt(queues.size())), 
server.getServerName());
-          long sleep = sleepBeforeFailover / 2;
-          if (!peer.getSecond().isEmpty()) {
-            newQueues.put(peer.getFirst(), peer.getSecond());
-            sleep = sleepBeforeFailover;
-          }
-          try {
-            Thread.sleep(sleep);
-          } catch (InterruptedException e) {
-            LOG.warn("Interrupted while waiting before transferring a queue.");
-            Thread.currentThread().interrupt();
-          }
-          queues = queueStorage.getAllQueues(deadRS);
-        }
-        if (queues.isEmpty()) {
-          queueStorage.removeReplicatorIfQueueIsEmpty(deadRS);
+      // track sources in walsByIdRecoveredQueues
+      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
+      walsByIdRecoveredQueues.put(queueId, walsByGroup);
+      for (String wal : walsSet) {
+        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+        NavigableSet<String> wals = walsByGroup.get(walPrefix);
+        if (wals == null) {
+          wals = new TreeSet<>();
+          walsByGroup.put(walPrefix, wals);
         }
-      } catch (ReplicationException e) {
-        LOG.error(String.format("ReplicationException: cannot claim dead 
region (%s)'s " +
-            "replication queue. Znode : (%s)" +
-            " Possible solution: check if znode size exceeds jute.maxBuffer 
value. " +
-            " If so, increase it for both client and server side.",
-          deadRS, queueStorage.getRsNode(deadRS)), e);
-        server.abort("Failed to claim queue from dead regionserver.", e);
-        return;
+        wals.add(wal);
       }
-      // Copying over the failed queue is completed.
-      if (newQueues.isEmpty()) {
-        // We either didn't get the lock or the failed region server didn't 
have any outstanding
-        // WALs to replicate, so we are done.
-        return;
-      }
-
-      for (Map.Entry<String, Set<String>> entry : newQueues.entrySet()) {
-        String queueId = entry.getKey();
-        Set<String> walsSet = entry.getValue();
-        try {
-          // there is not an actual peer defined corresponding to peerId for 
the failover.
-          ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(queueId);
-          String actualPeerId = replicationQueueInfo.getPeerId();
-
-          ReplicationPeerImpl peer = replicationPeers.getPeer(actualPeerId);
-          if (peer == null || !isOldPeer(actualPeerId, peer)) {
-            LOG.warn("Skipping failover for peer {} of node {}, peer is null", 
actualPeerId,
-              deadRS);
-            abortWhenFail(() -> 
queueStorage.removeQueue(server.getServerName(), queueId));
-            continue;
-          }
-          if (server instanceof ReplicationSyncUp.DummyServer
-              && peer.getPeerState().equals(PeerState.DISABLED)) {
-            LOG.warn("Peer {} is disabled. ReplicationSyncUp tool will skip "
-                + "replicating data to this peer.",
-              actualPeerId);
-            continue;
-          }
-          // track sources in walsByIdRecoveredQueues
-          Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
-          walsByIdRecoveredQueues.put(queueId, walsByGroup);
-          for (String wal : walsSet) {
-            String walPrefix = 
AbstractFSWALProvider.getWALPrefixFromWALName(wal);
-            NavigableSet<String> wals = walsByGroup.get(walPrefix);
-            if (wals == null) {
-              wals = new TreeSet<>();
-              walsByGroup.put(walPrefix, wals);
-            }
-            wals.add(wal);
-          }
-
-          ReplicationSourceInterface src = createSource(queueId, peer);
-          // synchronized on oldsources to avoid adding recovered source for 
the to-be-removed peer
-          synchronized (oldsources) {
-            peer = replicationPeers.getPeer(src.getPeerId());
-            if (peer == null || !isOldPeer(src.getPeerId(), peer)) {
-              src.terminate("Recovered queue doesn't belong to any current 
peer");
-              removeRecoveredSource(src);
-              continue;
-            }
-            oldsources.add(src);
-            LOG.info("Added recovered source {}", src.getQueueId());
-            for (String wal : walsSet) {
-              src.enqueueLog(new Path(oldLogDir, wal));
-            }
-            src.startup();
-          }
-        } catch (IOException e) {
-          // TODO manage it
-          LOG.error("Failed creating a source", e);
-        }
+      oldsources.add(src);
+      LOG.info("Added source for recovered queue {}", src.getQueueId());
+      for (String wal : walsSet) {
+        LOG.trace("Enqueueing log from recovered queue for source: " + 
src.getQueueId());
+        src.enqueueLog(new Path(oldLogDir, wal));
       }
+      src.startup();
     }
   }
 
@@ -1054,4 +949,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     }
     return crs.startup();
   }
+
+  ReplicationQueueStorage getQueueStorage() {
+    return queueStorage;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index b6386cc..a2ce6e6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -18,6 +18,10 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
@@ -31,18 +35,21 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * In a scenario of Replication based Disaster/Recovery, when hbase 
Master-Cluster crashes, this
  * tool is used to sync-up the delta from Master to Slave using the info from 
ZooKeeper. The tool
- * will run on Master-Cluser, and assume ZK, Filesystem and NetWork still 
available after hbase
+ * will run on Master-Cluster, and assume ZK, Filesystem and NetWork still 
available after hbase
  * crashes
  *
  * <pre>
@@ -62,6 +69,29 @@ public class ReplicationSyncUp extends Configured implements 
Tool {
     System.exit(ret);
   }
 
+  private Set<ServerName> getLiveRegionServers(ZKWatcher zkw) throws 
KeeperException {
+    List<String> rsZNodes = ZKUtil.listChildrenNoWatch(zkw, 
zkw.getZNodePaths().rsZNode);
+    return rsZNodes == null ? Collections.emptySet() :
+      
rsZNodes.stream().map(ServerName::parseServerName).collect(Collectors.toSet());
+  }
+
+  // When using this tool, usually the source cluster is unhealthy, so we 
should try to claim the
+  // replication queues for the dead region servers first and then replicate 
the data out.
+  private void claimReplicationQueues(ZKWatcher zkw, ReplicationSourceManager 
mgr)
+    throws ReplicationException, KeeperException {
+    List<ServerName> replicators = 
mgr.getQueueStorage().getListOfReplicators();
+    Set<ServerName> liveRegionServers = getLiveRegionServers(zkw);
+    for (ServerName sn : replicators) {
+      if (!liveRegionServers.contains(sn)) {
+        List<String> replicationQueues = 
mgr.getQueueStorage().getAllQueues(sn);
+        System.out.println(sn + " is dead, claim its replication queues: " + 
replicationQueues);
+        for (String queue : replicationQueues) {
+          mgr.claimQueue(sn, queue);
+        }
+      }
+    }
+  }
+
   @Override
   public int run(String[] args) throws Exception {
     Abortable abortable = new Abortable() {
@@ -88,7 +118,8 @@ public class ReplicationSyncUp extends Configured implements 
Tool {
       replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir,
         new WALFactory(conf, "test", null));
       ReplicationSourceManager manager = replication.getReplicationManager();
-      manager.init().get();
+      manager.init();
+      claimReplicationQueues(zkw, manager);
       while (manager.activeFailoverTaskCount() > 0) {
         Thread.sleep(SLEEP_TIME);
       }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java
index b2e698f..c78fe40 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java
@@ -18,41 +18,30 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import 
org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleRemoteStateData;
 
 /**
  * The callable executed at RS side to switch rpc throttle state. <br/>
  */
 @InterfaceAudience.Private
-public class SwitchRpcThrottleRemoteCallable implements RSProcedureCallable {
-  private HRegionServer rs;
+public class SwitchRpcThrottleRemoteCallable extends BaseRSProcedureCallable {
+
   private boolean rpcThrottleEnabled;
-  private Exception initError;
 
   @Override
-  public Void call() throws Exception {
-    if (initError != null) {
-      throw initError;
-    }
+  protected void doCall() throws Exception {
     rs.getRegionServerRpcQuotaManager().switchRpcThrottle(rpcThrottleEnabled);
-    return null;
   }
 
   @Override
-  public void init(byte[] parameter, HRegionServer rs) {
-    this.rs = rs;
-    try {
-      SwitchRpcThrottleRemoteStateData param =
-          SwitchRpcThrottleRemoteStateData.parseFrom(parameter);
-      rpcThrottleEnabled = param.getRpcThrottleEnabled();
-    } catch (InvalidProtocolBufferException e) {
-      initError = e;
-    }
+  protected void initParameter(byte[] parameter) throws 
InvalidProtocolBufferException {
+    SwitchRpcThrottleRemoteStateData param = 
SwitchRpcThrottleRemoteStateData.parseFrom(parameter);
+    rpcThrottleEnabled = param.getRpcThrottleEnabled();
   }
 
   @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java
new file mode 100644
index 0000000..6c86feb
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
+import 
org.apache.hadoop.hbase.master.replication.ClaimReplicationQueuesProcedure;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+
+/**
+ * In HBASE-26029, we reimplement the claim queue operation with proc-v2 and 
make it a step in SCP,
+ * this is a UT to make sure the {@link ClaimReplicationQueuesProcedure} works 
correctly.
+ */
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestClaimReplicationQueue extends TestReplicationBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestClaimReplicationQueue.class);
+
+  private static final TableName tableName3 = TableName.valueOf("test3");
+
+  private static final String PEER_ID3 = "3";
+
+  private static Table table3;
+
+  private static Table table4;
+
+  private static volatile boolean EMPTY = false;
+
+  public static final class ServerManagerForTest extends ServerManager {
+
+    public ServerManagerForTest(MasterServices master) {
+      super(master);
+    }
+
+    @Override
+    public List<ServerName> getOnlineServersList() {
+      // return no region server to make the procedure hang
+      if (EMPTY) {
+        for (StackTraceElement e : Thread.currentThread().getStackTrace()) {
+          if 
(e.getClassName().equals(ClaimReplicationQueuesProcedure.class.getName())) {
+            return Collections.emptyList();
+          }
+        }
+      }
+      return super.getOnlineServersList();
+    }
+  }
+
+  public static final class HMasterForTest extends HMaster {
+
+    public HMasterForTest(Configuration conf) throws IOException {
+      super(conf);
+    }
+
+    @Override
+    protected ServerManager createServerManager(MasterServices master) throws 
IOException {
+      setupClusterConnection();
+      return new ServerManagerForTest(master);
+    }
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    CONF1.setClass(HConstants.MASTER_IMPL, HMasterForTest.class, 
HMaster.class);
+    TestReplicationBase.setUpBeforeClass();
+    createTable(tableName3);
+    table3 = connection1.getTable(tableName3);
+    table4 = connection2.getTable(tableName3);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    Closeables.close(table3, true);
+    Closeables.close(table4, true);
+    TestReplicationBase.tearDownAfterClass();
+  }
+
+  @Override
+  public void setUpBase() throws Exception {
+    super.setUpBase();
+    // set up two replication peers and only 1 rs to test claim replication 
queue with multiple
+    // round
+    addPeer(PEER_ID3, tableName3);
+  }
+
+  @Override
+  public void tearDownBase() throws Exception {
+    super.tearDownBase();
+    removePeer(PEER_ID3);
+  }
+
+  @Test
+  public void testClaim() throws Exception {
+    // disable the peers
+    hbaseAdmin.disableReplicationPeer(PEER_ID2);
+    hbaseAdmin.disableReplicationPeer(PEER_ID3);
+
+    // put some data
+    int count1 = UTIL1.loadTable(htable1, famName);
+    int count2 = UTIL1.loadTable(table3, famName);
+
+    EMPTY = true;
+    UTIL1.getMiniHBaseCluster().stopRegionServer(0).join();
+    UTIL1.getMiniHBaseCluster().startRegionServer();
+
+    // since there is no active region server to get the replication queue, 
the procedure should be
+    // in WAITING_TIMEOUT state for most time to retry
+    HMaster master = UTIL1.getMiniHBaseCluster().getMaster();
+    UTIL1.waitFor(30000,
+      () -> master.getProcedures().stream()
+        .filter(p -> p instanceof ClaimReplicationQueuesProcedure)
+        .anyMatch(p -> p.getState() == ProcedureState.WAITING_TIMEOUT));
+
+    hbaseAdmin.enableReplicationPeer(PEER_ID2);
+    hbaseAdmin.enableReplicationPeer(PEER_ID3);
+
+    EMPTY = false;
+    // wait until the SCP finished, ClaimReplicationQueuesProcedure is a sub 
procedure of SCP
+    UTIL1.waitFor(30000, () -> master.getProcedures().stream()
+      .filter(p -> p instanceof 
ServerCrashProcedure).allMatch(Procedure::isSuccess));
+
+    // we should get all the data in the target cluster
+    waitForReplication(htable2, count1, NB_RETRIES);
+    waitForReplication(table4, count2, NB_RETRIES);
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index 6d40997..cb90eba 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -54,6 +55,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
 
@@ -65,8 +67,8 @@ import 
org.apache.hbase.thirdparty.com.google.common.io.Closeables;
  */
 public class TestReplicationBase {
   private static final Logger LOG = 
LoggerFactory.getLogger(TestReplicationBase.class);
-  private static Connection connection1;
-  private static Connection connection2;
+  protected static Connection connection1;
+  protected static Connection connection2;
   protected static Configuration CONF_WITH_LOCALFS;
 
   protected static ReplicationAdmin admin;
@@ -141,19 +143,22 @@ public class TestReplicationBase {
     waitForReplication(htable2, expectedRows, retries);
   }
 
-  protected static void waitForReplication(Table htable2, int expectedRows, 
int retries)
-      throws IOException, InterruptedException {
+  protected static void waitForReplication(Table table, int expectedRows, int 
retries)
+    throws IOException, InterruptedException {
     Scan scan;
     for (int i = 0; i < retries; i++) {
       scan = new Scan();
-      if (i== retries -1) {
+      if (i == retries - 1) {
         fail("Waited too much time for normal batch replication");
       }
-      ResultScanner scanner = htable2.getScanner(scan);
-      Result[] res = scanner.next(expectedRows);
-      scanner.close();
-      if (res.length != expectedRows) {
-        LOG.info("Only got " + res.length + " rows");
+      int count = 0;
+      try (ResultScanner scanner = table.getScanner(scan)) {
+        while (scanner.next() != null) {
+          count++;
+        }
+      }
+      if (count != expectedRows) {
+        LOG.info("Only got " + count + " rows");
         Thread.sleep(SLEEP_TIME);
       } else {
         break;
@@ -229,6 +234,18 @@ public class TestReplicationBase {
     htable2 = UTIL2.getConnection().getTable(tableName);
   }
 
+  protected static void createTable(TableName tableName)
+    throws IOException {
+    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
+      
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
+        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
+    UTIL1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+    UTIL2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+    UTIL1.waitUntilAllRegionsAssigned(tableName);
+    UTIL2.waitUntilAllRegionsAssigned(tableName);
+  }
+
   private static void startClusters() throws Exception {
     UTIL1.startMiniZKCluster();
     MiniZooKeeperCluster miniZK = UTIL1.getZkCluster();
@@ -248,22 +265,9 @@ public class TestReplicationBase {
     admin = new ReplicationAdmin(CONF1);
     hbaseAdmin = connection1.getAdmin();
 
-    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
-        
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
-            .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
-        
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
-
-    try (
-      Admin admin1 = connection1.getAdmin();
-      Admin admin2 = connection2.getAdmin()) {
-      admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
-      admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
-      UTIL1.waitUntilAllRegionsAssigned(tableName);
-      htable1 = connection1.getTable(tableName);
-      UTIL2.waitUntilAllRegionsAssigned(tableName);
-      htable2 = connection2.getTable(tableName);
-    }
-
+    createTable(tableName);
+    htable1 = connection1.getTable(tableName);
+    htable2 = connection2.getTable(tableName);
   }
 
   @BeforeClass
@@ -276,21 +280,29 @@ public class TestReplicationBase {
     return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> 
peerId.equals(p.getPeerId()));
   }
 
-  @Before
-  public void setUpBase() throws Exception {
-    if (!peerExist(PEER_ID2)) {
+  protected final void addPeer(String peerId, TableName tableName) throws 
Exception {
+    if (!peerExist(peerId)) {
       ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
         
.setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).setReplicationEndpointImpl(
           ReplicationEndpointTest.class.getName());
-      hbaseAdmin.addReplicationPeer(PEER_ID2, builder.build());
+      hbaseAdmin.addReplicationPeer(peerId, builder.build());
+    }
+  }
+
+  @Before
+  public void setUpBase() throws Exception {
+    addPeer(PEER_ID2, tableName);
+  }
+
+  protected final void removePeer(String peerId) throws Exception {
+    if (peerExist(peerId)) {
+      hbaseAdmin.removeReplicationPeer(peerId);
     }
   }
 
   @After
   public void tearDownBase() throws Exception {
-    if (peerExist(PEER_ID2)) {
-      hbaseAdmin.removeReplicationPeer(PEER_ID2);
-    }
+    removePeer(PEER_ID2);
   }
 
   protected static void runSimplePutDeleteTest() throws IOException, 
InterruptedException {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 68926d5..8aad2b1 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -72,7 +72,6 @@ import 
org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
-import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -393,9 +392,7 @@ public abstract class TestReplicationSourceManager {
     ReplicationPeers rp1 =
         ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), 
s1.getConfiguration());
     rp1.init();
-    NodeFailoverWorker w1 =
-        manager.new NodeFailoverWorker(server.getServerName());
-    w1.run();
+    manager.claimQueue(server.getServerName(), "1");
     assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
     String id = "1-" + server.getServerName().getServerName();
     assertEquals(files, 
manager.getWalsByIdRecoveredQueues().get(id).get(group));
@@ -415,8 +412,7 @@ public abstract class TestReplicationSourceManager {
     rq.addWAL(server.getServerName(), "2", group + ".log1");
     rq.addWAL(server.getServerName(), "2", group + ".log2");
 
-    NodeFailoverWorker w1 = manager.new 
NodeFailoverWorker(server.getServerName());
-    w1.run();
+    manager.claimQueue(server.getServerName(), "2");
 
     // The log of the unknown peer should be removed from zk
     for (String peer : manager.getAllQueues()) {

Reply via email to