HBASE-11963 Synchronize peer cluster replication connection attempts (Maddineni Sukumar)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/128a1cce Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/128a1cce Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/128a1cce Branch: refs/heads/0.98 Commit: 128a1cce762ef44c73b46bda7e65ffcd78fa16bd Parents: 9660fb0 Author: Andrew Purtell <apurt...@apache.org> Authored: Fri Sep 12 17:22:37 2014 -0700 Committer: Andrew Purtell <apurt...@apache.org> Committed: Fri Sep 12 17:22:37 2014 -0700 ---------------------------------------------------------------------- .../replication/ReplicationPeersZKImpl.java | 40 +++++++++++++------- 1 file changed, 27 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/128a1cce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index b7a6447..df0e385 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -296,17 +296,25 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re if (peer == null) { return Collections.emptyList(); } - List<ServerName> addresses; - try { - addresses = fetchSlavesAddresses(peer.getZkw()); - } catch (KeeperException ke) { - if (LOG.isDebugEnabled()) { - LOG.debug("Fetch salves addresses failed.", ke); + // Synchronize peer cluster connection attempts to avoid races and rate + // limit connections when multiple replication sources try to connect to + // the peer cluster. If the peer cluster is down we can get out of control + // over time. + synchronized (peer) { + List<ServerName> addresses; + try { + addresses = fetchSlavesAddresses(peer.getZkw()); + } + catch (KeeperException ke) { + if (LOG.isDebugEnabled()) { + LOG.debug("Fetch salves addresses failed.", ke); + } + reconnectPeer(ke, peer); + addresses = Collections.emptyList(); } - reconnectPeer(ke, peer); - addresses = Collections.emptyList(); + peer.setRegionServers(addresses); } - peer.setRegionServers(addresses); + return peer.getRegionServers(); } @@ -317,10 +325,16 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re return null; } UUID peerUUID = null; - try { - peerUUID = ZKClusterId.getUUIDForCluster(peer.getZkw()); - } catch (KeeperException ke) { - reconnectPeer(ke, peer); + // Synchronize peer cluster connection attempts to avoid races and rate + // limit connections when multiple replication sources try to connect to + // the peer cluster. If the peer cluster is down we can get out of control + // over time. + synchronized (peer) { + try { + peerUUID = ZKClusterId.getUUIDForCluster(peer.getZkw()); + } catch (KeeperException ke) { + reconnectPeer(ke, peer); + } } return peerUUID; }