Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 7214bad10 -> 909f06147


HBASE-16096 Backport. Cleanly remove replication peers from ZooKeeper.

Signed-off-by: Elliott Clark <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/909f0614
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/909f0614
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/909f0614

Branch: refs/heads/branch-1.3
Commit: 909f06147741f232d76bc411ffdd09f2627ca014
Parents: 7214bad
Author: Joseph Hwang <[email protected]>
Authored: Thu Jun 30 15:18:33 2016 -0700
Committer: Elliott Clark <[email protected]>
Committed: Thu Jul 28 11:30:51 2016 -0700

----------------------------------------------------------------------
 .../regionserver/ReplicationSourceManager.java  | 13 +++++--
 .../TestReplicationSourceManager.java           | 40 ++++++++++++++++++++
 2 files changed, 50 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/909f0614/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index dcc3634..996c518 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -572,9 +572,10 @@ public class ReplicationSourceManager implements 
ReplicationListener {
           srcToRemove.add(src);
         }
       }
-      if (srcToRemove.size() == 0) {
-        LOG.error("The queue we wanted to close is missing " + id);
-        return;
+      if (srcToRemove.isEmpty()) {
+        LOG.error("The peer we wanted to remove is missing a 
ReplicationSourceInterface. " +
+            "This could mean that ReplicationSourceInterface initialization 
failed for this peer " +
+            "and that replication on this peer may not be caught up. peerId=" 
+ id);
       }
       for (ReplicationSourceInterface toRemove : srcToRemove) {
         toRemove.terminate(terminateMessage);
@@ -752,6 +753,12 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   }
 
   /**
+   * Get the ReplicationPeers used by this ReplicationSourceManager
+   * @return the ReplicationPeers used by this ReplicationSourceManager
+   */
+  public ReplicationPeers getReplicationPeers() {return this.replicationPeers;}
+
+  /**
    * Get a string representation of all the sources' metrics
    */
   public String getStats() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/909f0614/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index f11e1e9..268d173 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -64,6 +64,7 @@ import 
org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
@@ -494,6 +495,45 @@ public class TestReplicationSourceManager {
       scopes.containsKey(f2));
   }
 
+  /**
+   * Test whether calling removePeer() on a ReplicationSourceManager that 
failed on initializing the
+   * corresponding ReplicationSourceInterface correctly cleans up the 
corresponding
+   * replication queue and ReplicationPeer.
+   * See HBASE-16096.
+   * @throws Exception
+   */
+  @Test
+  public void testPeerRemovalCleanup() throws Exception{
+    String replicationSourceImplName = 
conf.get("replication.replicationsource.implementation");
+    try {
+      DummyServer server = new DummyServer();
+      ReplicationQueues rq =
+          ReplicationFactory.getReplicationQueues(server.getZooKeeper(), 
server.getConfiguration(),
+              server);
+      rq.init(server.getServerName().toString());
+      // Purposely fail ReplicationSourceManager.addSource() by causing 
ReplicationSourceInterface
+      // initialization to throw an exception.
+      conf.set("replication.replicationsource.implementation", 
"fakeReplicationSourceImpl");
+      ReplicationPeers rp = manager.getReplicationPeers();
+      // Set up the znode and ReplicationPeer for the fake peer
+      rp.addPeer("FakePeer", new 
ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"), null);
+      rp.peerAdded("FakePeer");
+      // Have ReplicationSourceManager add the fake peer. It should fail to 
initialize a
+      // ReplicationSourceInterface.
+      List<String> fakePeers = new ArrayList<>();
+      fakePeers.add("FakePeer");
+      manager.peerListChanged(fakePeers);
+      // Create a replication queue for the fake peer
+      rq.addLog("FakePeer", "FakeFile");
+      // Removing the peer should remove both the replication queue and the 
ReplicationPeer
+      manager.removePeer("FakePeer");
+      assertFalse(rq.getAllQueues().contains("FakePeer"));
+      assertNull(rp.getPeer("FakePeer"));
+    } finally {
+      conf.set("replication.replicationsource.implementation", 
replicationSourceImplName);
+    }
+  }
+
   private WALEdit getBulkLoadWALEdit() {
     // 1. Create store files for the families
     Map<byte[], List<Path>> storeFiles = new HashMap<>(1);

Reply via email to