HBASE-15974 Create a ReplicationQueuesClientHBaseImpl

Building on HBase-15958.
Provided a ReplicationQueuesClientHBaseImpl that relies on the HBase 
Replication Table to track WAL queues.
Refactored out a large section of ReplicationQueuesHBaseImpl into a 
ReplicationTableClient class that handles Replication Table operations.

Signed-off-by: Elliott Clark <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2093aade
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2093aade
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2093aade

Branch: refs/heads/hbase-12439
Commit: 2093aadec1433c1376d1a9a6166e94a257a811b2
Parents: ae5fe1e
Author: Joseph Hwang <[email protected]>
Authored: Thu Jun 9 16:16:38 2016 -0700
Committer: Elliott Clark <[email protected]>
Committed: Wed Jun 15 10:43:14 2016 -0700

----------------------------------------------------------------------
 .../client/replication/ReplicationAdmin.java    |   4 +-
 .../hbase/replication/ReplicationFactory.java   |   9 +-
 .../replication/ReplicationQueuesClient.java    |  10 +-
 .../ReplicationQueuesClientArguments.java       |  35 +
 .../ReplicationQueuesClientZKImpl.java          |  51 +-
 .../replication/ReplicationQueuesHBaseImpl.java | 644 -------------------
 .../hbase/replication/ReplicationTableBase.java | 351 ++++++++++
 .../TableBasedReplicationQueuesClientImpl.java  | 111 ++++
 .../TableBasedReplicationQueuesImpl.java        | 437 +++++++++++++
 .../master/ReplicationHFileCleaner.java         |   8 +-
 .../master/ReplicationLogCleaner.java           |  46 +-
 .../hbase/util/hbck/ReplicationChecker.java     |   6 +-
 .../hbase/master/cleaner/TestLogsCleaner.java   |   3 +-
 .../TestReplicationStateHBaseImpl.java          |  90 ++-
 .../replication/TestReplicationStateZKImpl.java |   6 +-
 .../TestReplicationSourceManager.java           |   7 +-
 16 files changed, 1101 insertions(+), 717 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index d062448..e0985bd 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -54,6 +54,7 @@ import 
