http://git-wip-us.apache.org/repos/asf/hbase/blob/11158715/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index b6cf54d..4b9ed74 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -32,9 +31,10 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * Interface that defines a replication source
@@ -47,15 +47,10 @@ public interface ReplicationSourceInterface {
    * @param conf the configuration to use
    * @param fs the file system to use
    * @param manager the manager to use
-   * @param replicationQueues
-   * @param replicationPeers
    * @param server the server for this region server
-   * @param peerClusterZnode
-   * @param clusterId
-   * @throws IOException
    */
   void init(Configuration conf, FileSystem fs, ReplicationSourceManager 
manager,
-      ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, 
Server server,
+      ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, 
Server server,
       String peerClusterZnode, UUID clusterId, ReplicationEndpoint 
replicationEndpoint,
       WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) 
throws IOException;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/11158715/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index b1d82c8..853bafb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -34,19 +34,21 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
@@ -60,7 +62,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -68,6 +70,7 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -95,7 +98,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   private final List<ReplicationSourceInterface> sources;
   // List of all the sources we got from died RSs
   private final List<ReplicationSourceInterface> oldsources;
-  private final ReplicationQueues replicationQueues;
+  private final ReplicationQueueStorage queueStorage;
   private final ReplicationTracker replicationTracker;
   private final ReplicationPeers replicationPeers;
   // UUID for this cluster
@@ -130,7 +133,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 
   /**
    * Creates a replication manager and sets the watch on all the other 
registered region servers
-   * @param replicationQueues the interface for manipulating replication queues
+   * @param queueStorage the interface for manipulating replication queues
    * @param replicationPeers
    * @param replicationTracker
    * @param conf the configuration to use
@@ -140,14 +143,14 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    * @param oldLogDir the directory where old logs are archived
    * @param clusterId
    */
-  public ReplicationSourceManager(ReplicationQueues replicationQueues,
+  public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
       ReplicationPeers replicationPeers, ReplicationTracker 
replicationTracker, Configuration conf,
       Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID 
clusterId,
       WALFileLengthProvider walFileLengthProvider) throws IOException {
     //CopyOnWriteArrayList is thread-safe.
     //Generally, reading is more than modifying.
     this.sources = new CopyOnWriteArrayList<>();
-    this.replicationQueues = replicationQueues;
+    this.queueStorage = queueStorage;
     this.replicationPeers = replicationPeers;
     this.replicationTracker = replicationTracker;
     this.server = server;
@@ -184,6 +187,19 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     connection = ConnectionFactory.createConnection(conf);
   }
 
+  @FunctionalInterface
+  private interface ReplicationQueueOperation {
+    void exec() throws ReplicationException;
+  }
+
+  private void abortWhenFail(ReplicationQueueOperation op) {
+    try {
+      op.exec();
+    } catch (ReplicationException e) {
+      server.abort("Failed to operate on replication queue", e);
+    }
+  }
+
   /**
    * Provide the id of the peer and a log key and this method will figure which
    * wal it belongs to and will log, for this region server, the current
@@ -195,12 +211,13 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    * @param queueRecovered indicates if this queue comes from another region 
server
    * @param holdLogInZK if true then the log is retained in ZK
    */
-  public void logPositionAndCleanOldLogs(Path log, String id, long position,
-      boolean queueRecovered, boolean holdLogInZK) {
+  public void logPositionAndCleanOldLogs(Path log, String id, long position, 
boolean queueRecovered,
+      boolean holdLogInZK) {
     String fileName = log.getName();
-    this.replicationQueues.setLogPosition(id, fileName, position);
+    abortWhenFail(
+      () -> this.queueStorage.setWALPosition(server.getServerName(), id, 
fileName, position));
     if (holdLogInZK) {
-     return;
+      return;
     }
     cleanOldLogs(fileName, id, queueRecovered);
   }
@@ -227,36 +244,59 @@ public class ReplicationSourceManager implements 
ReplicationListener {
         }
       }
     }
- }
+  }
 
   private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
     SortedSet<String> walSet = wals.headSet(key);
-    LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
+    }
     for (String wal : walSet) {
-      this.replicationQueues.removeLog(id, wal);
+      abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), 
id, wal));
     }
     walSet.clear();
   }
 
+  private void adoptAbandonedQueues() {
+    List<ServerName> currentReplicators = null;
+    try {
+      currentReplicators = queueStorage.getListOfReplicators();
+    } catch (ReplicationException e) {
+      server.abort("Failed to get all replicators", e);
+      return;
+    }
+    if (currentReplicators == null || currentReplicators.isEmpty()) {
+      return;
+    }
+    List<ServerName> otherRegionServers = 
replicationTracker.getListOfRegionServers().stream()
+        .map(ServerName::valueOf).collect(Collectors.toList());
+    LOG.info(
+      "Current list of replicators: " + currentReplicators + " other RSs: " + 
otherRegionServers);
+
+    // Look if there's anything to process after a restart
+    for (ServerName rs : currentReplicators) {
+      if (!otherRegionServers.contains(rs)) {
+        transferQueues(rs);
+      }
+    }
+  }
+
   /**
-   * Adds a normal source per registered peer cluster and tries to process all
-   * old region server wal queues
+   * Adds a normal source per registered peer cluster and tries to process all 
old region server wal
+   * queues
+   * <p>
+   * The returned future is for adoptAbandonedQueues task.
    */
