Repository: hbase
Updated Branches:
  refs/heads/master b006e41a3 -> 152594560


HBASE-16036 Made Replication Table creation non-blocking.

All ReplicationTableBase method's that need to access the Replication Table 
will block until it is created though.
Also refactored ReplicationSourceManager so that abandoned queue adoption is 
run in the background too so that it does not block HRegionServer 
initialization.

Signed-off-by: Elliott Clark <ecl...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/15259456
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/15259456
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/15259456

Branch: refs/heads/master
Commit: 152594560e29549642587b850320f5d66339b747
Parents: b006e41
Author: Joseph Hwang <j...@fb.com>
Authored: Wed Jun 15 14:35:56 2016 -0700
Committer: Elliott Clark <ecl...@apache.org>
Committed: Tue Jun 21 13:05:50 2016 -0700

----------------------------------------------------------------------
 .../replication/ReplicationQueuesArguments.java |   4 +
 .../ReplicationQueuesClientArguments.java       |   5 +
 .../hbase/replication/ReplicationTableBase.java | 186 ++++++++++++++-----
 .../TableBasedReplicationQueuesClientImpl.java  |   3 +-
 .../TableBasedReplicationQueuesImpl.java        | 106 +++++------
 .../regionserver/ReplicationSourceManager.java  |  43 +++--
 .../TestReplicationStateHBaseImpl.java          |   2 +-
 .../replication/TestReplicationTableBase.java   | 109 +++++++++++
 .../TestReplicationSourceManager.java           | 160 ++++------------
 .../TestReplicationSourceManagerZkImpl.java     | 152 +++++++++++++++
 ...tTableBasedReplicationSourceManagerImpl.java |  60 ++++++
 11 files changed, 579 insertions(+), 251 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
