http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java deleted file mode 100644 index 1981131..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.util.List; -import java.util.Set; - -import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableSet; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; - -@InterfaceAudience.Private -public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements - ReplicationQueuesClient { - - Log LOG = LogFactory.getLog(ReplicationQueuesClientZKImpl.class); - - public ReplicationQueuesClientZKImpl(ReplicationQueuesClientArguments args) { - this(args.getZk(), args.getConf(), args.getAbortable()); - } - - public ReplicationQueuesClientZKImpl(final ZooKeeperWatcher zk, Configuration conf, - Abortable abortable) { - super(zk, conf, abortable); - } - - @Override - public void init() throws ReplicationException { - try { - if (ZKUtil.checkExists(this.zookeeper, this.queuesZNode) < 0) { - ZKUtil.createWithParents(this.zookeeper, this.queuesZNode); - } - } catch (KeeperException e) { - throw new ReplicationException("Internal error while initializing a queues client", e); - } - } - - @Override - public List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException { - String znode = ZKUtil.joinZNode(this.queuesZNode, serverName); - znode = ZKUtil.joinZNode(znode, queueId); - List<String> result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Failed to get list of wals for queueId=" + queueId - + " and serverName=" + serverName, e); - throw e; - } - return result; - } - - @Override - public List<String> getAllQueues(String serverName) throws KeeperException { - String znode = ZKUtil.joinZNode(this.queuesZNode, serverName); - List<String> result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e); - throw e; - } - return result; - } - - @Override - public Set<String> getAllWALs() throws KeeperException { - /** - * Load all wals in all replication queues from ZK. This method guarantees to return a - * snapshot which contains all WALs in the zookeeper at the start of this call even there - * is concurrent queue failover. However, some newly created WALs during the call may - * not be included. - */ - for (int retry = 0; ; retry++) { - int v0 = getQueuesZNodeCversion(); - List<String> rss = getListOfReplicators(); - if (rss == null || rss.isEmpty()) { - LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); - return ImmutableSet.of(); - } - Set<String> wals = Sets.newHashSet(); - for (String rs : rss) { - List<String> listOfPeers = getAllQueues(rs); - // if rs just died, this will be null - if (listOfPeers == null) { - continue; - } - for (String id : listOfPeers) { - List<String> peersWals = getLogsInQueue(rs, id); - if (peersWals != null) { - wals.addAll(peersWals); - } - } - } - int v1 = getQueuesZNodeCversion(); - if (v0 == v1) { - return wals; - } - LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d", - v0, v1, retry)); - } - } - - public int getQueuesZNodeCversion() throws KeeperException { - try { - Stat stat = new Stat(); - ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat); - return stat.getCversion(); - } catch (KeeperException e) { - this.abortable.abort("Failed to get stat of replication rs node", e); - throw e; - } - } - - @Override - public int getHFileRefsNodeChangeVersion() throws KeeperException { - Stat stat = new Stat(); - try { - ZKUtil.getDataNoWatch(this.zookeeper, this.hfileRefsZNode, stat); - } catch (KeeperException e) { - this.abortable.abort("Failed to get stat of replication hfile references node.", e); - throw e; - } - return stat.getCversion(); - } - - @Override - public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException { - List<String> result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.hfileRefsZNode); - } catch (KeeperException e) { - this.abortable.abort("Failed to get list of all peers in hfile references node.", e); - throw e; - } - return result; - } - - @Override - public List<String> getReplicableHFiles(String peerId) throws KeeperException { - String znode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId); - List<String> result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Failed to get list of hfile references for peerId=" + peerId, e); - throw e; - } - return result; - } -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java deleted file mode 100644 index 4733706..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ /dev/null @@ -1,407 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.util.ArrayList; -import java.util.List; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; - -/** - * This class provides an implementation of the - * interface using ZooKeeper. The - * base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of - * all outstanding WAL files on this region server that need to be replicated. The myQueuesZnode is - * the regionserver name (a concatenation of the region serverâs hostname, client port and start - * code). For example: - * - * /hbase/replication/rs/hostname.example.org,6020,1234 - * - * Within this znode, the region server maintains a set of WAL replication queues. These queues are - * represented by child znodes named using there give queue id. For example: - * - * /hbase/replication/rs/hostname.example.org,6020,1234/1 - * /hbase/replication/rs/hostname.example.org,6020,1234/2 - * - * Each queue has one child znode for every WAL that still needs to be replicated. The value of - * these WAL child znodes is the latest position that has been replicated. This position is updated - * every time a WAL entry is replicated. For example: - * - * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254] - */ -@InterfaceAudience.Private -public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues { - - /** Znode containing all replication queues for this region server. */ - private String myQueuesZnode; - - private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class); - - public ReplicationQueuesZKImpl(ReplicationQueuesArguments args) { - this(args.getZk(), args.getConf(), args.getAbortable()); - } - - public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf, - Abortable abortable) { - super(zk, conf, abortable); - } - - @Override - public void init(String serverName) throws ReplicationException { - this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName); - try { - if (ZKUtil.checkExists(this.zookeeper, this.myQueuesZnode) < 0) { - ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode); - } - } catch (KeeperException e) { - throw new ReplicationException("Could not initialize replication queues.", e); - } - if (conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, - HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) { - try { - if (ZKUtil.checkExists(this.zookeeper, this.hfileRefsZNode) < 0) { - ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode); - } - } catch (KeeperException e) { - throw new ReplicationException("Could not initialize hfile references replication queue.", - e); - } - } - } - - @Override - public void removeQueue(String queueId) { - try { - ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.myQueuesZnode, queueId)); - } catch (KeeperException e) { - this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", e); - } - } - - @Override - public void addLog(String queueId, String filename) throws ReplicationException { - String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); - znode = ZKUtil.joinZNode(znode, filename); - try { - ZKUtil.createWithParents(this.zookeeper, znode); - } catch (KeeperException e) { - throw new ReplicationException( - "Could not add log because znode could not be created. queueId=" + queueId - + ", filename=" + filename); - } - } - - @Override - public void removeLog(String queueId, String filename) { - try { - String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); - znode = ZKUtil.joinZNode(znode, filename); - ZKUtil.deleteNode(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename=" - + filename + ")", e); - } - } - - @Override - public void setLogPosition(String queueId, String filename, long position) { - try { - String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); - znode = ZKUtil.joinZNode(znode, filename); - // Why serialize String of Long and not Long as bytes? - ZKUtil.setData(this.zookeeper, znode, ZKUtil.positionToByteArray(position)); - } catch (KeeperException e) { - this.abortable.abort("Failed to write replication wal position (filename=" + filename - + ", position=" + position + ")", e); - } - } - - @Override - public long getLogPosition(String queueId, String filename) throws ReplicationException { - String clusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); - String znode = ZKUtil.joinZNode(clusterZnode, filename); - byte[] bytes = null; - try { - bytes = ZKUtil.getData(this.zookeeper, znode); - } catch (KeeperException e) { - throw new ReplicationException("Internal Error: could not get position in log for queueId=" - + queueId + ", filename=" + filename, e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return 0; - } - try { - return ZKUtil.parseWALPositionFrom(bytes); - } catch (DeserializationException de) { - LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and wal=" + filename - + " znode content, continuing."); - } - // if we can not parse the position, start at the beginning of the wal file - // again - return 0; - } - - @Override - public boolean isThisOurRegionServer(String regionserver) { - return ZKUtil.joinZNode(this.queuesZNode, regionserver).equals(this.myQueuesZnode); - } - - @Override - public List<String> getUnClaimedQueueIds(String regionserver) { - if (isThisOurRegionServer(regionserver)) { - return null; - } - String rsZnodePath = ZKUtil.joinZNode(this.queuesZNode, regionserver); - List<String> queues = null; - try { - queues = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZnodePath); - } catch (KeeperException e) { - this.abortable.abort("Failed to getUnClaimedQueueIds for RS" + regionserver, e); - } - return queues; - } - - @Override - public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) { - LOG.info("Atomically moving " + regionserver + "/" + queueId + "'s WALs to my queue"); - return moveQueueUsingMulti(regionserver, queueId); - } - - @Override - public void removeReplicatorIfQueueIsEmpty(String regionserver) { - String rsPath = ZKUtil.joinZNode(this.queuesZNode, regionserver); - try { - List<String> list = ZKUtil.listChildrenNoWatch(this.zookeeper, rsPath); - if (list != null && list.isEmpty()){ - ZKUtil.deleteNode(this.zookeeper, rsPath); - } - } catch (KeeperException e) { - LOG.warn("Got error while removing replicator", e); - } - } - - @Override - public void removeAllQueues() { - try { - ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode); - } catch (KeeperException e) { - // if the znode is already expired, don't bother going further - if (e instanceof KeeperException.SessionExpiredException) { - return; - } - this.abortable.abort("Failed to delete replication queues for region server: " - + this.myQueuesZnode, e); - } - } - - @Override - public List<String> getLogsInQueue(String queueId) { - String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); - List<String> result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Failed to get list of wals for queueId=" + queueId, e); - } - return result; - } - - @Override - public List<String> getAllQueues() { - List<String> listOfQueues = null; - try { - listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, this.myQueuesZnode); - } catch (KeeperException e) { - this.abortable.abort("Failed to get a list of queues for region server: " - + this.myQueuesZnode, e); - } - return listOfQueues == null ? new ArrayList<>() : listOfQueues; - } - - /** - * It "atomically" copies one peer's wals queue from another dead region server and returns them - * all sorted. The new peer id is equal to the old peer id appended with the dead server's znode. - * @param znode pertaining to the region server to copy the queues from - * @peerId peerId pertaining to the queue need to be copied - */ - private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, String peerId) { - try { - // hbase/replication/rs/deadrs - String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode); - List<ZKUtilOp> listOfOps = new ArrayList<>(); - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); - - String newPeerId = peerId + "-" + znode; - String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId); - // check the logs queue for the old peer cluster - String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId); - List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode); - - if (!peerExists(replicationQueueInfo.getPeerId())) { - LOG.warn("Peer " + replicationQueueInfo.getPeerId() + - " didn't exist, will move its queue to avoid the failure of multi op"); - for (String wal : wals) { - String oldWalZnode = ZKUtil.joinZNode(oldClusterZnode, wal); - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode)); - } - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); - ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); - return null; - } - - SortedSet<String> logQueue = new TreeSet<>(); - if (wals == null || wals.isEmpty()) { - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); - } else { - // create the new cluster znode - ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY); - listOfOps.add(op); - // get the offset of the logs and set it to new znodes - for (String wal : wals) { - String oldWalZnode = ZKUtil.joinZNode(oldClusterZnode, wal); - byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalZnode); - LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset)); - String newLogZnode = ZKUtil.joinZNode(newPeerZnode, wal); - listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset)); - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode)); - logQueue.add(wal); - } - // add delete op for peer - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); - - if (LOG.isTraceEnabled()) - LOG.trace(" The multi list size is: " + listOfOps.size()); - } - ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); - - LOG.info("Atomically moved " + znode + "/" + peerId + "'s WALs to my queue"); - return new Pair<>(newPeerId, logQueue); - } catch (KeeperException e) { - // Multi call failed; it looks like some other regionserver took away the logs. - LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); - } catch (InterruptedException e) { - LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); - Thread.currentThread().interrupt(); - } - return null; - } - - @Override - public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) - throws ReplicationException { - String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId); - boolean debugEnabled = LOG.isDebugEnabled(); - if (debugEnabled) { - LOG.debug("Adding hfile references " + pairs + " in queue " + peerZnode); - } - - int size = pairs.size(); - List<ZKUtilOp> listOfOps = new ArrayList<>(size); - - for (int i = 0; i < size; i++) { - listOfOps.add(ZKUtilOp.createAndFailSilent( - ZKUtil.joinZNode(peerZnode, pairs.get(i).getSecond().getName()), - HConstants.EMPTY_BYTE_ARRAY)); - } - if (debugEnabled) { - LOG.debug(" The multi list size for adding hfile references in zk for node " + peerZnode - + " is " + listOfOps.size()); - } - try { - ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); - } catch (KeeperException e) { - throw new ReplicationException("Failed to create hfile reference znode=" + e.getPath(), e); - } - } - - @Override - public void removeHFileRefs(String peerId, List<String> files) { - String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId); - boolean debugEnabled = LOG.isDebugEnabled(); - if (debugEnabled) { - LOG.debug("Removing hfile references " + files + " from queue " + peerZnode); - } - - int size = files.size(); - List<ZKUtilOp> listOfOps = new ArrayList<>(size); - - for (int i = 0; i < size; i++) { - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i)))); - } - if (debugEnabled) { - LOG.debug(" The multi list size for removing hfile references in zk for node " + peerZnode - + " is " + listOfOps.size()); - } - try { - ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); - } catch (KeeperException e) { - LOG.error("Failed to remove hfile reference znode=" + e.getPath(), e); - } - } - - @Override - public void addPeerToHFileRefs(String peerId) throws ReplicationException { - String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId); - try { - if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) { - LOG.info("Adding peer " + peerId + " to hfile reference queue."); - ZKUtil.createWithParents(this.zookeeper, peerZnode); - } - } catch (KeeperException e) { - throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.", - e); - } - } - - @Override - public void removePeerFromHFileRefs(String peerId) { - final String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId); - try { - if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) { - if (LOG.isDebugEnabled()) { - LOG.debug("Peer " + peerZnode + " not found in hfile reference queue."); - } - return; - } else { - LOG.info("Removing peer " + peerZnode + " from hfile reference queue."); - ZKUtil.deleteNodeRecursively(this.zookeeper, peerZnode); - } - } catch (KeeperException e) { - LOG.error("Ignoring the exception to remove peer " + peerId + " from hfile reference queue.", - e); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java deleted file mode 100644 index 47b780d..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; - -import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; -import org.apache.hadoop.hbase.zookeeper.ZKConfig; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; - - -/** - * This is a base class for maintaining replication state in zookeeper. - */ -@InterfaceAudience.Private -public abstract class ReplicationStateZKBase { - - /** - * The name of the znode that contains the replication status of a remote slave (i.e. peer) - * cluster. - */ - protected final String peerStateNodeName; - /** The name of the base znode that contains all replication state. */ - protected final String replicationZNode; - /** The name of the znode that contains a list of all remote slave (i.e. peer) clusters. */ - protected final String peersZNode; - /** The name of the znode that contains all replication queues */ - protected final String queuesZNode; - /** The name of the znode that contains queues of hfile references to be replicated */ - protected final String hfileRefsZNode; - /** The cluster key of the local cluster */ - protected final String ourClusterKey; - /** The name of the znode that contains tableCFs */ - protected final String tableCFsNodeName; - - protected final ZooKeeperWatcher zookeeper; - protected final Configuration conf; - protected final Abortable abortable; - - // Public for testing - public static final byte[] ENABLED_ZNODE_BYTES = - toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); - public static final byte[] DISABLED_ZNODE_BYTES = - toByteArray(ReplicationProtos.ReplicationState.State.DISABLED); - public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY = - "zookeeper.znode.replication.hfile.refs"; - public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs"; - - public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf, - Abortable abortable) { - this.zookeeper = zookeeper; - this.conf = conf; - this.abortable = abortable; - - String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); - String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); - String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); - String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, - ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); - this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); - this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs"); - this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf); - this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.znodePaths.baseZNode, - replicationZNodeName); - this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName); - this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName); - this.hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName); - } - - public List<String> getListOfReplicators() { - List<String> result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.queuesZNode); - } catch (KeeperException e) { - this.abortable.abort("Failed to get list of replicators", e); - } - return result; - } - - /** - * @param state - * @return Serialized protobuf of <code>state</code> with pb magic prefix prepended suitable for - * use as content of a peer-state znode under a peer cluster id as in - * /hbase/replication/peers/PEER_ID/peer-state. - */ - protected static byte[] toByteArray(final ReplicationProtos.ReplicationState.State state) { - ReplicationProtos.ReplicationState msg = - ReplicationProtos.ReplicationState.newBuilder().setState(state).build(); - // There is no toByteArray on this pb Message? - // 32 bytes is default which seems fair enough here. - try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - CodedOutputStream cos = CodedOutputStream.newInstance(baos, 16); - msg.writeTo(cos); - cos.flush(); - baos.flush(); - return ProtobufUtil.prependPBMagic(baos.toByteArray()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - protected boolean peerExists(String id) throws KeeperException { - return ZKUtil.checkExists(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)) >= 0; - } - - /** - * Determine if a ZK path points to a peer node. - * @param path path to be checked - * @return true if the path points to a peer node, otherwise false - */ - protected boolean isPeerPath(String path) { - return path.split("/").length == peersZNode.split("/").length + 1; - } - - @VisibleForTesting - protected String getTableCFsNode(String id) { - return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName)); - } - - @VisibleForTesting - protected String getPeerStateNode(String id) { - return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName)); - } - @VisibleForTesting - protected String getPeerNode(String id) { - return ZKUtil.joinZNode(this.peersZNode, id); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java deleted file mode 100644 index fa1d2d6..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java +++ /dev/null @@ -1,441 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.hadoop.hbase.replication; - -import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.NamespaceDescriptor; -import org.apache.hadoop.hbase.TableExistsException; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.filter.CompareFilter; -import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; -import org.apache.hadoop.hbase.regionserver.BloomType; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.RetryCounter; -import org.apache.hadoop.hbase.util.RetryCounterFactory; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/* - * Abstract class that provides an interface to the Replication Table. Which is currently - * being used for WAL offset tracking. - * The basic schema of this table will store each individual queue as a - * seperate row. The row key will be a unique identifier of the creating server's name and the - * queueId. Each queue must have the following two columns: - * COL_QUEUE_OWNER: tracks which server is currently responsible for tracking the queue - * COL_QUEUE_OWNER_HISTORY: a "|" delimited list of the previous server's that have owned this - * queue. The most recent previous owner is the leftmost entry. - * They will also have columns mapping [WAL filename : offset] - * The most flexible method of interacting with the Replication Table is by calling - * getOrBlockOnReplicationTable() which will return a new copy of the Replication Table. It is up - * to the caller to close the returned table. - */ -@InterfaceAudience.Private -abstract class ReplicationTableBase { - - /** Name of the HBase Table used for tracking replication*/ - public static final TableName REPLICATION_TABLE_NAME = - TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"); - - // Column family and column names for Queues in the Replication Table - public static final byte[] CF_QUEUE = Bytes.toBytes("q"); - public static final byte[] COL_QUEUE_OWNER = Bytes.toBytes("o"); - public static final byte[] COL_QUEUE_OWNER_HISTORY = Bytes.toBytes("h"); - - // Column Descriptor for the Replication Table - private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR = - new HColumnDescriptor(CF_QUEUE).setMaxVersions(1) - .setInMemory(true) - .setScope(HConstants.REPLICATION_SCOPE_LOCAL) - // TODO: Figure out which bloom filter to use - .setBloomFilterType(BloomType.NONE); - - // The value used to delimit the queueId and server name inside of a queue's row key. Currently a - // hyphen, because it is guaranteed that queueId (which is a cluster id) cannot contain hyphens. - // See HBASE-11394. - public static final String ROW_KEY_DELIMITER = "-"; - - // The value used to delimit server names in the queue history list - public static final String QUEUE_HISTORY_DELIMITER = "|"; - - /* - * Make sure that HBase table operations for replication have a high number of retries. This is - * because the server is aborted if any HBase table operation fails. Each RPC will be attempted - * 3600 times before exiting. This provides each operation with 2 hours of retries - * before the server is aborted. - */ - private static final int CLIENT_RETRIES = 3600; - private static final int RPC_TIMEOUT = 2000; - private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT; - - // We only need a single thread to initialize the Replication Table - private static final int NUM_INITIALIZE_WORKERS = 1; - - protected final Configuration conf; - protected final Abortable abortable; - private final Connection connection; - private final Executor executor; - private volatile CountDownLatch replicationTableInitialized; - - public ReplicationTableBase(Configuration conf, Abortable abort) throws IOException { - this.conf = new Configuration(conf); - this.abortable = abort; - decorateConf(); - this.connection = ConnectionFactory.createConnection(this.conf); - this.executor = setUpExecutor(); - this.replicationTableInitialized = new CountDownLatch(1); - createReplicationTableInBackground(); - } - - /** - * Modify the connection's config so that operations run on the Replication Table have longer and - * a larger number of retries - */ - private void decorateConf() { - this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES); - } - - /** - * Sets up the thread pool executor used to build the Replication Table in the background - * @return the configured executor - */ - private Executor setUpExecutor() { - ThreadPoolExecutor tempExecutor = new ThreadPoolExecutor(NUM_INITIALIZE_WORKERS, - NUM_INITIALIZE_WORKERS, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); - ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); - tfb.setNameFormat("ReplicationTableExecutor-%d"); - tfb.setDaemon(true); - tempExecutor.setThreadFactory(tfb.build()); - return tempExecutor; - } - - /** - * Get whether the Replication Table has been successfully initialized yet - * @return whether the Replication Table is initialized - */ - public boolean getInitializationStatus() { - return replicationTableInitialized.getCount() == 0; - } - - /** - * Increases the RPC and operations timeouts for the Replication Table - */ - private Table setReplicationTableTimeOuts(Table replicationTable) { - replicationTable.setRpcTimeout(RPC_TIMEOUT); - replicationTable.setOperationTimeout(OPERATION_TIMEOUT); - return replicationTable; - } - - /** - * Build the row key for the given queueId. This will uniquely identify it from all other queues - * in the cluster. - * @param serverName The owner of the queue - * @param queueId String identifier of the queue - * @return String representation of the queue's row key - */ - protected String buildQueueRowKey(String serverName, String queueId) { - return queueId + ROW_KEY_DELIMITER + serverName; - } - - /** - * Parse the original queueId from a row key - * @param rowKey String representation of a queue's row key - * @return the original queueId - */ - protected String getRawQueueIdFromRowKey(String rowKey) { - return rowKey.split(ROW_KEY_DELIMITER)[0]; - } - - /** - * Returns a queue's row key given either its raw or reclaimed queueId - * - * @param queueId queueId of the queue - * @return byte representation of the queue's row key - */ - protected byte[] queueIdToRowKey(String serverName, String queueId) { - // Cluster id's are guaranteed to have no hyphens, so if the passed in queueId has no hyphen - // then this is not a reclaimed queue. - if (!queueId.contains(ROW_KEY_DELIMITER)) { - return Bytes.toBytes(buildQueueRowKey(serverName, queueId)); - // If the queueId contained some hyphen it was reclaimed. In this case, the queueId is the - // queue's row key - } else { - return Bytes.toBytes(queueId); - } - } - - /** - * Creates a "|" delimited record of the queue's past region server owners. - * - * @param originalHistory the queue's original owner history - * @param oldServer the name of the server that used to own the queue - * @return the queue's new owner history - */ - protected String buildClaimedQueueHistory(String originalHistory, String oldServer) { - return oldServer + QUEUE_HISTORY_DELIMITER + originalHistory; - } - - /** - * Get a list of all region servers that have outstanding replication queues. These servers could - * be alive, dead or from a previous run of the cluster. - * @return a list of server names - */ - protected List<String> getListOfReplicators() { - // scan all of the queues and return a list of all unique OWNER values - Set<String> peerServers = new HashSet<>(); - ResultScanner allQueuesInCluster = null; - try (Table replicationTable = getOrBlockOnReplicationTable()){ - Scan scan = new Scan(); - scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); - allQueuesInCluster = replicationTable.getScanner(scan); - for (Result queue : allQueuesInCluster) { - peerServers.add(Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER))); - } - } catch (IOException e) { - String errMsg = "Failed getting list of replicators"; - abortable.abort(errMsg, e); - } finally { - if (allQueuesInCluster != null) { - allQueuesInCluster.close(); - } - } - return new ArrayList<>(peerServers); - } - - protected List<String> getAllQueues(String serverName) { - List<String> allQueues = new ArrayList<>(); - ResultScanner queueScanner = null; - try { - queueScanner = getQueuesBelongingToServer(serverName); - for (Result queue : queueScanner) { - String rowKey = Bytes.toString(queue.getRow()); - // If the queue does not have a Owner History, then we must be its original owner. So we - // want to return its queueId in raw form - if (Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER_HISTORY)).length() == 0) { - allQueues.add(getRawQueueIdFromRowKey(rowKey)); - } else { - allQueues.add(rowKey); - } - } - return allQueues; - } catch (IOException e) { - String errMsg = "Failed getting list of all replication queues for serverName=" + serverName; - abortable.abort(errMsg, e); - return null; - } finally { - if (queueScanner != null) { - queueScanner.close(); - } - } - } - - protected List<String> getLogsInQueue(String serverName, String queueId) { - String rowKey = queueId; - if (!queueId.contains(ROW_KEY_DELIMITER)) { - rowKey = buildQueueRowKey(serverName, queueId); - } - return getLogsInQueue(Bytes.toBytes(rowKey)); - } - - protected List<String> getLogsInQueue(byte[] rowKey) { - String errMsg = "Failed getting logs in queue queueId=" + Bytes.toString(rowKey); - try (Table replicationTable = getOrBlockOnReplicationTable()) { - Get getQueue = new Get(rowKey); - Result queue = replicationTable.get(getQueue); - if (queue == null || queue.isEmpty()) { - abortable.abort(errMsg, new ReplicationException(errMsg)); - return null; - } - return readWALsFromResult(queue); - } catch (IOException e) { - abortable.abort(errMsg, e); - return null; - } - } - - /** - * Read all of the WAL's from a queue into a list - * - * @param queue HBase query result containing the queue - * @return a list of all the WAL filenames - */ - protected List<String> readWALsFromResult(Result queue) { - List<String> wals = new ArrayList<>(); - Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE); - for (byte[] cQualifier : familyMap.keySet()) { - // Ignore the meta data fields of the queue - if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier, - COL_QUEUE_OWNER_HISTORY)) { - continue; - } - wals.add(Bytes.toString(cQualifier)); - } - return wals; - } - - /** - * Get the queue id's and meta data (Owner and History) for the queues belonging to the named - * server - * - * @param server name of the server - * @return a ResultScanner over the QueueIds belonging to the server - * @throws IOException - */ - protected ResultScanner getQueuesBelongingToServer(String server) throws IOException { - Scan scan = new Scan(); - SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER, - CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server)); - scan.setFilter(filterMyQueues); - scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); - scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY); - try (Table replicationTable = getOrBlockOnReplicationTable()) { - ResultScanner results = replicationTable.getScanner(scan); - return results; - } - } - - /** - * Attempts to acquire the Replication Table. This operation will block until it is assigned by - * the CreateReplicationWorker thread. It is up to the caller of this method to close the - * returned Table - * @return the Replication Table when it is created - * @throws IOException - */ - protected Table getOrBlockOnReplicationTable() throws IOException { - // Sleep until the Replication Table becomes available - try { - replicationTableInitialized.await(); - } catch (InterruptedException e) { - String errMsg = "Unable to acquire the Replication Table due to InterruptedException: " + - e.getMessage(); - throw new InterruptedIOException(errMsg); - } - return getAndSetUpReplicationTable(); - } - - /** - * Creates a new copy of the Replication Table and sets up the proper Table time outs for it - * - * @return the Replication Table - * @throws IOException - */ - private Table getAndSetUpReplicationTable() throws IOException { - Table replicationTable = connection.getTable(REPLICATION_TABLE_NAME); - setReplicationTableTimeOuts(replicationTable); - return replicationTable; - } - - /** - * Builds the Replication Table in a background thread. Any method accessing the Replication Table - * should do so through getOrBlockOnReplicationTable() - * - * @return the Replication Table - * @throws IOException if the Replication Table takes too long to build - */ - private void createReplicationTableInBackground() throws IOException { - executor.execute(new CreateReplicationTableWorker()); - } - - /** - * Attempts to build the Replication Table. Will continue blocking until we have a valid - * Table for the Replication Table. - */ - private class CreateReplicationTableWorker implements Runnable { - - private Admin admin; - - @Override - public void run() { - try { - admin = connection.getAdmin(); - if (!replicationTableExists()) { - createReplicationTable(); - } - int maxRetries = conf.getInt("hbase.replication.queues.createtable.retries.number", - CLIENT_RETRIES); - RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, RPC_TIMEOUT); - RetryCounter retryCounter = counterFactory.create(); - while (!replicationTableExists()) { - retryCounter.sleepUntilNextRetry(); - if (!retryCounter.shouldRetry()) { - throw new IOException("Unable to acquire the Replication Table"); - } - } - replicationTableInitialized.countDown(); - } catch (IOException | InterruptedException e) { - abortable.abort("Failed building Replication Table", e); - } - } - - /** - * Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR - * in TableBasedReplicationQueuesImpl - * - * @throws IOException - */ - private void createReplicationTable() throws IOException { - HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME); - replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR); - try { - admin.createTable(replicationTableDescriptor); - } catch (TableExistsException e) { - // In this case we can just continue as normal - } - } - - /** - * Checks whether the Replication Table exists yet - * - * @return whether the Replication Table exists - * @throws IOException - */ - private boolean replicationTableExists() { - try { - return admin.tableExists(REPLICATION_TABLE_NAME); - } catch (IOException e) { - return false; - } - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java deleted file mode 100644 index 51d7473..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.util.List; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -/** - * This is the interface for a Replication Tracker. A replication tracker provides the facility to - * subscribe and track events that reflect a change in replication state. These events are used by - * the ReplicationSourceManager to coordinate replication tasks such as addition/deletion of queues - * and queue failover. These events are defined in the ReplicationListener interface. If a class - * would like to listen to replication events it must implement the ReplicationListener interface - * and register itself with a Replication Tracker. - */ -@InterfaceAudience.Private -public interface ReplicationTracker { - - /** - * Register a replication listener to receive replication events. - * @param listener - */ - public void registerListener(ReplicationListener listener); - - public void removeListener(ReplicationListener listener); - - /** - * Returns a list of other live region servers in the cluster. - * @return List of region servers. - */ - public List<String> getListOfRegionServers(); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java deleted file mode 100644 index 9865d83..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; - -/** - * This class is a ZooKeeper implementation of the ReplicationTracker interface. This class is - * responsible for handling replication events that are defined in the ReplicationListener - * interface. - */ -@InterfaceAudience.Private -public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements ReplicationTracker { - - private static final Log LOG = LogFactory.getLog(ReplicationTrackerZKImpl.class); - // All about stopping - private final Stoppable stopper; - // listeners to be notified - private final List<ReplicationListener> listeners = new CopyOnWriteArrayList<>(); - // List of all the other region servers in this cluster - private final ArrayList<String> otherRegionServers = new ArrayList<>(); - private final ReplicationPeers replicationPeers; - - public ReplicationTrackerZKImpl(ZooKeeperWatcher zookeeper, - final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable, - Stoppable stopper) { - super(zookeeper, conf, abortable); - this.replicationPeers = replicationPeers; - this.stopper = stopper; - this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper)); - this.zookeeper.registerListener(new PeersWatcher(this.zookeeper)); - } - - @Override - public void registerListener(ReplicationListener listener) { - listeners.add(listener); - } - - @Override - public void removeListener(ReplicationListener listener) { - listeners.remove(listener); - } - - /** - * Return a snapshot of the current region servers. - */ - @Override - public List<String> getListOfRegionServers() { - refreshOtherRegionServersList(); - - List<String> list = null; - synchronized (otherRegionServers) { - list = new ArrayList<>(otherRegionServers); - } - return list; - } - - /** - * Watcher used to be notified of the other region server's death in the local cluster. It - * initiates the process to transfer the queues if it is able to grab the lock. - */ - public class OtherRegionServerWatcher extends ZooKeeperListener { - - /** - * Construct a ZooKeeper event listener. - */ - public OtherRegionServerWatcher(ZooKeeperWatcher watcher) { - super(watcher); - } - - /** - * Called when a new node has been created. - * @param path full path of the new node - */ - public void nodeCreated(String path) { - refreshListIfRightPath(path); - } - - /** - * Called when a node has been deleted - * @param path full path of the deleted node - */ - public void nodeDeleted(String path) { - if (stopper.isStopped()) { - return; - } - boolean cont = refreshListIfRightPath(path); - if (!cont) { - return; - } - LOG.info(path + " znode expired, triggering replicatorRemoved event"); - for (ReplicationListener rl : listeners) { - rl.regionServerRemoved(getZNodeName(path)); - } - } - - /** - * Called when an existing node has a child node added or removed. - * @param path full path of the node whose children have changed - */ - public void nodeChildrenChanged(String path) { - if (stopper.isStopped()) { - return; - } - refreshListIfRightPath(path); - } - - private boolean refreshListIfRightPath(String path) { - if (!path.startsWith(this.watcher.znodePaths.rsZNode)) { - return false; - } - return refreshOtherRegionServersList(); - } - } - - /** - * Watcher used to follow the creation and deletion of peer clusters. - */ - public class PeersWatcher extends ZooKeeperListener { - - /** - * Construct a ZooKeeper event listener. - */ - public PeersWatcher(ZooKeeperWatcher watcher) { - super(watcher); - } - - /** - * Called when a node has been deleted - * @param path full path of the deleted node - */ - public void nodeDeleted(String path) { - List<String> peers = refreshPeersList(path); - if (peers == null) { - return; - } - if (isPeerPath(path)) { - String id = getZNodeName(path); - LOG.info(path + " znode expired, triggering peerRemoved event"); - for (ReplicationListener rl : listeners) { - rl.peerRemoved(id); - } - } - } - - /** - * Called when an existing node has a child node added or removed. - * @param path full path of the node whose children have changed - */ - public void nodeChildrenChanged(String path) { - List<String> peers = refreshPeersList(path); - if (peers == null) { - return; - } - LOG.info(path + " znode expired, triggering peerListChanged event"); - for (ReplicationListener rl : listeners) { - rl.peerListChanged(peers); - } - } - } - - /** - * Verify if this event is meant for us, and if so then get the latest peers' list from ZK. Also - * reset the watches. - * @param path path to check against - * @return A list of peers' identifiers if the event concerns this watcher, else null. - */ - private List<String> refreshPeersList(String path) { - if (!path.startsWith(getPeersZNode())) { - return null; - } - return this.replicationPeers.getAllPeerIds(); - } - - private String getPeersZNode() { - return this.peersZNode; - } - - /** - * Extracts the znode name of a peer cluster from a ZK path - * @param fullPath Path to extract the id from - * @return the id or an empty string if path is invalid - */ - private String getZNodeName(String fullPath) { - String[] parts = fullPath.split("/"); - return parts.length > 0 ? parts[parts.length - 1] : ""; - } - - /** - * Reads the list of region servers from ZK and atomically clears our local view of it and - * replaces it with the updated list. - * @return true if the local list of the other region servers was updated with the ZK data (even - * if it was empty), false if the data was missing in ZK - */ - private boolean refreshOtherRegionServersList() { - List<String> newRsList = getRegisteredRegionServers(); - if (newRsList == null) { - return false; - } else { - synchronized (otherRegionServers) { - otherRegionServers.clear(); - otherRegionServers.addAll(newRsList); - } - } - return true; - } - - /** - * Get a list of all the other region servers in this cluster and set a watch - * @return a list of server nanes - */ - private List<String> getRegisteredRegionServers() { - List<String> result = null; - try { - result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.znodePaths.rsZNode); - } catch (KeeperException e) { - this.abortable.abort("Get list of registered region servers", e); - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java deleted file mode 100644 index 3507547..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import org.apache.commons.lang.NotImplementedException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.zookeeper.KeeperException; - -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -/** - * Implements the ReplicationQueuesClient interface on top of the Replication Table. It utilizes - * the ReplicationTableBase to access the Replication Table. - */ -@InterfaceAudience.Private -public class TableBasedReplicationQueuesClientImpl extends ReplicationTableBase - implements ReplicationQueuesClient { - - public TableBasedReplicationQueuesClientImpl(ReplicationQueuesClientArguments args) - throws IOException { - super(args.getConf(), args.getAbortable()); - } - public TableBasedReplicationQueuesClientImpl(Configuration conf, - Abortable abortable) throws IOException { - super(conf, abortable); - } - - @Override - public void init() throws ReplicationException{ - // no-op - } - - @Override - public List<String> getListOfReplicators() { - return super.getListOfReplicators(); - } - - @Override - public List<String> getLogsInQueue(String serverName, String queueId) { - return super.getLogsInQueue(serverName, queueId); - } - - @Override - public List<String> getAllQueues(String serverName) { - return super.getAllQueues(serverName); - } - - @Override - public Set<String> getAllWALs() { - Set<String> allWals = new HashSet<>(); - ResultScanner allQueues = null; - try (Table replicationTable = getOrBlockOnReplicationTable()) { - allQueues = replicationTable.getScanner(new Scan()); - for (Result queue : allQueues) { - for (String wal : readWALsFromResult(queue)) { - allWals.add(wal); - } - } - } catch (IOException e) { - String errMsg = "Failed getting all WAL's in Replication Table"; - abortable.abort(errMsg, e); - } finally { - if (allQueues != null) { - allQueues.close(); - } - } - return allWals; - } - - @Override - public int getHFileRefsNodeChangeVersion() throws KeeperException { - // TODO - throw new NotImplementedException(); - } - - @Override - public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException { - // TODO - throw new NotImplementedException(); - } - - @Override - public List<String> getReplicableHFiles(String peerId) throws KeeperException { - // TODO - throw new NotImplementedException(); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java deleted file mode 100644 index bf55e8c..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java +++ /dev/null @@ -1,450 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.hadoop.hbase.replication; - -import org.apache.commons.lang.NotImplementedException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.RowMutations; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.filter.CompareFilter; -import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - -/** - * This class provides an implementation of the ReplicationQueues interface using an HBase table - * "Replication Table". It utilizes the ReplicationTableBase to access the Replication Table. - */ -@InterfaceAudience.Private -public class TableBasedReplicationQueuesImpl extends ReplicationTableBase - implements ReplicationQueues { - - private static final Log LOG = LogFactory.getLog(TableBasedReplicationQueuesImpl.class); - - // Common byte values used in replication offset tracking - private static final byte[] INITIAL_OFFSET_BYTES = Bytes.toBytes(0L); - private static final byte[] EMPTY_STRING_BYTES = Bytes.toBytes(""); - - private String serverName = null; - private byte[] serverNameBytes = null; - - // TODO: Only use this variable temporarily. Eventually we want to use HBase to store all - // TODO: replication information - private ReplicationStateZKBase replicationState; - - public TableBasedReplicationQueuesImpl(ReplicationQueuesArguments args) throws IOException { - this(args.getConf(), args.getAbortable(), args.getZk()); - } - - public TableBasedReplicationQueuesImpl(Configuration conf, Abortable abort, ZooKeeperWatcher zkw) - throws IOException { - super(conf, abort); - replicationState = new ReplicationStateZKBase(zkw, conf, abort) {}; - } - - @Override - public void init(String serverName) throws ReplicationException { - this.serverName = serverName; - this.serverNameBytes = Bytes.toBytes(serverName); - } - - @Override - public List<String> getListOfReplicators() { - return super.getListOfReplicators(); - } - - @Override - public void removeQueue(String queueId) { - try { - byte[] rowKey = queueIdToRowKey(queueId); - if (checkQueueExists(queueId)) { - Delete deleteQueue = new Delete(rowKey); - safeQueueUpdate(deleteQueue); - } else { - LOG.info("No logs were registered for queue id=" + queueId + " so no rows were removed " + - "from the replication table while removing the queue"); - } - } catch (IOException | ReplicationException e) { - String errMsg = "Failed removing queue queueId=" + queueId; - abortable.abort(errMsg, e); - } - } - - @Override - public void addLog(String queueId, String filename) throws ReplicationException { - try (Table replicationTable = getOrBlockOnReplicationTable()) { - if (!checkQueueExists(queueId)) { - // Each queue will have an Owner, OwnerHistory, and a collection of [WAL:offset] key values - Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId))); - putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER, serverNameBytes); - putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, EMPTY_STRING_BYTES); - putNewQueue.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); - replicationTable.put(putNewQueue); - } else { - // Otherwise simply add the new log and offset as a new column - Put putNewLog = new Put(queueIdToRowKey(queueId)); - putNewLog.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); - safeQueueUpdate(putNewLog); - } - } catch (IOException | ReplicationException e) { - String errMsg = "Failed adding log queueId=" + queueId + " filename=" + filename; - abortable.abort(errMsg, e); - } - } - - @Override - public void removeLog(String queueId, String filename) { - try { - byte[] rowKey = queueIdToRowKey(queueId); - Delete delete = new Delete(rowKey); - delete.addColumns(CF_QUEUE, Bytes.toBytes(filename)); - safeQueueUpdate(delete); - } catch (IOException | ReplicationException e) { - String errMsg = "Failed removing log queueId=" + queueId + " filename=" + filename; - abortable.abort(errMsg, e); - } - } - - @Override - public void setLogPosition(String queueId, String filename, long position) { - try (Table replicationTable = getOrBlockOnReplicationTable()) { - byte[] rowKey = queueIdToRowKey(queueId); - // Check that the log exists. addLog() must have been called before setLogPosition(). - Get checkLogExists = new Get(rowKey); - checkLogExists.addColumn(CF_QUEUE, Bytes.toBytes(filename)); - if (!replicationTable.exists(checkLogExists)) { - String errMsg = "Could not set position of non-existent log from queueId=" + queueId + - ", filename=" + filename; - abortable.abort(errMsg, new ReplicationException(errMsg)); - return; - } - // Update the log offset if it exists - Put walAndOffset = new Put(rowKey); - walAndOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename), Bytes.toBytes(position)); - safeQueueUpdate(walAndOffset); - } catch (IOException | ReplicationException e) { - String errMsg = "Failed writing log position queueId=" + queueId + "filename=" + - filename + " position=" + position; - abortable.abort(errMsg, e); - } - } - - @Override - public long getLogPosition(String queueId, String filename) throws ReplicationException { - try { - byte[] rowKey = queueIdToRowKey(queueId); - Get getOffset = new Get(rowKey); - getOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename)); - Result result = getResultIfOwner(getOffset); - if (result == null || !result.containsColumn(CF_QUEUE, Bytes.toBytes(filename))) { - throw new ReplicationException("Could not read empty result while getting log position " + - "queueId=" + queueId + ", filename=" + filename); - } - return Bytes.toLong(result.getValue(CF_QUEUE, Bytes.toBytes(filename))); - } catch (IOException e) { - throw new ReplicationException("Could not get position in log for queueId=" + queueId + - ", filename=" + filename); - } - } - - @Override - public void removeAllQueues() { - List<String> myQueueIds = getAllQueues(); - for (String queueId : myQueueIds) { - removeQueue(queueId); - } - } - - @Override - public List<String> getLogsInQueue(String queueId) { - String errMsg = "Failed getting logs in queue queueId=" + queueId; - byte[] rowKey = queueIdToRowKey(queueId); - List<String> logs = new ArrayList<>(); - try { - Get getQueue = new Get(rowKey); - Result queue = getResultIfOwner(getQueue); - if (queue == null || queue.isEmpty()) { - String errMsgLostOwnership = "Failed getting logs for queue queueId=" + - Bytes.toString(rowKey) + " because the queue was missing or we lost ownership"; - abortable.abort(errMsg, new ReplicationException(errMsgLostOwnership)); - return null; - } - Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE); - for(byte[] cQualifier : familyMap.keySet()) { - if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier, - COL_QUEUE_OWNER_HISTORY)) { - continue; - } - logs.add(Bytes.toString(cQualifier)); - } - } catch (IOException e) { - abortable.abort(errMsg, e); - return null; - } - return logs; - } - - @Override - public List<String> getAllQueues() { - return getAllQueues(serverName); - } - - @Override public List<String> getUnClaimedQueueIds(String regionserver) { - if (isThisOurRegionServer(regionserver)) { - return null; - } - try (ResultScanner queuesToClaim = getQueuesBelongingToServer(regionserver)) { - List<String> res = new ArrayList<>(); - for (Result queue : queuesToClaim) { - String rowKey = Bytes.toString(queue.getRow()); - res.add(rowKey); - } - return res.isEmpty() ? null : res; - } catch (IOException e) { - String errMsg = "Failed getUnClaimedQueueIds"; - abortable.abort(errMsg, e); - } - return null; - } - - @Override public void removeReplicatorIfQueueIsEmpty(String regionserver) { - // Do nothing here - } - - @Override - public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) { - if (isThisOurRegionServer(regionserver)) { - return null; - } - - try (ResultScanner queuesToClaim = getQueuesBelongingToServer(regionserver)){ - for (Result queue : queuesToClaim) { - String rowKey = Bytes.toString(queue.getRow()); - if (!rowKey.equals(queueId)){ - continue; - } - if (attemptToClaimQueue(queue, regionserver)) { - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(rowKey); - if (replicationState.peerExists(replicationQueueInfo.getPeerId())) { - SortedSet<String> sortedLogs = new TreeSet<>(); - List<String> logs = getLogsInQueue(queue.getRow()); - for (String log : logs) { - sortedLogs.add(log); - } - LOG.info(serverName + " has claimed queue " + rowKey + " from " + regionserver); - return new Pair<>(rowKey, sortedLogs); - } else { - // Delete orphaned queues - removeQueue(Bytes.toString(queue.getRow())); - LOG.info(serverName + " has deleted abandoned queue " + queueId + " from " + - regionserver); - } - } - } - } catch (IOException | KeeperException e) { - String errMsg = "Failed claiming queues for regionserver=" + regionserver; - abortable.abort(errMsg, e); - } - return null; - } - - @Override - public boolean isThisOurRegionServer(String regionserver) { - return this.serverName.equals(regionserver); - } - - @Override - public void addPeerToHFileRefs(String peerId) throws ReplicationException { - // TODO - throw new NotImplementedException(); - } - - @Override - public void removePeerFromHFileRefs(String peerId) { - // TODO - throw new NotImplementedException(); - } - - @Override - public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) - throws ReplicationException { - // TODO - throw new NotImplementedException(); - } - - @Override - public void removeHFileRefs(String peerId, List<String> files) { - // TODO - throw new NotImplementedException(); - } - - private String buildQueueRowKey(String queueId) { - return buildQueueRowKey(serverName, queueId); - } - - /** - * Convenience method that gets the row key of the queue specified by queueId - * @param queueId queueId of a queue in this server - * @return the row key of the queue in the Replication Table - */ - private byte[] queueIdToRowKey(String queueId) { - return queueIdToRowKey(serverName, queueId); - } - - /** - * See safeQueueUpdate(RowMutations mutate) - * - * @param put Row mutation to perform on the queue - */ - private void safeQueueUpdate(Put put) throws ReplicationException, IOException { - RowMutations mutations = new RowMutations(put.getRow()); - mutations.add(put); - safeQueueUpdate(mutations); - } - - /** - * See safeQueueUpdate(RowMutations mutate) - * - * @param delete Row mutation to perform on the queue - */ - private void safeQueueUpdate(Delete delete) throws ReplicationException, - IOException{ - RowMutations mutations = new RowMutations(delete.getRow()); - mutations.add(delete); - safeQueueUpdate(mutations); - } - - /** - * Attempt to mutate a given queue in the Replication Table with a checkAndPut on the OWNER column - * of the queue. Abort the server if this checkAndPut fails: which means we have somehow lost - * ownership of the column or an IO Exception has occurred during the transaction. - * - * @param mutate Mutation to perform on a given queue - */ - private void safeQueueUpdate(RowMutations mutate) throws ReplicationException, IOException{ - try (Table replicationTable = getOrBlockOnReplicationTable()) { - boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), - CF_QUEUE, COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, serverNameBytes, mutate); - if (!updateSuccess) { - throw new ReplicationException("Failed to update Replication Table because we lost queue " + - " ownership"); - } - } - } - - /** - * Check if the queue specified by queueId is stored in HBase - * - * @param queueId Either raw or reclaimed format of the queueId - * @return Whether the queue is stored in HBase - * @throws IOException - */ - private boolean checkQueueExists(String queueId) throws IOException { - try (Table replicationTable = getOrBlockOnReplicationTable()) { - byte[] rowKey = queueIdToRowKey(queueId); - return replicationTable.exists(new Get(rowKey)); - } - } - - /** - * Attempt to claim the given queue with a checkAndPut on the OWNER column. We check that the - * recently killed server is still the OWNER before we claim it. - * - * @param queue The queue that we are trying to claim - * @param originalServer The server that originally owned the queue - * @return Whether we successfully claimed the queue - * @throws IOException - */ - private boolean attemptToClaimQueue (Result queue, String originalServer) throws IOException{ - Put putQueueNameAndHistory = new Put(queue.getRow()); - putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER, Bytes.toBytes(serverName)); - String newOwnerHistory = buildClaimedQueueHistory(Bytes.toString(queue.getValue(CF_QUEUE, - COL_QUEUE_OWNER_HISTORY)), originalServer); - putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, - Bytes.toBytes(newOwnerHistory)); - RowMutations claimAndRenameQueue = new RowMutations(queue.getRow()); - claimAndRenameQueue.add(putQueueNameAndHistory); - // Attempt to claim ownership for this queue by checking if the current OWNER is the original - // server. If it is not then another RS has already claimed it. If it is we set ourselves as the - // new owner and update the queue's history - try (Table replicationTable = getOrBlockOnReplicationTable()) { - boolean success = replicationTable.checkAndMutate(queue.getRow(), - CF_QUEUE, COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(originalServer), - claimAndRenameQueue); - return success; - } - } - - /** - * Attempts to run a Get on some queue. Will only return a non-null result if we currently own - * the queue. - * - * @param get The Get that we want to query - * @return The result of the Get if this server is the owner of the queue. Else it returns null. - * @throws IOException - */ - private Result getResultIfOwner(Get get) throws IOException { - Scan scan = new Scan(get); - // Check if the Get currently contains all columns or only specific columns - if (scan.getFamilyMap().size() > 0) { - // Add the OWNER column if the scan is already only over specific columns - scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); - } - scan.setMaxResultSize(1); - SingleColumnValueFilter checkOwner = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER, - CompareFilter.CompareOp.EQUAL, serverNameBytes); - scan.setFilter(checkOwner); - ResultScanner scanner = null; - try (Table replicationTable = getOrBlockOnReplicationTable()) { - scanner = replicationTable.getScanner(scan); - Result result = scanner.next(); - return (result == null || result.isEmpty()) ? null : result; - } finally { - if (scanner != null) { - scanner.close(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index dafe421..1de3652 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; @@ -1772,23 +1771,18 @@ public class ZKUtil { */ private static void getReplicationZnodesDump(ZooKeeperWatcher zkw, StringBuilder sb) throws KeeperException { - String replicationZNodeName = zkw.getConfiguration().get("zookeeper.znode.replication", - "replication"); - String replicationZnode = joinZNode(zkw.znodePaths.baseZNode, replicationZNodeName); + String replicationZnode = zkw.znodePaths.replicationZNode; if (ZKUtil.checkExists(zkw, replicationZnode) == -1) return; // do a ls -r on this znode sb.append("\n").append(replicationZnode).append(": "); List<String> children = ZKUtil.listChildrenNoWatch(zkw, replicationZnode); for (String child : children) { String znode = joinZNode(replicationZnode, child); - if (child.equals(zkw.getConfiguration().get("zookeeper.znode.replication.peers", "peers"))) { + if (znode.equals(zkw.znodePaths.peersZNode)) { appendPeersZnodes(zkw, znode, sb); - } else if (child.equals(zkw.getConfiguration(). - get("zookeeper.znode.replication.rs", "rs"))) { + } else if (znode.equals(zkw.znodePaths.queuesZNode)) { appendRSZnodes(zkw, znode, sb); - } else if (child.equals(zkw.getConfiguration().get( - ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, - ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT))) { + } else if (znode.equals(zkw.znodePaths.hfileRefsZNode)) { appendHFileRefsZnodes(zkw, znode, sb); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java index 4c66b8f..5608eb5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java @@ -79,6 +79,15 @@ public class ZNodePaths { // znode of indicating master maintenance mode public final String masterMaintZNode; + // znode containing all replication state. + public final String replicationZNode; + // znode containing a list of all remote slave (i.e. peer) clusters. + public final String peersZNode; + // znode containing all replication queues + public final String queuesZNode; + // znode containing queues of hfile references to be replicated + public final String hfileRefsZNode; + public ZNodePaths(Configuration conf) { baseZNode = conf.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT); ImmutableMap.Builder<Integer, String> builder = ImmutableMap.builder(); @@ -113,6 +122,15 @@ public class ZNodePaths { conf.get("zookeeper.znode.namespace", "namespace")); masterMaintZNode = ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.masterMaintenance", "master-maintenance")); + replicationZNode = + ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.replication", "replication")); + peersZNode = + ZKUtil.joinZNode(replicationZNode, conf.get("zookeeper.znode.replication.peers", "peers")); + queuesZNode = + ZKUtil.joinZNode(replicationZNode, conf.get("zookeeper.znode.replication.rs", "rs")); + hfileRefsZNode = + ZKUtil.joinZNode(replicationZNode, + conf.get("zookeeper.znode.replication.hfile.refs", "hfile-refs")); } @Override @@ -125,7 +143,9 @@ public class ZNodePaths { + ", balancerZNode=" + balancerZNode + ", regionNormalizerZNode=" + regionNormalizerZNode + ", switchZNode=" + switchZNode + ", tableLockZNode=" + tableLockZNode + ", recoveringRegionsZNode=" + recoveringRegionsZNode + ", namespaceZNode=" - + namespaceZNode + ", masterMaintZNode=" + masterMaintZNode + "]"; + + namespaceZNode + ", masterMaintZNode=" + masterMaintZNode + ", replicationZNode=" + + replicationZNode + ", peersZNode=" + peersZNode + ", queuesZNode=" + queuesZNode + + ", hfileRefsZNode=" + hfileRefsZNode + "]"; } /**