Repository: hbase Updated Branches: refs/heads/master 39cf42be9 -> e9a278adc
HBASE-20475 Fix the flaky TestReplicationDroppedTables unit test - addendum Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e9a278ad Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e9a278ad Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e9a278ad Branch: refs/heads/master Commit: e9a278adc617a41ad3eefdd419e7618afee6b2b3 Parents: 39cf42b Author: huzheng <open...@gmail.com> Authored: Fri Apr 27 16:40:53 2018 +0800 Committer: huzheng <open...@gmail.com> Committed: Fri Apr 27 21:38:15 2018 +0800 ---------------------------------------------------------------------- .../hbase/replication/ReplicationPeers.java | 5 ++++ .../regionserver/ReplicationSourceManager.java | 28 +++++++++++++++----- 2 files changed, 27 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e9a278ad/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index eacb2f4..4d602ca 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.replication; import java.util.Collections; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -103,6 +104,10 @@ public class ReplicationPeers { return Collections.unmodifiableSet(peerCache.keySet()); } + public Map<String, ReplicationPeerImpl> getPeerCache() { + return Collections.unmodifiableMap(peerCache); + } + public PeerState refreshPeerState(String peerId) throws ReplicationException { ReplicationPeerImpl peer = peerCache.get(peerId); if (peer == null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/e9a278ad/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 3ecc50a..70cd986 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationListener; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; +import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; @@ -627,11 +628,25 @@ public class ReplicationSourceManager implements ReplicationListener { class NodeFailoverWorker extends Thread { private final ServerName deadRS; + // After claim the queues from dead region server, the NodeFailoverWorker will skip to start + // the RecoveredReplicationSource if the peer has been removed. but there's possible that + // remove a peer with peerId = 2 and add a peer with peerId = 2 again during the + // NodeFailoverWorker. So we need a deep copied <peerId, peer> map to decide whether we + // should start the RecoveredReplicationSource. If the latest peer is not the old peer when + // NodeFailoverWorker begin, we should skip to start the RecoveredReplicationSource, Otherwise + // the rs will abort (See HBASE-20475). + private final Map<String, ReplicationPeerImpl> peersSnapshot; @VisibleForTesting public NodeFailoverWorker(ServerName deadRS) { super("Failover-for-" + deadRS); this.deadRS = deadRS; + peersSnapshot = new HashMap<>(replicationPeers.getPeerCache()); + } + + private boolean isOldPeer(String peerId, ReplicationPeerImpl newPeerRef) { + ReplicationPeerImpl oldPeerRef = peersSnapshot.get(peerId); + return oldPeerRef != null && oldPeerRef == newPeerRef; } @Override @@ -691,16 +706,16 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId); String actualPeerId = replicationQueueInfo.getPeerId(); - ReplicationPeer peer = replicationPeers.getPeer(actualPeerId); - if (peer == null) { - LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS + - ", peer is null"); + ReplicationPeerImpl peer = replicationPeers.getPeer(actualPeerId); + if (peer == null || !isOldPeer(actualPeerId, peer)) { + LOG.warn("Skipping failover for peer {} of node {}, peer is null", actualPeerId, + deadRS); abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId)); continue; } if (server instanceof ReplicationSyncUp.DummyServer && peer.getPeerState().equals(PeerState.DISABLED)) { - LOG.warn("Peer {} is disbaled. ReplicationSyncUp tool will skip " + LOG.warn("Peer {} is disabled. ReplicationSyncUp tool will skip " + "replicating data to this peer.", actualPeerId); continue; @@ -721,7 +736,8 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationSourceInterface src = createSource(queueId, peer); // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer synchronized (oldsources) { - if (!replicationPeers.getAllPeerIds().contains(src.getPeerId())) { + peer = replicationPeers.getPeer(src.getPeerId()); + if (peer == null || !isOldPeer(src.getPeerId(), peer)) { src.terminate("Recovered queue doesn't belong to any current peer"); removeRecoveredSource(src); continue;