Repository: hbase Updated Branches: refs/heads/master 2c84b6e17 -> 6dee406bf
HBASE-11535 ReplicationPeer map is not thread safe (Virag Kothari) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6dee406b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6dee406b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6dee406b Branch: refs/heads/master Commit: 6dee406bf359eec31883cdb2714399a8161a4c26 Parents: 2c84b6e Author: Jonathan M Hsieh <[email protected]> Authored: Wed Aug 6 11:16:53 2014 -0700 Committer: Jonathan M Hsieh <[email protected]> Committed: Wed Aug 6 11:16:53 2014 -0700 ---------------------------------------------------------------------- .../hbase/replication/ReplicationPeersZKImpl.java | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/6dee406b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 488d37a..af028fb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -82,7 +84,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re Abortable abortable) { super(zk, conf, abortable); this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs"); - this.peerClusters = new HashMap<String, ReplicationPeerZKImpl>(); + this.peerClusters = new ConcurrentHashMap<String, ReplicationPeerZKImpl>(); } @Override @@ -187,18 +189,20 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re @Override public Map<String, List<String>> getTableCFs(String id) throws IllegalArgumentException { - if (!this.peerClusters.containsKey(id)) { + ReplicationPeer replicationPeer = this.peerClusters.get(id); + if (replicationPeer == null) { throw new IllegalArgumentException("Peer with id= " + id + " is not connected"); } - return this.peerClusters.get(id).getTableCFs(); + return replicationPeer.getTableCFs(); } @Override public boolean getStatusOfPeer(String id) { - if (!this.peerClusters.containsKey(id)) { + ReplicationPeer replicationPeer = this.peerClusters.get(id); + if (replicationPeer == null) { throw new IllegalArgumentException("Peer with id= " + id + " is not connected"); } - return this.peerClusters.get(id).getPeerState() == PeerState.ENABLED; + return replicationPeer.getPeerState() == PeerState.ENABLED; } @Override @@ -359,7 +363,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re public void peerRemoved(String peerId) { ReplicationPeer rp = this.peerClusters.get(peerId); if (rp != null) { - this.peerClusters.remove(peerId); + ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).remove(peerId, rp); } } @@ -385,7 +389,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re if (peer == null) { return false; } - this.peerClusters.put(peerId, peer); + ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).putIfAbsent(peerId, peer); LOG.info("Added new peer cluster " + peer.getPeerConfig().getClusterKey()); return true; }
