Repository: hbase Updated Branches: refs/heads/0.98 e0ede7f4a -> c8812737b
HBASE-11535 ReplicationPeer map is not thread safe (Virag Kothari) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c8812737 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c8812737 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c8812737 Branch: refs/heads/0.98 Commit: c8812737bb53016ec18738061b9c2a18ca8c0ad8 Parents: e0ede7f Author: Jonathan M Hsieh <[email protected]> Authored: Wed Aug 6 11:18:20 2014 -0700 Committer: Jonathan M Hsieh <[email protected]> Committed: Wed Aug 6 11:18:20 2014 -0700 ---------------------------------------------------------------------- .../replication/ReplicationPeersZKImpl.java | 25 ++++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/c8812737/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index b69367a..b7a6447 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -27,6 +27,8 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -86,7 +88,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re Abortable abortable) { super(zk, conf, abortable); this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs"); - this.peerClusters = new HashMap<String, ReplicationPeer>(); + this.peerClusters = new ConcurrentHashMap<String, ReplicationPeer>(); } @Override @@ -195,18 +197,20 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re @Override public Map<String, List<String>> getTableCFs(String id) throws IllegalArgumentException { - if (!this.peerClusters.containsKey(id)) { + ReplicationPeer replicationPeer = this.peerClusters.get(id); + if (replicationPeer == null) { throw new IllegalArgumentException("Peer with id= " + id + " is not connected"); } - return this.peerClusters.get(id).getTableCFs(); + return replicationPeer.getTableCFs(); } @Override public boolean getStatusOfConnectedPeer(String id) { - if (!this.peerClusters.containsKey(id)) { + ReplicationPeer replicationPeer = this.peerClusters.get(id); + if (replicationPeer == null) { throw new IllegalArgumentException("Peer with id= " + id + " is not connected"); - } - return this.peerClusters.get(id).getPeerEnabled().get(); + } + return replicationPeer.getPeerEnabled().get(); } @Override @@ -246,7 +250,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re if (peer == null) { return false; } - this.peerClusters.put(peerId, peer); + ((ConcurrentMap<String, ReplicationPeer>) peerClusters).putIfAbsent(peerId, peer); LOG.info("Added new peer cluster " + peer.getClusterKey()); return true; } @@ -256,7 +260,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re ReplicationPeer rp = this.peerClusters.get(peerId); if (rp != null) { rp.getZkw().close(); - this.peerClusters.remove(peerId); + ((ConcurrentMap<String, ReplicationPeer>) peerClusters).remove(peerId, rp); } } @@ -375,10 +379,11 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re @Override public long getTimestampOfLastChangeToPeer(String peerId) { - if (!peerClusters.containsKey(peerId)) { + ReplicationPeer peer = this.peerClusters.get(peerId); + if (peer == null) { throw new IllegalArgumentException("Unknown peer id: " + peerId); } - return peerClusters.get(peerId).getLastRegionserverUpdate(); + return peer.getLastRegionserverUpdate(); } /**
