Repository: hbase
Updated Branches:
  refs/heads/branch-2 205016ca7 -> e2ce252b5


http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
new file mode 100644
index 0000000..4733706
--- /dev/null
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -0,0 +1,407 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This class provides an implementation of the
+ * interface using ZooKeeper. The
+ * base znode that this class works at is the myQueuesZnode. The myQueuesZnode 
contains a list of
+ * all outstanding WAL files on this region server that need to be replicated. 
The myQueuesZnode is
+ * the regionserver name (a concatenation of the region server’s hostname, 
client port and start
+ * code). For example:
+ *
+ * /hbase/replication/rs/hostname.example.org,6020,1234
+ *
+ * Within this znode, the region server maintains a set of WAL replication 
queues. These queues are
+ * represented by child znodes named using there give queue id. For example:
+ *
+ * /hbase/replication/rs/hostname.example.org,6020,1234/1
+ * /hbase/replication/rs/hostname.example.org,6020,1234/2
+ *
+ * Each queue has one child znode for every WAL that still needs to be 
replicated. The value of
+ * these WAL child znodes is the latest position that has been replicated. 
This position is updated
+ * every time a WAL entry is replicated. For example:
+ *
+ * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 
[VALUE: 254]
+ */
+@InterfaceAudience.Private
+public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements 
ReplicationQueues {
+
+  /** Znode containing all replication queues for this region server. */
+  private String myQueuesZnode;
+
+  private static final Log LOG = 
LogFactory.getLog(ReplicationQueuesZKImpl.class);
+
+  public ReplicationQueuesZKImpl(ReplicationQueuesArguments args) {
+    this(args.getZk(), args.getConf(), args.getAbortable());
+  }
+
+  public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf,
+      Abortable abortable) {
+    super(zk, conf, abortable);
+  }
+
+  @Override
+  public void init(String serverName) throws ReplicationException {
+    this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName);
+    try {
+      if (ZKUtil.checkExists(this.zookeeper, this.myQueuesZnode) < 0) {
+        ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
+      }
+    } catch (KeeperException e) {
+      throw new ReplicationException("Could not initialize replication 
queues.", e);
+    }
+    if (conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) {
+      try {
+        if (ZKUtil.checkExists(this.zookeeper, this.hfileRefsZNode) < 0) {
+          ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
+        }
+      } catch (KeeperException e) {
+        throw new ReplicationException("Could not initialize hfile references 
replication queue.",
+            e);
+      }
+    }
+  }
+
+  @Override
+  public void removeQueue(String queueId) {
+    try {
+      ZKUtil.deleteNodeRecursively(this.zookeeper, 
ZKUtil.joinZNode(this.myQueuesZnode, queueId));
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", 
e);
+    }
+  }
+
+  @Override
+  public void addLog(String queueId, String filename) throws 
ReplicationException {
+    String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
+    znode = ZKUtil.joinZNode(znode, filename);
+    try {
+      ZKUtil.createWithParents(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      throw new ReplicationException(
+          "Could not add log because znode could not be created. queueId=" + 
queueId
+              + ", filename=" + filename);
+    }
+  }
+
+  @Override
+  public void removeLog(String queueId, String filename) {
+    try {
+      String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
+      znode = ZKUtil.joinZNode(znode, filename);
+      ZKUtil.deleteNode(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to remove wal from queue (queueId=" + 
queueId + ", filename="
+          + filename + ")", e);
+    }
+  }
+
+  @Override
+  public void setLogPosition(String queueId, String filename, long position) {
+    try {
+      String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
+      znode = ZKUtil.joinZNode(znode, filename);
+      // Why serialize String of Long and not Long as bytes?
+      ZKUtil.setData(this.zookeeper, znode, 
ZKUtil.positionToByteArray(position));
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to write replication wal position 
(filename=" + filename
+          + ", position=" + position + ")", e);
+    }
+  }
+
+  @Override
+  public long getLogPosition(String queueId, String filename) throws 
ReplicationException {
+    String clusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
+    String znode = ZKUtil.joinZNode(clusterZnode, filename);
+    byte[] bytes = null;
+    try {
+      bytes = ZKUtil.getData(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      throw new ReplicationException("Internal Error: could not get position 
in log for queueId="
+          + queueId + ", filename=" + filename, e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return 0;
+    }
+    try {
+      return ZKUtil.parseWALPositionFrom(bytes);
+    } catch (DeserializationException de) {
+      LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and 
wal=" + filename
+          + " znode content, continuing.");
+    }
+    // if we can not parse the position, start at the beginning of the wal file
+    // again
+    return 0;
+  }
+
+  @Override
+  public boolean isThisOurRegionServer(String regionserver) {
+    return ZKUtil.joinZNode(this.queuesZNode, 
regionserver).equals(this.myQueuesZnode);
+  }
+
+  @Override
+  public List<String> getUnClaimedQueueIds(String regionserver) {
+    if (isThisOurRegionServer(regionserver)) {
+      return null;
+    }
+    String rsZnodePath = ZKUtil.joinZNode(this.queuesZNode, regionserver);
+    List<String> queues = null;
+    try {
+      queues = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZnodePath);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to getUnClaimedQueueIds for RS" + 
regionserver, e);
+    }
+    return queues;
+  }
+
+  @Override
+  public Pair<String, SortedSet<String>> claimQueue(String regionserver, 
String queueId) {
+    LOG.info("Atomically moving " + regionserver + "/" + queueId + "'s WALs to 
my queue");
+    return moveQueueUsingMulti(regionserver, queueId);
+  }
+
+  @Override
+  public void removeReplicatorIfQueueIsEmpty(String regionserver) {
+    String rsPath = ZKUtil.joinZNode(this.queuesZNode, regionserver);
+    try {
+      List<String> list = ZKUtil.listChildrenNoWatch(this.zookeeper, rsPath);
+      if (list != null && list.isEmpty()){
+        ZKUtil.deleteNode(this.zookeeper, rsPath);
+      }
+    } catch (KeeperException e) {
+      LOG.warn("Got error while removing replicator", e);
+    }
+  }
+
+  @Override
+  public void removeAllQueues() {
+    try {
+      ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode);
+    } catch (KeeperException e) {
+      // if the znode is already expired, don't bother going further
+      if (e instanceof KeeperException.SessionExpiredException) {
+        return;
+      }
+      this.abortable.abort("Failed to delete replication queues for region 
server: "
+          + this.myQueuesZnode, e);
+    }
+  }
+
+  @Override
+  public List<String> getLogsInQueue(String queueId) {
+    String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get list of wals for queueId=" + 
queueId, e);
+    }
+    return result;
+  }
+
+  @Override
+  public List<String> getAllQueues() {
+    List<String> listOfQueues = null;
+    try {
+      listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, 
this.myQueuesZnode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get a list of queues for region server: "
+          + this.myQueuesZnode, e);
+    }
+    return listOfQueues == null ? new ArrayList<>() : listOfQueues;
+  }
+
+  /**
+   * It "atomically" copies one peer's wals queue from another dead region 
server and returns them
+   * all sorted. The new peer id is equal to the old peer id appended with the 
dead server's znode.
+   * @param znode pertaining to the region server to copy the queues from
+   * @peerId peerId pertaining to the queue need to be copied
+   */
+  private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, 
String peerId) {
+    try {
+      // hbase/replication/rs/deadrs
+      String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
+      List<ZKUtilOp> listOfOps = new ArrayList<>();
+      ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(peerId);
+
+      String newPeerId = peerId + "-" + znode;
+      String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
+      // check the logs queue for the old peer cluster
+      String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
+      List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, 
oldClusterZnode);
+
+      if (!peerExists(replicationQueueInfo.getPeerId())) {
+        LOG.warn("Peer " + replicationQueueInfo.getPeerId() +
+                " didn't exist, will move its queue to avoid the failure of 
multi op");
+        for (String wal : wals) {
+          String oldWalZnode = ZKUtil.joinZNode(oldClusterZnode, wal);
+          listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
+        }
+        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
+        ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
+        return null;
+      }
+
+      SortedSet<String> logQueue = new TreeSet<>();
+      if (wals == null || wals.isEmpty()) {
+        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
+      } else {
+        // create the new cluster znode
+        ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, 
HConstants.EMPTY_BYTE_ARRAY);
+        listOfOps.add(op);
+        // get the offset of the logs and set it to new znodes
+        for (String wal : wals) {
+          String oldWalZnode = ZKUtil.joinZNode(oldClusterZnode, wal);
+          byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalZnode);
+          LOG.debug("Creating " + wal + " with data " + 
Bytes.toString(logOffset));
+          String newLogZnode = ZKUtil.joinZNode(newPeerZnode, wal);
+          listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
+          listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
+          logQueue.add(wal);
+        }
+        // add delete op for peer
+        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
+
+        if (LOG.isTraceEnabled())
+          LOG.trace(" The multi list size is: " + listOfOps.size());
+      }
+      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
+
+      LOG.info("Atomically moved " + znode + "/" + peerId + "'s WALs to my 
queue");
+      return new Pair<>(newPeerId, logQueue);
+    } catch (KeeperException e) {
+      // Multi call failed; it looks like some other regionserver took away 
the logs.
+      LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
+    } catch (InterruptedException e) {
+      LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
+      Thread.currentThread().interrupt();
+    }
+    return null;
+  }
+
+  @Override
+  public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
+      throws ReplicationException {
+    String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+    boolean debugEnabled = LOG.isDebugEnabled();
+    if (debugEnabled) {
+      LOG.debug("Adding hfile references " + pairs + " in queue " + peerZnode);
+    }
+
+    int size = pairs.size();
+    List<ZKUtilOp> listOfOps = new ArrayList<>(size);
+
+    for (int i = 0; i < size; i++) {
+      listOfOps.add(ZKUtilOp.createAndFailSilent(
+        ZKUtil.joinZNode(peerZnode, pairs.get(i).getSecond().getName()),
+        HConstants.EMPTY_BYTE_ARRAY));
+    }
+    if (debugEnabled) {
+      LOG.debug(" The multi list size for adding hfile references in zk for 
node " + peerZnode
+          + " is " + listOfOps.size());
+    }
+    try {
+      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
+    } catch (KeeperException e) {
+      throw new ReplicationException("Failed to create hfile reference znode=" 
+ e.getPath(), e);
+    }
+  }
+
+  @Override
+  public void removeHFileRefs(String peerId, List<String> files) {
+    String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+    boolean debugEnabled = LOG.isDebugEnabled();
+    if (debugEnabled) {
+      LOG.debug("Removing hfile references " + files + " from queue " + 
peerZnode);
+    }
+
+    int size = files.size();
+    List<ZKUtilOp> listOfOps = new ArrayList<>(size);
+
+    for (int i = 0; i < size; i++) {
+      listOfOps.add(ZKUtilOp.deleteNodeFailSilent(ZKUtil.joinZNode(peerZnode, 
files.get(i))));
+    }
+    if (debugEnabled) {
+      LOG.debug(" The multi list size for removing hfile references in zk for 
node " + peerZnode
+          + " is " + listOfOps.size());
+    }
+    try {
+      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
+    } catch (KeeperException e) {
+      LOG.error("Failed to remove hfile reference znode=" + e.getPath(), e);
+    }
+  }
+
+  @Override
+  public void addPeerToHFileRefs(String peerId) throws ReplicationException {
+    String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+    try {
+      if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) {
+        LOG.info("Adding peer " + peerId + " to hfile reference queue.");
+        ZKUtil.createWithParents(this.zookeeper, peerZnode);
+      }
+    } catch (KeeperException e) {
+      throw new ReplicationException("Failed to add peer " + peerId + " to 
hfile reference queue.",
+          e);
+    }
+  }
+
+  @Override
+  public void removePeerFromHFileRefs(String peerId) {
+    final String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+    try {
+      if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Peer " + peerZnode + " not found in hfile reference 
queue.");
+        }
+        return;
+      } else {
+        LOG.info("Removing peer " + peerZnode + " from hfile reference 
queue.");
+        ZKUtil.deleteNodeRecursively(this.zookeeper, peerZnode);
+      }
+    } catch (KeeperException e) {
+      LOG.error("Ignoring the exception to remove peer " + peerId + " from 
hfile reference queue.",
+        e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
new file mode 100644
index 0000000..47b780d
--- /dev/null
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
@@ -0,0 +1,155 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import 
org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+
+/**
+ * This is a base class for maintaining replication state in zookeeper.
+ */
+@InterfaceAudience.Private
+public abstract class ReplicationStateZKBase {
+
+  /**
+   * The name of the znode that contains the replication status of a remote 
slave (i.e. peer)
+   * cluster.
+   */
+  protected final String peerStateNodeName;
+  /** The name of the base znode that contains all replication state. */
+  protected final String replicationZNode;
+  /** The name of the znode that contains a list of all remote slave (i.e. 
peer) clusters. */
+  protected final String peersZNode;
+  /** The name of the znode that contains all replication queues */
+  protected final String queuesZNode;
+  /** The name of the znode that contains queues of hfile references to be 
replicated */
+  protected final String hfileRefsZNode;
+  /** The cluster key of the local cluster */
+  protected final String ourClusterKey;
+  /** The name of the znode that contains tableCFs */
+  protected final String tableCFsNodeName;
+
+  protected final ZooKeeperWatcher zookeeper;
+  protected final Configuration conf;
+  protected final Abortable abortable;
+
+  // Public for testing
+  public static final byte[] ENABLED_ZNODE_BYTES =
+      toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
+  public static final byte[] DISABLED_ZNODE_BYTES =
+      toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
+  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
+      "zookeeper.znode.replication.hfile.refs";
+  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = 
"hfile-refs";
+
+  public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf,
+      Abortable abortable) {
+    this.zookeeper = zookeeper;
+    this.conf = conf;
+    this.abortable = abortable;
+
+    String replicationZNodeName = conf.get("zookeeper.znode.replication", 
"replication");
+    String peersZNodeName = conf.get("zookeeper.znode.replication.peers", 
"peers");
+    String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
+    String hfileRefsZNodeName = 
conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
+      ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
+    this.peerStateNodeName = 
conf.get("zookeeper.znode.replication.peers.state", "peer-state");
+    this.tableCFsNodeName = 
conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
+    this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf);
+    this.replicationZNode = 
ZKUtil.joinZNode(this.zookeeper.znodePaths.baseZNode,
+      replicationZNodeName);
+    this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
+    this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName);
+    this.hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, 
hfileRefsZNodeName);
+  }
+
+  public List<String> getListOfReplicators() {
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.queuesZNode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get list of replicators", e);
+    }
+    return result;
+  }
+
+  /**
+   * @param state
+   * @return Serialized protobuf of <code>state</code> with pb magic prefix 
prepended suitable for
+   *         use as content of a peer-state znode under a peer cluster id as in
+   *         /hbase/replication/peers/PEER_ID/peer-state.
+   */
+  protected static byte[] toByteArray(final 
ReplicationProtos.ReplicationState.State state) {
+    ReplicationProtos.ReplicationState msg =
+        
ReplicationProtos.ReplicationState.newBuilder().setState(state).build();
+    // There is no toByteArray on this pb Message?
+    // 32 bytes is default which seems fair enough here.
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      CodedOutputStream cos = CodedOutputStream.newInstance(baos, 16);
+      msg.writeTo(cos);
+      cos.flush();
+      baos.flush();
+      return ProtobufUtil.prependPBMagic(baos.toByteArray());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected boolean peerExists(String id) throws KeeperException {
+    return ZKUtil.checkExists(this.zookeeper, 
ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
+  }
+
+  /**
+   * Determine if a ZK path points to a peer node.
+   * @param path path to be checked
+   * @return true if the path points to a peer node, otherwise false
+   */
+  protected boolean isPeerPath(String path) {
+    return path.split("/").length == peersZNode.split("/").length + 1;
+  }
+
+  @VisibleForTesting
+  protected String getTableCFsNode(String id) {
+    return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, 
this.tableCFsNodeName));
+  }
+
+  @VisibleForTesting
+  protected String getPeerStateNode(String id) {
+    return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, 
this.peerStateNodeName));
+  }
+  @VisibleForTesting
+  protected String getPeerNode(String id) {
+    return ZKUtil.joinZNode(this.peersZNode, id);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
new file mode 100644
index 0000000..fa1d2d6
--- /dev/null
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
@@ -0,0 +1,441 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hbase.replication;
+
+import 
org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/*
+ * Abstract class that provides an interface to the Replication Table. Which 
is currently
+ * being used for WAL offset tracking.
+ * The basic schema of this table will store each individual queue as a
+ * seperate row. The row key will be a unique identifier of the creating 
server's name and the
+ * queueId. Each queue must have the following two columns:
+ *  COL_QUEUE_OWNER: tracks which server is currently responsible for tracking 
the queue
+ *  COL_QUEUE_OWNER_HISTORY: a "|" delimited list of the previous server's 
that have owned this
+ *    queue. The most recent previous owner is the leftmost entry.
+ * They will also have columns mapping [WAL filename : offset]
+ * The most flexible method of interacting with the Replication Table is by 
calling
+ * getOrBlockOnReplicationTable() which will return a new copy of the 
Replication Table. It is up
+ * to the caller to close the returned table.
+ */
+@InterfaceAudience.Private
+abstract class ReplicationTableBase {
+
+  /** Name of the HBase Table used for tracking replication*/
+  public static final TableName REPLICATION_TABLE_NAME =
+    TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, 
"replication");
+
+  // Column family and column names for Queues in the Replication Table
+  public static final byte[] CF_QUEUE = Bytes.toBytes("q");
+  public static final byte[] COL_QUEUE_OWNER = Bytes.toBytes("o");
+  public static final byte[] COL_QUEUE_OWNER_HISTORY = Bytes.toBytes("h");
+
+  // Column Descriptor for the Replication Table
+  private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR =
+    new HColumnDescriptor(CF_QUEUE).setMaxVersions(1)
+      .setInMemory(true)
+      .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
+        // TODO: Figure out which bloom filter to use
+      .setBloomFilterType(BloomType.NONE);
+
+  // The value used to delimit the queueId and server name inside of a queue's 
row key. Currently a
+  // hyphen, because it is guaranteed that queueId (which is a cluster id) 
cannot contain hyphens.
+  // See HBASE-11394.
+  public static final String ROW_KEY_DELIMITER = "-";
+
+  // The value used to delimit server names in the queue history list
+  public static final String QUEUE_HISTORY_DELIMITER = "|";
+
+  /*
+  * Make sure that HBase table operations for replication have a high number 
of retries. This is
+  * because the server is aborted if any HBase table operation fails. Each RPC 
will be attempted
+  * 3600 times before exiting. This provides each operation with 2 hours of 
retries
+  * before the server is aborted.
+  */
+  private static final int CLIENT_RETRIES = 3600;
+  private static final int RPC_TIMEOUT = 2000;
+  private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT;
+
+  // We only need a single thread to initialize the Replication Table
+  private static final int NUM_INITIALIZE_WORKERS = 1;
+
+  protected final Configuration conf;
+  protected final Abortable abortable;
+  private final Connection connection;
+  private final Executor executor;
+  private volatile CountDownLatch replicationTableInitialized;
+
+  public ReplicationTableBase(Configuration conf, Abortable abort) throws 
IOException {
+    this.conf = new Configuration(conf);
+    this.abortable = abort;
+    decorateConf();
+    this.connection = ConnectionFactory.createConnection(this.conf);
+    this.executor = setUpExecutor();
+    this.replicationTableInitialized = new CountDownLatch(1);
+    createReplicationTableInBackground();
+  }
+
+  /**
+   * Modify the connection's config so that operations run on the Replication 
Table have longer and
+   * a larger number of retries
+   */
+  private void decorateConf() {
+    this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES);
+  }
+
+  /**
+   * Sets up the thread pool executor used to build the Replication Table in 
the background
+   * @return the configured executor
+   */
+  private Executor setUpExecutor() {
+    ThreadPoolExecutor tempExecutor = new 
ThreadPoolExecutor(NUM_INITIALIZE_WORKERS,
+        NUM_INITIALIZE_WORKERS, 100, TimeUnit.MILLISECONDS, new 
LinkedBlockingQueue<>());
+    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
+    tfb.setNameFormat("ReplicationTableExecutor-%d");
+    tfb.setDaemon(true);
+    tempExecutor.setThreadFactory(tfb.build());
+    return tempExecutor;
+  }
+
+  /**
+   * Get whether the Replication Table has been successfully initialized yet
+   * @return whether the Replication Table is initialized
+   */
+  public boolean getInitializationStatus() {
+    return replicationTableInitialized.getCount() == 0;
+  }
+
+  /**
+   * Increases the RPC and operations timeouts for the Replication Table
+   */
+  private Table setReplicationTableTimeOuts(Table replicationTable) {
+    replicationTable.setRpcTimeout(RPC_TIMEOUT);
+    replicationTable.setOperationTimeout(OPERATION_TIMEOUT);
+    return replicationTable;
+  }
+
+  /**
+   * Build the row key for the given queueId. This will uniquely identify it 
from all other queues
+   * in the cluster.
+   * @param serverName The owner of the queue
+   * @param queueId String identifier of the queue
+   * @return String representation of the queue's row key
+   */
+  protected String buildQueueRowKey(String serverName, String queueId) {
+    return queueId + ROW_KEY_DELIMITER + serverName;
+  }
+
+  /**
+   * Parse the original queueId from a row key
+   * @param rowKey String representation of a queue's row key
+   * @return the original queueId
+   */
+  protected String getRawQueueIdFromRowKey(String rowKey) {
+    return rowKey.split(ROW_KEY_DELIMITER)[0];
+  }
+
+  /**
+   * Returns a queue's row key given either its raw or reclaimed queueId
+   *
+   * @param queueId queueId of the queue
+   * @return byte representation of the queue's row key
+   */
+  protected byte[] queueIdToRowKey(String serverName, String queueId) {
+    // Cluster id's are guaranteed to have no hyphens, so if the passed in 
queueId has no hyphen
+    // then this is not a reclaimed queue.
+    if (!queueId.contains(ROW_KEY_DELIMITER)) {
+      return Bytes.toBytes(buildQueueRowKey(serverName, queueId));
+      // If the queueId contained some hyphen it was reclaimed. In this case, 
the queueId is the
+      // queue's row key
+    } else {
+      return Bytes.toBytes(queueId);
+    }
+  }
+
+  /**
+   * Creates a "|" delimited record of the queue's past region server owners.
+   *
+   * @param originalHistory the queue's original owner history
+   * @param oldServer the name of the server that used to own the queue
+   * @return the queue's new owner history
+   */
+  protected String buildClaimedQueueHistory(String originalHistory, String 
oldServer) {
+    return oldServer + QUEUE_HISTORY_DELIMITER + originalHistory;
+  }
+
+  /**
+   * Get a list of all region servers that have outstanding replication 
queues. These servers could
+   * be alive, dead or from a previous run of the cluster.
+   * @return a list of server names
+   */
+  protected List<String> getListOfReplicators() {
+    // scan all of the queues and return a list of all unique OWNER values
+    Set<String> peerServers = new HashSet<>();
+    ResultScanner allQueuesInCluster = null;
+    try (Table replicationTable = getOrBlockOnReplicationTable()){
+      Scan scan = new Scan();
+      scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
+      allQueuesInCluster = replicationTable.getScanner(scan);
+      for (Result queue : allQueuesInCluster) {
+        peerServers.add(Bytes.toString(queue.getValue(CF_QUEUE, 
COL_QUEUE_OWNER)));
+      }
+    } catch (IOException e) {
+      String errMsg = "Failed getting list of replicators";
+      abortable.abort(errMsg, e);
+    } finally {
+      if (allQueuesInCluster != null) {
+        allQueuesInCluster.close();
+      }
+    }
+    return new ArrayList<>(peerServers);
+  }
+
+  protected List<String> getAllQueues(String serverName) {
+    List<String> allQueues = new ArrayList<>();
+    ResultScanner queueScanner = null;
+    try {
+      queueScanner = getQueuesBelongingToServer(serverName);
+      for (Result queue : queueScanner) {
+        String rowKey =  Bytes.toString(queue.getRow());
+        // If the queue does not have a Owner History, then we must be its 
original owner. So we
+        // want to return its queueId in raw form
+        if (Bytes.toString(queue.getValue(CF_QUEUE, 
COL_QUEUE_OWNER_HISTORY)).length() == 0) {
+          allQueues.add(getRawQueueIdFromRowKey(rowKey));
+        } else {
+          allQueues.add(rowKey);
+        }
+      }
+      return allQueues;
+    } catch (IOException e) {
+      String errMsg = "Failed getting list of all replication queues for 
serverName=" + serverName;
+      abortable.abort(errMsg, e);
+      return null;
+    } finally {
+      if (queueScanner != null) {
+        queueScanner.close();
+      }
+    }
+  }
+
+  protected List<String> getLogsInQueue(String serverName, String queueId) {
+    String rowKey = queueId;
+    if (!queueId.contains(ROW_KEY_DELIMITER)) {
+      rowKey = buildQueueRowKey(serverName, queueId);
+    }
+    return getLogsInQueue(Bytes.toBytes(rowKey));
+  }
+
+  protected List<String> getLogsInQueue(byte[] rowKey) {
+    String errMsg = "Failed getting logs in queue queueId=" + 
Bytes.toString(rowKey);
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
+      Get getQueue = new Get(rowKey);
+      Result queue = replicationTable.get(getQueue);
+      if (queue == null || queue.isEmpty()) {
+        abortable.abort(errMsg, new ReplicationException(errMsg));
+        return null;
+      }
+      return readWALsFromResult(queue);
+    } catch (IOException e) {
+      abortable.abort(errMsg, e);
+      return null;
+    }
+  }
+
+  /**
+   * Read all of the WAL's from a queue into a list
+   *
+   * @param queue HBase query result containing the queue
+   * @return a list of all the WAL filenames
+   */
+  protected List<String> readWALsFromResult(Result queue) {
+    List<String> wals = new ArrayList<>();
+    Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE);
+    for (byte[] cQualifier : familyMap.keySet()) {
+      // Ignore the meta data fields of the queue
+      if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || 
Arrays.equals(cQualifier,
+          COL_QUEUE_OWNER_HISTORY)) {
+        continue;
+      }
+      wals.add(Bytes.toString(cQualifier));
+    }
+    return wals;
+  }
+
+  /**
+   * Get the queue id's and meta data (Owner and History) for the queues 
belonging to the named
+   * server
+   *
+   * @param server name of the server
+   * @return a ResultScanner over the QueueIds belonging to the server
+   * @throws IOException
+   */
+  protected ResultScanner getQueuesBelongingToServer(String server) throws 
IOException {
+    Scan scan = new Scan();
+    SingleColumnValueFilter filterMyQueues = new 
SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER,
+      CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server));
+    scan.setFilter(filterMyQueues);
+    scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
+    scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY);
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
+      ResultScanner results = replicationTable.getScanner(scan);
+      return results;
+    }
+  }
+
+  /**
+   * Attempts to acquire the Replication Table. This operation will block 
until it is assigned by
+   * the CreateReplicationWorker thread. It is up to the caller of this method 
to close the
+   * returned Table
+   * @return the Replication Table when it is created
+   * @throws IOException
+   */
+  protected Table getOrBlockOnReplicationTable() throws IOException {
+    // Sleep until the Replication Table becomes available
+    try {
+      replicationTableInitialized.await();
+    } catch (InterruptedException e) {
+      String errMsg = "Unable to acquire the Replication Table due to 
InterruptedException: " +
+          e.getMessage();
+      throw new InterruptedIOException(errMsg);
+    }
+    return getAndSetUpReplicationTable();
+  }
+
+  /**
+   * Creates a new copy of the Replication Table and sets up the proper Table 
time outs for it
+   *
+   * @return the Replication Table
+   * @throws IOException
+   */
+  private Table getAndSetUpReplicationTable() throws IOException {
+    Table replicationTable = connection.getTable(REPLICATION_TABLE_NAME);
+    setReplicationTableTimeOuts(replicationTable);
+    return replicationTable;
+  }
+
+  /**
+   * Builds the Replication Table in a background thread. Any method accessing 
the Replication Table
+   * should do so through getOrBlockOnReplicationTable()
+   *
+   * @return the Replication Table
+   * @throws IOException if the Replication Table takes too long to build
+   */
+  private void createReplicationTableInBackground() throws IOException {
+    executor.execute(new CreateReplicationTableWorker());
+  }
+
+  /**
+   * Attempts to build the Replication Table. Will continue blocking until we 
have a valid
+   * Table for the Replication Table.
+   */
+  private class CreateReplicationTableWorker implements Runnable {
+
+    private Admin admin;
+
+    @Override
+    public void run() {
+      try {
+        admin = connection.getAdmin();
+        if (!replicationTableExists()) {
+          createReplicationTable();
+        }
+        int maxRetries = 
conf.getInt("hbase.replication.queues.createtable.retries.number",
+            CLIENT_RETRIES);
+        RetryCounterFactory counterFactory = new 
RetryCounterFactory(maxRetries, RPC_TIMEOUT);
+        RetryCounter retryCounter = counterFactory.create();
+        while (!replicationTableExists()) {
+          retryCounter.sleepUntilNextRetry();
+          if (!retryCounter.shouldRetry()) {
+            throw new IOException("Unable to acquire the Replication Table");
+          }
+        }
+        replicationTableInitialized.countDown();
+      } catch (IOException | InterruptedException e) {
+        abortable.abort("Failed building Replication Table", e);
+      }
+    }
+
+    /**
+     * Create the replication table with the provided HColumnDescriptor 
REPLICATION_COL_DESCRIPTOR
+     * in TableBasedReplicationQueuesImpl
+     *
+     * @throws IOException
+     */
+    private void createReplicationTable() throws IOException {
+      HTableDescriptor replicationTableDescriptor = new 
HTableDescriptor(REPLICATION_TABLE_NAME);
+      replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR);
+      try {
+        admin.createTable(replicationTableDescriptor);
+      } catch (TableExistsException e) {
+        // In this case we can just continue as normal
+      }
+    }
+
+    /**
+     * Checks whether the Replication Table exists yet
+     *
+     * @return whether the Replication Table exists
+     * @throws IOException
+     */
+    private boolean replicationTableExists() {
+      try {
+        return admin.tableExists(REPLICATION_TABLE_NAME);
+      } catch (IOException e) {
+        return false;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
new file mode 100644
index 0000000..51d7473
--- /dev/null
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This is the interface for a Replication Tracker. A replication tracker 
provides the facility to
+ * subscribe and track events that reflect a change in replication state. 
These events are used by
+ * the ReplicationSourceManager to coordinate replication tasks such as 
addition/deletion of queues
+ * and queue failover. These events are defined in the ReplicationListener 
interface. If a class
+ * would like to listen to replication events it must implement the 
ReplicationListener interface
+ * and register itself with a Replication Tracker.
+ */
+@InterfaceAudience.Private
+public interface ReplicationTracker {
+
+  /**
+   * Register a replication listener to receive replication events.
+   * @param listener
+   */
+  public void registerListener(ReplicationListener listener);
+
+  public void removeListener(ReplicationListener listener);
+
+  /**
+   * Returns a list of other live region servers in the cluster.
+   * @return List of region servers.
+   */
+  public List<String> getListOfRegionServers();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
new file mode 100644
index 0000000..9865d83
--- /dev/null
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
@@ -0,0 +1,250 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This class is a ZooKeeper implementation of the ReplicationTracker 
interface. This class is
+ * responsible for handling replication events that are defined in the 
ReplicationListener
+ * interface.
+ */
+@InterfaceAudience.Private
+public class ReplicationTrackerZKImpl extends ReplicationStateZKBase 
implements ReplicationTracker {
+
+  private static final Log LOG = 
LogFactory.getLog(ReplicationTrackerZKImpl.class);
+  // All about stopping
+  private final Stoppable stopper;
+  // listeners to be notified
+  private final List<ReplicationListener> listeners = new 
CopyOnWriteArrayList<>();
+  // List of all the other region servers in this cluster
+  private final ArrayList<String> otherRegionServers = new ArrayList<>();
+  private final ReplicationPeers replicationPeers;
+
+  public ReplicationTrackerZKImpl(ZooKeeperWatcher zookeeper,
+      final ReplicationPeers replicationPeers, Configuration conf, Abortable 
abortable,
+      Stoppable stopper) {
+    super(zookeeper, conf, abortable);
+    this.replicationPeers = replicationPeers;
+    this.stopper = stopper;
+    this.zookeeper.registerListener(new 
OtherRegionServerWatcher(this.zookeeper));
+    this.zookeeper.registerListener(new PeersWatcher(this.zookeeper));
+  }
+
+  @Override
+  public void registerListener(ReplicationListener listener) {
+    listeners.add(listener);
+  }
+
+  @Override
+  public void removeListener(ReplicationListener listener) {
+    listeners.remove(listener);
+  }
+
+  /**
+   * Return a snapshot of the current region servers.
+   */
+  @Override
+  public List<String> getListOfRegionServers() {
+    refreshOtherRegionServersList();
+
+    List<String> list = null;
+    synchronized (otherRegionServers) {
+      list = new ArrayList<>(otherRegionServers);
+    }
+    return list;
+  }
+
+  /**
+   * Watcher used to be notified of the other region server's death in the 
local cluster. It
+   * initiates the process to transfer the queues if it is able to grab the 
lock.
+   */
+  public class OtherRegionServerWatcher extends ZooKeeperListener {
+
+    /**
+     * Construct a ZooKeeper event listener.
+     */
+    public OtherRegionServerWatcher(ZooKeeperWatcher watcher) {
+      super(watcher);
+    }
+
+    /**
+     * Called when a new node has been created.
+     * @param path full path of the new node
+     */
+    public void nodeCreated(String path) {
+      refreshListIfRightPath(path);
+    }
+
+    /**
+     * Called when a node has been deleted
+     * @param path full path of the deleted node
+     */
+    public void nodeDeleted(String path) {
+      if (stopper.isStopped()) {
+        return;
+      }
+      boolean cont = refreshListIfRightPath(path);
+      if (!cont) {
+        return;
+      }
+      LOG.info(path + " znode expired, triggering replicatorRemoved event");
+      for (ReplicationListener rl : listeners) {
+        rl.regionServerRemoved(getZNodeName(path));
+      }
+    }
+
+    /**
+     * Called when an existing node has a child node added or removed.
+     * @param path full path of the node whose children have changed
+     */
+    public void nodeChildrenChanged(String path) {
+      if (stopper.isStopped()) {
+        return;
+      }
+      refreshListIfRightPath(path);
+    }
+
+    private boolean refreshListIfRightPath(String path) {
+      if (!path.startsWith(this.watcher.znodePaths.rsZNode)) {
+        return false;
+      }
+      return refreshOtherRegionServersList();
+    }
+  }
+
+  /**
+   * Watcher used to follow the creation and deletion of peer clusters.
+   */
+  public class PeersWatcher extends ZooKeeperListener {
+
+    /**
+     * Construct a ZooKeeper event listener.
+     */
+    public PeersWatcher(ZooKeeperWatcher watcher) {
+      super(watcher);
+    }
+
+    /**
+     * Called when a node has been deleted
+     * @param path full path of the deleted node
+     */
+    public void nodeDeleted(String path) {
+      List<String> peers = refreshPeersList(path);
+      if (peers == null) {
+        return;
+      }
+      if (isPeerPath(path)) {
+        String id = getZNodeName(path);
+        LOG.info(path + " znode expired, triggering peerRemoved event");
+        for (ReplicationListener rl : listeners) {
+          rl.peerRemoved(id);
+        }
+      }
+    }
+
+    /**
+     * Called when an existing node has a child node added or removed.
+     * @param path full path of the node whose children have changed
+     */
+    public void nodeChildrenChanged(String path) {
+      List<String> peers = refreshPeersList(path);
+      if (peers == null) {
+        return;
+      }
+      LOG.info(path + " znode expired, triggering peerListChanged event");
+      for (ReplicationListener rl : listeners) {
+        rl.peerListChanged(peers);
+      }
+    }
+  }
+
+  /**
+   * Verify if this event is meant for us, and if so then get the latest 
peers' list from ZK. Also
+   * reset the watches.
+   * @param path path to check against
+   * @return A list of peers' identifiers if the event concerns this watcher, 
else null.
+   */
+  private List<String> refreshPeersList(String path) {
+    if (!path.startsWith(getPeersZNode())) {
+      return null;
+    }
+    return this.replicationPeers.getAllPeerIds();
+  }
+
+  private String getPeersZNode() {
+    return this.peersZNode;
+  }
+
+  /**
+   * Extracts the znode name of a peer cluster from a ZK path
+   * @param fullPath Path to extract the id from
+   * @return the id or an empty string if path is invalid
+   */
+  private String getZNodeName(String fullPath) {
+    String[] parts = fullPath.split("/");
+    return parts.length > 0 ? parts[parts.length - 1] : "";
+  }
+
+  /**
+   * Reads the list of region servers from ZK and atomically clears our local 
view of it and
+   * replaces it with the updated list.
+   * @return true if the local list of the other region servers was updated 
with the ZK data (even
+   *         if it was empty), false if the data was missing in ZK
+   */
+  private boolean refreshOtherRegionServersList() {
+    List<String> newRsList = getRegisteredRegionServers();
+    if (newRsList == null) {
+      return false;
+    } else {
+      synchronized (otherRegionServers) {
+        otherRegionServers.clear();
+        otherRegionServers.addAll(newRsList);
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Get a list of all the other region servers in this cluster and set a watch
+   * @return a list of server nanes
+   */
+  private List<String> getRegisteredRegionServers() {
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, 
this.zookeeper.znodePaths.rsZNode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Get list of registered region servers", e);
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
new file mode 100644
index 0000000..3507547
--- /dev/null
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Implements the ReplicationQueuesClient interface on top of the Replication 
Table. It utilizes
+ * the ReplicationTableBase to access the Replication Table.
+ */
+@InterfaceAudience.Private
+public class TableBasedReplicationQueuesClientImpl extends ReplicationTableBase
+  implements ReplicationQueuesClient {
+
+  public 
TableBasedReplicationQueuesClientImpl(ReplicationQueuesClientArguments args)
+    throws IOException {
+    super(args.getConf(), args.getAbortable());
+  }
+  public TableBasedReplicationQueuesClientImpl(Configuration conf,
+                                               Abortable abortable) throws 
IOException {
+    super(conf, abortable);
+  }
+
+  @Override
+  public void init() throws ReplicationException{
+    // no-op
+  }
+
+  @Override
+  public List<String> getListOfReplicators() {
+    return super.getListOfReplicators();
+  }
+
+  @Override
+  public List<String> getLogsInQueue(String serverName, String queueId) {
+    return super.getLogsInQueue(serverName, queueId);
+  }
+
+  @Override
+  public List<String> getAllQueues(String serverName) {
+    return super.getAllQueues(serverName);
+  }
+
+  @Override
+  public Set<String> getAllWALs() {
+    Set<String> allWals = new HashSet<>();
+    ResultScanner allQueues = null;
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
+      allQueues = replicationTable.getScanner(new Scan());
+      for (Result queue : allQueues) {
+        for (String wal : readWALsFromResult(queue)) {
+          allWals.add(wal);
+        }
+      }
+    } catch (IOException e) {
+      String errMsg = "Failed getting all WAL's in Replication Table";
+      abortable.abort(errMsg, e);
+    } finally {
+      if (allQueues != null) {
+        allQueues.close();
+      }
+    }
+    return allWals;
+  }
+
+  @Override
+  public int getHFileRefsNodeChangeVersion() throws KeeperException {
+    // TODO
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException {
+    // TODO
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public List<String> getReplicableHFiles(String peerId) throws 
KeeperException {
+    // TODO
+    throw new NotImplementedException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
new file mode 100644
index 0000000..bf55e8c
--- /dev/null
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
@@ -0,0 +1,450 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * This class provides an implementation of the ReplicationQueues interface 
using an HBase table
+ * "Replication Table". It utilizes the ReplicationTableBase to access the 
Replication Table.
+ */
+@InterfaceAudience.Private
+public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
+  implements ReplicationQueues {
+
+  private static final Log LOG = 
LogFactory.getLog(TableBasedReplicationQueuesImpl.class);
+
+  // Common byte values used in replication offset tracking
+  private static final byte[] INITIAL_OFFSET_BYTES = Bytes.toBytes(0L);
+  private static final byte[] EMPTY_STRING_BYTES = Bytes.toBytes("");
+
+  private String serverName = null;
+  private byte[] serverNameBytes = null;
+
+  // TODO: Only use this variable temporarily. Eventually we want to use HBase 
to store all
+  // TODO: replication information
+  private ReplicationStateZKBase replicationState;
+
+  public TableBasedReplicationQueuesImpl(ReplicationQueuesArguments args) 
throws IOException {
+    this(args.getConf(), args.getAbortable(), args.getZk());
+  }
+
+  public TableBasedReplicationQueuesImpl(Configuration conf, Abortable abort, 
ZooKeeperWatcher zkw)
+    throws IOException {
+    super(conf, abort);
+    replicationState = new ReplicationStateZKBase(zkw, conf, abort) {};
+  }
+
+  @Override
+  public void init(String serverName) throws ReplicationException {
+    this.serverName = serverName;
+    this.serverNameBytes = Bytes.toBytes(serverName);
+  }
+
+  @Override
+  public List<String> getListOfReplicators() {
+    return super.getListOfReplicators();
+  }
+
+  @Override
+  public void removeQueue(String queueId) {
+    try {
+      byte[] rowKey = queueIdToRowKey(queueId);
+      if (checkQueueExists(queueId)) {
+        Delete deleteQueue = new Delete(rowKey);
+        safeQueueUpdate(deleteQueue);
+      } else {
+        LOG.info("No logs were registered for queue id=" + queueId + " so no 
rows were removed " +
+            "from the replication table while removing the queue");
+      }
+    } catch (IOException | ReplicationException e) {
+      String errMsg = "Failed removing queue queueId=" + queueId;
+      abortable.abort(errMsg, e);
+    }
+  }
+
+  @Override
+  public void addLog(String queueId, String filename) throws 
ReplicationException {
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
+      if (!checkQueueExists(queueId)) {
+        // Each queue will have an Owner, OwnerHistory, and a collection of 
[WAL:offset] key values
+        Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId)));
+        putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER, serverNameBytes);
+        putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, 
EMPTY_STRING_BYTES);
+        putNewQueue.addColumn(CF_QUEUE, Bytes.toBytes(filename), 
INITIAL_OFFSET_BYTES);
+        replicationTable.put(putNewQueue);
+      } else {
+        // Otherwise simply add the new log and offset as a new column
+        Put putNewLog = new Put(queueIdToRowKey(queueId));
+        putNewLog.addColumn(CF_QUEUE, Bytes.toBytes(filename), 
INITIAL_OFFSET_BYTES);
+        safeQueueUpdate(putNewLog);
+      }
+    } catch (IOException | ReplicationException e) {
+      String errMsg = "Failed adding log queueId=" + queueId + " filename=" + 
filename;
+      abortable.abort(errMsg, e);
+    }
+  }
+
+  @Override
+  public void removeLog(String queueId, String filename) {
+    try {
+      byte[] rowKey = queueIdToRowKey(queueId);
+      Delete delete = new Delete(rowKey);
+      delete.addColumns(CF_QUEUE, Bytes.toBytes(filename));
+      safeQueueUpdate(delete);
+    } catch (IOException | ReplicationException e) {
+      String errMsg = "Failed removing log queueId=" + queueId + " filename=" 
+ filename;
+      abortable.abort(errMsg, e);
+    }
+  }
+
+  @Override
+  public void setLogPosition(String queueId, String filename, long position) {
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
+      byte[] rowKey = queueIdToRowKey(queueId);
+      // Check that the log exists. addLog() must have been called before 
setLogPosition().
+      Get checkLogExists = new Get(rowKey);
+      checkLogExists.addColumn(CF_QUEUE, Bytes.toBytes(filename));
+      if (!replicationTable.exists(checkLogExists)) {
+        String errMsg = "Could not set position of non-existent log from 
queueId=" + queueId +
+          ", filename=" + filename;
+        abortable.abort(errMsg, new ReplicationException(errMsg));
+        return;
+      }
+      // Update the log offset if it exists
+      Put walAndOffset = new Put(rowKey);
+      walAndOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename), 
Bytes.toBytes(position));
+      safeQueueUpdate(walAndOffset);
+    } catch (IOException | ReplicationException e) {
+      String errMsg = "Failed writing log position queueId=" + queueId + 
"filename=" +
+        filename + " position=" + position;
+      abortable.abort(errMsg, e);
+    }
+  }
+
+  @Override
+  public long getLogPosition(String queueId, String filename) throws 
ReplicationException {
+    try {
+      byte[] rowKey = queueIdToRowKey(queueId);
+      Get getOffset = new Get(rowKey);
+      getOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename));
+      Result result = getResultIfOwner(getOffset);
+      if (result == null || !result.containsColumn(CF_QUEUE, 
Bytes.toBytes(filename))) {
+        throw new ReplicationException("Could not read empty result while 
getting log position " +
+          "queueId=" + queueId + ", filename=" + filename);
+      }
+      return Bytes.toLong(result.getValue(CF_QUEUE, Bytes.toBytes(filename)));
+    } catch (IOException e) {
+      throw new ReplicationException("Could not get position in log for 
queueId=" + queueId +
+        ", filename=" + filename);
+    }
+  }
+
+  @Override
+  public void removeAllQueues() {
+    List<String> myQueueIds = getAllQueues();
+    for (String queueId : myQueueIds) {
+      removeQueue(queueId);
+    }
+  }
+
+  @Override
+  public List<String> getLogsInQueue(String queueId) {
+    String errMsg = "Failed getting logs in queue queueId=" + queueId;
+    byte[] rowKey = queueIdToRowKey(queueId);
+    List<String> logs = new ArrayList<>();
+    try {
+      Get getQueue = new Get(rowKey);
+      Result queue = getResultIfOwner(getQueue);
+      if (queue == null || queue.isEmpty()) {
+        String errMsgLostOwnership = "Failed getting logs for queue queueId=" +
+            Bytes.toString(rowKey) + " because the queue was missing or we 
lost ownership";
+        abortable.abort(errMsg, new ReplicationException(errMsgLostOwnership));
+        return null;
+      }
+      Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE);
+      for(byte[] cQualifier : familyMap.keySet()) {
+        if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || 
Arrays.equals(cQualifier,
+            COL_QUEUE_OWNER_HISTORY)) {
+          continue;
+        }
+        logs.add(Bytes.toString(cQualifier));
+      }
+    } catch (IOException e) {
+      abortable.abort(errMsg, e);
+      return null;
+    }
+    return logs;
+  }
+
+  @Override
+  public List<String> getAllQueues() {
+    return getAllQueues(serverName);
+  }
+
+  @Override public List<String> getUnClaimedQueueIds(String regionserver) {
+    if (isThisOurRegionServer(regionserver)) {
+      return null;
+    }
+    try (ResultScanner queuesToClaim = 
getQueuesBelongingToServer(regionserver)) {
+      List<String> res = new ArrayList<>();
+      for (Result queue : queuesToClaim) {
+        String rowKey = Bytes.toString(queue.getRow());
+        res.add(rowKey);
+      }
+      return res.isEmpty() ? null : res;
+    } catch (IOException e) {
+      String errMsg = "Failed getUnClaimedQueueIds";
+      abortable.abort(errMsg, e);
+    }
+    return null;
+  }
+
+  @Override public void removeReplicatorIfQueueIsEmpty(String regionserver) {
+    // Do nothing here
+  }
+
+  @Override
+  public Pair<String, SortedSet<String>> claimQueue(String regionserver, 
String queueId) {
+    if (isThisOurRegionServer(regionserver)) {
+      return null;
+    }
+
+    try (ResultScanner queuesToClaim = 
getQueuesBelongingToServer(regionserver)){
+      for (Result queue : queuesToClaim) {
+        String rowKey = Bytes.toString(queue.getRow());
+        if (!rowKey.equals(queueId)){
+          continue;
+        }
+        if (attemptToClaimQueue(queue, regionserver)) {
+          ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(rowKey);
+          if (replicationState.peerExists(replicationQueueInfo.getPeerId())) {
+            SortedSet<String> sortedLogs = new TreeSet<>();
+            List<String> logs = getLogsInQueue(queue.getRow());
+            for (String log : logs) {
+              sortedLogs.add(log);
+            }
+            LOG.info(serverName + " has claimed queue " + rowKey + " from " + 
regionserver);
+            return new Pair<>(rowKey, sortedLogs);
+          } else {
+            // Delete orphaned queues
+            removeQueue(Bytes.toString(queue.getRow()));
+            LOG.info(serverName + " has deleted abandoned queue " + queueId + 
" from " +
+              regionserver);
+          }
+        }
+      }
+    } catch (IOException | KeeperException e) {
+      String errMsg = "Failed claiming queues for regionserver=" + 
regionserver;
+      abortable.abort(errMsg, e);
+    }
+    return null;
+  }
+
+  @Override
+  public boolean isThisOurRegionServer(String regionserver) {
+    return this.serverName.equals(regionserver);
+  }
+
+  @Override
+  public void addPeerToHFileRefs(String peerId) throws ReplicationException {
+    // TODO
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public void removePeerFromHFileRefs(String peerId) {
+    // TODO
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
+      throws ReplicationException {
+    // TODO
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public void removeHFileRefs(String peerId, List<String> files) {
+    // TODO
+    throw new NotImplementedException();
+  }
+
+  private String buildQueueRowKey(String queueId) {
+    return buildQueueRowKey(serverName, queueId);
+  }
+
+  /**
+   * Convenience method that gets the row key of the queue specified by queueId
+   * @param queueId queueId of a queue in this server
+   * @return the row key of the queue in the Replication Table
+   */
+  private byte[] queueIdToRowKey(String queueId) {
+    return queueIdToRowKey(serverName, queueId);
+  }
+
+  /**
+   * See safeQueueUpdate(RowMutations mutate)
+   *
+   * @param put Row mutation to perform on the queue
+   */
+  private void safeQueueUpdate(Put put) throws ReplicationException, 
IOException {
+    RowMutations mutations = new RowMutations(put.getRow());
+    mutations.add(put);
+    safeQueueUpdate(mutations);
+  }
+
+  /**
+   * See safeQueueUpdate(RowMutations mutate)
+   *
+   * @param delete Row mutation to perform on the queue
+   */
+  private void safeQueueUpdate(Delete delete) throws ReplicationException,
+    IOException{
+    RowMutations mutations = new RowMutations(delete.getRow());
+    mutations.add(delete);
+    safeQueueUpdate(mutations);
+  }
+
+  /**
+   * Attempt to mutate a given queue in the Replication Table with a 
checkAndPut on the OWNER column
+   * of the queue. Abort the server if this checkAndPut fails: which means we 
have somehow lost
+   * ownership of the column or an IO Exception has occurred during the 
transaction.
+   *
+   * @param mutate Mutation to perform on a given queue
+   */
+  private void safeQueueUpdate(RowMutations mutate) throws 
ReplicationException, IOException{
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
+      boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(),
+          CF_QUEUE, COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, 
serverNameBytes, mutate);
+      if (!updateSuccess) {
+        throw new ReplicationException("Failed to update Replication Table 
because we lost queue " +
+            " ownership");
+      }
+    }
+  }
+
+  /**
+   * Check if the queue specified by queueId is stored in HBase
+   *
+   * @param queueId Either raw or reclaimed format of the queueId
+   * @return Whether the queue is stored in HBase
+   * @throws IOException
+   */
+  private boolean checkQueueExists(String queueId) throws IOException {
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
+      byte[] rowKey = queueIdToRowKey(queueId);
+      return replicationTable.exists(new Get(rowKey));
+    }
+  }
+
+  /**
+   * Attempt to claim the given queue with a checkAndPut on the OWNER column. 
We check that the
+   * recently killed server is still the OWNER before we claim it.
+   *
+   * @param queue The queue that we are trying to claim
+   * @param originalServer The server that originally owned the queue
+   * @return Whether we successfully claimed the queue
+   * @throws IOException
+   */
+  private boolean attemptToClaimQueue (Result queue, String originalServer) 
throws IOException{
+    Put putQueueNameAndHistory = new Put(queue.getRow());
+    putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER, 
Bytes.toBytes(serverName));
+    String newOwnerHistory = 
buildClaimedQueueHistory(Bytes.toString(queue.getValue(CF_QUEUE,
+      COL_QUEUE_OWNER_HISTORY)), originalServer);
+    putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY,
+        Bytes.toBytes(newOwnerHistory));
+    RowMutations claimAndRenameQueue = new RowMutations(queue.getRow());
+    claimAndRenameQueue.add(putQueueNameAndHistory);
+    // Attempt to claim ownership for this queue by checking if the current 
OWNER is the original
+    // server. If it is not then another RS has already claimed it. If it is 
we set ourselves as the
+    // new owner and update the queue's history
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
+      boolean success = replicationTable.checkAndMutate(queue.getRow(),
+          CF_QUEUE, COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, 
Bytes.toBytes(originalServer),
+          claimAndRenameQueue);
+      return success;
+    }
+  }
+
+  /**
+   * Attempts to run a Get on some queue. Will only return a non-null result 
if we currently own
+   * the queue.
+   *
+   * @param get The Get that we want to query
+   * @return The result of the Get if this server is the owner of the queue. 
Else it returns null.
+   * @throws IOException
+   */
+  private Result getResultIfOwner(Get get) throws IOException {
+    Scan scan = new Scan(get);
+    // Check if the Get currently contains all columns or only specific columns
+    if (scan.getFamilyMap().size() > 0) {
+      // Add the OWNER column if the scan is already only over specific columns
+      scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
+    }
+    scan.setMaxResultSize(1);
+    SingleColumnValueFilter checkOwner = new SingleColumnValueFilter(CF_QUEUE, 
COL_QUEUE_OWNER,
+      CompareFilter.CompareOp.EQUAL, serverNameBytes);
+    scan.setFilter(checkOwner);
+    ResultScanner scanner = null;
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
+      scanner = replicationTable.getScanner(scan);
+      Result result = scanner.next();
+      return (result == null || result.isEmpty()) ? null : result;
+    } finally {
+      if (scanner != null) {
+        scanner.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-server/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml
index d48dec8..b0468d6 100644
--- a/hbase-server/pom.xml
+++ b/hbase-server/pom.xml
@@ -378,6 +378,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-replication</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-prefix-tree</artifactId>
       <scope>runtime</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
index 0e619ea..ae4e7cc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
@@ -27,8 +27,6 @@ import 
org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
 import 
org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.AbstractService;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
-
 /**
  * A Base implementation for {@link ReplicationEndpoint}s. Users should 
consider extending this
  * class rather than implementing {@link ReplicationEndpoint} directly for 
better backwards

http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 65da6eb..0ac0f27 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,6 +61,7 @@
   </licenses>
 
   <modules>
+    <module>hbase-replication</module>
     <module>hbase-resource-bundle</module>
     <module>hbase-server</module>
     <module>hbase-thrift</module>
@@ -1564,6 +1565,11 @@
         <scope>test</scope>
       </dependency>
       <dependency>
+        <artifactId>hbase-replication</artifactId>
+        <groupId>org.apache.hbase</groupId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
         <artifactId>hbase-server</artifactId>
         <groupId>org.apache.hbase</groupId>
         <version>${project.version}</version>

Reply via email to