Repository: hbase
Updated Branches:
  refs/heads/0.98 e0ede7f4a -> c8812737b


HBASE-11535 ReplicationPeer map is not thread safe (Virag Kothari)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c8812737
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c8812737
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c8812737

Branch: refs/heads/0.98
Commit: c8812737bb53016ec18738061b9c2a18ca8c0ad8
Parents: e0ede7f
Author: Jonathan M Hsieh <[email protected]>
Authored: Wed Aug 6 11:18:20 2014 -0700
Committer: Jonathan M Hsieh <[email protected]>
Committed: Wed Aug 6 11:18:20 2014 -0700

----------------------------------------------------------------------
 .../replication/ReplicationPeersZKImpl.java     | 25 ++++++++++++--------
 1 file changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c8812737/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index b69367a..b7a6447 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -27,6 +27,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -86,7 +88,7 @@ public class ReplicationPeersZKImpl extends 
ReplicationStateZKBase implements Re
       Abortable abortable) {
     super(zk, conf, abortable);
     this.tableCFsNodeName = 
conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
-    this.peerClusters = new HashMap<String, ReplicationPeer>();
+    this.peerClusters = new ConcurrentHashMap<String, ReplicationPeer>();
   }
 
   @Override
@@ -195,18 +197,20 @@ public class ReplicationPeersZKImpl extends 
ReplicationStateZKBase implements Re
 
   @Override
   public Map<String, List<String>> getTableCFs(String id) throws 
IllegalArgumentException {
-    if (!this.peerClusters.containsKey(id)) {
+    ReplicationPeer replicationPeer = this.peerClusters.get(id);
+    if (replicationPeer == null) {
       throw new IllegalArgumentException("Peer with id= " + id + " is not 
connected");
     }
-    return this.peerClusters.get(id).getTableCFs();
+    return replicationPeer.getTableCFs();
   }
 
   @Override
   public boolean getStatusOfConnectedPeer(String id) {
-    if (!this.peerClusters.containsKey(id)) {
+    ReplicationPeer replicationPeer = this.peerClusters.get(id);
+    if (replicationPeer == null) {
       throw new IllegalArgumentException("Peer with id= " + id + " is not 
connected");
-    }
-    return this.peerClusters.get(id).getPeerEnabled().get();
+    } 
+    return replicationPeer.getPeerEnabled().get();
   }
 
   @Override
@@ -246,7 +250,7 @@ public class ReplicationPeersZKImpl extends 
ReplicationStateZKBase implements Re
     if (peer == null) {
       return false;
     }
-    this.peerClusters.put(peerId, peer);
+    ((ConcurrentMap<String, ReplicationPeer>) 
peerClusters).putIfAbsent(peerId, peer);
     LOG.info("Added new peer cluster " + peer.getClusterKey());
     return true;
   }
@@ -256,7 +260,7 @@ public class ReplicationPeersZKImpl extends 
ReplicationStateZKBase implements Re
     ReplicationPeer rp = this.peerClusters.get(peerId);
     if (rp != null) {
       rp.getZkw().close();
-      this.peerClusters.remove(peerId);
+      ((ConcurrentMap<String, ReplicationPeer>) peerClusters).remove(peerId, 
rp);
     }
   }
 
@@ -375,10 +379,11 @@ public class ReplicationPeersZKImpl extends 
ReplicationStateZKBase implements Re
 
   @Override
   public long getTimestampOfLastChangeToPeer(String peerId) {
-    if (!peerClusters.containsKey(peerId)) {
+    ReplicationPeer peer = this.peerClusters.get(peerId);
+    if (peer == null) {
       throw new IllegalArgumentException("Unknown peer id: " + peerId);
     }
-    return peerClusters.get(peerId).getLastRegionserverUpdate();
+    return peer.getLastRegionserverUpdate();
   }
 
   /**

Reply via email to