-  void init() throws IOException, ReplicationException {
+  Future<?> init() throws IOException, ReplicationException {
     for (String id : this.replicationPeers.getConnectedPeerIds()) {
       addSource(id);
       if (replicationForBulkLoadDataEnabled) {
         // Check if peer exists in hfile-refs queue, if not add it. This can 
happen in the case
         // when a peer was added before replication for bulk loaded data was 
enabled.
-        this.replicationQueues.addPeerToHFileRefs(id);
+        this.queueStorage.addPeerToHFileRefs(id);
       }
     }
-    AdoptAbandonedQueuesWorker adoptionWorker = new 
AdoptAbandonedQueuesWorker();
-    try {
-      this.executor.execute(adoptionWorker);
-    } catch (RejectedExecutionException ex) {
-      LOG.info("Cancelling the adoption of abandoned queues because of " + 
ex.getMessage());
-    }
+    return this.executor.submit(this::adoptAbandonedQueues);
   }
 
   /**
@@ -264,15 +304,12 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    * need to enqueue the latest log of each wal group and do replication
    * @param id the id of the peer cluster
    * @return the source that was created
-   * @throws IOException
    */
   @VisibleForTesting
   ReplicationSourceInterface addSource(String id) throws IOException, 
ReplicationException {
     ReplicationPeerConfig peerConfig = 
replicationPeers.getReplicationPeerConfig(id);
     ReplicationPeer peer = replicationPeers.getConnectedPeer(id);
-    ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, 
this,
-      this.replicationQueues, this.replicationPeers, server, id, 
this.clusterId, peerConfig, peer,
-      walFileLengthProvider);
+    ReplicationSourceInterface src = getReplicationSource(id, peerConfig, 
peer);
     synchronized (this.walsById) {
       this.sources.add(src);
       Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
@@ -287,11 +324,10 @@ public class ReplicationSourceManager implements 
ReplicationListener {
             logs.add(name);
             walsByGroup.put(walPrefix, logs);
             try {
-              this.replicationQueues.addLog(id, name);
+              this.queueStorage.addWAL(server.getServerName(), id, name);
             } catch (ReplicationException e) {
-              String message =
-                  "Cannot add log to queue when creating a new source, 
queueId=" + id
-                      + ", filename=" + name;
+              String message = "Cannot add log to queue when creating a new 
source, queueId=" + id +
+                ", filename=" + name;
               server.stop(message);
               throw e;
             }
@@ -316,7 +352,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    * @param peerId Id of the peer cluster queue of wals to delete
    */
   public void deleteSource(String peerId, boolean closeConnection) {
-    this.replicationQueues.removeQueue(peerId);
+    abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), 
peerId));
     if (closeConnection) {
       this.replicationPeers.peerDisconnected(peerId);
     }
@@ -376,8 +412,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   }
 
   @VisibleForTesting