index 4fdc4e7..12fc6a1 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
@@ -23,6 +23,10 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
+/**
+ * Wrapper around common arguments used to construct ReplicationQueues. Used 
to construct various
+ * ReplicationQueues Implementations with different constructor arguments by 
reflection.
+ */
 @InterfaceAudience.Private
 public class ReplicationQueuesArguments {
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
index 8a61993..834f831 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
@@ -23,6 +23,11 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
+/**
+ * Wrapper around common arguments used to construct ReplicationQueuesClient. 
Used to construct
+ * various ReplicationQueuesClient Implementations with different constructor 
arguments by
+ * reflection.
+ */
 @InterfaceAudience.Private
 public class ReplicationQueuesClientArguments extends 
ReplicationQueuesArguments {
   public ReplicationQueuesClientArguments(Configuration conf, Abortable abort,

http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
index c1506cd..61bb041 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
@@ -18,12 +18,14 @@
 */
 package org.apache.hadoop.hbase.replication;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Admin;
@@ -42,12 +44,18 @@ import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.RetryCounterFactory;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /*
  * Abstract class that provides an interface to the Replication Table. Which 
is currently
@@ -59,8 +67,10 @@ import java.util.Set;
  *  COL_QUEUE_OWNER_HISTORY: a "|" delimited list of the previous server's 
that have owned this
  *    queue. The most recent previous owner is the leftmost entry.
  * They will also have columns mapping [WAL filename : offset]
+ * The most flexible method of interacting with the Replication Table is by 
calling
+ * getOrBlockOnReplicationTable() which will return a new copy of the 
Replication Table. It is up
+ * to the caller to close the returned table.
  */
-
 @InterfaceAudience.Private
 abstract class ReplicationTableBase {
 
@@ -99,20 +109,23 @@ abstract class ReplicationTableBase {
   private static final int RPC_TIMEOUT = 2000;
   private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT;
 
-  protected final Table replicationTable;
+  // We only need a single thread to initialize the Replication Table
+  private static final int NUM_INITIALIZE_WORKERS = 1;
+
   protected final Configuration conf;
   protected final Abortable abortable;
-  private final Admin admin;
   private final Connection connection;
+  private final Executor executor;
+  private volatile CountDownLatch replicationTableInitialized;
 
   public ReplicationTableBase(Configuration conf, Abortable abort) throws 
IOException {
     this.conf = new Configuration(conf);
     this.abortable = abort;
     decorateConf();
     this.connection = ConnectionFactory.createConnection(this.conf);
-    this.admin = connection.getAdmin();
-    this.replicationTable = createAndGetReplicationTable();
-    setTableTimeOuts();
+    this.executor = setUpExecutor();
+    this.replicationTableInitialized = new CountDownLatch(1);
+    createReplicationTableInBackground();
   }
 
   /**
@@ -124,11 +137,34 @@ abstract class ReplicationTableBase {
   }
 
   /**
+   * Sets up the thread pool executor used to build the Replication Table in 
the background
+   * @return the configured executor
+   */
+  private Executor setUpExecutor() {
+    ThreadPoolExecutor tempExecutor = new 
ThreadPoolExecutor(NUM_INITIALIZE_WORKERS,
+        NUM_INITIALIZE_WORKERS, 100, TimeUnit.MILLISECONDS, new 
LinkedBlockingQueue<Runnable>());
+    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
+    tfb.setNameFormat("ReplicationTableExecutor-%d");
+    tfb.setDaemon(true);
+    tempExecutor.setThreadFactory(tfb.build());
+    return tempExecutor;
+  }
+
+  /**
+   * Get whether the Replication Table has been successfully initialized yet
+   * @return whether the Replication Table is initialized
+   */
+  public boolean getInitializationStatus() {
+    return replicationTableInitialized.getCount() == 0;
+  }
+
+  /**
    * Increases the RPC and operations timeouts for the Replication Table
    */
-  private void setTableTimeOuts() {
+  private Table setReplicationTableTimeOuts(Table replicationTable) {
     replicationTable.setRpcTimeout(RPC_TIMEOUT);
     replicationTable.setOperationTimeout(OPERATION_TIMEOUT);
+    return replicationTable;
   }
 
   /**
@@ -189,7 +225,7 @@ abstract class ReplicationTableBase {
     // scan all of the queues and return a list of all unique OWNER values
     Set<String> peerServers = new HashSet<String>();
     ResultScanner allQueuesInCluster = null;
-    try {
+    try (Table replicationTable = getOrBlockOnReplicationTable()){
       Scan scan = new Scan();
       scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
       allQueuesInCluster = replicationTable.getScanner(scan);
@@ -244,7 +280,7 @@ abstract class ReplicationTableBase {
 
   protected List<String> getLogsInQueue(byte[] rowKey) {
     String errMsg = "Failed getting logs in queue queueId=" + 
Bytes.toString(rowKey);
-    try {
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
       Get getQueue = new Get(rowKey);
       Result queue = replicationTable.get(getQueue);
       if (queue == null || queue.isEmpty()) {
@@ -286,66 +322,120 @@ abstract class ReplicationTableBase {
    * @return a ResultScanner over the QueueIds belonging to the server
    * @throws IOException
    */
-  private ResultScanner getQueuesBelongingToServer(String server) throws 
IOException {
+  protected ResultScanner getQueuesBelongingToServer(String server) throws 
IOException {
     Scan scan = new Scan();
     SingleColumnValueFilter filterMyQueues = new 
SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER,
       CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server));
     scan.setFilter(filterMyQueues);
     scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
     scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY);
-    ResultScanner results = replicationTable.getScanner(scan);
-    return results;
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
+      ResultScanner results = replicationTable.getScanner(scan);
+      return results;
+    }
   }
 
   /**
-   * Gets the Replication Table. Builds and blocks until the table is 
available if the Replication
-   * Table does not exist.
-   *
-   * @return the Replication Table
-   * @throws IOException if the Replication Table takes too long to build
+   * Attempts to acquire the Replication Table. This operation will block 
until it is assigned by
+   * the CreateReplicationWorker thread. It is up to the caller of this method 
to close the
+   * returned Table
+   * @return the Replication Table when it is created
+   * @throws IOException
    */
-  private Table createAndGetReplicationTable() throws IOException {
-    if (!replicationTableExists()) {
-      createReplicationTable();
-    }
-    int maxRetries = 
conf.getInt("replication.queues.createtable.retries.number", 100);
-    RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, 
100);
-    RetryCounter retryCounter = counterFactory.create();
-    while (!replicationTableExists()) {
-      try {
-        retryCounter.sleepUntilNextRetry();
-        if (!retryCounter.shouldRetry()) {
-          throw new IOException("Unable to acquire the Replication Table");
-        }
-      } catch (InterruptedException e) {
-        return null;
-      }
+  protected Table getOrBlockOnReplicationTable() throws IOException {
+    // Sleep until the Replication Table becomes available
+    try {
+      replicationTableInitialized.await();
+    } catch (InterruptedException e) {
+      String errMsg = "Unable to acquire the Replication Table due to 
InterruptedException: " +
+          e.getMessage();
+      throw new InterruptedIOException(errMsg);
     }
-    return connection.getTable(REPLICATION_TABLE_NAME);
+    return getAndSetUpReplicationTable();
   }
 
   /**
-   * Create the replication table with the provided HColumnDescriptor 
REPLICATION_COL_DESCRIPTOR
-   * in TableBasedReplicationQueuesImpl
+   * Creates a new copy of the Replication Table and sets up the proper Table 
time outs for it
+   *
+   * @return the Replication Table
    * @throws IOException
    */
-  private void createReplicationTable() throws IOException {
-    HTableDescriptor replicationTableDescriptor = new 
HTableDescriptor(REPLICATION_TABLE_NAME);
-    replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR);
-    admin.createTable(replicationTableDescriptor);
+  private Table getAndSetUpReplicationTable() throws IOException {
+    Table replicationTable = connection.getTable(REPLICATION_TABLE_NAME);
+    setReplicationTableTimeOuts(replicationTable);
+    return replicationTable;
   }
 
   /**
-   * Checks whether the Replication Table exists yet
+   * Builds the Replication Table in a background thread. Any method accessing 
the Replication Table
+   * should do so through getOrBlockOnReplicationTable()
    *
-   * @return whether the Replication Table exists
-   * @throws IOException
+   * @return the Replication Table
+   * @throws IOException if the Replication Table takes too long to build
    */
-  private boolean replicationTableExists() {
-    try {
-      return admin.tableExists(REPLICATION_TABLE_NAME);
-    } catch (IOException e) {
-      return false;
+  private void createReplicationTableInBackground() throws IOException {
+    executor.execute(new CreateReplicationTableWorker());
+  }
+
+  /**
+   * Attempts to build the Replication Table. Will continue blocking until we 
have a valid
+   * Table for the Replication Table.
+   */
+  private class CreateReplicationTableWorker implements Runnable {
+
+    private Admin admin;
+
+    @Override
+    public void run() {
+      try {
+        admin = connection.getAdmin();
+        if (!replicationTableExists()) {
+          createReplicationTable();
+        }
+        int maxRetries = 
conf.getInt("hbase.replication.queues.createtable.retries.number",
+            CLIENT_RETRIES);
+        RetryCounterFactory counterFactory = new 
RetryCounterFactory(maxRetries, RPC_TIMEOUT);
+        RetryCounter retryCounter = counterFactory.create();
+        while (!replicationTableExists()) {
+          retryCounter.sleepUntilNextRetry();
+          if (!retryCounter.shouldRetry()) {
+            throw new IOException("Unable to acquire the Replication Table");
+          }
+        }
+        replicationTableInitialized.countDown();
+      } catch (IOException | InterruptedException e) {
+        abortable.abort("Failed building Replication Table", e);
+      }
+    }
+
+    /**
+     * Create the replication table with the provided HColumnDescriptor 
REPLICATION_COL_DESCRIPTOR
+     * in TableBasedReplicationQueuesImpl
+     *
+     * @throws IOException
+     */
+    private void createReplicationTable() throws IOException {
+      HTableDescriptor replicationTableDescriptor = new 
HTableDescriptor(REPLICATION_TABLE_NAME);
+      replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR);
+      try {
+        admin.createTable(replicationTableDescriptor);
+      } catch (TableExistsException e) {
+        // In this case we can just continue as normal
+      }
+    }
+
+    /**
+     * Checks whether the Replication Table exists yet
+     *
+     * @return whether the Replication Table exists
+     * @throws IOException
+     */
+    private boolean replicationTableExists() {
+      try {
+        return admin.tableExists(REPLICATION_TABLE_NAME);
+      } catch (IOException e) {
+        return false;
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
index 55dfdd8..dcbed7a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
@@ -25,6 +25,7 @@ import 
org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.zookeeper.KeeperException;
 
 import java.io.IOException;
@@ -73,7 +74,7 @@ public class TableBasedReplicationQueuesClientImpl extends 
ReplicationTableBase
   public Set<String> getAllWALs() {
     Set<String> allWals = new HashSet<String>();
     ResultScanner allQueues = null;
-    try {
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
       allQueues = replicationTable.getScanner(new Scan());
       for (Result queue : allQueues) {
         for (String wal : readWALsFromResult(queue)) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
index 6ea7801..28fa967 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -105,7 +106,7 @@ public class TableBasedReplicationQueuesImpl extends 
ReplicationTableBase
 
   @Override
   public void addLog(String queueId, String filename) throws 
ReplicationException {
-    try {
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
       if (!checkQueueExists(queueId)) {
         // Each queue will have an Owner, OwnerHistory, and a collection of 
[WAL:offset] key values
         Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId)));
@@ -140,7 +141,7 @@ public class TableBasedReplicationQueuesImpl extends 
ReplicationTableBase
 
   @Override
   public void setLogPosition(String queueId, String filename, long position) {
-    try {
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
       byte[] rowKey = queueIdToRowKey(queueId);
       // Check that the log exists. addLog() must have been called before 
setLogPosition().
       Get checkLogExists = new Get(rowKey);
@@ -190,8 +191,31 @@ public class TableBasedReplicationQueuesImpl extends 
ReplicationTableBase
 
   @Override
   public List<String> getLogsInQueue(String queueId) {
+    String errMsg = "Failed getting logs in queue queueId=" + queueId;
     byte[] rowKey = queueIdToRowKey(queueId);
-    return getLogsInQueueAndCheckOwnership(rowKey);
+    List<String> logs = new ArrayList<String>();
+    try {
+      Get getQueue = new Get(rowKey);
+      Result queue = getResultIfOwner(getQueue);
+      if (queue == null || queue.isEmpty()) {
+        String errMsgLostOwnership = "Failed getting logs for queue queueId=" +
+            Bytes.toString(rowKey) + " because the queue was missing or we 
lost ownership";
+        abortable.abort(errMsg, new ReplicationException(errMsgLostOwnership));
+        return null;
+      }
+      Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE);
+      for(byte[] cQualifier : familyMap.keySet()) {
+        if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || 
Arrays.equals(cQualifier,
+            COL_QUEUE_OWNER_HISTORY)) {
+          continue;
+        }
+        logs.add(Bytes.toString(cQualifier));
+      }
+    } catch (IOException e) {
+      abortable.abort(errMsg, e);
+      return null;
+    }
+    return logs;
   }
 
   @Override
@@ -207,7 +231,7 @@ public class TableBasedReplicationQueuesImpl extends 
ReplicationTableBase
     }
     ResultScanner queuesToClaim = null;
     try {
-      queuesToClaim = getAllQueuesScanner(regionserver);
+      queuesToClaim = getQueuesBelongingToServer(regionserver);
       for (Result queue : queuesToClaim) {
         if (attemptToClaimQueue(queue, regionserver)) {
           String rowKey = Bytes.toString(queue.getRow());
@@ -240,24 +264,6 @@ public class TableBasedReplicationQueuesImpl extends 
ReplicationTableBase
     return queues;
   }
 
-  /**
-   * Get the QueueIds belonging to the named server from the 
ReplicationTableBase
-   *
-   * @param server name of the server
-   * @return a ResultScanner over the QueueIds belonging to the server
-   * @throws IOException
-   */
-  private ResultScanner getAllQueuesScanner(String server) throws IOException {
-    Scan scan = new Scan();
-    SingleColumnValueFilter filterMyQueues = new 
SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER,
-      CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server));
-    scan.setFilter(filterMyQueues);
-    scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
-    scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY);
-    ResultScanner results = replicationTable.getScanner(scan);
-    return results;
-  }
-
   @Override
   public boolean isThisOurRegionServer(String regionserver) {
     return this.serverName.equals(regionserver);
@@ -287,33 +293,6 @@ public class TableBasedReplicationQueuesImpl extends 
ReplicationTableBase
     throw new NotImplementedException();
   }
 
-  private List<String> getLogsInQueueAndCheckOwnership(byte[] rowKey) {
-    String errMsg = "Failed getting logs in queue queueId=" + 
Bytes.toString(rowKey);
-    List<String> logs = new ArrayList<String>();
-    try {
-      Get getQueue = new Get(rowKey);
-      Result queue = getResultIfOwner(getQueue);
-      if (queue == null || queue.isEmpty()) {
-        String errMsgLostOwnership = "Failed getting logs for queue queueId=" +
-          Bytes.toString(rowKey) + " because the queue was missing or we lost 
ownership";
-        abortable.abort(errMsg, new ReplicationException(errMsgLostOwnership));
-        return null;
-      }
-      Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE);
-      for(byte[] cQualifier : familyMap.keySet()) {
-        if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || 
Arrays.equals(cQualifier,
-            COL_QUEUE_OWNER_HISTORY)) {
-          continue;
-        }
-        logs.add(Bytes.toString(cQualifier));
-      }
-    } catch (IOException e) {
-      abortable.abort(errMsg, e);
-      return null;
-    }
-    return logs;
-  }
-
   private String buildQueueRowKey(String queueId) {
     return buildQueueRowKey(serverName, queueId);
   }
@@ -358,11 +337,13 @@ public class TableBasedReplicationQueuesImpl extends 
ReplicationTableBase
    * @param mutate Mutation to perform on a given queue
    */
   private void safeQueueUpdate(RowMutations mutate) throws 
ReplicationException, IOException{
-    boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), 
CF_QUEUE,
-        COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, serverNameBytes, 
mutate);
-    if (!updateSuccess) {
-      throw new ReplicationException("Failed to update Replication Table 
because we lost queue " +
-        " ownership");
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
+      boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(),
+          CF_QUEUE, COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, 
serverNameBytes, mutate);
+      if (!updateSuccess) {
+        throw new ReplicationException("Failed to update Replication Table 
because we lost queue " +
+            " ownership");
+      }
     }
   }
 
@@ -374,8 +355,10 @@ public class TableBasedReplicationQueuesImpl extends 
ReplicationTableBase
    * @throws IOException
    */
   private boolean checkQueueExists(String queueId) throws IOException {
-    byte[] rowKey = queueIdToRowKey(queueId);
-    return replicationTable.exists(new Get(rowKey));
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
+      byte[] rowKey = queueIdToRowKey(queueId);
+      return replicationTable.exists(new Get(rowKey));
+    }
   }
 
   /**
@@ -399,9 +382,12 @@ public class TableBasedReplicationQueuesImpl extends 
ReplicationTableBase
     // Attempt to claim ownership for this queue by checking if the current 
OWNER is the original
     // server. If it is not then another RS has already claimed it. If it is 
we set ourselves as the
     // new owner and update the queue's history
-    boolean success = replicationTable.checkAndMutate(queue.getRow(), 
CF_QUEUE, COL_QUEUE_OWNER,
-      CompareFilter.CompareOp.EQUAL, Bytes.toBytes(originalServer), 
claimAndRenameQueue);
-    return success;
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
+      boolean success = replicationTable.checkAndMutate(queue.getRow(),
+          CF_QUEUE, COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, 
Bytes.toBytes(originalServer),
+          claimAndRenameQueue);
+      return success;
+    }
   }
 
   /**
@@ -424,7 +410,7 @@ public class TableBasedReplicationQueuesImpl extends 
ReplicationTableBase
       CompareFilter.CompareOp.EQUAL, serverNameBytes);
     scan.setFilter(checkOwner);
     ResultScanner scanner = null;
-    try {
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
       scanner = replicationTable.getScanner(scan);
       Result result = scanner.next();
       return (result == null || result.isEmpty()) ? null : result;

http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 7532c64..07ee46a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -238,19 +238,11 @@ public class ReplicationSourceManager implements 
ReplicationListener {
         this.replicationQueues.addPeerToHFileRefs(id);
       }
     }
-    List<String> currentReplicators = 
this.replicationQueues.getListOfReplicators();
-    if (currentReplicators == null || currentReplicators.size() == 0) {
-      return;
-    }
-    List<String> otherRegionServers = 
replicationTracker.getListOfRegionServers();
-    LOG.info("Current list of replicators: " + currentReplicators + " other 
RSs: "
-        + otherRegionServers);
-
-    // Look if there's anything to process after a restart
-    for (String rs : currentReplicators) {
-      if (!otherRegionServers.contains(rs)) {
-        transferQueues(rs);
-      }
+    AdoptAbandonedQueuesWorker adoptionWorker = new 
AdoptAbandonedQueuesWorker();
+    try {
+      this.executor.execute(adoptionWorker);
+    } catch (RejectedExecutionException ex) {
+      LOG.info("Cancelling the adoption of abandoned queues because of " + 
ex.getMessage());
     }
   }
 
@@ -705,6 +697,31 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     }
   }
 
+  class AdoptAbandonedQueuesWorker extends Thread{
+
+    public AdoptAbandonedQueuesWorker() {}
+
+    @Override
+    public void run() {
+      List<String> currentReplicators = 
replicationQueues.getListOfReplicators();
+      if (currentReplicators == null || currentReplicators.size() == 0) {
+        return;
+      }
+      List<String> otherRegionServers = 
replicationTracker.getListOfRegionServers();
+      LOG.info("Current list of replicators: " + currentReplicators + " other 
RSs: "
+        + otherRegionServers);
+
+      // Look if there's anything to process after a restart
+      for (String rs : currentReplicators) {
+        if (!otherRegionServers.contains(rs)) {
+          transferQueues(rs);
+        }
+      }
+    }
+  }
+
+
+
   /**
    * Get the directory where wals are archived
    * @return the directory where wals are archived

http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
index 3a9a5a5..25f30d8 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
@@ -80,12 +80,12 @@ public class TestReplicationStateHBaseImpl {
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     utility = new HBaseTestingUtility();
-    utility.startMiniCluster();
     conf = utility.getConfiguration();
     conf.setClass("hbase.region.replica.replication.ReplicationQueuesType",
       TableBasedReplicationQueuesImpl.class, ReplicationQueues.class);
     
conf.setClass("hbase.region.replica.replication.ReplicationQueuesClientType",
       TableBasedReplicationQueuesClientImpl.class, 
ReplicationQueuesClient.class);
+    utility.startMiniCluster();
     zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
     String replicationZNodeName = conf.get("zookeeper.znode.replication", 
"replication");
     replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);

http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java
new file mode 100644
index 0000000..aa5cfed
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java
@@ -0,0 +1,109 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests ReplicationTableBase behavior when the Master startup is delayed. The 
table initialization
+ * should be non-blocking, but any method calls that access the table should 
be blocking.
+ */
+@Category({ReplicationTests.class, MediumTests.class})
+public class TestReplicationTableBase {
+
+  private static long SLEEP_MILLIS = 5000;
+  private static long TIME_OUT_MILLIS = 3000;
+  private static Configuration conf;
+  private static HBaseTestingUtility utility;
+  private static ZooKeeperWatcher zkw;
+  private static ReplicationTableBase rb;
+  private static ReplicationQueues rq;
+  private static ReplicationQueuesClient rqc;
+  private volatile boolean asyncRequestSuccess = false;
+
+  @Test
+  public void testSlowStartup() throws Exception{
+    utility = new HBaseTestingUtility();
+    utility.startMiniZKCluster();
+    conf = utility.getConfiguration();
+    conf.setClass("hbase.region.replica.replication.ReplicationQueuesType",
+      TableBasedReplicationQueuesImpl.class, ReplicationQueues.class);
+    
conf.setClass("hbase.region.replica.replication.ReplicationQueuesClientType",
+      TableBasedReplicationQueuesClientImpl.class, 
ReplicationQueuesClient.class);
+    zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
+    utility.waitFor(0, TIME_OUT_MILLIS, new 
Waiter.ExplainingPredicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        rb = new ReplicationTableBase(conf, zkw) {};
+        rq = ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(
+          conf, zkw, zkw));
+        rqc = ReplicationFactory.getReplicationQueuesClient(
+          new ReplicationQueuesClientArguments(conf, zkw, zkw));
+        return true;
+      }
+      @Override
+      public String explainFailure() throws Exception {
+        return "Failed to initialize ReplicationTableBase, 
TableBasedReplicationQueuesClient and " +
+          "TableBasedReplicationQueues after a timeout=" + TIME_OUT_MILLIS +
+          " ms. Their initialization " + "should be non-blocking";
+      }
+    });
+    final RequestReplicationQueueData async = new 
RequestReplicationQueueData();
+    async.start();
+    Thread.sleep(SLEEP_MILLIS);
+    // Test that the Replication Table has not been assigned and the methods 
are blocking
+    assertFalse(rb.getInitializationStatus());
+    assertFalse(asyncRequestSuccess);
+    utility.startMiniCluster();
+    // Test that the methods do return the correct results after getting the 
table
+    utility.waitFor(0, TIME_OUT_MILLIS, new 
Waiter.ExplainingPredicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        async.join();
+        return true;
+      }
+      @Override
+      public String explainFailure() throws Exception {
+        return "ReplicationQueue failed to return list of replicators even 
after Replication Table "
+          + "was initialized timeout=" + TIME_OUT_MILLIS + " ms";
+      }
+    });
+    assertTrue(asyncRequestSuccess);
+  }
+
+  public class RequestReplicationQueueData extends Thread {
+    @Override
+    public void run() {
+      assertEquals(0, rq.getListOfReplicators().size());
+      asyncRequestSuccess = true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index bf47d4f..4b278bb 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -64,13 +65,8 @@ import 
org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
-import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
-import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -88,69 +84,63 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import com.google.common.collect.Sets;
 
+/**
+ * An abstract class that tests ReplicationSourceManager. Classes that extend 
this class should
+ * set up the proper config for this class and initialize the proper cluster 
using
+ * HBaseTestingUtility.
+ */
 @Category({ReplicationTests.class, MediumTests.class})
-public class TestReplicationSourceManager {
+public abstract class TestReplicationSourceManager {
 
-  private static final Log LOG =
+  protected static final Log LOG =
       LogFactory.getLog(TestReplicationSourceManager.class);
 
-  private static Configuration conf;
+  protected static Configuration conf;
 
-  private static HBaseTestingUtility utility;
+  protected static HBaseTestingUtility utility;
 
-  private static Replication replication;
+  protected static Replication replication;
 
-  private static ReplicationSourceManager manager;
+  protected static ReplicationSourceManager manager;
 
-  private static ZooKeeperWatcher zkw;
+  protected static ZooKeeperWatcher zkw;
 
-  private static HTableDescriptor htd;
+  protected static HTableDescriptor htd;
 
-  private static HRegionInfo hri;
+  protected static HRegionInfo hri;
 
-  private static final byte[] r1 = Bytes.toBytes("r1");
+  protected static final byte[] r1 = Bytes.toBytes("r1");
 
-  private static final byte[] r2 = Bytes.toBytes("r2");
+  protected static final byte[] r2 = Bytes.toBytes("r2");
 
-  private static final byte[] f1 = Bytes.toBytes("f1");
+  protected static final byte[] f1 = Bytes.toBytes("f1");
 
-  private static final byte[] f2 = Bytes.toBytes("f2");
+  protected static final byte[] f2 = Bytes.toBytes("f2");
 
-  private static final TableName test =
+  protected static final TableName test =
       TableName.valueOf("test");
 
-  private static final String slaveId = "1";
-
-  private static FileSystem fs;
-
-  private static Path oldLogDir;
+  protected static final String slaveId = "1";
 
-  private static Path logDir;
+  protected static FileSystem fs;
 
-  private static CountDownLatch latch;
+  protected static Path oldLogDir;
 
-  private static List<String> files = new ArrayList<String>();
-  private static NavigableMap<byte[], Integer> scopes;
+  protected static Path logDir;
 
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
+  protected static CountDownLatch latch;
 
-    conf = HBaseConfiguration.create();
-    conf.set("replication.replicationsource.implementation",
-        ReplicationSourceDummy.class.getCanonicalName());
-    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
-        HConstants.REPLICATION_ENABLE_DEFAULT);
-    conf.setLong("replication.sleep.before.failover", 2000);
-    conf.setInt("replication.source.maxretriesmultiplier", 10);
-    utility = new HBaseTestingUtility(conf);
-    utility.startMiniZKCluster();
+  protected static List<String> files = new ArrayList<String>();
+  protected static NavigableMap<byte[], Integer> scopes;
 
+  protected static void setupZkAndReplication() throws Exception {
+    // The implementing class should set up the conf
+    assertNotNull(conf);
     zkw = new ZooKeeperWatcher(conf, "test", null);
     ZKUtil.createWithParents(zkw, "/hbase/replication");
     ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
@@ -347,7 +337,7 @@ public class TestReplicationSourceManager {
     Server s1 = new DummyServer("dummyserver1.example.org");
     ReplicationQueues rq1 =
         ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(s1.getConfiguration(), s1,
-          s1.getZooKeeper()));
+            s1.getZooKeeper()));
     rq1.init(s1.getServerName().toString());
     ReplicationPeers rp1 =
         ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), 
s1.getConfiguration(), s1);
@@ -356,7 +346,7 @@ public class TestReplicationSourceManager {
         manager.new NodeFailoverWorker(server.getServerName().getServerName(), 
rq1, rp1, new UUID(
             new Long(1), new Long(2)));
     w1.start();
-    w1.join(5000);
+    w1.join(10000);
     assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
     String id = "1-" + server.getServerName().getServerName();
     assertEquals(files, 
manager.getWalsByIdRecoveredQueues().get(id).get(group));
@@ -366,92 +356,6 @@ public class TestReplicationSourceManager {
   }
 
   @Test
-  public void testNodeFailoverDeadServerParsing() throws Exception {
-    LOG.debug("testNodeFailoverDeadServerParsing");
-    conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
-    final Server server = new 
DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
-    ReplicationQueues repQueues =
-        ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, server,
-          server.getZooKeeper()));
-    repQueues.init(server.getServerName().toString());
-    // populate some znodes in the peer znode
-    files.add("log1");
-    files.add("log2");
-    for (String file : files) {
-      repQueues.addLog("1", file);
-    }
-
-    // create 3 DummyServers
-    Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal");
-    Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com");
-    Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
-
-    // simulate three servers fail sequentially
-    ReplicationQueues rq1 =
-        ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(s1.getConfiguration(), s1,
-          s1.getZooKeeper()));
-    rq1.init(s1.getServerName().toString());
-    Map<String, Set<String>> testMap =
-        rq1.claimQueues(server.getServerName().getServerName());
-    ReplicationQueues rq2 =
-        ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(s2.getConfiguration(), s2,
-          s2.getZooKeeper()));
-    rq2.init(s2.getServerName().toString());
-    testMap = rq2.claimQueues(s1.getServerName().getServerName());
-    ReplicationQueues rq3 =
-        ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(s3.getConfiguration(), s3,
-          s3.getZooKeeper()));
-    rq3.init(s3.getServerName().toString());
-    testMap = rq3.claimQueues(s2.getServerName().getServerName());
-
-    ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(testMap.keySet().iterator().next());
-    List<String> result = replicationQueueInfo.getDeadRegionServers();
-
-    // verify
-    assertTrue(result.contains(server.getServerName().getServerName()));
-    assertTrue(result.contains(s1.getServerName().getServerName()));
-    assertTrue(result.contains(s2.getServerName().getServerName()));
-
-    server.abort("", null);
-  }
-
-  @Test
-  public void testFailoverDeadServerCversionChange() throws Exception {
-    LOG.debug("testFailoverDeadServerCversionChange");
-
-    conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
-    final Server s0 = new DummyServer("cversion-change0.example.org");
-    ReplicationQueues repQueues =
-        ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, s0,
-          s0.getZooKeeper()));
-    repQueues.init(s0.getServerName().toString());
-    // populate some znodes in the peer znode
-    files.add("log1");
-    files.add("log2");
-    for (String file : files) {
-      repQueues.addLog("1", file);
-    }
-    // simulate queue transfer
-    Server s1 = new DummyServer("cversion-change1.example.org");
-    ReplicationQueues rq1 =
-        ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(s1.getConfiguration(), s1,
-          s1.getZooKeeper()));
-    rq1.init(s1.getServerName().toString());
-
-    ReplicationQueuesClientZKImpl client =
-        
(ReplicationQueuesClientZKImpl)ReplicationFactory.getReplicationQueuesClient(
-        new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, 
s1.getZooKeeper()));
-
-    int v0 = client.getQueuesZNodeCversion();
-    rq1.claimQueues(s0.getServerName().getServerName());
-    int v1 = client.getQueuesZNodeCversion();
-    // cversion should increased by 1 since a child node is deleted
-    assertEquals(v0 + 1, v1);
-
-    s0.abort("", null);
-  }
-
-  @Test
   public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws 
