Repository: hbase Updated Branches: refs/heads/branch-2 205016ca7 -> e2ce252b5
http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java new file mode 100644 index 0000000..4733706 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -0,0 +1,407 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.ArrayList; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * This class provides an implementation of the + * interface using ZooKeeper. The + * base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of + * all outstanding WAL files on this region server that need to be replicated. The myQueuesZnode is + * the regionserver name (a concatenation of the region serverâs hostname, client port and start + * code). For example: + * + * /hbase/replication/rs/hostname.example.org,6020,1234 + * + * Within this znode, the region server maintains a set of WAL replication queues. These queues are + * represented by child znodes named using there give queue id. For example: + * + * /hbase/replication/rs/hostname.example.org,6020,1234/1 + * /hbase/replication/rs/hostname.example.org,6020,1234/2 + * + * Each queue has one child znode for every WAL that still needs to be replicated. The value of + * these WAL child znodes is the latest position that has been replicated. This position is updated + * every time a WAL entry is replicated. For example: + * + * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254] + */ +@InterfaceAudience.Private +public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues { + + /** Znode containing all replication queues for this region server. */ + private String myQueuesZnode; + + private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class); + + public ReplicationQueuesZKImpl(ReplicationQueuesArguments args) { + this(args.getZk(), args.getConf(), args.getAbortable()); + } + + public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf, + Abortable abortable) { + super(zk, conf, abortable); + } + + @Override + public void init(String serverName) throws ReplicationException { + this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName); + try { + if (ZKUtil.checkExists(this.zookeeper, this.myQueuesZnode) < 0) { + ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode); + } + } catch (KeeperException e) { + throw new ReplicationException("Could not initialize replication queues.", e); + } + if (conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, + HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) { + try { + if (ZKUtil.checkExists(this.zookeeper, this.hfileRefsZNode) < 0) { + ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode); + } + } catch (KeeperException e) { + throw new ReplicationException("Could not initialize hfile references replication queue.", + e); + } + } + } + + @Override + public void removeQueue(String queueId) { + try { + ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.myQueuesZnode, queueId)); + } catch (KeeperException e) { + this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", e); + } + } + + @Override + public void addLog(String queueId, String filename) throws ReplicationException { + String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); + znode = ZKUtil.joinZNode(znode, filename); + try { + ZKUtil.createWithParents(this.zookeeper, znode); + } catch (KeeperException e) { + throw new ReplicationException( + "Could not add log because znode could not be created. queueId=" + queueId + + ", filename=" + filename); + } + } + + @Override + public void removeLog(String queueId, String filename) { + try { + String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); + znode = ZKUtil.joinZNode(znode, filename); + ZKUtil.deleteNode(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename=" + + filename + ")", e); + } + } + + @Override + public void setLogPosition(String queueId, String filename, long position) { + try { + String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); + znode = ZKUtil.joinZNode(znode, filename); + // Why serialize String of Long and not Long as bytes? + ZKUtil.setData(this.zookeeper, znode, ZKUtil.positionToByteArray(position)); + } catch (KeeperException e) { + this.abortable.abort("Failed to write replication wal position (filename=" + filename + + ", position=" + position + ")", e); + } + } + + @Override + public long getLogPosition(String queueId, String filename) throws ReplicationException { + String clusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); + String znode = ZKUtil.joinZNode(clusterZnode, filename); + byte[] bytes = null; + try { + bytes = ZKUtil.getData(this.zookeeper, znode); + } catch (KeeperException e) { + throw new ReplicationException("Internal Error: could not get position in log for queueId=" + + queueId + ", filename=" + filename, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return 0; + } + try { + return ZKUtil.parseWALPositionFrom(bytes); + } catch (DeserializationException de) { + LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and wal=" + filename + + " znode content, continuing."); + } + // if we can not parse the position, start at the beginning of the wal file + // again + return 0; + } + + @Override + public boolean isThisOurRegionServer(String regionserver) { + return ZKUtil.joinZNode(this.queuesZNode, regionserver).equals(this.myQueuesZnode); + } + + @Override + public List<String> getUnClaimedQueueIds(String regionserver) { + if (isThisOurRegionServer(regionserver)) { + return null; + } + String rsZnodePath = ZKUtil.joinZNode(this.queuesZNode, regionserver); + List<String> queues = null; + try { + queues = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZnodePath); + } catch (KeeperException e) { + this.abortable.abort("Failed to getUnClaimedQueueIds for RS" + regionserver, e); + } + return queues; + } + + @Override + public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) { + LOG.info("Atomically moving " + regionserver + "/" + queueId + "'s WALs to my queue"); + return moveQueueUsingMulti(regionserver, queueId); + } + + @Override + public void removeReplicatorIfQueueIsEmpty(String regionserver) { + String rsPath = ZKUtil.joinZNode(this.queuesZNode, regionserver); + try { + List<String> list = ZKUtil.listChildrenNoWatch(this.zookeeper, rsPath); + if (list != null && list.isEmpty()){ + ZKUtil.deleteNode(this.zookeeper, rsPath); + } + } catch (KeeperException e) { + LOG.warn("Got error while removing replicator", e); + } + } + + @Override + public void removeAllQueues() { + try { + ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode); + } catch (KeeperException e) { + // if the znode is already expired, don't bother going further + if (e instanceof KeeperException.SessionExpiredException) { + return; + } + this.abortable.abort("Failed to delete replication queues for region server: " + + this.myQueuesZnode, e); + } + } + + @Override + public List<String> getLogsInQueue(String queueId) { + String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); + List<String> result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get list of wals for queueId=" + queueId, e); + } + return result; + } + + @Override + public List<String> getAllQueues() { + List<String> listOfQueues = null; + try { + listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, this.myQueuesZnode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get a list of queues for region server: " + + this.myQueuesZnode, e); + } + return listOfQueues == null ? new ArrayList<>() : listOfQueues; + } + + /** + * It "atomically" copies one peer's wals queue from another dead region server and returns them + * all sorted. The new peer id is equal to the old peer id appended with the dead server's znode. + * @param znode pertaining to the region server to copy the queues from + * @peerId peerId pertaining to the queue need to be copied + */ + private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, String peerId) { + try { + // hbase/replication/rs/deadrs + String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode); + List<ZKUtilOp> listOfOps = new ArrayList<>(); + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); + + String newPeerId = peerId + "-" + znode; + String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId); + // check the logs queue for the old peer cluster + String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId); + List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode); + + if (!peerExists(replicationQueueInfo.getPeerId())) { + LOG.warn("Peer " + replicationQueueInfo.getPeerId() + + " didn't exist, will move its queue to avoid the failure of multi op"); + for (String wal : wals) { + String oldWalZnode = ZKUtil.joinZNode(oldClusterZnode, wal); + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode)); + } + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); + ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); + return null; + } + + SortedSet<String> logQueue = new TreeSet<>(); + if (wals == null || wals.isEmpty()) { + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); + } else { + // create the new cluster znode + ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY); + listOfOps.add(op); + // get the offset of the logs and set it to new znodes + for (String wal : wals) { + String oldWalZnode = ZKUtil.joinZNode(oldClusterZnode, wal); + byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalZnode); + LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset)); + String newLogZnode = ZKUtil.joinZNode(newPeerZnode, wal); + listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset)); + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode)); + logQueue.add(wal); + } + // add delete op for peer + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); + + if (LOG.isTraceEnabled()) + LOG.trace(" The multi list size is: " + listOfOps.size()); + } + ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); + + LOG.info("Atomically moved " + znode + "/" + peerId + "'s WALs to my queue"); + return new Pair<>(newPeerId, logQueue); + } catch (KeeperException e) { + // Multi call failed; it looks like some other regionserver took away the logs. + LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); + } catch (InterruptedException e) { + LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); + Thread.currentThread().interrupt(); + } + return null; + } + + @Override + public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) + throws ReplicationException { + String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId); + boolean debugEnabled = LOG.isDebugEnabled(); + if (debugEnabled) { + LOG.debug("Adding hfile references " + pairs + " in queue " + peerZnode); + } + + int size = pairs.size(); + List<ZKUtilOp> listOfOps = new ArrayList<>(size); + + for (int i = 0; i < size; i++) { + listOfOps.add(ZKUtilOp.createAndFailSilent( + ZKUtil.joinZNode(peerZnode, pairs.get(i).getSecond().getName()), + HConstants.EMPTY_BYTE_ARRAY)); + } + if (debugEnabled) { + LOG.debug(" The multi list size for adding hfile references in zk for node " + peerZnode + + " is " + listOfOps.size()); + } + try { + ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); + } catch (KeeperException e) { + throw new ReplicationException("Failed to create hfile reference znode=" + e.getPath(), e); + } + } + + @Override + public void removeHFileRefs(String peerId, List<String> files) { + String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId); + boolean debugEnabled = LOG.isDebugEnabled(); + if (debugEnabled) { + LOG.debug("Removing hfile references " + files + " from queue " + peerZnode); + } + + int size = files.size(); + List<ZKUtilOp> listOfOps = new ArrayList<>(size); + + for (int i = 0; i < size; i++) { + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i)))); + } + if (debugEnabled) { + LOG.debug(" The multi list size for removing hfile references in zk for node " + peerZnode + + " is " + listOfOps.size()); + } + try { + ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); + } catch (KeeperException e) { + LOG.error("Failed to remove hfile reference znode=" + e.getPath(), e); + } + } + + @Override + public void addPeerToHFileRefs(String peerId) throws ReplicationException { + String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId); + try { + if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) { + LOG.info("Adding peer " + peerId + " to hfile reference queue."); + ZKUtil.createWithParents(this.zookeeper, peerZnode); + } + } catch (KeeperException e) { + throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.", + e); + } + } + + @Override + public void removePeerFromHFileRefs(String peerId) { + final String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId); + try { + if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) { + if (LOG.isDebugEnabled()) { + LOG.debug("Peer " + peerZnode + " not found in hfile reference queue."); + } + return; + } else { + LOG.info("Removing peer " + peerZnode + " from hfile reference queue."); + ZKUtil.deleteNodeRecursively(this.zookeeper, peerZnode); + } + } catch (KeeperException e) { + LOG.error("Ignoring the exception to remove peer " + peerId + " from hfile reference queue.", + e); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java new file mode 100644 index 0000000..47b780d --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java @@ -0,0 +1,155 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + + +/** + * This is a base class for maintaining replication state in zookeeper. + */ +@InterfaceAudience.Private +public abstract class ReplicationStateZKBase { + + /** + * The name of the znode that contains the replication status of a remote slave (i.e. peer) + * cluster. + */ + protected final String peerStateNodeName; + /** The name of the base znode that contains all replication state. */ + protected final String replicationZNode; + /** The name of the znode that contains a list of all remote slave (i.e. peer) clusters. */ + protected final String peersZNode; + /** The name of the znode that contains all replication queues */ + protected final String queuesZNode; + /** The name of the znode that contains queues of hfile references to be replicated */ + protected final String hfileRefsZNode; + /** The cluster key of the local cluster */ + protected final String ourClusterKey; + /** The name of the znode that contains tableCFs */ + protected final String tableCFsNodeName; + + protected final ZooKeeperWatcher zookeeper; + protected final Configuration conf; + protected final Abortable abortable; + + // Public for testing + public static final byte[] ENABLED_ZNODE_BYTES = + toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); + public static final byte[] DISABLED_ZNODE_BYTES = + toByteArray(ReplicationProtos.ReplicationState.State.DISABLED); + public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY = + "zookeeper.znode.replication.hfile.refs"; + public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs"; + + public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf, + Abortable abortable) { + this.zookeeper = zookeeper; + this.conf = conf; + this.abortable = abortable; + + String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); + String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); + String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); + String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, + ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); + this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); + this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs"); + this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf); + this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.znodePaths.baseZNode, + replicationZNodeName); + this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName); + this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName); + this.hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName); + } + + public List<String> getListOfReplicators() { + List<String> result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.queuesZNode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get list of replicators", e); + } + return result; + } + + /** + * @param state + * @return Serialized protobuf of <code>state</code> with pb magic prefix prepended suitable for + * use as content of a peer-state znode under a peer cluster id as in + * /hbase/replication/peers/PEER_ID/peer-state. + */ + protected static byte[] toByteArray(final ReplicationProtos.ReplicationState.State state) { + ReplicationProtos.ReplicationState msg = + ReplicationProtos.ReplicationState.newBuilder().setState(state).build(); + // There is no toByteArray on this pb Message? + // 32 bytes is default which seems fair enough here. + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + CodedOutputStream cos = CodedOutputStream.newInstance(baos, 16); + msg.writeTo(cos); + cos.flush(); + baos.flush(); + return ProtobufUtil.prependPBMagic(baos.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + protected boolean peerExists(String id) throws KeeperException { + return ZKUtil.checkExists(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)) >= 0; + } + + /** + * Determine if a ZK path points to a peer node. + * @param path path to be checked + * @return true if the path points to a peer node, otherwise false + */ + protected boolean isPeerPath(String path) { + return path.split("/").length == peersZNode.split("/").length + 1; + } + + @VisibleForTesting + protected String getTableCFsNode(String id) { + return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName)); + } + + @VisibleForTesting + protected String getPeerStateNode(String id) { + return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName)); + } + @VisibleForTesting + protected String getPeerNode(String id) { + return ZKUtil.joinZNode(this.peersZNode, id); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java new file mode 100644 index 0000000..fa1d2d6 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java @@ -0,0 +1,441 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/* + * Abstract class that provides an interface to the Replication Table. Which is currently + * being used for WAL offset tracking. + * The basic schema of this table will store each individual queue as a + * seperate row. The row key will be a unique identifier of the creating server's name and the + * queueId. Each queue must have the following two columns: + * COL_QUEUE_OWNER: tracks which server is currently responsible for tracking the queue + * COL_QUEUE_OWNER_HISTORY: a "|" delimited list of the previous server's that have owned this + * queue. The most recent previous owner is the leftmost entry. + * They will also have columns mapping [WAL filename : offset] + * The most flexible method of interacting with the Replication Table is by calling + * getOrBlockOnReplicationTable() which will return a new copy of the Replication Table. It is up + * to the caller to close the returned table. + */ +@InterfaceAudience.Private +abstract class ReplicationTableBase { + + /** Name of the HBase Table used for tracking replication*/ + public static final TableName REPLICATION_TABLE_NAME = + TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"); + + // Column family and column names for Queues in the Replication Table + public static final byte[] CF_QUEUE = Bytes.toBytes("q"); + public static final byte[] COL_QUEUE_OWNER = Bytes.toBytes("o"); + public static final byte[] COL_QUEUE_OWNER_HISTORY = Bytes.toBytes("h"); + + // Column Descriptor for the Replication Table + private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR = + new HColumnDescriptor(CF_QUEUE).setMaxVersions(1) + .setInMemory(true) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL) + // TODO: Figure out which bloom filter to use + .setBloomFilterType(BloomType.NONE); + + // The value used to delimit the queueId and server name inside of a queue's row key. Currently a + // hyphen, because it is guaranteed that queueId (which is a cluster id) cannot contain hyphens. + // See HBASE-11394. + public static final String ROW_KEY_DELIMITER = "-"; + + // The value used to delimit server names in the queue history list + public static final String QUEUE_HISTORY_DELIMITER = "|"; + + /* + * Make sure that HBase table operations for replication have a high number of retries. This is + * because the server is aborted if any HBase table operation fails. Each RPC will be attempted + * 3600 times before exiting. This provides each operation with 2 hours of retries + * before the server is aborted. + */ + private static final int CLIENT_RETRIES = 3600; + private static final int RPC_TIMEOUT = 2000; + private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT; + + // We only need a single thread to initialize the Replication Table + private static final int NUM_INITIALIZE_WORKERS = 1; + + protected final Configuration conf; + protected final Abortable abortable; + private final Connection connection; + private final Executor executor; + private volatile CountDownLatch replicationTableInitialized; + + public ReplicationTableBase(Configuration conf, Abortable abort) throws IOException { + this.conf = new Configuration(conf); + this.abortable = abort; + decorateConf(); + this.connection = ConnectionFactory.createConnection(this.conf); + this.executor = setUpExecutor(); + this.replicationTableInitialized = new CountDownLatch(1); + createReplicationTableInBackground(); + } + + /** + * Modify the connection's config so that operations run on the Replication Table have longer and + * a larger number of retries + */ + private void decorateConf() { + this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES); + } + + /** + * Sets up the thread pool executor used to build the Replication Table in the background + * @return the configured executor + */ + private Executor setUpExecutor() { + ThreadPoolExecutor tempExecutor = new ThreadPoolExecutor(NUM_INITIALIZE_WORKERS, + NUM_INITIALIZE_WORKERS, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); + ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); + tfb.setNameFormat("ReplicationTableExecutor-%d"); + tfb.setDaemon(true); + tempExecutor.setThreadFactory(tfb.build()); + return tempExecutor; + } + + /** + * Get whether the Replication Table has been successfully initialized yet + * @return whether the Replication Table is initialized + */ + public boolean getInitializationStatus() { + return replicationTableInitialized.getCount() == 0; + } + + /** + * Increases the RPC and operations timeouts for the Replication Table + */ + private Table setReplicationTableTimeOuts(Table replicationTable) { + replicationTable.setRpcTimeout(RPC_TIMEOUT); + replicationTable.setOperationTimeout(OPERATION_TIMEOUT); + return replicationTable; + } + + /** + * Build the row key for the given queueId. This will uniquely identify it from all other queues + * in the cluster. + * @param serverName The owner of the queue + * @param queueId String identifier of the queue + * @return String representation of the queue's row key + */ + protected String buildQueueRowKey(String serverName, String queueId) { + return queueId + ROW_KEY_DELIMITER + serverName; + } + + /** + * Parse the original queueId from a row key + * @param rowKey String representation of a queue's row key + * @return the original queueId + */ + protected String getRawQueueIdFromRowKey(String rowKey) { + return rowKey.split(ROW_KEY_DELIMITER)[0]; + } + + /** + * Returns a queue's row key given either its raw or reclaimed queueId + * + * @param queueId queueId of the queue + * @return byte representation of the queue's row key + */ + protected byte[] queueIdToRowKey(String serverName, String queueId) { + // Cluster id's are guaranteed to have no hyphens, so if the passed in queueId has no hyphen + // then this is not a reclaimed queue. + if (!queueId.contains(ROW_KEY_DELIMITER)) { + return Bytes.toBytes(buildQueueRowKey(serverName, queueId)); + // If the queueId contained some hyphen it was reclaimed. In this case, the queueId is the + // queue's row key + } else { + return Bytes.toBytes(queueId); + } + } + + /** + * Creates a "|" delimited record of the queue's past region server owners. + * + * @param originalHistory the queue's original owner history + * @param oldServer the name of the server that used to own the queue + * @return the queue's new owner history + */ + protected String buildClaimedQueueHistory(String originalHistory, String oldServer) { + return oldServer + QUEUE_HISTORY_DELIMITER + originalHistory; + } + + /** + * Get a list of all region servers that have outstanding replication queues. These servers could + * be alive, dead or from a previous run of the cluster. + * @return a list of server names + */ + protected List<String> getListOfReplicators() { + // scan all of the queues and return a list of all unique OWNER values + Set<String> peerServers = new HashSet<>(); + ResultScanner allQueuesInCluster = null; + try (Table replicationTable = getOrBlockOnReplicationTable()){ + Scan scan = new Scan(); + scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); + allQueuesInCluster = replicationTable.getScanner(scan); + for (Result queue : allQueuesInCluster) { + peerServers.add(Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER))); + } + } catch (IOException e) { + String errMsg = "Failed getting list of replicators"; + abortable.abort(errMsg, e); + } finally { + if (allQueuesInCluster != null) { + allQueuesInCluster.close(); + } + } + return new ArrayList<>(peerServers); + } + + protected List<String> getAllQueues(String serverName) { + List<String> allQueues = new ArrayList<>(); + ResultScanner queueScanner = null; + try { + queueScanner = getQueuesBelongingToServer(serverName); + for (Result queue : queueScanner) { + String rowKey = Bytes.toString(queue.getRow()); + // If the queue does not have a Owner History, then we must be its original owner. So we + // want to return its queueId in raw form + if (Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER_HISTORY)).length() == 0) { + allQueues.add(getRawQueueIdFromRowKey(rowKey)); + } else { + allQueues.add(rowKey); + } + } + return allQueues; + } catch (IOException e) { + String errMsg = "Failed getting list of all replication queues for serverName=" + serverName; + abortable.abort(errMsg, e); + return null; + } finally { + if (queueScanner != null) { + queueScanner.close(); + } + } + } + + protected List<String> getLogsInQueue(String serverName, String queueId) { + String rowKey = queueId; + if (!queueId.contains(ROW_KEY_DELIMITER)) { + rowKey = buildQueueRowKey(serverName, queueId); + } + return getLogsInQueue(Bytes.toBytes(rowKey)); + } + + protected List<String> getLogsInQueue(byte[] rowKey) { + String errMsg = "Failed getting logs in queue queueId=" + Bytes.toString(rowKey); + try (Table replicationTable = getOrBlockOnReplicationTable()) { + Get getQueue = new Get(rowKey); + Result queue = replicationTable.get(getQueue); + if (queue == null || queue.isEmpty()) { + abortable.abort(errMsg, new ReplicationException(errMsg)); + return null; + } + return readWALsFromResult(queue); + } catch (IOException e) { + abortable.abort(errMsg, e); + return null; + } + } + + /** + * Read all of the WAL's from a queue into a list + * + * @param queue HBase query result containing the queue + * @return a list of all the WAL filenames + */ + protected List<String> readWALsFromResult(Result queue) { + List<String> wals = new ArrayList<>(); + Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE); + for (byte[] cQualifier : familyMap.keySet()) { + // Ignore the meta data fields of the queue + if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier, + COL_QUEUE_OWNER_HISTORY)) { + continue; + } + wals.add(Bytes.toString(cQualifier)); + } + return wals; + } + + /** + * Get the queue id's and meta data (Owner and History) for the queues belonging to the named + * server + * + * @param server name of the server + * @return a ResultScanner over the QueueIds belonging to the server + * @throws IOException + */ + protected ResultScanner getQueuesBelongingToServer(String server) throws IOException { + Scan scan = new Scan(); + SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER, + CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server)); + scan.setFilter(filterMyQueues); + scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); + scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY); + try (Table replicationTable = getOrBlockOnReplicationTable()) { + ResultScanner results = replicationTable.getScanner(scan); + return results; + } + } + + /** + * Attempts to acquire the Replication Table. This operation will block until it is assigned by + * the CreateReplicationWorker thread. It is up to the caller of this method to close the + * returned Table + * @return the Replication Table when it is created + * @throws IOException + */ + protected Table getOrBlockOnReplicationTable() throws IOException { + // Sleep until the Replication Table becomes available + try { + replicationTableInitialized.await(); + } catch (InterruptedException e) { + String errMsg = "Unable to acquire the Replication Table due to InterruptedException: " + + e.getMessage(); + throw new InterruptedIOException(errMsg); + } + return getAndSetUpReplicationTable(); + } + + /** + * Creates a new copy of the Replication Table and sets up the proper Table time outs for it + * + * @return the Replication Table + * @throws IOException + */ + private Table getAndSetUpReplicationTable() throws IOException { + Table replicationTable = connection.getTable(REPLICATION_TABLE_NAME); + setReplicationTableTimeOuts(replicationTable); + return replicationTable; + } + + /** + * Builds the Replication Table in a background thread. Any method accessing the Replication Table + * should do so through getOrBlockOnReplicationTable() + * + * @return the Replication Table + * @throws IOException if the Replication Table takes too long to build + */ + private void createReplicationTableInBackground() throws IOException { + executor.execute(new CreateReplicationTableWorker()); + } + + /** + * Attempts to build the Replication Table. Will continue blocking until we have a valid + * Table for the Replication Table. + */ + private class CreateReplicationTableWorker implements Runnable { + + private Admin admin; + + @Override + public void run() { + try { + admin = connection.getAdmin(); + if (!replicationTableExists()) { + createReplicationTable(); + } + int maxRetries = conf.getInt("hbase.replication.queues.createtable.retries.number", + CLIENT_RETRIES); + RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, RPC_TIMEOUT); + RetryCounter retryCounter = counterFactory.create(); + while (!replicationTableExists()) { + retryCounter.sleepUntilNextRetry(); + if (!retryCounter.shouldRetry()) { + throw new IOException("Unable to acquire the Replication Table"); + } + } + replicationTableInitialized.countDown(); + } catch (IOException | InterruptedException e) { + abortable.abort("Failed building Replication Table", e); + } + } + + /** + * Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR + * in TableBasedReplicationQueuesImpl + * + * @throws IOException + */ + private void createReplicationTable() throws IOException { + HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME); + replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR); + try { + admin.createTable(replicationTableDescriptor); + } catch (TableExistsException e) { + // In this case we can just continue as normal + } + } + + /** + * Checks whether the Replication Table exists yet + * + * @return whether the Replication Table exists + * @throws IOException + */ + private boolean replicationTableExists() { + try { + return admin.tableExists(REPLICATION_TABLE_NAME); + } catch (IOException e) { + return false; + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java new file mode 100644 index 0000000..51d7473 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * This is the interface for a Replication Tracker. A replication tracker provides the facility to + * subscribe and track events that reflect a change in replication state. These events are used by + * the ReplicationSourceManager to coordinate replication tasks such as addition/deletion of queues + * and queue failover. These events are defined in the ReplicationListener interface. If a class + * would like to listen to replication events it must implement the ReplicationListener interface + * and register itself with a Replication Tracker. + */ +@InterfaceAudience.Private +public interface ReplicationTracker { + + /** + * Register a replication listener to receive replication events. + * @param listener + */ + public void registerListener(ReplicationListener listener); + + public void removeListener(ReplicationListener listener); + + /** + * Returns a list of other live region servers in the cluster. + * @return List of region servers. + */ + public List<String> getListOfRegionServers(); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java new file mode 100644 index 0000000..9865d83 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java @@ -0,0 +1,250 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * This class is a ZooKeeper implementation of the ReplicationTracker interface. This class is + * responsible for handling replication events that are defined in the ReplicationListener + * interface. + */ +@InterfaceAudience.Private +public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements ReplicationTracker { + + private static final Log LOG = LogFactory.getLog(ReplicationTrackerZKImpl.class); + // All about stopping + private final Stoppable stopper; + // listeners to be notified + private final List<ReplicationListener> listeners = new CopyOnWriteArrayList<>(); + // List of all the other region servers in this cluster + private final ArrayList<String> otherRegionServers = new ArrayList<>(); + private final ReplicationPeers replicationPeers; + + public ReplicationTrackerZKImpl(ZooKeeperWatcher zookeeper, + final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable, + Stoppable stopper) { + super(zookeeper, conf, abortable); + this.replicationPeers = replicationPeers; + this.stopper = stopper; + this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper)); + this.zookeeper.registerListener(new PeersWatcher(this.zookeeper)); + } + + @Override + public void registerListener(ReplicationListener listener) { + listeners.add(listener); + } + + @Override + public void removeListener(ReplicationListener listener) { + listeners.remove(listener); + } + + /** + * Return a snapshot of the current region servers. + */ + @Override + public List<String> getListOfRegionServers() { + refreshOtherRegionServersList(); + + List<String> list = null; + synchronized (otherRegionServers) { + list = new ArrayList<>(otherRegionServers); + } + return list; + } + + /** + * Watcher used to be notified of the other region server's death in the local cluster. It + * initiates the process to transfer the queues if it is able to grab the lock. + */ + public class OtherRegionServerWatcher extends ZooKeeperListener { + + /** + * Construct a ZooKeeper event listener. + */ + public OtherRegionServerWatcher(ZooKeeperWatcher watcher) { + super(watcher); + } + + /** + * Called when a new node has been created. + * @param path full path of the new node + */ + public void nodeCreated(String path) { + refreshListIfRightPath(path); + } + + /** + * Called when a node has been deleted + * @param path full path of the deleted node + */ + public void nodeDeleted(String path) { + if (stopper.isStopped()) { + return; + } + boolean cont = refreshListIfRightPath(path); + if (!cont) { + return; + } + LOG.info(path + " znode expired, triggering replicatorRemoved event"); + for (ReplicationListener rl : listeners) { + rl.regionServerRemoved(getZNodeName(path)); + } + } + + /** + * Called when an existing node has a child node added or removed. + * @param path full path of the node whose children have changed + */ + public void nodeChildrenChanged(String path) { + if (stopper.isStopped()) { + return; + } + refreshListIfRightPath(path); + } + + private boolean refreshListIfRightPath(String path) { + if (!path.startsWith(this.watcher.znodePaths.rsZNode)) { + return false; + } + return refreshOtherRegionServersList(); + } + } + + /** + * Watcher used to follow the creation and deletion of peer clusters. + */ + public class PeersWatcher extends ZooKeeperListener { + + /** + * Construct a ZooKeeper event listener. + */ + public PeersWatcher(ZooKeeperWatcher watcher) { + super(watcher); + } + + /** + * Called when a node has been deleted + * @param path full path of the deleted node + */ + public void nodeDeleted(String path) { + List<String> peers = refreshPeersList(path); + if (peers == null) { + return; + } + if (isPeerPath(path)) { + String id = getZNodeName(path); + LOG.info(path + " znode expired, triggering peerRemoved event"); + for (ReplicationListener rl : listeners) { + rl.peerRemoved(id); + } + } + } + + /** + * Called when an existing node has a child node added or removed. + * @param path full path of the node whose children have changed + */ + public void nodeChildrenChanged(String path) { + List<String> peers = refreshPeersList(path); + if (peers == null) { + return; + } + LOG.info(path + " znode expired, triggering peerListChanged event"); + for (ReplicationListener rl : listeners) { + rl.peerListChanged(peers); + } + } + } + + /** + * Verify if this event is meant for us, and if so then get the latest peers' list from ZK. Also + * reset the watches. + * @param path path to check against + * @return A list of peers' identifiers if the event concerns this watcher, else null. + */ + private List<String> refreshPeersList(String path) { + if (!path.startsWith(getPeersZNode())) { + return null; + } + return this.replicationPeers.getAllPeerIds(); + } + + private String getPeersZNode() { + return this.peersZNode; + } + + /** + * Extracts the znode name of a peer cluster from a ZK path + * @param fullPath Path to extract the id from + * @return the id or an empty string if path is invalid + */ + private String getZNodeName(String fullPath) { + String[] parts = fullPath.split("/"); + return parts.length > 0 ? parts[parts.length - 1] : ""; + } + + /** + * Reads the list of region servers from ZK and atomically clears our local view of it and + * replaces it with the updated list. + * @return true if the local list of the other region servers was updated with the ZK data (even + * if it was empty), false if the data was missing in ZK + */ + private boolean refreshOtherRegionServersList() { + List<String> newRsList = getRegisteredRegionServers(); + if (newRsList == null) { + return false; + } else { + synchronized (otherRegionServers) { + otherRegionServers.clear(); + otherRegionServers.addAll(newRsList); + } + } + return true; + } + + /** + * Get a list of all the other region servers in this cluster and set a watch + * @return a list of server nanes + */ + private List<String> getRegisteredRegionServers() { + List<String> result = null; + try { + result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.znodePaths.rsZNode); + } catch (KeeperException e) { + this.abortable.abort("Get list of registered region servers", e); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java new file mode 100644 index 0000000..3507547 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java @@ -0,0 +1,112 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Implements the ReplicationQueuesClient interface on top of the Replication Table. It utilizes + * the ReplicationTableBase to access the Replication Table. + */ +@InterfaceAudience.Private +public class TableBasedReplicationQueuesClientImpl extends ReplicationTableBase + implements ReplicationQueuesClient { + + public TableBasedReplicationQueuesClientImpl(ReplicationQueuesClientArguments args) + throws IOException { + super(args.getConf(), args.getAbortable()); + } + public TableBasedReplicationQueuesClientImpl(Configuration conf, + Abortable abortable) throws IOException { + super(conf, abortable); + } + + @Override + public void init() throws ReplicationException{ + // no-op + } + + @Override + public List<String> getListOfReplicators() { + return super.getListOfReplicators(); + } + + @Override + public List<String> getLogsInQueue(String serverName, String queueId) { + return super.getLogsInQueue(serverName, queueId); + } + + @Override + public List<String> getAllQueues(String serverName) { + return super.getAllQueues(serverName); + } + + @Override + public Set<String> getAllWALs() { + Set<String> allWals = new HashSet<>(); + ResultScanner allQueues = null; + try (Table replicationTable = getOrBlockOnReplicationTable()) { + allQueues = replicationTable.getScanner(new Scan()); + for (Result queue : allQueues) { + for (String wal : readWALsFromResult(queue)) { + allWals.add(wal); + } + } + } catch (IOException e) { + String errMsg = "Failed getting all WAL's in Replication Table"; + abortable.abort(errMsg, e); + } finally { + if (allQueues != null) { + allQueues.close(); + } + } + return allWals; + } + + @Override + public int getHFileRefsNodeChangeVersion() throws KeeperException { + // TODO + throw new NotImplementedException(); + } + + @Override + public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException { + // TODO + throw new NotImplementedException(); + } + + @Override + public List<String> getReplicableHFiles(String peerId) throws KeeperException { + // TODO + throw new NotImplementedException(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java new file mode 100644 index 0000000..bf55e8c --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java @@ -0,0 +1,450 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hbase.replication; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * This class provides an implementation of the ReplicationQueues interface using an HBase table + * "Replication Table". It utilizes the ReplicationTableBase to access the Replication Table. + */ +@InterfaceAudience.Private +public class TableBasedReplicationQueuesImpl extends ReplicationTableBase + implements ReplicationQueues { + + private static final Log LOG = LogFactory.getLog(TableBasedReplicationQueuesImpl.class); + + // Common byte values used in replication offset tracking + private static final byte[] INITIAL_OFFSET_BYTES = Bytes.toBytes(0L); + private static final byte[] EMPTY_STRING_BYTES = Bytes.toBytes(""); + + private String serverName = null; + private byte[] serverNameBytes = null; + + // TODO: Only use this variable temporarily. Eventually we want to use HBase to store all + // TODO: replication information + private ReplicationStateZKBase replicationState; + + public TableBasedReplicationQueuesImpl(ReplicationQueuesArguments args) throws IOException { + this(args.getConf(), args.getAbortable(), args.getZk()); + } + + public TableBasedReplicationQueuesImpl(Configuration conf, Abortable abort, ZooKeeperWatcher zkw) + throws IOException { + super(conf, abort); + replicationState = new ReplicationStateZKBase(zkw, conf, abort) {}; + } + + @Override + public void init(String serverName) throws ReplicationException { + this.serverName = serverName; + this.serverNameBytes = Bytes.toBytes(serverName); + } + + @Override + public List<String> getListOfReplicators() { + return super.getListOfReplicators(); + } + + @Override + public void removeQueue(String queueId) { + try { + byte[] rowKey = queueIdToRowKey(queueId); + if (checkQueueExists(queueId)) { + Delete deleteQueue = new Delete(rowKey); + safeQueueUpdate(deleteQueue); + } else { + LOG.info("No logs were registered for queue id=" + queueId + " so no rows were removed " + + "from the replication table while removing the queue"); + } + } catch (IOException | ReplicationException e) { + String errMsg = "Failed removing queue queueId=" + queueId; + abortable.abort(errMsg, e); + } + } + + @Override + public void addLog(String queueId, String filename) throws ReplicationException { + try (Table replicationTable = getOrBlockOnReplicationTable()) { + if (!checkQueueExists(queueId)) { + // Each queue will have an Owner, OwnerHistory, and a collection of [WAL:offset] key values + Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId))); + putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER, serverNameBytes); + putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, EMPTY_STRING_BYTES); + putNewQueue.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); + replicationTable.put(putNewQueue); + } else { + // Otherwise simply add the new log and offset as a new column + Put putNewLog = new Put(queueIdToRowKey(queueId)); + putNewLog.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); + safeQueueUpdate(putNewLog); + } + } catch (IOException | ReplicationException e) { + String errMsg = "Failed adding log queueId=" + queueId + " filename=" + filename; + abortable.abort(errMsg, e); + } + } + + @Override + public void removeLog(String queueId, String filename) { + try { + byte[] rowKey = queueIdToRowKey(queueId); + Delete delete = new Delete(rowKey); + delete.addColumns(CF_QUEUE, Bytes.toBytes(filename)); + safeQueueUpdate(delete); + } catch (IOException | ReplicationException e) { + String errMsg = "Failed removing log queueId=" + queueId + " filename=" + filename; + abortable.abort(errMsg, e); + } + } + + @Override + public void setLogPosition(String queueId, String filename, long position) { + try (Table replicationTable = getOrBlockOnReplicationTable()) { + byte[] rowKey = queueIdToRowKey(queueId); + // Check that the log exists. addLog() must have been called before setLogPosition(). + Get checkLogExists = new Get(rowKey); + checkLogExists.addColumn(CF_QUEUE, Bytes.toBytes(filename)); + if (!replicationTable.exists(checkLogExists)) { + String errMsg = "Could not set position of non-existent log from queueId=" + queueId + + ", filename=" + filename; + abortable.abort(errMsg, new ReplicationException(errMsg)); + return; + } + // Update the log offset if it exists + Put walAndOffset = new Put(rowKey); + walAndOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename), Bytes.toBytes(position)); + safeQueueUpdate(walAndOffset); + } catch (IOException | ReplicationException e) { + String errMsg = "Failed writing log position queueId=" + queueId + "filename=" + + filename + " position=" + position; + abortable.abort(errMsg, e); + } + } + + @Override + public long getLogPosition(String queueId, String filename) throws ReplicationException { + try { + byte[] rowKey = queueIdToRowKey(queueId); + Get getOffset = new Get(rowKey); + getOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename)); + Result result = getResultIfOwner(getOffset); + if (result == null || !result.containsColumn(CF_QUEUE, Bytes.toBytes(filename))) { + throw new ReplicationException("Could not read empty result while getting log position " + + "queueId=" + queueId + ", filename=" + filename); + } + return Bytes.toLong(result.getValue(CF_QUEUE, Bytes.toBytes(filename))); + } catch (IOException e) { + throw new ReplicationException("Could not get position in log for queueId=" + queueId + + ", filename=" + filename); + } + } + + @Override + public void removeAllQueues() { + List<String> myQueueIds = getAllQueues(); + for (String queueId : myQueueIds) { + removeQueue(queueId); + } + } + + @Override + public List<String> getLogsInQueue(String queueId) { + String errMsg = "Failed getting logs in queue queueId=" + queueId; + byte[] rowKey = queueIdToRowKey(queueId); + List<String> logs = new ArrayList<>(); + try { + Get getQueue = new Get(rowKey); + Result queue = getResultIfOwner(getQueue); + if (queue == null || queue.isEmpty()) { + String errMsgLostOwnership = "Failed getting logs for queue queueId=" + + Bytes.toString(rowKey) + " because the queue was missing or we lost ownership"; + abortable.abort(errMsg, new ReplicationException(errMsgLostOwnership)); + return null; + } + Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE); + for(byte[] cQualifier : familyMap.keySet()) { + if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier, + COL_QUEUE_OWNER_HISTORY)) { + continue; + } + logs.add(Bytes.toString(cQualifier)); + } + } catch (IOException e) { + abortable.abort(errMsg, e); + return null; + } + return logs; + } + + @Override + public List<String> getAllQueues() { + return getAllQueues(serverName); + } + + @Override public List<String> getUnClaimedQueueIds(String regionserver) { + if (isThisOurRegionServer(regionserver)) { + return null; + } + try (ResultScanner queuesToClaim = getQueuesBelongingToServer(regionserver)) { + List<String> res = new ArrayList<>(); + for (Result queue : queuesToClaim) { + String rowKey = Bytes.toString(queue.getRow()); + res.add(rowKey); + } + return res.isEmpty() ? null : res; + } catch (IOException e) { + String errMsg = "Failed getUnClaimedQueueIds"; + abortable.abort(errMsg, e); + } + return null; + } + + @Override public void removeReplicatorIfQueueIsEmpty(String regionserver) { + // Do nothing here + } + + @Override + public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) { + if (isThisOurRegionServer(regionserver)) { + return null; + } + + try (ResultScanner queuesToClaim = getQueuesBelongingToServer(regionserver)){ + for (Result queue : queuesToClaim) { + String rowKey = Bytes.toString(queue.getRow()); + if (!rowKey.equals(queueId)){ + continue; + } + if (attemptToClaimQueue(queue, regionserver)) { + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(rowKey); + if (replicationState.peerExists(replicationQueueInfo.getPeerId())) { + SortedSet<String> sortedLogs = new TreeSet<>(); + List<String> logs = getLogsInQueue(queue.getRow()); + for (String log : logs) { + sortedLogs.add(log); + } + LOG.info(serverName + " has claimed queue " + rowKey + " from " + regionserver); + return new Pair<>(rowKey, sortedLogs); + } else { + // Delete orphaned queues + removeQueue(Bytes.toString(queue.getRow())); + LOG.info(serverName + " has deleted abandoned queue " + queueId + " from " + + regionserver); + } + } + } + } catch (IOException | KeeperException e) { + String errMsg = "Failed claiming queues for regionserver=" + regionserver; + abortable.abort(errMsg, e); + } + return null; + } + + @Override + public boolean isThisOurRegionServer(String regionserver) { + return this.serverName.equals(regionserver); + } + + @Override + public void addPeerToHFileRefs(String peerId) throws ReplicationException { + // TODO + throw new NotImplementedException(); + } + + @Override + public void removePeerFromHFileRefs(String peerId) { + // TODO + throw new NotImplementedException(); + } + + @Override + public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) + throws ReplicationException { + // TODO + throw new NotImplementedException(); + } + + @Override + public void removeHFileRefs(String peerId, List<String> files) { + // TODO + throw new NotImplementedException(); + } + + private String buildQueueRowKey(String queueId) { + return buildQueueRowKey(serverName, queueId); + } + + /** + * Convenience method that gets the row key of the queue specified by queueId + * @param queueId queueId of a queue in this server + * @return the row key of the queue in the Replication Table + */ + private byte[] queueIdToRowKey(String queueId) { + return queueIdToRowKey(serverName, queueId); + } + + /** + * See safeQueueUpdate(RowMutations mutate) + * + * @param put Row mutation to perform on the queue + */ + private void safeQueueUpdate(Put put) throws ReplicationException, IOException { + RowMutations mutations = new RowMutations(put.getRow()); + mutations.add(put); + safeQueueUpdate(mutations); + } + + /** + * See safeQueueUpdate(RowMutations mutate) + * + * @param delete Row mutation to perform on the queue + */ + private void safeQueueUpdate(Delete delete) throws ReplicationException, + IOException{ + RowMutations mutations = new RowMutations(delete.getRow()); + mutations.add(delete); + safeQueueUpdate(mutations); + } + + /** + * Attempt to mutate a given queue in the Replication Table with a checkAndPut on the OWNER column + * of the queue. Abort the server if this checkAndPut fails: which means we have somehow lost + * ownership of the column or an IO Exception has occurred during the transaction. + * + * @param mutate Mutation to perform on a given queue + */ + private void safeQueueUpdate(RowMutations mutate) throws ReplicationException, IOException{ + try (Table replicationTable = getOrBlockOnReplicationTable()) { + boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), + CF_QUEUE, COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, serverNameBytes, mutate); + if (!updateSuccess) { + throw new ReplicationException("Failed to update Replication Table because we lost queue " + + " ownership"); + } + } + } + + /** + * Check if the queue specified by queueId is stored in HBase + * + * @param queueId Either raw or reclaimed format of the queueId + * @return Whether the queue is stored in HBase + * @throws IOException + */ + private boolean checkQueueExists(String queueId) throws IOException { + try (Table replicationTable = getOrBlockOnReplicationTable()) { + byte[] rowKey = queueIdToRowKey(queueId); + return replicationTable.exists(new Get(rowKey)); + } + } + + /** + * Attempt to claim the given queue with a checkAndPut on the OWNER column. We check that the + * recently killed server is still the OWNER before we claim it. + * + * @param queue The queue that we are trying to claim + * @param originalServer The server that originally owned the queue + * @return Whether we successfully claimed the queue + * @throws IOException + */ + private boolean attemptToClaimQueue (Result queue, String originalServer) throws IOException{ + Put putQueueNameAndHistory = new Put(queue.getRow()); + putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER, Bytes.toBytes(serverName)); + String newOwnerHistory = buildClaimedQueueHistory(Bytes.toString(queue.getValue(CF_QUEUE, + COL_QUEUE_OWNER_HISTORY)), originalServer); + putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, + Bytes.toBytes(newOwnerHistory)); + RowMutations claimAndRenameQueue = new RowMutations(queue.getRow()); + claimAndRenameQueue.add(putQueueNameAndHistory); + // Attempt to claim ownership for this queue by checking if the current OWNER is the original + // server. If it is not then another RS has already claimed it. If it is we set ourselves as the + // new owner and update the queue's history + try (Table replicationTable = getOrBlockOnReplicationTable()) { + boolean success = replicationTable.checkAndMutate(queue.getRow(), + CF_QUEUE, COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(originalServer), + claimAndRenameQueue); + return success; + } + } + + /** + * Attempts to run a Get on some queue. Will only return a non-null result if we currently own + * the queue. + * + * @param get The Get that we want to query + * @return The result of the Get if this server is the owner of the queue. Else it returns null. + * @throws IOException + */ + private Result getResultIfOwner(Get get) throws IOException { + Scan scan = new Scan(get); + // Check if the Get currently contains all columns or only specific columns + if (scan.getFamilyMap().size() > 0) { + // Add the OWNER column if the scan is already only over specific columns + scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); + } + scan.setMaxResultSize(1); + SingleColumnValueFilter checkOwner = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER, + CompareFilter.CompareOp.EQUAL, serverNameBytes); + scan.setFilter(checkOwner); + ResultScanner scanner = null; + try (Table replicationTable = getOrBlockOnReplicationTable()) { + scanner = replicationTable.getScanner(scan); + Result result = scanner.next(); + return (result == null || result.isEmpty()) ? null : result; + } finally { + if (scanner != null) { + scanner.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-server/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index d48dec8..b0468d6 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -378,6 +378,10 @@ </dependency> <dependency> <groupId>org.apache.hbase</groupId> + <artifactId>hbase-replication</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> <artifactId>hbase-prefix-tree</artifactId> <scope>runtime</scope> </dependency> http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java index 0e619ea..ae4e7cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java @@ -27,8 +27,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.AbstractService; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; - /** * A Base implementation for {@link ReplicationEndpoint}s. Users should consider extending this * class rather than implementing {@link ReplicationEndpoint} directly for better backwards http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 65da6eb..0ac0f27 100644 --- a/pom.xml +++ b/pom.xml @@ -61,6 +61,7 @@ </licenses> <modules> + <module>hbase-replication</module> <module>hbase-resource-bundle</module> <module>hbase-server</module> <module>hbase-thrift</module> @@ -1564,6 +1565,11 @@ <scope>test</scope> </dependency> <dependency> + <artifactId>hbase-replication</artifactId> + <groupId>org.apache.hbase</groupId> + <version>${project.version}</version> + </dependency> + <dependency> <artifactId>hbase-server</artifactId> <groupId>org.apache.hbase</groupId> <version>${project.version}</version>