Repository: hbase
Updated Branches:
  refs/heads/branch-1 6c4980161 -> 961337aad


HBASE-18092: Removing a peer does not properly clean up the 
ReplicationSourceManager state and metrics

Signed-off-by: tedyu <yuzhih...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/961337aa
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/961337aa
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/961337aa

Branch: refs/heads/branch-1
Commit: 961337aadc3205e875e345b39da02aabb34c921e
Parents: 6c49801
Author: Ashu Pachauri <ashu210...@gmail.com>
Authored: Thu May 25 18:24:38 2017 -0700
Committer: tedyu <yuzhih...@gmail.com>
Committed: Fri Jun 9 11:14:40 2017 -0700

----------------------------------------------------------------------
 .../replication/regionserver/MetricsSource.java |   6 +-
 .../replication/regionserver/Replication.java   |   4 +-
 .../regionserver/ReplicationSource.java         |   5 +-
 .../ReplicationSourceInterface.java             |   7 ++
 .../regionserver/ReplicationSourceManager.java  |  27 ++--
 .../replication/ReplicationSourceDummy.java     |   8 ++
 .../TestReplicationSourceManager.java           | 123 ++++++++++++++-----
 7 files changed, 132 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/961337aa/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index e808570..2d99018 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -41,7 +41,6 @@ public class MetricsSource implements BaseSource {
 
   // tracks last shipped timestamp for each wal group
   private Map<String, Long> lastTimeStamps = new HashMap<String, Long>();
-  private int lastQueueSize = 0;
   private long lastHFileRefsQueueSize = 0;
   private String id;
 
@@ -182,11 +181,12 @@ public class MetricsSource implements BaseSource {
 
   /** Removes all metrics about this Source. */
   public void clear() {
-    singleSourceSource.clear();
+    int lastQueueSize = singleSourceSource.getSizeOfLogQueue();
     globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
+    singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
+    singleSourceSource.clear();
     globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
     lastTimeStamps.clear();
-    lastQueueSize = 0;
     lastHFileRefsQueueSize = 0;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/961337aa/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 991eb2f..b2b403b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -451,9 +451,7 @@ public class Replication extends WALActionsListener.Base 
implements
     // get source
     List<ReplicationSourceInterface> sources = 
this.replicationManager.getSources();
     for (ReplicationSourceInterface source : sources) {
-      if (source instanceof ReplicationSource) {
-        sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
-      }
+      sourceMetricsList.add(source.getSourceMetrics());
     }
 
     // get old source

http://git-wip-us.apache.org/repos/asf/hbase/blob/961337aa/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 0d52bbe..65ea422 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -490,10 +490,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     return sb.toString();
   }
 
-  /**
-   * Get Replication Source Metrics
-   * @return sourceMetrics
-   */
+  @Override
   public MetricsSource getSourceMetrics() {
     return this.metrics;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/961337aa/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 8d5451c..e7569ed 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -120,4 +120,11 @@ public interface ReplicationSourceInterface {
   void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> 
pairs)
       throws ReplicationException;
 
+  /**
+   * Get the associated metrics.
+   *
+   * @return The metrics for this source.
+   */
+  MetricsSource getSourceMetrics();
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/961337aa/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 5b22d04..ed2aa1d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -370,6 +370,20 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     return this.oldsources;
   }
 
+  /**
+   * Get the normal source for a given peer
+   * @param peerId
+   * @return the normal source for the give peer if it exists, otherwise null.
+   */
+  public ReplicationSourceInterface getSource(String peerId) {
+    for (ReplicationSourceInterface source: getSources()) {
+      if (source.getPeerClusterId().equals(peerId)) {
+        return source;
+      }
+    }
+    return null;
+  }
+
   @VisibleForTesting
   List<String> getAllQueues() {
     return replicationQueues.getAllQueues();
@@ -549,9 +563,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    */
   public void closeRecoveredQueue(ReplicationSourceInterface src) {
     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
-    if (src instanceof ReplicationSource) {
-      ((ReplicationSource) src).getSourceMetrics().clear();
-    }
+    src.getSourceMetrics().clear();
     this.oldsources.remove(src);
     deleteSource(src.getPeerClusterZnode(), false);
     this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
@@ -563,9 +575,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    */
   public void closeQueue(ReplicationSourceInterface src) {
     LOG.info("Done with the queue " + src.getPeerClusterZnode());
-    if (src instanceof ReplicationSource) {
-      ((ReplicationSource) src).getSourceMetrics().clear();
-    }
+    src.getSourceMetrics().clear();
     this.sources.remove(src);
     deleteSource(src.getPeerClusterZnode(), true);
     this.walsById.remove(src.getPeerClusterZnode());
@@ -615,10 +625,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
       }
       for (ReplicationSourceInterface toRemove : srcToRemove) {
         toRemove.terminate(terminateMessage);
-        if (toRemove instanceof ReplicationSource) {
-          ((ReplicationSource) toRemove).getSourceMetrics().clear();
-        }
-        this.sources.remove(toRemove);
+        closeQueue(toRemove);
       }
       deleteSource(id, true);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/961337aa/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 57e54d7..ad8c52f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -40,6 +40,7 @@ public class ReplicationSourceDummy implements 
ReplicationSourceInterface {
   ReplicationSourceManager manager;
   String peerClusterId;
   Path currentPath;
+  MetricsSource metrics;
 
   @Override
   public void init(Configuration conf, FileSystem fs, ReplicationSourceManager 
manager,
@@ -49,11 +50,13 @@ public class ReplicationSourceDummy implements 
ReplicationSourceInterface {
 
     this.manager = manager;
     this.peerClusterId = peerClusterId;
+    this.metrics = metrics;
   }
 
   @Override
   public void enqueueLog(Path log) {
     this.currentPath = log;
+    metrics.incrSizeOfLogQueue();
   }
 
   @Override
@@ -98,4 +101,9 @@ public class ReplicationSourceDummy implements 
ReplicationSourceInterface {
       throws ReplicationException {
     return;
   }
+
+  @Override
+  public MetricsSource getSourceMetrics() {
+    return metrics;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/961337aa/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 96228e5..b042a57 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -20,13 +20,14 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.Sets;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -523,6 +524,9 @@ public class TestReplicationSourceManager {
   @Test
   public void testPeerRemovalCleanup() throws Exception{
     String replicationSourceImplName = 
conf.get("replication.replicationsource.implementation");
+    final String peerId = "FakePeer";
+    final ReplicationPeerConfig peerConfig =
+        new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase");
     try {
       DummyServer server = new DummyServer();
       final ReplicationQueues rq =
@@ -535,42 +539,105 @@ public class TestReplicationSourceManager {
           FailInitializeDummyReplicationSource.class.getName());
       final ReplicationPeers rp = manager.getReplicationPeers();
       // Set up the znode and ReplicationPeer for the fake peer
-      rp.addPeer("FakePeer", new 
ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"));
-      // Wait for the peer to get created and connected
-      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          return (rp.getPeer("FakePeer") != null);
-        }
-      });
+      // Don't wait for replication source to initialize, we know it won't.
+      addPeerAndWait(peerId, peerConfig, false);
 
-      // Make sure that the replication source was not initialized
-      List<ReplicationSourceInterface> sources = manager.getSources();
-      for (ReplicationSourceInterface source : sources) {
-        assertNotEquals("FakePeer", source.getPeerClusterId());
-      }
+      // Sanity check
+      assertNull(manager.getSource(peerId));
 
-      // Removing the peer should remove both the replication queue and the 
ReplicationPeer
-      manager.removePeer("FakePeer");
-      assertFalse(rq.getAllQueues().contains("FakePeer"));
-      assertNull(rp.getPeer("FakePeer"));
+
+      // Create a replication queue for the fake peer
+      rq.addLog(peerId, "FakeFile");
       // Unregister peer, this should remove the peer and clear all queues 
associated with it
       // Need to wait for the ReplicationTracker to pick up the changes and 
notify listeners.
-      rp.removePeer("FakePeer");
-      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          List<String> peers = rp.getAllPeerIds();
-          return (!rq.getAllQueues().contains("FakePeer"))
-              && (rp.getPeer("FakePeer") == null)
-              && (!peers.contains("FakePeer"));
-          }
-      });
+      removePeerAndWait(peerId);
+      assertFalse(rq.getAllQueues().contains(peerId));
     } finally {
       conf.set("replication.replicationsource.implementation", 
replicationSourceImplName);
+      removePeerAndWait(peerId);
     }
   }
 
+  @Test
+  public void testRemovePeerMetricsCleanup() throws Exception {
+    final String peerId = "DummyPeer";
+    final ReplicationPeerConfig peerConfig =
+        new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase");
+    try {
+      addPeerAndWait(peerId, peerConfig, true);
+
+      ReplicationSourceInterface source = manager.getSource(peerId);
+      // Sanity check
+      assertNotNull(source);
+      // Retrieve the global replication metrics source
+      Field f = MetricsSource.class.getDeclaredField("globalSourceSource");
+      f.setAccessible(true);
+      MetricsReplicationSourceSource globalSource =
+          (MetricsReplicationSourceSource)f.get(source.getSourceMetrics());
+      int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
+
+      // Enqueue log and check if metrics updated
+      source.enqueueLog(new Path("abc"));
+      assertEquals(1, source.getSourceMetrics().getSizeOfLogQueue());
+      assertEquals(1 + globalLogQueueSizeInitial, 
globalSource.getSizeOfLogQueue());
+
+      // Removing the peer should reset the global metrics
+      removePeerAndWait(peerId);
+      assertEquals(globalLogQueueSizeInitial, 
globalSource.getSizeOfLogQueue());
+
+      // Adding the same peer back again should reset the single source metrics
+      addPeerAndWait(peerId, peerConfig, true);
+      source = manager.getSource(peerId);
+      assertNotNull(source);
+      assertEquals(0, source.getSourceMetrics().getSizeOfLogQueue());
+      assertEquals(globalLogQueueSizeInitial, 
globalSource.getSizeOfLogQueue());
+    } finally {
+      removePeerAndWait(peerId);
+    }
+  }
+
+  /**
+   * Add a peer and wait for it to initialize
+   * @param peerId
+   * @param peerConfig
+   * @param waitForSource Whether to wait for replication source to initialize
+   * @throws Exception
+   */
+  private void addPeerAndWait(final String peerId, final ReplicationPeerConfig 
peerConfig,
+      final boolean waitForSource) throws Exception {
+    final ReplicationPeers rp = manager.getReplicationPeers();
+    rp.addPeer(peerId, peerConfig);
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() throws Exception {
+        if (waitForSource) {
+          return (manager.getSource(peerId) != null);
+        } else {
+          return (rp.getPeer(peerId) != null);
+        }
+      }
+    });
+  }
+
+  /**
+   * Remove a peer and wait for it to get cleaned up
+   * @param peerId
+   * @throws Exception
+   */
+  private void removePeerAndWait(final String peerId) throws Exception {
+    final ReplicationPeers rp = manager.getReplicationPeers();
+    if (rp.getAllPeerIds().contains(peerId)) {
+      rp.removePeer(peerId);
+    }
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() throws Exception {
+        List<String> peers = rp.getAllPeerIds();
+        return (!manager.getAllQueues().contains(peerId)) && 
(rp.getPeer(peerId) == null)
+            && (!peers.contains(peerId));
+      }
+    });
+  }
+
+
   private WALEdit getBulkLoadWALEdit() {
     // 1. Create store files for the families
     Map<byte[], List<Path>> storeFiles = new HashMap<>(1);

Reply via email to