Exception {
     NavigableMap<byte[], Integer> scope = new TreeMap<byte[], 
Integer>(Bytes.BYTES_COMPARATOR);
     // 1. Get the bulk load wal edit event

http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
new file mode 100644
index 0000000..72042b1
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
@@ -0,0 +1,152 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the ReplicationSourceManager with ReplicationQueueZkImpl's and
+ * ReplicationQueuesClientZkImpl. Also includes extra tests outside of those in
+ * TestReplicationSourceManager that test ReplicationQueueZkImpl-specific 
behaviors.
+ */
+@Category({ReplicationTests.class, MediumTests.class})
+public class TestReplicationSourceManagerZkImpl extends 
TestReplicationSourceManager {
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",
+      ReplicationSourceDummy.class.getCanonicalName());
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
+      HConstants.REPLICATION_ENABLE_DEFAULT);
+    conf.setLong("replication.sleep.before.failover", 2000);
+    conf.setInt("replication.source.maxretriesmultiplier", 10);
+    utility = new HBaseTestingUtility(conf);
+    utility.startMiniZKCluster();
+    setupZkAndReplication();
+  }
+
+  // Tests the naming convention of adopted queues for ReplicationQueuesZkImpl
+  @Test
+  public void testNodeFailoverDeadServerParsing() throws Exception {
+    LOG.debug("testNodeFailoverDeadServerParsing");
+    conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
+    final Server server = new 
DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
+    ReplicationQueues repQueues =
+      ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, server,
+        server.getZooKeeper()));
+    repQueues.init(server.getServerName().toString());
+    // populate some znodes in the peer znode
+    files.add("log1");
+    files.add("log2");
+    for (String file : files) {
+      repQueues.addLog("1", file);
+    }
+
+    // create 3 DummyServers
+    Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal");
+    Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com");
+    Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
+
+    // simulate three servers fail sequentially
+    ReplicationQueues rq1 =
+      ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(s1.getConfiguration(), s1,
+        s1.getZooKeeper()));
+    rq1.init(s1.getServerName().toString());
+    Map<String, Set<String>> testMap =
+      rq1.claimQueues(server.getServerName().getServerName());
+    ReplicationQueues rq2 =
+      ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(s2.getConfiguration(), s2,
+        s2.getZooKeeper()));
+    rq2.init(s2.getServerName().toString());
+    testMap = rq2.claimQueues(s1.getServerName().getServerName());
+    ReplicationQueues rq3 =
+      ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(s3.getConfiguration(), s3,
+        s3.getZooKeeper()));
+    rq3.init(s3.getServerName().toString());
+    testMap = rq3.claimQueues(s2.getServerName().getServerName());
+
+    ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(testMap.keySet().iterator().next());
+    List<String> result = replicationQueueInfo.getDeadRegionServers();
+
+    // verify
+    assertTrue(result.contains(server.getServerName().getServerName()));
+    assertTrue(result.contains(s1.getServerName().getServerName()));
+    assertTrue(result.contains(s2.getServerName().getServerName()));
+
+    server.stop("");
+  }
+
+  @Test
+  public void testFailoverDeadServerCversionChange() throws Exception {
+    LOG.debug("testFailoverDeadServerCversionChange");
+
+    conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
+    final Server s0 = new DummyServer("cversion-change0.example.org");
+    ReplicationQueues repQueues =
+      ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, s0,
+        s0.getZooKeeper()));
+    repQueues.init(s0.getServerName().toString());
+    // populate some znodes in the peer znode
+    files.add("log1");
+    files.add("log2");
+    for (String file : files) {
+      repQueues.addLog("1", file);
+    }
+    // simulate queue transfer
+    Server s1 = new DummyServer("cversion-change1.example.org");
+    ReplicationQueues rq1 =
+      ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(s1.getConfiguration(), s1,
+        s1.getZooKeeper()));
+    rq1.init(s1.getServerName().toString());
+
+    ReplicationQueuesClientZKImpl client =
+      
(ReplicationQueuesClientZKImpl)ReplicationFactory.getReplicationQueuesClient(
+        new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, 
s1.getZooKeeper()));
+
+    int v0 = client.getQueuesZNodeCversion();
+    rq1.claimQueues(s0.getServerName().getServerName());
+    int v1 = client.getQueuesZNodeCversion();
+    // cversion should increase by 1 since a child node is deleted
+    assertEquals(v0 + 1, v1);
+
+    s0.stop("");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
new file mode 100644
index 0000000..59acfb3
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
@@ -0,0 +1,60 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
+import 
org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesClientImpl;
+import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesImpl;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests the ReplicationSourceManager with TableBasedReplicationQueue's and
+ * TableBasedReplicationQueuesClient
+ */
+@Category({ReplicationTests.class, MediumTests.class})
+public class TestTableBasedReplicationSourceManagerImpl extends 
TestReplicationSourceManager {
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",
+      ReplicationSourceDummy.class.getCanonicalName());
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
+      HConstants.REPLICATION_ENABLE_DEFAULT);
+    conf.setLong("replication.sleep.before.failover", 2000);
+    conf.setInt("replication.source.maxretriesmultiplier", 10);
+
+    conf.setClass("hbase.region.replica.replication.ReplicationQueuesType",
+      TableBasedReplicationQueuesImpl.class, ReplicationQueues.class);
+    
conf.setClass("hbase.region.replica.replication.ReplicationQueuesClientType",
+      TableBasedReplicationQueuesClientImpl.class, 
ReplicationQueuesClient.class);
+    utility = new HBaseTestingUtility(conf);
+    utility.startMiniCluster();
+    setupZkAndReplication();
+  }
+
+}

Reply via email to