Repository: hbase Updated Branches: refs/heads/branch-1.3 7214bad10 -> 909f06147
HBASE-16096 Backport. Cleanly remove replication peers from ZooKeeper. Signed-off-by: Elliott Clark <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/909f0614 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/909f0614 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/909f0614 Branch: refs/heads/branch-1.3 Commit: 909f06147741f232d76bc411ffdd09f2627ca014 Parents: 7214bad Author: Joseph Hwang <[email protected]> Authored: Thu Jun 30 15:18:33 2016 -0700 Committer: Elliott Clark <[email protected]> Committed: Thu Jul 28 11:30:51 2016 -0700 ---------------------------------------------------------------------- .../regionserver/ReplicationSourceManager.java | 13 +++++-- .../TestReplicationSourceManager.java | 40 ++++++++++++++++++++ 2 files changed, 50 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/909f0614/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index dcc3634..996c518 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -572,9 +572,10 @@ public class ReplicationSourceManager implements ReplicationListener { srcToRemove.add(src); } } - if (srcToRemove.size() == 0) { - LOG.error("The queue we wanted to close is missing " + id); - return; + if (srcToRemove.isEmpty()) { + LOG.error("The peer we wanted to remove is missing a ReplicationSourceInterface. " + + "This could mean that ReplicationSourceInterface initialization failed for this peer " + + "and that replication on this peer may not be caught up. peerId=" + id); } for (ReplicationSourceInterface toRemove : srcToRemove) { toRemove.terminate(terminateMessage); @@ -752,6 +753,12 @@ public class ReplicationSourceManager implements ReplicationListener { } /** + * Get the ReplicationPeers used by this ReplicationSourceManager + * @return the ReplicationPeers used by this ReplicationSourceManager + */ + public ReplicationPeers getReplicationPeers() {return this.replicationPeers;} + + /** * Get a string representation of all the sources' metrics */ public String getStats() { http://git-wip-us.apache.org/repos/asf/hbase/blob/909f0614/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index f11e1e9..268d173 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; @@ -494,6 +495,45 @@ public class TestReplicationSourceManager { scopes.containsKey(f2)); } + /** + * Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the + * corresponding ReplicationSourceInterface correctly cleans up the corresponding + * replication queue and ReplicationPeer. + * See HBASE-16096. + * @throws Exception + */ + @Test + public void testPeerRemovalCleanup() throws Exception{ + String replicationSourceImplName = conf.get("replication.replicationsource.implementation"); + try { + DummyServer server = new DummyServer(); + ReplicationQueues rq = + ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), + server); + rq.init(server.getServerName().toString()); + // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface + // initialization to throw an exception. + conf.set("replication.replicationsource.implementation", "fakeReplicationSourceImpl"); + ReplicationPeers rp = manager.getReplicationPeers(); + // Set up the znode and ReplicationPeer for the fake peer + rp.addPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"), null); + rp.peerAdded("FakePeer"); + // Have ReplicationSourceManager add the fake peer. It should fail to initialize a + // ReplicationSourceInterface. + List<String> fakePeers = new ArrayList<>(); + fakePeers.add("FakePeer"); + manager.peerListChanged(fakePeers); + // Create a replication queue for the fake peer + rq.addLog("FakePeer", "FakeFile"); + // Removing the peer should remove both the replication queue and the ReplicationPeer + manager.removePeer("FakePeer"); + assertFalse(rq.getAllQueues().contains("FakePeer")); + assertNull(rp.getPeer("FakePeer")); + } finally { + conf.set("replication.replicationsource.implementation", replicationSourceImplName); + } + } + private WALEdit getBulkLoadWALEdit() { // 1. Create store files for the families Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