-  List<String> getAllQueues() {
-    return replicationQueues.getAllQueues();
+  List<String> getAllQueues() throws ReplicationException {
+    return queueStorage.getAllQueues(server.getServerName());
   }
 
   void preLogRoll(Path newLog) throws IOException {
@@ -411,10 +447,10 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     synchronized (replicationPeers) {
       for (String id : replicationPeers.getConnectedPeerIds()) {
         try {
-          this.replicationQueues.addLog(id, logName);
+          this.queueStorage.addWAL(server.getServerName(), id, logName);
         } catch (ReplicationException e) {
-          throw new IOException("Cannot add log to replication queue"
-              + " when creating a new source, queueId=" + id + ", filename=" + 
logName, e);
+          throw new IOException("Cannot add log to replication queue" +
+            " when creating a new source, queueId=" + id + ", filename=" + 
logName, e);
         }
       }
     }
@@ -461,19 +497,11 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 
   /**
    * Factory method to create a replication source
-   * @param conf the configuration to use
-   * @param fs the file system to use
-   * @param manager the manager to use
-   * @param server the server object for this region server
    * @param peerId the id of the peer cluster
    * @return the created source
-   * @throws IOException
    */
-  private ReplicationSourceInterface getReplicationSource(Configuration conf, 
FileSystem fs,
-      ReplicationSourceManager manager, ReplicationQueues replicationQueues,
-      ReplicationPeers replicationPeers, Server server, String peerId, UUID 
clusterId,
-      ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer,
-      WALFileLengthProvider walFileLengthProvider) throws IOException {
+  private ReplicationSourceInterface getReplicationSource(String peerId,
+      ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer) 
throws IOException {
     RegionServerCoprocessorHost rsServerHost = null;
     TableDescriptors tableDescriptors = null;
     if (server instanceof HRegionServer) {
@@ -490,9 +518,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
         // Default to HBase inter-cluster replication endpoint
         replicationEndpointImpl = 
HBaseInterClusterReplicationEndpoint.class.getName();
       }
-      @SuppressWarnings("rawtypes")
-      Class c = Class.forName(replicationEndpointImpl);
-      replicationEndpoint = (ReplicationEndpoint) c.newInstance();
+      replicationEndpoint = Class.forName(replicationEndpointImpl)
+          .asSubclass(ReplicationEndpoint.class).newInstance();
       if(rsServerHost != null) {
         ReplicationEndpoint newReplicationEndPoint = rsServerHost
             .postCreateReplicationEndPoint(replicationEndpoint);
@@ -509,7 +536,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 
     MetricsSource metrics = new MetricsSource(peerId);
     // init replication source
-    src.init(conf, fs, manager, replicationQueues, replicationPeers, server, 
peerId, clusterId,
+    src.init(conf, fs, this, queueStorage, replicationPeers, server, peerId, 
clusterId,
       replicationEndpoint, walFileLengthProvider, metrics);
 
     // init replication endpoint
@@ -520,21 +547,21 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   }
 
   /**
-   * Transfer all the queues of the specified to this region server.
-   * First it tries to grab a lock and if it works it will move the
-   * znodes and finally will delete the old znodes.
-   *
+   * Transfer all the queues of the specified to this region server. First it 
tries to grab a lock
+   * and if it works it will move the znodes and finally will delete the old 
znodes.
+   * <p>
    * It creates one old source for any type of source of the old rs.
-   * @param rsZnode
    */
-  private void transferQueues(String rsZnode) {
-    NodeFailoverWorker transfer =
-        new NodeFailoverWorker(rsZnode, this.replicationQueues, 
this.replicationPeers,
-            this.clusterId);
+  private void transferQueues(ServerName deadRS) {
+    if (server.getServerName().equals(deadRS)) {
+      // it's just us, give up
+      return;
+    }
+    NodeFailoverWorker transfer = new NodeFailoverWorker(deadRS);
     try {
       this.executor.execute(transfer);
     } catch (RejectedExecutionException ex) {
-      LOG.info("Cancelling the transfer of " + rsZnode + " because of " + 
ex.getMessage());
+      LOG.info("Cancelling the transfer of " + deadRS + " because of " + 
ex.getMessage());
     }
   }
 
@@ -571,7 +598,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
       LOG.info("Peer " + id + " connected success, trying to start the 
replication source thread.");
       addSource(id);
       if (replicationForBulkLoadDataEnabled) {
-        this.replicationQueues.addPeerToHFileRefs(id);
+        this.queueStorage.addPeerToHFileRefs(id);
       }
     }
   }
@@ -624,12 +651,12 @@ public class ReplicationSourceManager implements 
ReplicationListener {
       deleteSource(id, true);
     }
     // Remove HFile Refs znode from zookeeper
-    this.replicationQueues.removePeerFromHFileRefs(id);
+    abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(id));
   }
 
   @Override
   public void regionServerRemoved(String regionserver) {
-    transferQueues(regionserver);
+    transferQueues(ServerName.valueOf(regionserver));
   }
 
   /**
@@ -638,37 +665,21 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    */
   class NodeFailoverWorker extends Thread {
 
-    private String rsZnode;
-    private final ReplicationQueues rq;
-    private final ReplicationPeers rp;
-    private final UUID clusterId;
+    private final ServerName deadRS;
 
-    /**
-     * @param rsZnode
-     */
-    public NodeFailoverWorker(String rsZnode) {
-      this(rsZnode, replicationQueues, replicationPeers, 
ReplicationSourceManager.this.clusterId);
-    }
-
-    public NodeFailoverWorker(String rsZnode, final ReplicationQueues 
replicationQueues,
-        final ReplicationPeers replicationPeers, final UUID clusterId) {
-      super("Failover-for-"+rsZnode);
-      this.rsZnode = rsZnode;
-      this.rq = replicationQueues;
-      this.rp = replicationPeers;
-      this.clusterId = clusterId;
+    @VisibleForTesting
+    public NodeFailoverWorker(ServerName deadRS) {
+      super("Failover-for-" + deadRS);
+      this.deadRS = deadRS;
     }
 
     @Override
     public void run() {
-      if (this.rq.isThisOurRegionServer(rsZnode)) {
-        return;
-      }
       // Wait a bit before transferring the queues, we may be shutting down.
       // This sleep may not be enough in some cases.
       try {
         Thread.sleep(sleepBeforeFailover +
-            (long) (ThreadLocalRandom.current().nextFloat() * 
sleepBeforeFailover));
+          (long) (ThreadLocalRandom.current().nextFloat() * 
sleepBeforeFailover));
       } catch (InterruptedException e) {
         LOG.warn("Interrupted while waiting before transferring a queue.");
         Thread.currentThread().interrupt();
@@ -679,25 +690,30 @@ public class ReplicationSourceManager implements 
ReplicationListener {
         return;
       }
       Map<String, Set<String>> newQueues = new HashMap<>();
-      List<String> peers = rq.getUnClaimedQueueIds(rsZnode);
-      while (peers != null && !peers.isEmpty()) {
-        Pair<String, SortedSet<String>> peer = this.rq.claimQueue(rsZnode,
-          peers.get(ThreadLocalRandom.current().nextInt(peers.size())));
-        long sleep = sleepBeforeFailover/2;
-        if (peer != null) {
-          newQueues.put(peer.getFirst(), peer.getSecond());
-          sleep = sleepBeforeFailover;
+      try {
+        List<String> peers = queueStorage.getAllQueues(deadRS);
+        while (!peers.isEmpty()) {
+          Pair<String, SortedSet<String>> peer = 
queueStorage.claimQueue(deadRS,
+            peers.get(ThreadLocalRandom.current().nextInt(peers.size())), 
server.getServerName());
+          long sleep = sleepBeforeFailover / 2;
+          if (!peer.getSecond().isEmpty()) {
+            newQueues.put(peer.getFirst(), peer.getSecond());
+            sleep = sleepBeforeFailover;
+          }
+          try {
+            Thread.sleep(sleep);
+          } catch (InterruptedException e) {
+            LOG.warn("Interrupted while waiting before transferring a queue.");
+            Thread.currentThread().interrupt();
+          }
+          peers = queueStorage.getAllQueues(deadRS);
         }
-        try {
-          Thread.sleep(sleep);
-        } catch (InterruptedException e) {
-          LOG.warn("Interrupted while waiting before transferring a queue.");
-          Thread.currentThread().interrupt();
+        if (!peers.isEmpty()) {
+          queueStorage.removeReplicatorIfQueueIsEmpty(deadRS);
         }
-        peers = rq.getUnClaimedQueueIds(rsZnode);
-      }
-      if (peers != null) {
-        rq.removeReplicatorIfQueueIsEmpty(rsZnode);
+      } catch (ReplicationException e) {
+        server.abort("Failed to claim queue from dead regionserver", e);
+        return;
       }
       // Copying over the failed queue is completed.
       if (newQueues.isEmpty()) {
@@ -722,8 +738,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
                 + ex);
           }
           if (peer == null || peerConfig == null) {
-            LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" 
+ rsZnode);
-            replicationQueues.removeQueue(peerId);
+            LOG.warn("Skipping failover for peer:" + actualPeerId + " of node 
" + deadRS);
+            abortWhenFail(() -> 
queueStorage.removeQueue(server.getServerName(), peerId));
             continue;
           }
           // track sources in walsByIdRecoveredQueues
@@ -740,13 +756,11 @@ public class ReplicationSourceManager implements 
ReplicationListener {
           }
 
           // enqueue sources
-          ReplicationSourceInterface src =
-              getReplicationSource(conf, fs, ReplicationSourceManager.this, 
this.rq, this.rp,
-                server, peerId, this.clusterId, peerConfig, peer, 
walFileLengthProvider);
+          ReplicationSourceInterface src = getReplicationSource(peerId, 
peerConfig, peer);
           // synchronized on oldsources to avoid adding recovered source for 
the to-be-removed peer
           // see removePeer
           synchronized (oldsources) {
-            if (!this.rp.getConnectedPeerIds().contains(src.getPeerId())) {
+            if 
(!replicationPeers.getConnectedPeerIds().contains(src.getPeerId())) {
               src.terminate("Recovered queue doesn't belong to any current 
peer");
               closeRecoveredQueue(src);
               continue;
@@ -765,29 +779,6 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     }
   }
 
-  class AdoptAbandonedQueuesWorker extends Thread{
-
-    public AdoptAbandonedQueuesWorker() {}
-
-    @Override
-    public void run() {
-      List<String> currentReplicators = 
replicationQueues.getListOfReplicators();
-      if (currentReplicators == null || currentReplicators.isEmpty()) {
-        return;
-      }
-      List<String> otherRegionServers = 
replicationTracker.getListOfRegionServers();
-      LOG.info("Current list of replicators: " + currentReplicators + " other 
RSs: "
-        + otherRegionServers);
-
-      // Look if there's anything to process after a restart
-      for (String rs : currentReplicators) {
-        if (!otherRegionServers.contains(rs)) {
-          transferQueues(rs);
-        }
-      }
-    }
-  }
-
   /**
    * Get the directory where wals are archived
    * @return the directory where wals are archived
@@ -846,7 +837,11 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   }
 
   public void cleanUpHFileRefs(String peerId, List<String> files) {
-    this.replicationQueues.removeHFileRefs(peerId, files);
+    abortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files));
+  }
+
+  int activeFailoverTaskCount() {
+    return executor.getActiveCount();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/11158715/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index 21b8ac5..9ec244a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
@@ -37,22 +36,19 @@ import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * In a scenario of Replication based Disaster/Recovery, when hbase
- * Master-Cluster crashes, this tool is used to sync-up the delta from Master 
to
- * Slave using the info from ZooKeeper. The tool will run on Master-Cluser, and
- * assume ZK, Filesystem and NetWork still available after hbase crashes
+ * In a scenario of Replication based Disaster/Recovery, when hbase 
Master-Cluster crashes, this
+ * tool is used to sync-up the delta from Master to Slave using the info from 
ZooKeeper. The tool
+ * will run on Master-Cluser, and assume ZK, Filesystem and NetWork still 
available after hbase
+ * crashes
  *
+ * <pre>
  * hbase org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp
+ * </pre>
  */
-
 public class ReplicationSyncUp extends Configured implements Tool {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationSyncUp.class.getName());
-
   private static Configuration conf;
 
   private static final long SLEEP_TIME = 10000;
@@ -105,13 +101,14 @@ public class ReplicationSyncUp extends Configured 
implements Tool {
     System.out.println("Start Replication Server start");
     replication = new Replication(new DummyServer(zkw), fs, logDir, oldLogDir);
     manager = replication.getReplicationManager();
-    manager.init();
+    manager.init().get();
 
     try {
-      int numberOfOldSource = 1; // default wait once
-      while (numberOfOldSource > 0) {
+      while (manager.activeFailoverTaskCount() > 0) {
+        Thread.sleep(SLEEP_TIME);
+      }
+      while (manager.getOldSources().size() > 0) {
         Thread.sleep(SLEEP_TIME);
-        numberOfOldSource = manager.getOldSources().size();
       }
     } catch (InterruptedException e) {
       System.err.println("didn't wait long enough:" + e);
@@ -121,7 +118,7 @@ public class ReplicationSyncUp extends Configured 
implements Tool {
     manager.join();
     zkw.close();
 
-    return (0);
+    return 0;
   }
 
   static class DummyServer implements Server {

http://git-wip-us.apache.org/repos/asf/hbase/blob/11158715/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 1e75959..2de6608 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -44,9 +44,8 @@ import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -111,9 +110,8 @@ public class TestLogsCleaner {
 
     Replication.decorateMasterConfiguration(conf);
     Server server = new DummyServer();
-    ReplicationQueues repQueues = ReplicationFactory.getReplicationQueues(
-        new ReplicationQueuesArguments(conf, server, server.getZooKeeper()));
-    repQueues.init(server.getServerName().toString());
+    ReplicationQueueStorage queueStorage =
+        
ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), 
conf);
     final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), 
HConstants.HREGION_OLDLOGDIR_NAME);
     final Path oldProcedureWALDir = new Path(oldLogDir, "masterProcedureWALs");
     String fakeMachineName = 
URLEncoder.encode(server.getServerName().toString(), "UTF8");
@@ -144,7 +142,7 @@ public class TestLogsCleaner {
       // Case 4: put 3 WALs in ZK indicating that they are scheduled for 
replication so these
       // files would pass TimeToLiveLogCleaner but would be rejected by 
ReplicationLogCleaner
       if (i % (30 / 3) == 1) {
-        repQueues.addLog(fakeMachineName, fileName.getName());
+        queueStorage.addWAL(server.getServerName(), fakeMachineName, 
fileName.getName());
         LOG.info("Replication log file: " + fileName);
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/11158715/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index f83695f..8802e36 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -46,9 +46,8 @@ import 
org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -72,19 +71,16 @@ import 
org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
 @Category({ MasterTests.class, SmallTests.class })
 public class TestReplicationHFileCleaner {
-  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationQueuesZKImpl.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestReplicationHFileCleaner.class);
   private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
   private static Server server;
-  private static ReplicationQueues rq;
+  private static ReplicationQueueStorage rq;
   private static ReplicationPeers rp;
   private static final String peerId = "TestReplicationHFileCleaner";
   private static Configuration conf = TEST_UTIL.getConfiguration();
   static FileSystem fs = null;
   Path root;
 
-  /**
-   * @throws java.lang.Exception
-   */
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.startMiniZKCluster();
@@ -93,20 +89,10 @@ public class TestReplicationHFileCleaner {
     Replication.decorateMasterConfiguration(conf);
     rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, 
server);
     rp.init();
-    rq = ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, server, server.getZooKeeper()));
-    rq.init(server.getServerName().toString());
-    try {
-      fs = FileSystem.get(conf);
-    } finally {
-      if (fs != null) {
-        fs.close();
-      }
-    }
+    rq = 
ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), 
conf);
+    fs = FileSystem.get(conf);
   }
 
-  /**
-   * @throws java.lang.Exception
-   */
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     TEST_UTIL.shutdownMiniZKCluster();

http://git-wip-us.apache.org/repos/asf/hbase/blob/11158715/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
index 8178266..2ad8bd7 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
@@ -26,10 +26,8 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -49,14 +47,12 @@ public class TestReplicationZKNodeCleaner {
 
   private final Configuration conf;
   private final ZKWatcher zkw;
-  private final ReplicationQueues repQueues;
+  private final ReplicationQueueStorage repQueues;
 
   public TestReplicationZKNodeCleaner() throws Exception {
     conf = TEST_UTIL.getConfiguration();
     zkw = new ZKWatcher(conf, "TestReplicationZKNodeCleaner", null);
-    repQueues = ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, null,
-        zkw));
-    assertTrue(repQueues instanceof ReplicationQueuesZKImpl);
+    repQueues = ReplicationStorageFactory.getReplicationQueueStorage(zkw, 
conf);
   }
 
   @BeforeClass
@@ -72,9 +68,8 @@ public class TestReplicationZKNodeCleaner {
 
   @Test
   public void testReplicationZKNodeCleaner() throws Exception {
-    repQueues.init(SERVER_ONE.getServerName());
     // add queue for ID_ONE which isn't exist
-    repQueues.addLog(ID_ONE, "file1");
+    repQueues.addWAL(SERVER_ONE, ID_ONE, "file1");
 
     ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, 
null);
     Map<ServerName, List<String>> undeletedQueues = 
cleaner.getUnDeletedQueues();
@@ -84,7 +79,7 @@ public class TestReplicationZKNodeCleaner {
     assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE));
 
     // add a recovery queue for ID_TWO which isn't exist
-    repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2");
+    repQueues.addWAL(SERVER_ONE, ID_TWO + "-" + SERVER_TWO, "file2");
 
     undeletedQueues = cleaner.getUnDeletedQueues();
     assertEquals(1, undeletedQueues.size());
@@ -100,11 +95,10 @@ public class TestReplicationZKNodeCleaner {
 
   @Test
   public void testReplicationZKNodeCleanerChore() throws Exception {
-    repQueues.init(SERVER_ONE.getServerName());
     // add queue for ID_ONE which isn't exist
-    repQueues.addLog(ID_ONE, "file1");
+    repQueues.addWAL(SERVER_ONE, ID_ONE, "file1");
     // add a recovery queue for ID_TWO which isn't exist
-    repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2");
+    repQueues.addWAL(SERVER_ONE, ID_TWO + "-" + SERVER_TWO, "file2");
 
     // Wait the cleaner chore to run
     Thread.sleep(20000);

http://git-wip-us.apache.org/repos/asf/hbase/blob/11158715/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 7ea79f9..14c5e56 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -46,9 +45,10 @@ public class ReplicationSourceDummy implements 
ReplicationSourceInterface {
   MetricsSource metrics;
   WALFileLengthProvider walFileLengthProvider;
   AtomicBoolean startup = new AtomicBoolean(false);
+
   @Override
   public void init(Configuration conf, FileSystem fs, ReplicationSourceManager 
manager,
-      ReplicationQueues rq, ReplicationPeers rp, Server server, String 
peerClusterId,
+      ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String 
peerClusterId,
       UUID clusterId, ReplicationEndpoint replicationEndpoint,
       WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) 
throws IOException {
     this.manager = manager;

http://git-wip-us.apache.org/repos/asf/hbase/blob/11158715/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
index 0a602ad..0313b3b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -64,7 +64,6 @@ public class TestReplicationSyncUpTool extends 
TestReplicationBase {
 
   @Before
   public void setUp() throws Exception {
-
     HColumnDescriptor fam;
 
     t1_syncupSource = new HTableDescriptor(t1_su);
@@ -100,7 +99,7 @@ public class TestReplicationSyncUpTool extends 
TestReplicationBase {
    * check's gone Also check the puts and deletes are not replicated back to
    * the originating cluster.
    */
-  @Test(timeout = 300000)
+  @Test
   public void testSyncUpTool() throws Exception {
 
     /**
@@ -176,7 +175,6 @@ public class TestReplicationSyncUpTool extends 
TestReplicationBase {
      * verify correctly replicated to Slave
      */
     mimicSyncUpAfterPut();
-
   }
 
   protected void setupReplication() throws Exception {

http://git-wip-us.apache.org/repos/asf/hbase/blob/11158715/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index b5a7461..325012d 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -40,7 +40,6 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.stream.Collectors;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -67,10 +66,10 @@ import 
org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -95,11 +94,13 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
-import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
+
 /**
  * An abstract class that tests ReplicationSourceManager. Classes that extend 
this class should
  * set up the proper config for this class and initialize the proper cluster 
using
@@ -328,18 +329,14 @@ public abstract class TestReplicationSourceManager {
 
   @Test
   public void testClaimQueues() throws Exception {
-    final Server server = new DummyServer("hostname0.example.org");
-
-
-    ReplicationQueues rq =
-        ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(server.getConfiguration(), server,
-          server.getZooKeeper()));
-    rq.init(server.getServerName().toString());
+    Server server = new DummyServer("hostname0.example.org");
+    ReplicationQueueStorage rq = ReplicationStorageFactory
+        .getReplicationQueueStorage(server.getZooKeeper(), 
server.getConfiguration());
     // populate some znodes in the peer znode
     files.add("log1");
     files.add("log2");
     for (String file : files) {
-      rq.addLog("1", file);
+      rq.addWAL(server.getServerName(), "1", file);
     }
     // create 3 DummyServers
     Server s1 = new DummyServer("dummyserver1.example.org");
@@ -347,12 +344,9 @@ public abstract class TestReplicationSourceManager {
     Server s3 = new DummyServer("dummyserver3.example.org");
 
     // create 3 DummyNodeFailoverWorkers
-    DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(
-        server.getServerName().getServerName(), s1);
-    DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(
-        server.getServerName().getServerName(), s2);
-    DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(
-        server.getServerName().getServerName(), s3);
+    DummyNodeFailoverWorker w1 = new 
DummyNodeFailoverWorker(server.getServerName(), s1);
+    DummyNodeFailoverWorker w2 = new 
DummyNodeFailoverWorker(server.getServerName(), s2);
+    DummyNodeFailoverWorker w3 = new 
DummyNodeFailoverWorker(server.getServerName(), s3);
 
     latch = new CountDownLatch(3);
     // start the threads
@@ -371,11 +365,9 @@ public abstract class TestReplicationSourceManager {
 
   @Test
   public void testCleanupFailoverQueues() throws Exception {
-    final Server server = new DummyServer("hostname1.example.org");
-    ReplicationQueues rq =
-        ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(server.getConfiguration(), server,
-          server.getZooKeeper()));
-    rq.init(server.getServerName().toString());
+    Server server = new DummyServer("hostname1.example.org");
+    ReplicationQueueStorage rq = ReplicationStorageFactory
+        .getReplicationQueueStorage(server.getZooKeeper(), 
server.getConfiguration());
     // populate some znodes in the peer znode
     SortedSet<String> files = new TreeSet<>();
     String group = "testgroup";
@@ -384,19 +376,14 @@ public abstract class TestReplicationSourceManager {
     files.add(file1);
     files.add(file2);
     for (String file : files) {
-      rq.addLog("1", file);
+      rq.addWAL(server.getServerName(), "1", file);
     }
     Server s1 = new DummyServer("dummyserver1.example.org");
-    ReplicationQueues rq1 =
-        ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(s1.getConfiguration(), s1,
-            s1.getZooKeeper()));
-    rq1.init(s1.getServerName().toString());
     ReplicationPeers rp1 =
         ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), 
s1.getConfiguration(), s1);
     rp1.init();
     NodeFailoverWorker w1 =
-        manager.new NodeFailoverWorker(server.getServerName().getServerName(), 
rq1, rp1, new UUID(
-            new Long(1), new Long(2)));
+        manager.new NodeFailoverWorker(server.getServerName());
     w1.run();
     assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
     String id = "1-" + server.getServerName().getServerName();
@@ -408,17 +395,16 @@ public abstract class TestReplicationSourceManager {
 
   @Test
   public void testCleanupUnknownPeerZNode() throws Exception {
-    final Server server = new DummyServer("hostname2.example.org");
-    ReplicationQueues rq = ReplicationFactory.getReplicationQueues(
-      new ReplicationQueuesArguments(server.getConfiguration(), server, 
server.getZooKeeper()));
-    rq.init(server.getServerName().toString());
+    Server server = new DummyServer("hostname2.example.org");
+    ReplicationQueueStorage rq = ReplicationStorageFactory
+        .getReplicationQueueStorage(server.getZooKeeper(), 
server.getConfiguration());
     // populate some znodes in the peer znode
     // add log to an unknown peer
     String group = "testgroup";
-    rq.addLog("2", group + ".log1");
-    rq.addLog("2", group + ".log2");
+    rq.addWAL(server.getServerName(), "2", group + ".log1");
+    rq.addWAL(server.getServerName(), "2", group + ".log2");
 
-    NodeFailoverWorker w1 = manager.new 
NodeFailoverWorker(server.getServerName().getServerName());
+    NodeFailoverWorker w1 = manager.new 
NodeFailoverWorker(server.getServerName());
     w1.run();
 
     // The log of the unknown peer should be removed from zk
@@ -481,10 +467,8 @@ public abstract class TestReplicationSourceManager {
         .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + 
":/hbase");
     try {
       DummyServer server = new DummyServer();
-      final ReplicationQueues rq =
-          ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(
-              server.getConfiguration(), server, server.getZooKeeper()));
-      rq.init(server.getServerName().toString());
+      ReplicationQueueStorage rq = ReplicationStorageFactory
+          .getReplicationQueueStorage(server.getZooKeeper(), 
server.getConfiguration());
       // Purposely fail ReplicationSourceManager.addSource() by causing 
ReplicationSourceInterface
       // initialization to throw an exception.
       conf.set("replication.replicationsource.implementation",
@@ -498,11 +482,11 @@ public abstract class TestReplicationSourceManager {
       assertNull(manager.getSource(peerId));
 
       // Create a replication queue for the fake peer
-      rq.addLog(peerId, "FakeFile");
+      rq.addWAL(server.getServerName(), peerId, "FakeFile");
       // Unregister peer, this should remove the peer and clear all queues 
associated with it
       // Need to wait for the ReplicationTracker to pick up the changes and 
notify listeners.
       removePeerAndWait(peerId);
-      assertFalse(rq.getAllQueues().contains(peerId));
+      assertFalse(rq.getAllQueues(server.getServerName()).contains(peerId));
     } finally {
       conf.set("replication.replicationsource.implementation", 
replicationSourceImplName);
       removePeerAndWait(peerId);
@@ -625,11 +609,12 @@ public abstract class TestReplicationSourceManager {
       }
     }
     Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
-      @Override public boolean evaluate() throws Exception {
+      @Override
+      public boolean evaluate() throws Exception {
         List<String> peers = rp.getAllPeerIds();
-        return (!manager.getAllQueues().contains(peerId)) && 
(rp.getConnectedPeer(peerId) == null)
-            && (!peers.contains(peerId))
-            && manager.getSource(peerId) == null;
+        return (!manager.getAllQueues().contains(peerId)) &&
+          (rp.getConnectedPeer(peerId) == null) && (!peers.contains(peerId)) &&
+          manager.getSource(peerId) == null;
       }
     });
   }
@@ -672,25 +657,24 @@ public abstract class TestReplicationSourceManager {
   static class DummyNodeFailoverWorker extends Thread {
     private Map<String, Set<String>> logZnodesMap;
     Server server;
-    private String deadRsZnode;
-    ReplicationQueues rq;
+    private ServerName deadRS;
+    ReplicationQueueStorage rq;
 
-    public DummyNodeFailoverWorker(String znode, Server s) throws Exception {
-      this.deadRsZnode = znode;
+    public DummyNodeFailoverWorker(ServerName deadRS, Server s) throws 
Exception {
+      this.deadRS = deadRS;
       this.server = s;
-      this.rq =
-          ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(server.getConfiguration(), server,
-            server.getZooKeeper()));
-      this.rq.init(this.server.getServerName().toString());
+      this.rq = 
ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(),
+        server.getConfiguration());
     }
 
     @Override
     public void run() {
       try {
         logZnodesMap = new HashMap<>();
-        List<String> queues = rq.getUnClaimedQueueIds(deadRsZnode);
-        for(String queue:queues){
-          Pair<String, SortedSet<String>> pair = rq.claimQueue(deadRsZnode, 
queue);
+        List<String> queues = rq.getAllQueues(deadRS);
+        for (String queue : queues) {
+          Pair<String, SortedSet<String>> pair =
+              rq.claimQueue(deadRS, queue, server.getServerName());
           if (pair != null) {
             logZnodesMap.put(pair.getFirst(), pair.getSecond());
           }
@@ -729,7 +713,7 @@ public abstract class TestReplicationSourceManager {
 
     @Override
     public void init(Configuration conf, FileSystem fs, 
ReplicationSourceManager manager,
-        ReplicationQueues rq, ReplicationPeers rp, Server server, String 
peerClusterId,
+        ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String 
peerClusterId,
         UUID clusterId, ReplicationEndpoint replicationEndpoint,
         WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) 
throws IOException {
       throw new IOException("Failing deliberately");

http://git-wip-us.apache.org/repos/asf/hbase/blob/11158715/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
index aeab8b0..c6d9eef 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
@@ -25,11 +25,10 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.junit.BeforeClass;
@@ -41,8 +40,9 @@ import org.junit.experimental.categories.Category;
  * ReplicationQueuesClientZkImpl. Also includes extra tests outside of those in
  * TestReplicationSourceManager that test ReplicationQueueZkImpl-specific 
behaviors.
  */
-@Category({ReplicationTests.class, MediumTests.class})
+@Category({ ReplicationTests.class, MediumTests.class })
 public class TestReplicationSourceManagerZkImpl extends 
TestReplicationSourceManager {
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     conf = HBaseConfiguration.create();
@@ -58,16 +58,14 @@ public class TestReplicationSourceManagerZkImpl extends 
TestReplicationSourceMan
   // Tests the naming convention of adopted queues for ReplicationQueuesZkImpl
   @Test
   public void testNodeFailoverDeadServerParsing() throws Exception {
-    final Server server = new 
DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
-    ReplicationQueues repQueues =
-      ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, server,
-        server.getZooKeeper()));
-    repQueues.init(server.getServerName().toString());
+    Server server = new 
DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
+    ReplicationQueueStorage queueStorage =
+        
ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), 
conf);
     // populate some znodes in the peer znode
     files.add("log1");
     files.add("log2");
     for (String file : files) {
-      repQueues.addLog("1", file);
+      queueStorage.addWAL(server.getServerName(), "1", file);
     }
 
     // create 3 DummyServers
@@ -76,30 +74,22 @@ public class TestReplicationSourceManagerZkImpl extends 
TestReplicationSourceMan
     Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
 
     // simulate three servers fail sequentially
-    ReplicationQueues rq1 =
-      ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(s1.getConfiguration(), s1,
-        s1.getZooKeeper()));
-    rq1.init(s1.getServerName().toString());
-    String serverName = server.getServerName().getServerName();
-    List<String> unclaimed = rq1.getUnClaimedQueueIds(serverName);
-    rq1.claimQueue(serverName, unclaimed.get(0)).getSecond();
-    rq1.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
-    ReplicationQueues rq2 =
-      ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(s2.getConfiguration(), s2,
-        s2.getZooKeeper()));
-    rq2.init(s2.getServerName().toString());
-    serverName = s1.getServerName().getServerName();
-    unclaimed = rq2.getUnClaimedQueueIds(serverName);
-    rq2.claimQueue(serverName, unclaimed.get(0)).getSecond();
-    rq2.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
-    ReplicationQueues rq3 =
-      ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(s3.getConfiguration(), s3,
-        s3.getZooKeeper()));
-    rq3.init(s3.getServerName().toString());
-    serverName = s2.getServerName().getServerName();
-    unclaimed = rq3.getUnClaimedQueueIds(serverName);
-    String queue3 = rq3.claimQueue(serverName, unclaimed.get(0)).getFirst();
-    rq3.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
+    ServerName serverName = server.getServerName();
+    List<String> unclaimed = queueStorage.getAllQueues(serverName);
+    queueStorage.claimQueue(serverName, unclaimed.get(0), s1.getServerName());
+    queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
+
+    serverName = s1.getServerName();
+    unclaimed = queueStorage.getAllQueues(serverName);
+    queueStorage.claimQueue(serverName, unclaimed.get(0), s2.getServerName());
+    queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
+
+    serverName = s2.getServerName();
+    unclaimed = queueStorage.getAllQueues(serverName);
+    String queue3 =
+        queueStorage.claimQueue(serverName, unclaimed.get(0), 
s3.getServerName()).getFirst();
+    queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
+
     ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(queue3);
     List<ServerName> result = replicationQueueInfo.getDeadRegionServers();
     // verify

Reply via email to