org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
@@ -122,7 +123,8 @@ public class ReplicationAdmin implements Closeable {
       zkw = createZooKeeperWatcher();
       try {
         this.replicationQueuesClient =
-            ReplicationFactory.getReplicationQueuesClient(zkw, conf, 
this.connection);
+            ReplicationFactory.getReplicationQueuesClient(new 
ReplicationQueuesClientArguments(conf,
+            this.connection, zkw));
         this.replicationQueuesClient.init();
         this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, 
conf,
           this.replicationQueuesClient, this.connection);

http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
index e264a4d..38f9f30 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
@@ -38,9 +38,12 @@ public class ReplicationFactory {
     return (ReplicationQueues) 
ConstructorUtils.invokeConstructor(classToBuild, args);
   }
 
-  public static ReplicationQueuesClient getReplicationQueuesClient(final 
ZooKeeperWatcher zk,
-      Configuration conf, Abortable abortable) {
-    return new ReplicationQueuesClientZKImpl(zk, conf, abortable);
+  public static ReplicationQueuesClient getReplicationQueuesClient(
+      ReplicationQueuesClientArguments args)
+    throws Exception {
+    Class<?> classToBuild = args.getConf().getClass("hbase.region.replica." +
+      "replication.ReplicationQueuesClientType", 
ReplicationQueuesClientZKImpl.class);
+    return (ReplicationQueuesClient) 
ConstructorUtils.invokeConstructor(classToBuild, args);
   }
 
   public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher 
zk, Configuration conf,

http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
index 7fa3bbb..6d8900e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
@@ -61,11 +62,12 @@ public interface ReplicationQueuesClient {
   List<String> getAllQueues(String serverName) throws KeeperException;
 
   /**
-   * Get the cversion of replication rs node. This can be used as optimistic 
locking to get a
-   * consistent snapshot of the replication queues.
-   * @return cversion of replication rs node
+   * Load all wals in all replication queues from ZK. This method guarantees 
to return a
+   * snapshot which contains all WALs in the zookeeper at the start of this 
call even there
+   * is concurrent queue failover. However, some newly created WALs during the 
call may
+   * not be included.
    */
-  int getQueuesZNodeCversion() throws KeeperException;
+   Set<String> getAllWALs() throws KeeperException;
 
   /**
    * Get the change version number of replication hfile references node. This 
can be used as

http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
new file mode 100644
index 0000000..8a61993
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
[email protected]
+public class ReplicationQueuesClientArguments extends 
ReplicationQueuesArguments {
+  public ReplicationQueuesClientArguments(Configuration conf, Abortable abort,
+     ZooKeeperWatcher zk) {
+    super(conf, abort, zk);
+  }
+  public ReplicationQueuesClientArguments(Configuration conf, Abortable abort) 
{
+    super(conf, abort);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
index cc407e3..b0ded7d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
@@ -19,7 +19,12 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.util.List;
+import java.util.Set;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
@@ -32,6 +37,12 @@ import org.apache.zookeeper.data.Stat;
 public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase 
implements
     ReplicationQueuesClient {
 
+  Log LOG = LogFactory.getLog(ReplicationQueuesClientZKImpl.class);
+
+  public ReplicationQueuesClientZKImpl(ReplicationQueuesClientArguments args) {
+    this(args.getZk(), args.getConf(), args.getAbortable());
+  }
+
   public ReplicationQueuesClientZKImpl(final ZooKeeperWatcher zk, 
Configuration conf,
       Abortable abortable) {
     super(zk, conf, abortable);
@@ -74,7 +85,45 @@ public class ReplicationQueuesClientZKImpl extends 
ReplicationStateZKBase implem
     return result;
   }
 
-  @Override public int getQueuesZNodeCversion() throws KeeperException {
+  @Override
+  public Set<String> getAllWALs() throws KeeperException {
+    /**
+     * Load all wals in all replication queues from ZK. This method guarantees 
to return a
+     * snapshot which contains all WALs in the zookeeper at the start of this 
call even there
+     * is concurrent queue failover. However, some newly created WALs during 
the call may
+     * not be included.
+     */
+    for (int retry = 0; ; retry++) {
+      int v0 = getQueuesZNodeCversion();
+      List<String> rss = getListOfReplicators();
+      if (rss == null) {
+        LOG.debug("Didn't find any region server that replicates, won't 
prevent any deletions.");
+        return ImmutableSet.of();
+      }
+      Set<String> wals = Sets.newHashSet();
+      for (String rs : rss) {
+        List<String> listOfPeers = getAllQueues(rs);
+        // if rs just died, this will be null
+        if (listOfPeers == null) {
+          continue;
+        }
+        for (String id : listOfPeers) {
+          List<String> peersWals = getLogsInQueue(rs, id);
+          if (peersWals != null) {
+            wals.addAll(peersWals);
+          }
+        }
+      }
+      int v1 = getQueuesZNodeCversion();
+      if (v0 == v1) {
+        return wals;
+      }
+      LOG.info(String.format("Replication queue node cversion changed from %d 
to %d, retry = %d",
+        v0, v1, retry));
+    }
+  }
+
+  public int getQueuesZNodeCversion() throws KeeperException {
     try {
       Stat stat = new Stat();
       ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);

http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java
deleted file mode 100644
index 34a5289..0000000
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java
+++ /dev/null
@@ -1,644 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.NamespaceDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.RowMutations;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
-import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.RetryCounterFactory;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * This class provides an implementation of the ReplicationQueues interface 
using an HBase table
- * "Replication Table". The basic schema of this table will store each 
individual queue as a
- * seperate row. The row key will be a unique identifier of the creating 
server's name and the
- * queueId. Each queue must have the following two columns:
- *  COL_OWNER: tracks which server is currently responsible for tracking the 
queue
- *  COL_QUEUE_ID: tracks the queue's id as stored in ReplicationSource
- * They will also have columns mapping [WAL filename : offset]
- * One key difference from the ReplicationQueuesZkImpl is that when queues are 
reclaimed we
- * simply return its HBase row key as its new "queueId"
- */
-
[email protected]
-public class ReplicationQueuesHBaseImpl extends ReplicationStateZKBase
-    implements ReplicationQueues {
-
-  private static final Log LOG = 
LogFactory.getLog(ReplicationQueuesHBaseImpl.class);
-
-  /** Name of the HBase Table used for tracking replication*/
-  public static final TableName REPLICATION_TABLE_NAME =
-    TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, 
"replication");
-
-  // Column family and column names for the Replication Table
-  private static final byte[] CF = Bytes.toBytes("r");
-  private static final byte[] COL_OWNER = Bytes.toBytes("o");
-  private static final byte[] COL_OWNER_HISTORY = Bytes.toBytes("h");
-
-  // The value used to delimit the queueId and server name inside of a queue's 
row key. Currently a
-  // hyphen, because it is guaranteed that queueId (which is a cluster id) 
cannot contain hyphens.
-  // See HBASE-11394.
-  private static String ROW_KEY_DELIMITER = "-";
-
-  // Column Descriptor for the Replication Table
-  private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR =
-    new HColumnDescriptor(CF).setMaxVersions(1)
-      .setInMemory(true)
-      .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
-        // TODO: Figure out which bloom filter to use
-      .setBloomFilterType(BloomType.NONE)
-      .setCacheDataInL1(true);
-
-  // Common byte values used in replication offset tracking
-  private static final byte[] INITIAL_OFFSET_BYTES = Bytes.toBytes(0L);
-  private static final byte[] EMPTY_STRING_BYTES = Bytes.toBytes("");
-
-  /*
-   * Make sure that HBase table operations for replication have a high number 
of retries. This is
-   * because the server is aborted if any HBase table operation fails. Each 
RPC will be attempted
-   * 3600 times before exiting. This provides each operation with 2 hours of 
retries
-   * before the server is aborted.
-   */
-  private static final int CLIENT_RETRIES = 3600;
-  private static final int RPC_TIMEOUT = 2000;
-  private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT;
-
-  private Configuration modifiedConf;
-  private Admin admin;
-  private Connection connection;
-  private Table replicationTable;
-  private String serverName = null;
-  private byte[] serverNameBytes = null;
-
-  public ReplicationQueuesHBaseImpl(ReplicationQueuesArguments args) {
-    this(args.getConf(), args.getAbortable(), args.getZk());
-  }
-
-  public ReplicationQueuesHBaseImpl(Configuration conf, Abortable abort, 
ZooKeeperWatcher zkw) {
-    super(zkw, conf, abort);
-    modifiedConf = new Configuration(conf);
-    modifiedConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 
CLIENT_RETRIES);
-  }
-
-  @Override
-  public void init(String serverName) throws ReplicationException {
-    try {
-      this.serverName = serverName;
-      this.serverNameBytes = Bytes.toBytes(serverName);
-      // Modify the connection's config so that the Replication Table it 
returns has a much higher
-      // number of client retries
-      this.connection = ConnectionFactory.createConnection(modifiedConf);
-      this.admin = connection.getAdmin();
-      replicationTable = createAndGetReplicationTable();
-      replicationTable.setRpcTimeout(RPC_TIMEOUT);
-      replicationTable.setOperationTimeout(OPERATION_TIMEOUT);
-    } catch (IOException e) {
-      throw new ReplicationException(e);
-    }
-  }
-
-  @Override
-  public void removeQueue(String queueId) {
-
-    try {
-      byte[] rowKey = queueIdToRowKey(queueId);
-      Delete deleteQueue = new Delete(rowKey);
-      safeQueueUpdate(deleteQueue);
-    } catch (IOException | ReplicationException e) {
-      String errMsg = "Failed removing queue queueId=" + queueId;
-      abortable.abort(errMsg, e);
-    }
-  }
-
-  @Override
-  public void addLog(String queueId, String filename) throws 
ReplicationException {
-    try {
-      if (!checkQueueExists(queueId)) {
-        // Each queue will have an Owner, OwnerHistory, and a collection of 
[WAL:offset] key values
-        Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId)));
-        putNewQueue.addColumn(CF, COL_OWNER, serverNameBytes);
-        putNewQueue.addColumn(CF, COL_OWNER_HISTORY, EMPTY_STRING_BYTES);
-        putNewQueue.addColumn(CF, Bytes.toBytes(filename), 
INITIAL_OFFSET_BYTES);
-        replicationTable.put(putNewQueue);
-      } else {
-        // Otherwise simply add the new log and offset as a new column
-        Put putNewLog = new Put(queueIdToRowKey(queueId));
-        putNewLog.addColumn(CF, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES);
-        safeQueueUpdate(putNewLog);
-      }
-    } catch (IOException | ReplicationException e) {
-      String errMsg = "Failed adding log queueId=" + queueId + " filename=" + 
filename;
-      abortable.abort(errMsg, e);
-    }
-  }
-
-  @Override
-  public void removeLog(String queueId, String filename) {
-    try {
-      byte[] rowKey = queueIdToRowKey(queueId);
-      Delete delete = new Delete(rowKey);
-      delete.addColumns(CF, Bytes.toBytes(filename));
-      safeQueueUpdate(delete);
-    } catch (IOException | ReplicationException e) {
-      String errMsg = "Failed removing log queueId=" + queueId + " filename=" 
+ filename;
-      abortable.abort(errMsg, e);
-    }
-  }
-
-  @Override
-  public void setLogPosition(String queueId, String filename, long position) {
-    try {
-      byte[] rowKey = queueIdToRowKey(queueId);
-      // Check that the log exists. addLog() must have been called before 
setLogPosition().
-      Get checkLogExists = new Get(rowKey);
-      checkLogExists.addColumn(CF, Bytes.toBytes(filename));
-      if (!replicationTable.exists(checkLogExists)) {
-        String errMsg = "Could not set position of non-existent log from 
queueId=" + queueId +
-          ", filename=" + filename;
-        abortable.abort(errMsg, new ReplicationException(errMsg));
-        return;
-      }
-      // Update the log offset if it exists
-      Put walAndOffset = new Put(rowKey);
-      walAndOffset.addColumn(CF, Bytes.toBytes(filename), 
Bytes.toBytes(position));
-      safeQueueUpdate(walAndOffset);
-    } catch (IOException | ReplicationException e) {
-      String errMsg = "Failed writing log position queueId=" + queueId + 
"filename=" +
-        filename + " position=" + position;
-      abortable.abort(errMsg, e);
-    }
-  }
-
-  @Override
-  public long getLogPosition(String queueId, String filename) throws 
ReplicationException {
-    try {
-      byte[] rowKey = queueIdToRowKey(queueId);
-      Get getOffset = new Get(rowKey);
-      getOffset.addColumn(CF, Bytes.toBytes(filename));
-      Result result = getResultIfOwner(getOffset);
-      if (result == null || !result.containsColumn(CF, 
Bytes.toBytes(filename))) {
-        throw new ReplicationException("Could not read empty result while 
getting log position " +
-            "queueId=" + queueId + ", filename=" + filename);
-      }
-      return Bytes.toLong(result.getValue(CF, Bytes.toBytes(filename)));
-    } catch (IOException e) {
-      throw new ReplicationException("Could not get position in log for 
queueId=" + queueId +
-          ", filename=" + filename);
-    }
-  }
-
-  @Override
-  public void removeAllQueues() {
-    List<String> myQueueIds = getAllQueues();
-    for (String queueId : myQueueIds) {
-      removeQueue(queueId);
-    }
-  }
-
-  @Override
-  public List<String> getLogsInQueue(String queueId) {
-    byte[] rowKey = queueIdToRowKey(queueId);
-    return getLogsInQueue(rowKey);
-  }
-
-  private List<String> getLogsInQueue(byte[] rowKey) {
-    String errMsg = "Could not get logs in queue queueId=" + 
Bytes.toString(rowKey);
-    try {
-      Get getQueue = new Get(rowKey);
-      Result queue = getResultIfOwner(getQueue);
-      // The returned queue could be null if we have lost ownership of it
-      if (queue == null) {
-        abortable.abort(errMsg, new ReplicationException(errMsg));
-        return null;
-      }
-      return readWALsFromResult(queue);
-    } catch (IOException e) {
-      abortable.abort(errMsg, e);
-      return null;
-    }
-  }
-
-  @Override
-  public List<String> getAllQueues() {
-    List<String> allQueues = new ArrayList<String>();
-    ResultScanner queueScanner = null;
-    try {
-      queueScanner = this.getQueuesBelongingToServer(serverName);
-      for (Result queue : queueScanner) {
-        String rowKey =  Bytes.toString(queue.getRow());
-        // If the queue does not have a Owner History, then we must be its 
original owner. So we
-        // want to return its queueId in raw form
-        if (Bytes.toString(queue.getValue(CF, COL_OWNER_HISTORY)).length() == 
0) {
-          allQueues.add(getRawQueueIdFromRowKey(rowKey));
-        } else {
-          allQueues.add(rowKey);
-        }
-      }
-      return allQueues;
-    } catch (IOException e) {
-      String errMsg = "Failed getting list of all replication queues";
-      abortable.abort(errMsg, e);
-      return null;
-    } finally {
-      if (queueScanner != null) {
-        queueScanner.close();
-      }
-    }
-  }
-
-  @Override
-  public Map<String, Set<String>> claimQueues(String regionserver) {
-    Map<String, Set<String>> queues = new HashMap<>();
-    if (isThisOurRegionServer(regionserver)) {
-      return queues;
-    }
-    ResultScanner queuesToClaim = null;
-    try {
-      queuesToClaim = this.getQueuesBelongingToServer(regionserver);
-      for (Result queue : queuesToClaim) {
-        if (attemptToClaimQueue(queue, regionserver)) {
-          String rowKey = Bytes.toString(queue.getRow());
-          ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(rowKey);
-          if (peerExists(replicationQueueInfo.getPeerId())) {
-            Set<String> sortedLogs = new HashSet<String>();
-            List<String> logs = getLogsInQueue(queue.getRow());
-            for (String log : logs) {
-              sortedLogs.add(log);
-            }
-            queues.put(rowKey, sortedLogs);
-            LOG.info(serverName + " has claimed queue " + rowKey + " from " + 
regionserver);
-          } else {
-            // Delete orphaned queues
-            removeQueue(Bytes.toString(queue.getRow()));
-            LOG.info(serverName + " has deleted abandoned queue " + rowKey + " 
from " +
-                regionserver);
-          }
-        }
-      }
-    } catch (IOException | KeeperException e) {
-      String errMsg = "Failed claiming queues for regionserver=" + 
regionserver;
-      abortable.abort(errMsg, e);
-      queues.clear();
-    } finally {
-      if (queuesToClaim != null) {
-        queuesToClaim.close();
-      }
-    }
-    return queues;
-  }
-
-  @Override
-  public List<String> getListOfReplicators() {
-    // scan all of the queues and return a list of all unique OWNER values
-    Set<String> peerServers = new HashSet<String>();
-    ResultScanner allQueuesInCluster = null;
-    try {
-      Scan scan = new Scan();
-      scan.addColumn(CF, COL_OWNER);
-      allQueuesInCluster = replicationTable.getScanner(scan);
-      for (Result queue : allQueuesInCluster) {
-        peerServers.add(Bytes.toString(queue.getValue(CF, COL_OWNER)));
-      }
-    } catch (IOException e) {
-      String errMsg = "Failed getting list of replicators";
-      abortable.abort(errMsg, e);
-    } finally {
-      if (allQueuesInCluster != null) {
-        allQueuesInCluster.close();
-      }
-    }
-    return new ArrayList<String>(peerServers);
-  }
-
-  @Override
-  public boolean isThisOurRegionServer(String regionserver) {
-    return this.serverName.equals(regionserver);
-  }
-
-  @Override
-  public void addPeerToHFileRefs(String peerId) throws ReplicationException {
-    // TODO
-    throw new NotImplementedException();
-  }
-
-  @Override
-  public void removePeerFromHFileRefs(String peerId) {
-    // TODO
-    throw new NotImplementedException();
-  }
-
-  @Override
-  public void addHFileRefs(String peerId, List<String> files) throws 
ReplicationException {
-    // TODO
-    throw new NotImplementedException();
-  }
-
-  @Override
-  public void removeHFileRefs(String peerId, List<String> files) {
-    // TODO
-    throw new NotImplementedException();
-  }
-
-  /**
-   * Gets the Replication Table. Builds and blocks until the table is 
available if the Replication
-   * Table does not exist.
-   *
-   * @return the Replication Table
-   * @throws IOException if the Replication Table takes too long to build
-   */
-  private Table createAndGetReplicationTable() throws IOException {
-    if (!replicationTableExists()) {
-      createReplicationTable();
-    }
-    int maxRetries = 
conf.getInt("replication.queues.createtable.retries.number", 100);
-    RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, 
100);
-    RetryCounter retryCounter = counterFactory.create();
-    while (!replicationTableExists()) {
-      try {
-        retryCounter.sleepUntilNextRetry();
-        if (!retryCounter.shouldRetry()) {
-          throw new IOException("Unable to acquire the Replication Table");
-        }
-      } catch (InterruptedException e) {
-        return null;
-      }
-    }
-    return connection.getTable(REPLICATION_TABLE_NAME);
-  }
-
-  /**
-   * Checks whether the Replication Table exists yet
-   *
-   * @return whether the Replication Table exists
-   * @throws IOException
-   */
-  private boolean replicationTableExists() {
-    try {
-      return admin.tableExists(REPLICATION_TABLE_NAME);
-    } catch (IOException e) {
-      return false;
-    }
-  }
-
-  /**
-   * Create the replication table with the provided HColumnDescriptor 
REPLICATION_COL_DESCRIPTOR
-   * in ReplicationQueuesHBaseImpl
-   *
-   * @throws IOException
-   */
-  private void createReplicationTable() throws IOException {
-    HTableDescriptor replicationTableDescriptor = new 
HTableDescriptor(REPLICATION_TABLE_NAME);
-    replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR);
-    admin.createTable(replicationTableDescriptor);
-  }
-
-  /**
-   * Build the row key for the given queueId. This will uniquely identify it 
from all other queues
-   * in the cluster.
-   * @param serverName The owner of the queue
-   * @param queueId String identifier of the queue
-   * @return String representation of the queue's row key
-   */
-  private String buildQueueRowKey(String serverName, String queueId) {
-    return queueId + ROW_KEY_DELIMITER + serverName;
-  }
-
-  private String buildQueueRowKey(String queueId) {
-    return buildQueueRowKey(serverName, queueId);
-  }
-
-  /**
-   * Parse the original queueId from a row key
-   * @param rowKey String representation of a queue's row key
-   * @return the original queueId
-   */
-  private String getRawQueueIdFromRowKey(String rowKey) {
-    return rowKey.split(ROW_KEY_DELIMITER)[0];
-  }
-
-  /**
-   * See safeQueueUpdate(RowMutations mutate)
-   *
-   * @param put Row mutation to perform on the queue
-   */
-  private void safeQueueUpdate(Put put) throws ReplicationException, 
IOException {
-    RowMutations mutations = new RowMutations(put.getRow());
-    mutations.add(put);
-    safeQueueUpdate(mutations);
-  }
-
-  /**
-   * See safeQueueUpdate(RowMutations mutate)
-   *
-   * @param delete Row mutation to perform on the queue
-   */
-  private void safeQueueUpdate(Delete delete) throws ReplicationException,
-      IOException{
-    RowMutations mutations = new RowMutations(delete.getRow());
-    mutations.add(delete);
-    safeQueueUpdate(mutations);
-  }
-
-  /**
-   * Attempt to mutate a given queue in the Replication Table with a 
checkAndPut on the OWNER column
-   * of the queue. Abort the server if this checkAndPut fails: which means we 
have somehow lost
-   * ownership of the column or an IO Exception has occurred during the 
transaction.
-   *
-   * @param mutate Mutation to perform on a given queue
-   */
-  private void safeQueueUpdate(RowMutations mutate) throws 
ReplicationException, IOException{
-    boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), 
CF, COL_OWNER,
-      CompareFilter.CompareOp.EQUAL, serverNameBytes, mutate);
-    if (!updateSuccess) {
-      throw new ReplicationException("Failed to update Replication Table 
because we lost queue " +
-        " ownership");
-    }
-  }
-
-  /**
-   * Returns a queue's row key given either its raw or reclaimed queueId
-   *
-   * @param queueId queueId of the queue
-   * @return byte representation of the queue's row key
-   */
-  private byte[] queueIdToRowKey(String queueId) {
-    // Cluster id's are guaranteed to have no hyphens, so if the passed in 
queueId has no hyphen
-    // then this is not a reclaimed queue.
-    if (!queueId.contains(ROW_KEY_DELIMITER)) {
-      return Bytes.toBytes(buildQueueRowKey(queueId));
-      // If the queueId contained some hyphen it was reclaimed. In this case, 
the queueId is the
-      // queue's row key
-    } else {
-      return Bytes.toBytes(queueId);
-    }
-  }
-
-  /**
-   * Get the QueueIds belonging to the named server from the ReplicationTable
-   *
-   * @param server name of the server
-   * @return a ResultScanner over the QueueIds belonging to the server
-   * @throws IOException
-   */
-  private ResultScanner getQueuesBelongingToServer(String server) throws 
IOException {
-    Scan scan = new Scan();
-    SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF, 
COL_OWNER,
-      CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server));
-    scan.setFilter(filterMyQueues);
-    scan.addColumn(CF, COL_OWNER);
-    scan.addColumn(CF, COL_OWNER_HISTORY);
-    ResultScanner results = replicationTable.getScanner(scan);
-    return results;
-  }
-
-  /**
-   * Check if the queue specified by queueId is stored in HBase
-   *
-   * @param queueId Either raw or reclaimed format of the queueId
-   * @return Whether the queue is stored in HBase
-   * @throws IOException
-   */
-  private boolean checkQueueExists(String queueId) throws IOException {
-    byte[] rowKey = queueIdToRowKey(queueId);
-    return replicationTable.exists(new Get(rowKey));
-  }
-
-  /**
-   * Read all of the WAL's from a queue into a list
-   *
-   * @param queue HBase query result containing the queue
-   * @return a list of all the WAL filenames
-   */
-  private List<String> readWALsFromResult(Result queue) {
-    List<String> wals = new ArrayList<>();
-    Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF);
-    for(byte[] cQualifier : familyMap.keySet()) {
-      // Ignore the meta data fields of the queue
-      if (Arrays.equals(cQualifier, COL_OWNER) || Arrays.equals(cQualifier, 
COL_OWNER_HISTORY)) {
-        continue;
-      }
-      wals.add(Bytes.toString(cQualifier));
-    }
-    return wals;
-  }
-
-  /**
-   * Attempt to claim the given queue with a checkAndPut on the OWNER column. 
We check that the
-   * recently killed server is still the OWNER before we claim it.
-   *
-   * @param queue The queue that we are trying to claim
-   * @param originalServer The server that originally owned the queue
-   * @return Whether we successfully claimed the queue
-   * @throws IOException
-   */
-  private boolean attemptToClaimQueue (Result queue, String originalServer) 
throws IOException{
-    Put putQueueNameAndHistory = new Put(queue.getRow());
-    putQueueNameAndHistory.addColumn(CF, COL_OWNER, Bytes.toBytes(serverName));
-    String newOwnerHistory = 
buildClaimedQueueHistory(Bytes.toString(queue.getValue(CF,
-      COL_OWNER_HISTORY)), originalServer);
-    putQueueNameAndHistory.addColumn(CF, COL_OWNER_HISTORY, 
Bytes.toBytes(newOwnerHistory));
-    RowMutations claimAndRenameQueue = new RowMutations(queue.getRow());
-    claimAndRenameQueue.add(putQueueNameAndHistory);
-    // Attempt to claim ownership for this queue by checking if the current 
OWNER is the original
-    // server. If it is not then another RS has already claimed it. If it is 
we set ourselves as the
-    // new owner and update the queue's history
-    boolean success = replicationTable.checkAndMutate(queue.getRow(), CF, 
COL_OWNER,
-      CompareFilter.CompareOp.EQUAL, Bytes.toBytes(originalServer), 
claimAndRenameQueue);
-    return success;
-  }
-
-  /**
-   * Creates a "|" delimited record of the queue's past region server owners.
-   *
-   * @param originalHistory the queue's original owner history
-   * @param oldServer the name of the server that used to own the queue
-   * @return the queue's new owner history
-   */
-  private String buildClaimedQueueHistory(String originalHistory, String 
oldServer) {
-    return originalHistory + "|" + oldServer;
-  }
-
-  /**
-   * Attempts to run a Get on some queue. Will only return a non-null result 
if we currently own
-   * the queue.
-   *
-   * @param get The get that we want to query
-   * @return The result of the get if this server is the owner of the queue. 
Else it returns null
-   * @throws IOException
-   */
-  private Result getResultIfOwner(Get get) throws IOException {
-    Scan scan = new Scan(get);
-    // Check if the Get currently contains all columns or only specific columns
-    if (scan.getFamilyMap().size() > 0) {
-      // Add the OWNER column if the scan is already only over specific columns
-      scan.addColumn(CF, COL_OWNER);
-    }
-    scan.setMaxResultSize(1);
-    SingleColumnValueFilter checkOwner = new SingleColumnValueFilter(CF, 
COL_OWNER,
-      CompareFilter.CompareOp.EQUAL, serverNameBytes);
-    scan.setFilter(checkOwner);
-    ResultScanner scanner = null;
-    try {
-      scanner = replicationTable.getScanner(scan);
-      Result result = scanner.next();
-      return (result == null || result.isEmpty()) ? null : result;
-    } finally {
-      if (scanner != null) {
-        scanner.close();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
new file mode 100644
index 0000000..c1506cd
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
@@ -0,0 +1,351 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/*
+ * Abstract class that provides an interface to the Replication Table. Which 
is currently
+ * being used for WAL offset tracking.
+ * The basic schema of this table will store each individual queue as a
+ * seperate row. The row key will be a unique identifier of the creating 
server's name and the
+ * queueId. Each queue must have the following two columns:
+ *  COL_QUEUE_OWNER: tracks which server is currently responsible for tracking 
the queue
+ *  COL_QUEUE_OWNER_HISTORY: a "|" delimited list of the previous server's 
that have owned this
+ *    queue. The most recent previous owner is the leftmost entry.
+ * They will also have columns mapping [WAL filename : offset]
+ */
+
[email protected]
+abstract class ReplicationTableBase {
+
+  /** Name of the HBase Table used for tracking replication*/
+  public static final TableName REPLICATION_TABLE_NAME =
+    TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, 
"replication");
+
+  // Column family and column names for Queues in the Replication Table
+  public static final byte[] CF_QUEUE = Bytes.toBytes("q");
+  public static final byte[] COL_QUEUE_OWNER = Bytes.toBytes("o");
+  public static final byte[] COL_QUEUE_OWNER_HISTORY = Bytes.toBytes("h");
+
+  // Column Descriptor for the Replication Table
+  private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR =
+    new HColumnDescriptor(CF_QUEUE).setMaxVersions(1)
+      .setInMemory(true)
+      .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
+        // TODO: Figure out which bloom filter to use
+      .setBloomFilterType(BloomType.NONE);
+
+  // The value used to delimit the queueId and server name inside of a queue's 
row key. Currently a
+  // hyphen, because it is guaranteed that queueId (which is a cluster id) 
cannot contain hyphens.
+  // See HBASE-11394.
+  public static final String ROW_KEY_DELIMITER = "-";
+
+  // The value used to delimit server names in the queue history list
+  public static final String QUEUE_HISTORY_DELIMITER = "|";
+
+  /*
+  * Make sure that HBase table operations for replication have a high number 
of retries. This is
+  * because the server is aborted if any HBase table operation fails. Each RPC 
will be attempted
+  * 3600 times before exiting. This provides each operation with 2 hours of 
retries
+  * before the server is aborted.
+  */
+  private static final int CLIENT_RETRIES = 3600;
+  private static final int RPC_TIMEOUT = 2000;
+  private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT;
+
+  protected final Table replicationTable;
+  protected final Configuration conf;
+  protected final Abortable abortable;
+  private final Admin admin;
+  private final Connection connection;
+
+  public ReplicationTableBase(Configuration conf, Abortable abort) throws 
IOException {
+    this.conf = new Configuration(conf);
+    this.abortable = abort;
+    decorateConf();
+    this.connection = ConnectionFactory.createConnection(this.conf);
+    this.admin = connection.getAdmin();
+    this.replicationTable = createAndGetReplicationTable();
+    setTableTimeOuts();
+  }
+
+  /**
+   * Modify the connection's config so that operations run on the Replication 
Table have longer and
+   * a larger number of retries
+   */
+  private void decorateConf() {
+    this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES);
+  }
+
+  /**
+   * Increases the RPC and operations timeouts for the Replication Table
+   */
+  private void setTableTimeOuts() {
+    replicationTable.setRpcTimeout(RPC_TIMEOUT);
+    replicationTable.setOperationTimeout(OPERATION_TIMEOUT);
+  }
+
+  /**
+   * Build the row key for the given queueId. This will uniquely identify it 
from all other queues
+   * in the cluster.
+   * @param serverName The owner of the queue
+   * @param queueId String identifier of the queue
+   * @return String representation of the queue's row key
+   */
+  protected String buildQueueRowKey(String serverName, String queueId) {
+    return queueId + ROW_KEY_DELIMITER + serverName;
+  }
+
+  /**
+   * Parse the original queueId from a row key
+   * @param rowKey String representation of a queue's row key
+   * @return the original queueId
+   */
+  protected String getRawQueueIdFromRowKey(String rowKey) {
+    return rowKey.split(ROW_KEY_DELIMITER)[0];
+  }
+
+  /**
+   * Returns a queue's row key given either its raw or reclaimed queueId
+   *
+   * @param queueId queueId of the queue
+   * @return byte representation of the queue's row key
+   */
+  protected byte[] queueIdToRowKey(String serverName, String queueId) {
+    // Cluster id's are guaranteed to have no hyphens, so if the passed in 
queueId has no hyphen
+    // then this is not a reclaimed queue.
+    if (!queueId.contains(ROW_KEY_DELIMITER)) {
+      return Bytes.toBytes(buildQueueRowKey(serverName, queueId));
+      // If the queueId contained some hyphen it was reclaimed. In this case, 
the queueId is the
+      // queue's row key
+    } else {
+      return Bytes.toBytes(queueId);
+    }
+  }
+
+  /**
+   * Creates a "|" delimited record of the queue's past region server owners.
+   *
+   * @param originalHistory the queue's original owner history
+   * @param oldServer the name of the server that used to own the queue
+   * @return the queue's new owner history
+   */
+  protected String buildClaimedQueueHistory(String originalHistory, String 
oldServer) {
+    return oldServer + QUEUE_HISTORY_DELIMITER + originalHistory;
+  }
+
+  /**
+   * Get a list of all region servers that have outstanding replication 
queues. These servers could
+   * be alive, dead or from a previous run of the cluster.
+   * @return a list of server names
+   */
+  protected List<String> getListOfReplicators() {
+    // scan all of the queues and return a list of all unique OWNER values
+    Set<String> peerServers = new HashSet<String>();
+    ResultScanner allQueuesInCluster = null;
+    try {
+      Scan scan = new Scan();
+      scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
+      allQueuesInCluster = replicationTable.getScanner(scan);
+      for (Result queue : allQueuesInCluster) {
+        peerServers.add(Bytes.toString(queue.getValue(CF_QUEUE, 
COL_QUEUE_OWNER)));
+      }
+    } catch (IOException e) {
+      String errMsg = "Failed getting list of replicators";
+      abortable.abort(errMsg, e);
+    } finally {
+      if (allQueuesInCluster != null) {
+        allQueuesInCluster.close();
+      }
+    }
+    return new ArrayList<String>(peerServers);
+  }
+
+  protected List<String> getAllQueues(String serverName) {
+    List<String> allQueues = new ArrayList<String>();
+    ResultScanner queueScanner = null;
+    try {
+      queueScanner = getQueuesBelongingToServer(serverName);
+      for (Result queue : queueScanner) {
+        String rowKey =  Bytes.toString(queue.getRow());
+        // If the queue does not have a Owner History, then we must be its 
original owner. So we
+        // want to return its queueId in raw form
+        if (Bytes.toString(queue.getValue(CF_QUEUE, 
COL_QUEUE_OWNER_HISTORY)).length() == 0) {
+          allQueues.add(getRawQueueIdFromRowKey(rowKey));
+        } else {
+          allQueues.add(rowKey);
+        }
+      }
+      return allQueues;
+    } catch (IOException e) {
+      String errMsg = "Failed getting list of all replication queues for 
serverName=" + serverName;
+      abortable.abort(errMsg, e);
+      return null;
+    } finally {
+      if (queueScanner != null) {
+        queueScanner.close();
+      }
+    }
+  }
+
+  protected List<String> getLogsInQueue(String serverName, String queueId) {
+    String rowKey = queueId;
+    if (!queueId.contains(ROW_KEY_DELIMITER)) {
+      rowKey = buildQueueRowKey(serverName, queueId);
+    }
+    return getLogsInQueue(Bytes.toBytes(rowKey));
+  }
+
+  protected List<String> getLogsInQueue(byte[] rowKey) {
+    String errMsg = "Failed getting logs in queue queueId=" + 
Bytes.toString(rowKey);
+    try {
+      Get getQueue = new Get(rowKey);
+      Result queue = replicationTable.get(getQueue);
+      if (queue == null || queue.isEmpty()) {
+        abortable.abort(errMsg, new ReplicationException(errMsg));
+        return null;
+      }
+      return readWALsFromResult(queue);
+    } catch (IOException e) {
+      abortable.abort(errMsg, e);
+      return null;
+    }
+  }
+
+  /**
+   * Read all of the WAL's from a queue into a list
+   *
+   * @param queue HBase query result containing the queue
+   * @return a list of all the WAL filenames
+   */
+  protected List<String> readWALsFromResult(Result queue) {
+    List<String> wals = new ArrayList<>();
+    Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE);
+    for (byte[] cQualifier : familyMap.keySet()) {
+      // Ignore the meta data fields of the queue
+      if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || 
Arrays.equals(cQualifier,
+          COL_QUEUE_OWNER_HISTORY)) {
+        continue;
+      }
+      wals.add(Bytes.toString(cQualifier));
+    }
+    return wals;
+  }
+
+  /**
+   * Get the queue id's and meta data (Owner and History) for the queues 
belonging to the named
+   * server
+   *
+   * @param server name of the server
+   * @return a ResultScanner over the QueueIds belonging to the server
+   * @throws IOException
+   */
+  private ResultScanner getQueuesBelongingToServer(String server) throws 
IOException {
+    Scan scan = new Scan();
+    SingleColumnValueFilter filterMyQueues = new 
SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER,
+      CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server));
+    scan.setFilter(filterMyQueues);
+    scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
+    scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY);
+    ResultScanner results = replicationTable.getScanner(scan);
+    return results;
+  }
+
+  /**
+   * Gets the Replication Table. Builds and blocks until the table is 
available if the Replication
+   * Table does not exist.
+   *
+   * @return the Replication Table
+   * @throws IOException if the Replication Table takes too long to build
+   */
+  private Table createAndGetReplicationTable() throws IOException {
+    if (!replicationTableExists()) {
+      createReplicationTable();
+    }
+    int maxRetries = 
conf.getInt("replication.queues.createtable.retries.number", 100);
+    RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, 
100);
+    RetryCounter retryCounter = counterFactory.create();
+    while (!replicationTableExists()) {
+      try {
+        retryCounter.sleepUntilNextRetry();
+        if (!retryCounter.shouldRetry()) {
+          throw new IOException("Unable to acquire the Replication Table");
+        }
+      } catch (InterruptedException e) {
+        return null;
+      }
+    }
+    return connection.getTable(REPLICATION_TABLE_NAME);
+  }
+
+  /**
+   * Create the replication table with the provided HColumnDescriptor 
REPLICATION_COL_DESCRIPTOR
+   * in TableBasedReplicationQueuesImpl
+   * @throws IOException
+   */
+  private void createReplicationTable() throws IOException {
+    HTableDescriptor replicationTableDescriptor = new 
HTableDescriptor(REPLICATION_TABLE_NAME);
+    replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR);
+    admin.createTable(replicationTableDescriptor);
+  }
+
+  /**
+   * Checks whether the Replication Table exists yet
+   *
+   * @return whether the Replication Table exists
+   * @throws IOException
+   */
+  private boolean replicationTableExists() {
+    try {
+      return admin.tableExists(REPLICATION_TABLE_NAME);
+    } catch (IOException e) {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
new file mode 100644
index 0000000..55dfdd8
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Implements the ReplicationQueuesClient interface on top of the Replication 
Table. It utilizes
+ * the ReplicationTableBase to access the Replication Table.
+ */
[email protected]
+public class TableBasedReplicationQueuesClientImpl extends ReplicationTableBase
+  implements ReplicationQueuesClient {
+
+  public 
TableBasedReplicationQueuesClientImpl(ReplicationQueuesClientArguments args)
+    throws IOException {
+    super(args.getConf(), args.getAbortable());
+  }
+  public TableBasedReplicationQueuesClientImpl(Configuration conf,
+                                               Abortable abortable) throws 
IOException {
+    super(conf, abortable);
+  }
+
+  @Override
+  public void init() throws ReplicationException{
+    // no-op
+  }
+
+  @Override
+  public List<String> getListOfReplicators() {
+    return super.getListOfReplicators();
+  }
+
+  @Override
+  public List<String> getLogsInQueue(String serverName, String queueId) {
+    return super.getLogsInQueue(serverName, queueId);
+  }
+
+  @Override
+  public List<String> getAllQueues(String serverName) {
+    return super.getAllQueues(serverName);
+  }
+
+  @Override
+  public Set<String> getAllWALs() {
+    Set<String> allWals = new HashSet<String>();
+    ResultScanner allQueues = null;
+    try {
+      allQueues = replicationTable.getScanner(new Scan());
+      for (Result queue : allQueues) {
+        for (String wal : readWALsFromResult(queue)) {
+          allWals.add(wal);
+        }
+      }
+    } catch (IOException e) {
+      String errMsg = "Failed getting all WAL's in Replication Table";
+      abortable.abort(errMsg, e);
+    } finally {
+      if (allQueues != null) {
+        allQueues.close();
+      }
+    }
+    return allWals;
+  }
+
+  @Override
+  public int getHFileRefsNodeChangeVersion() throws KeeperException {
+    // TODO
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException {
+    // TODO
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public List<String> getReplicableHFiles(String peerId) throws 
KeeperException {
+    // TODO
+    throw new NotImplementedException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
new file mode 100644
index 0000000..6ea7801
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
@@ -0,0 +1,437 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class provides an implementation of the ReplicationQueues interface 
using an HBase table
+ * "Replication Table". It utilizes the ReplicationTableBase to access the 
Replication Table.
+ */
[email protected]
+public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
+  implements ReplicationQueues {
+
+  private static final Log LOG = 
LogFactory.getLog(TableBasedReplicationQueuesImpl.class);
+
+  // Common byte values used in replication offset tracking
+  private static final byte[] INITIAL_OFFSET_BYTES = Bytes.toBytes(0L);
+  private static final byte[] EMPTY_STRING_BYTES = Bytes.toBytes("");
+
+  private String serverName = null;
+  private byte[] serverNameBytes = null;
+
+  // TODO: Only use this variable temporarily. Eventually we want to use HBase 
to store all
+  // TODO: replication information
+  private ReplicationStateZKBase replicationState;
+
+  public TableBasedReplicationQueuesImpl(ReplicationQueuesArguments args) 
throws IOException {
+    this(args.getConf(), args.getAbortable(), args.getZk());
+  }
+
+  public TableBasedReplicationQueuesImpl(Configuration conf, Abortable abort, 
ZooKeeperWatcher zkw)
+    throws IOException {
+    super(conf, abort);
+    replicationState = new ReplicationStateZKBase(zkw, conf, abort) {};
+  }
+
+  @Override
+  public void init(String serverName) throws ReplicationException {
+    this.serverName = serverName;
+    this.serverNameBytes = Bytes.toBytes(serverName);
+  }
+
+  @Override
+  public List<String> getListOfReplicators() {
+    return super.getListOfReplicators();
+  }
+
+  @Override
+  public void removeQueue(String queueId) {
+
+    try {
+      byte[] rowKey = queueIdToRowKey(queueId);
+      Delete deleteQueue = new Delete(rowKey);
+      safeQueueUpdate(deleteQueue);
+    } catch (IOException | ReplicationException e) {
+      String errMsg = "Failed removing queue queueId=" + queueId;
+      abortable.abort(errMsg, e);
+    }
+  }
+
+  @Override
+  public void addLog(String queueId, String filename) throws 
ReplicationException {
+    try {
+      if (!checkQueueExists(queueId)) {
+        // Each queue will have an Owner, OwnerHistory, and a collection of 
[WAL:offset] key values
+        Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId)));
+        putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER, serverNameBytes);
+        putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, 
EMPTY_STRING_BYTES);
+        putNewQueue.addColumn(CF_QUEUE, Bytes.toBytes(filename), 
INITIAL_OFFSET_BYTES);
+        replicationTable.put(putNewQueue);
+      } else {
+        // Otherwise simply add the new log and offset as a new column
+        Put putNewLog = new Put(queueIdToRowKey(queueId));
+        putNewLog.addColumn(CF_QUEUE, Bytes.toBytes(filename), 
INITIAL_OFFSET_BYTES);
+        safeQueueUpdate(putNewLog);
+      }
+    } catch (IOException | ReplicationException e) {
+      String errMsg = "Failed adding log queueId=" + queueId + " filename=" + 
filename;
+      abortable.abort(errMsg, e);
+    }
+  }
+
+  @Override
+  public void removeLog(String queueId, String filename) {
+    try {
+      byte[] rowKey = queueIdToRowKey(queueId);
+      Delete delete = new Delete(rowKey);
+      delete.addColumns(CF_QUEUE, Bytes.toBytes(filename));
+      safeQueueUpdate(delete);
+    } catch (IOException | ReplicationException e) {
+      String errMsg = "Failed removing log queueId=" + queueId + " filename=" 
+ filename;
+      abortable.abort(errMsg, e);
+    }
+  }
+
+  @Override
+  public void setLogPosition(String queueId, String filename, long position) {
+    try {
+      byte[] rowKey = queueIdToRowKey(queueId);
+      // Check that the log exists. addLog() must have been called before 
setLogPosition().
+      Get checkLogExists = new Get(rowKey);
+      checkLogExists.addColumn(CF_QUEUE, Bytes.toBytes(filename));
+      if (!replicationTable.exists(checkLogExists)) {
+        String errMsg = "Could not set position of non-existent log from 
queueId=" + queueId +
+          ", filename=" + filename;
+        abortable.abort(errMsg, new ReplicationException(errMsg));
+        return;
+      }
+      // Update the log offset if it exists
+      Put walAndOffset = new Put(rowKey);
+      walAndOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename), 
Bytes.toBytes(position));
+      safeQueueUpdate(walAndOffset);
+    } catch (IOException | ReplicationException e) {
+      String errMsg = "Failed writing log position queueId=" + queueId + 
"filename=" +
+        filename + " position=" + position;
+      abortable.abort(errMsg, e);
+    }
+  }
+
+  @Override
+  public long getLogPosition(String queueId, String filename) throws 
ReplicationException {
+    try {
+      byte[] rowKey = queueIdToRowKey(queueId);
+      Get getOffset = new Get(rowKey);
+      getOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename));
+      Result result = getResultIfOwner(getOffset);
+      if (result == null || !result.containsColumn(CF_QUEUE, 
Bytes.toBytes(filename))) {
+        throw new ReplicationException("Could not read empty result while 
getting log position " +
+          "queueId=" + queueId + ", filename=" + filename);
+      }
+      return Bytes.toLong(result.getValue(CF_QUEUE, Bytes.toBytes(filename)));
+    } catch (IOException e) {
+      throw new ReplicationException("Could not get position in log for 
queueId=" + queueId +
+        ", filename=" + filename);
+    }
+  }
+
+  @Override
+  public void removeAllQueues() {
+    List<String> myQueueIds = getAllQueues();
+    for (String queueId : myQueueIds) {
+      removeQueue(queueId);
+    }
+  }
+
+  @Override
+  public List<String> getLogsInQueue(String queueId) {
+    byte[] rowKey = queueIdToRowKey(queueId);
+    return getLogsInQueueAndCheckOwnership(rowKey);
+  }
+
+  @Override
+  public List<String> getAllQueues() {
+    return getAllQueues(serverName);
+  }
+
+  @Override
+  public Map<String, Set<String>> claimQueues(String regionserver) {
+    Map<String, Set<String>> queues = new HashMap<>();
+    if (isThisOurRegionServer(regionserver)) {
+      return queues;
+    }
+    ResultScanner queuesToClaim = null;
+    try {
+      queuesToClaim = getAllQueuesScanner(regionserver);
+      for (Result queue : queuesToClaim) {
+        if (attemptToClaimQueue(queue, regionserver)) {
+          String rowKey = Bytes.toString(queue.getRow());
+          ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(rowKey);
+          if (replicationState.peerExists(replicationQueueInfo.getPeerId())) {
+            Set<String> sortedLogs = new HashSet<String>();
+            List<String> logs = getLogsInQueue(queue.getRow());
+            for (String log : logs) {
+              sortedLogs.add(log);
+            }
+            queues.put(rowKey, sortedLogs);
+            LOG.info(serverName + " has claimed queue " + rowKey + " from " + 
regionserver);
+          } else {
+            // Delete orphaned queues
+            removeQueue(Bytes.toString(queue.getRow()));
+            LOG.info(serverName + " has deleted abandoned queue " + rowKey + " 
from " +
+              regionserver);
+          }
+        }
+      }
+    } catch (IOException | KeeperException e) {
+      String errMsg = "Failed claiming queues for regionserver=" + 
regionserver;
+      abortable.abort(errMsg, e);
+      queues.clear();
+    } finally {
+      if (queuesToClaim != null) {
+        queuesToClaim.close();
+      }
+    }
+    return queues;
+  }
+
+  /**
+   * Get the QueueIds belonging to the named server from the 
ReplicationTableBase
+   *
+   * @param server name of the server
+   * @return a ResultScanner over the QueueIds belonging to the server
+   * @throws IOException
+   */
+  private ResultScanner getAllQueuesScanner(String server) throws IOException {
+    Scan scan = new Scan();
+    SingleColumnValueFilter filterMyQueues = new 
SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER,
+      CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server));
+    scan.setFilter(filterMyQueues);
+    scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
+    scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY);
+    ResultScanner results = replicationTable.getScanner(scan);
+    return results;
+  }
+
+  @Override
+  public boolean isThisOurRegionServer(String regionserver) {
+    return this.serverName.equals(regionserver);
+  }
+
+  @Override
+  public void addPeerToHFileRefs(String peerId) throws ReplicationException {
+    // TODO
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public void removePeerFromHFileRefs(String peerId) {
+    // TODO
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public void addHFileRefs(String peerId, List<String> files) throws 
ReplicationException {
+    // TODO
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public void removeHFileRefs(String peerId, List<String> files) {
+    // TODO
+    throw new NotImplementedException();
+  }
+
+  private List<String> getLogsInQueueAndCheckOwnership(byte[] rowKey) {
+    String errMsg = "Failed getting logs in queue queueId=" + 
Bytes.toString(rowKey);
+    List<String> logs = new ArrayList<String>();
+    try {
+      Get getQueue = new Get(rowKey);
+      Result queue = getResultIfOwner(getQueue);
+      if (queue == null || queue.isEmpty()) {
+        String errMsgLostOwnership = "Failed getting logs for queue queueId=" +
+          Bytes.toString(rowKey) + " because the queue was missing or we lost 
ownership";
+        abortable.abort(errMsg, new ReplicationException(errMsgLostOwnership));
+        return null;
+      }
+      Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE);
+      for(byte[] cQualifier : familyMap.keySet()) {
+        if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || 
Arrays.equals(cQualifier,
+            COL_QUEUE_OWNER_HISTORY)) {
+          continue;
+        }
+        logs.add(Bytes.toString(cQualifier));
+      }
+    } catch (IOException e) {
+      abortable.abort(errMsg, e);
+      return null;
+    }
+    return logs;
+  }
+
+  private String buildQueueRowKey(String queueId) {
+    return buildQueueRowKey(serverName, queueId);
+  }
+
+  /**
+   * Convenience method that gets the row key of the queue specified by queueId
+   * @param queueId queueId of a queue in this server
+   * @return the row key of the queue in the Replication Table
+   */
+  private byte[] queueIdToRowKey(String queueId) {
+    return queueIdToRowKey(serverName, queueId);
+  }
+
+  /**
+   * See safeQueueUpdate(RowMutations mutate)
+   *
+   * @param put Row mutation to perform on the queue
+   */
+  private void safeQueueUpdate(Put put) throws ReplicationException, 
IOException {
+    RowMutations mutations = new RowMutations(put.getRow());
+    mutations.add(put);
+    safeQueueUpdate(mutations);
+  }
+
+  /**
+   * See safeQueueUpdate(RowMutations mutate)
+   *
+   * @param delete Row mutation to perform on the queue
+   */
+  private void safeQueueUpdate(Delete delete) throws ReplicationException,
+    IOException{
+    RowMutations mutations = new RowMutations(delete.getRow());
+    mutations.add(delete);
+    safeQueueUpdate(mutations);
+  }
+
+  /**
+   * Attempt to mutate a given queue in the Replication Table with a 
checkAndPut on the OWNER column
+   * of the queue. Abort the server if this checkAndPut fails: which means we 
have somehow lost
+   * ownership of the column or an IO Exception has occurred during the 
transaction.
+   *
+   * @param mutate Mutation to perform on a given queue
+   */
+  private void safeQueueUpdate(RowMutations mutate) throws 
ReplicationException, IOException{
+    boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), 
CF_QUEUE,
+        COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, serverNameBytes, 
mutate);
+    if (!updateSuccess) {
+      throw new ReplicationException("Failed to update Replication Table 
because we lost queue " +
+        " ownership");
+    }
+  }
+
+  /**
+   * Check if the queue specified by queueId is stored in HBase
+   *
+   * @param queueId Either raw or reclaimed format of the queueId
+   * @return Whether the queue is stored in HBase
+   * @throws IOException
+   */
+  private boolean checkQueueExists(String queueId) throws IOException {
+    byte[] rowKey = queueIdToRowKey(queueId);
+    return replicationTable.exists(new Get(rowKey));
+  }
+
+  /**
+   * Attempt to claim the given queue with a checkAndPut on the OWNER column. 
We check that the
+   * recently killed server is still the OWNER before we claim it.
+   *
+   * @param queue The queue that we are trying to claim
+   * @param originalServer The server that originally owned the queue
+   * @return Whether we successfully claimed the queue
+   * @throws IOException
+   */
+  private boolean attemptToClaimQueue (Result queue, String originalServer) 
throws IOException{
+    Put putQueueNameAndHistory = new Put(queue.getRow());
+    putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER, 
Bytes.toBytes(serverName));
+    String newOwnerHistory = 
buildClaimedQueueHistory(Bytes.toString(queue.getValue(CF_QUEUE,
+      COL_QUEUE_OWNER_HISTORY)), originalServer);
+    putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY,
+        Bytes.toBytes(newOwnerHistory));
+    RowMutations claimAndRenameQueue = new RowMutations(queue.getRow());
+    claimAndRenameQueue.add(putQueueNameAndHistory);
+    // Attempt to claim ownership for this queue by checking if the current 
OWNER is the original
+    // server. If it is not then another RS has already claimed it. If it is 
we set ourselves as the
+    // new owner and update the queue's history
+    boolean success = replicationTable.checkAndMutate(queue.getRow(), 
CF_QUEUE, COL_QUEUE_OWNER,
+      CompareFilter.CompareOp.EQUAL, Bytes.toBytes(originalServer), 
claimAndRenameQueue);
+    return success;
+  }
+
+  /**
+   * Attempts to run a Get on some queue. Will only return a non-null result 
if we currently own
+   * the queue.
+   *
+   * @param get The Get that we want to query
+   * @return The result of the Get if this server is the owner of the queue. 
Else it returns null.
+   * @throws IOException
+   */
+  private Result getResultIfOwner(Get get) throws IOException {
+    Scan scan = new Scan(get);
+    // Check if the Get currently contains all columns or only specific columns
+    if (scan.getFamilyMap().size() > 0) {
+      // Add the OWNER column if the scan is already only over specific columns
+      scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
+    }
+    scan.setMaxResultSize(1);
+    SingleColumnValueFilter checkOwner = new SingleColumnValueFilter(CF_QUEUE, 
COL_QUEUE_OWNER,
+      CompareFilter.CompareOp.EQUAL, serverNameBytes);
+    scan.setFilter(checkOwner);
+    ResultScanner scanner = null;
+    try {
+      scanner = replicationTable.getScanner(scan);
+      Result result = scanner.next();
+      return (result == null || result.isEmpty()) ? null : result;
+    } finally {
+      if (scanner != null) {
+        scanner.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java
index 5df9379..a7b2f26 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java
@@ -34,6 +34,7 @@ import 
org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 
@@ -141,15 +142,16 @@ public class ReplicationHFileCleaner extends 
BaseHFileCleanerDelegate {
     super.setConf(conf);
     try {
       initReplicationQueuesClient(conf, zk);
-    } catch (IOException e) {
+    } catch (Exception e) {
       LOG.error("Error while configuring " + this.getClass().getName(), e);
     }
   }
 
   private void initReplicationQueuesClient(Configuration conf, 
ZooKeeperWatcher zk)
-      throws ZooKeeperConnectionException, IOException {
+      throws Exception {
     this.zkw = zk;
-    this.rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, new 
WarnOnlyAbortable());
+    this.rqc = ReplicationFactory.getReplicationQueuesClient(new 
ReplicationQueuesClientArguments(
+        conf, new WarnOnlyAbortable(), zkw));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 9ecba11..9e724db 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -31,6 +31,7 @@ import 
org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import java.io.IOException;
 import java.util.Collections;
@@ -67,7 +68,7 @@ public class ReplicationLogCleaner extends 
BaseLogCleanerDelegate {
     try {
       // The concurrently created new WALs may not be included in the return 
list,
       // but they won't be deleted because they're not in the checking set.
-      wals = loadWALsFromQueues();
+      wals = replicationQueues.getAllWALs();
     } catch (KeeperException e) {
       LOG.warn("Failed to read zookeeper, skipping checking deletable files");
       return Collections.emptyList();
@@ -88,43 +89,6 @@ public class ReplicationLogCleaner extends 
BaseLogCleanerDelegate {
       }});
   }
 
-  /**
-   * Load all wals in all replication queues from ZK. This method guarantees 
to return a
-   * snapshot which contains all WALs in the zookeeper at the start of this 
call even there
-   * is concurrent queue failover. However, some newly created WALs during the 
call may
-   * not be included.
-   */
-  private Set<String> loadWALsFromQueues() throws KeeperException {
-    for (int retry = 0; ; retry++) {
-      int v0 = replicationQueues.getQueuesZNodeCversion();
-      List<String> rss = replicationQueues.getListOfReplicators();
-      if (rss == null) {
-        LOG.debug("Didn't find any region server that replicates, won't 
prevent any deletions.");
-        return ImmutableSet.of();
-      }
-      Set<String> wals = Sets.newHashSet();
-      for (String rs : rss) {
-        List<String> listOfPeers = replicationQueues.getAllQueues(rs);
-        // if rs just died, this will be null
-        if (listOfPeers == null) {
-          continue;
-        }
-        for (String id : listOfPeers) {
-          List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);
-          if (peersWals != null) {
-            wals.addAll(peersWals);
-          }
-        }
-      }
-      int v1 = replicationQueues.getQueuesZNodeCversion();
-      if (v0 == v1) {
-        return wals;
-      }
-      LOG.info(String.format("Replication queue node cversion changed from %d 
to %d, retry = %d",
-          v0, v1, retry));
-    }
-  }
-
   @Override
   public void setConf(Configuration config) {
     // If replication is disabled, keep all members null
@@ -148,10 +112,10 @@ public class ReplicationLogCleaner extends 
BaseLogCleanerDelegate {
     super.setConf(conf);
     try {
       this.zkw = zk;
-      this.replicationQueues = 
ReplicationFactory.getReplicationQueuesClient(zkw, conf,
-          new WarnOnlyAbortable());
+      this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(
+          new ReplicationQueuesClientArguments(conf, new WarnOnlyAbortable(), 
zkw));
       this.replicationQueues.init();
-    } catch (ReplicationException e) {
+    } catch (Exception e) {
       LOG.error("Error while configuring " + this.getClass().getName(), e);
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
index e472558..8d66c8f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import org.apache.hadoop.hbase.util.HBaseFsck;
 import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
@@ -67,13 +68,14 @@ public class ReplicationChecker {
     try {
       this.zkw = zkw;
       this.errorReporter = errorReporter;
-      this.queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, 
conf, connection);
+      this.queuesClient = ReplicationFactory.getReplicationQueuesClient(
+          new ReplicationQueuesClientArguments(conf, connection, zkw));
       this.queuesClient.init();
       this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, 
conf, this.queuesClient,
         connection);
       this.replicationPeers.init();
       this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, connection);
-    } catch (ReplicationException e) {
+    } catch (Exception e) {
       throw new IOException("failed to construct ReplicationChecker", e);
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 18950a2..eecaae1 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -167,7 +168,7 @@ public class TestLogsCleaner {
     ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
     cleaner.setConf(conf);
 
-    ReplicationQueuesClient rqcMock = 
Mockito.mock(ReplicationQueuesClient.class);
+    ReplicationQueuesClientZKImpl rqcMock = 
Mockito.mock(ReplicationQueuesClientZKImpl.class);
     Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4);
 
     Field rqc = 
ReplicationLogCleaner.class.getDeclaredField("replicationQueues");

Reply via email to