HBASE-11442 ReplicationSourceManager doesn't cleanup the queues for recovered sources (Virag Kothari)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ed1a7896 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ed1a7896 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ed1a7896 Branch: refs/heads/0.98 Commit: ed1a7896e41c88764cd360afaa315f979db9e989 Parents: 3a52381 Author: Enis Soztutar <e...@apache.org> Authored: Mon Jul 14 16:46:11 2014 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Mon Jul 14 16:54:45 2014 -0700 ---------------------------------------------------------------------- .../regionserver/ReplicationSourceManager.java | 51 ++++++++++++++------ .../replication/ReplicationSourceDummy.java | 4 +- .../TestReplicationSourceManager.java | 46 ++++++++++++++++-- 3 files changed, 83 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/ed1a7896/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 30640ba..f823457 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -30,11 +30,12 @@ import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -83,6 +84,8 @@ public class ReplicationSourceManager implements ReplicationListener { private final Stoppable stopper; // All logs we are currently tracking private final Map<String, SortedSet<String>> hlogsById; + // Logs for recovered sources we are currently tracking + private final Map<String, SortedSet<String>> hlogsByIdRecoveredQueues; private final Configuration conf; private final FileSystem fs; // The path to the latest log we saw, for new coming sources @@ -123,6 +126,7 @@ public class ReplicationSourceManager implements ReplicationListener { this.replicationTracker = replicationTracker; this.stopper = stopper; this.hlogsById = new HashMap<String, SortedSet<String>>(); + this.hlogsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>(); this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>(); this.conf = conf; this.fs = fs; @@ -174,20 +178,29 @@ public class ReplicationSourceManager implements ReplicationListener { * @param id id of the peer cluster * @param queueRecovered Whether this is a recovered queue */ - public void cleanOldLogs(String key, - String id, - boolean queueRecovered) { - synchronized (this.hlogsById) { - SortedSet<String> hlogs = this.hlogsById.get(id); - if (queueRecovered || hlogs.first().equals(key)) { - return; + public void cleanOldLogs(String key, String id, boolean queueRecovered) { + if (queueRecovered) { + SortedSet<String> hlogs = hlogsByIdRecoveredQueues.get(id); + if (hlogs != null && !hlogs.first().equals(key)) { + cleanOldLogs(hlogs, key, id); } - SortedSet<String> hlogSet = hlogs.headSet(key); - for (String hlog : hlogSet) { - this.replicationQueues.removeLog(id, hlog); + } else { + synchronized (this.hlogsById) { + SortedSet<String> hlogs = hlogsById.get(id); + if (!hlogs.first().equals(key)) { + cleanOldLogs(hlogs, key, id); + } } - hlogSet.clear(); } + } + + private void cleanOldLogs(SortedSet<String> hlogs, String key, String id) { + SortedSet<String> hlogSet = hlogs.headSet(key); + LOG.debug("Removing " + hlogSet.size() + " logs in the list: " + hlogSet); + for (String hlog : hlogSet) { + this.replicationQueues.removeLog(id, hlog); + } + hlogSet.clear(); } /** @@ -279,6 +292,14 @@ public class ReplicationSourceManager implements ReplicationListener { protected Map<String, SortedSet<String>> getHLogs() { return Collections.unmodifiableMap(hlogsById); } + + /** + * Get a copy of the hlogs of the recovered sources on this rs + * @return a sorted set of hlog names + */ + protected Map<String, SortedSet<String>> getHlogsByIdRecoveredQueues() { + return Collections.unmodifiableMap(hlogsByIdRecoveredQueues); + } /** * Get a list of all the normal sources of this rs @@ -297,7 +318,6 @@ public class ReplicationSourceManager implements ReplicationListener { } void preLogRoll(Path newLog) throws IOException { - synchronized (this.hlogsById) { String name = newLog.getName(); for (ReplicationSourceInterface source : this.sources) { @@ -385,6 +405,7 @@ public class ReplicationSourceManager implements ReplicationListener { LOG.info("Done with the recovered queue " + src.getPeerClusterZnode()); this.oldsources.remove(src); deleteSource(src.getPeerClusterZnode(), false); + this.hlogsByIdRecoveredQueues.remove(src.getPeerClusterZnode()); } /** @@ -516,10 +537,12 @@ public class ReplicationSourceManager implements ReplicationListener { break; } oldsources.add(src); - for (String hlog : entry.getValue()) { + SortedSet<String> hlogsSet = entry.getValue(); + for (String hlog : hlogsSet) { src.enqueueLog(new Path(oldLogDir, hlog)); } src.startup(); + hlogsByIdRecoveredQueues.put(peerId, hlogsSet); } catch (IOException e) { // TODO manage it LOG.error("Failed creating a source", e); http://git-wip-us.apache.org/repos/asf/hbase/blob/ed1a7896/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index fa3dda6..a2d4a8d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -78,7 +78,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { @Override public String getPeerClusterId() { - return peerClusterId; + String[] parts = peerClusterId.split("-", 2); + return parts.length != 1 ? + parts[0] : peerClusterId; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/ed1a7896/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index b9e4d8f..9441dfa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -27,6 +27,8 @@ import java.util.Collection; import java.util.List; import java.util.SortedMap; import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; @@ -54,10 +56,12 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -69,6 +73,8 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.google.common.collect.Sets; + @Category(MediumTests.class) public class TestReplicationSourceManager { @@ -136,14 +142,14 @@ public class TestReplicationSourceManager { ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES); ZKClusterId.setClusterId(zkw, new ClusterId()); - - replication = new Replication(new DummyServer(), fs, logDir, oldLogDir); - manager = replication.getReplicationManager(); fs = FileSystem.get(conf); oldLogDir = new Path(utility.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); logDir = new Path(utility.getDataTestDir(), HConstants.HREGION_LOGDIR_NAME); + replication = new Replication(new DummyServer(), fs, logDir, oldLogDir); + manager = replication.getReplicationManager(); + logName = HConstants.HREGION_LOGDIR_NAME; manager.addSource(slaveId); @@ -272,6 +278,40 @@ public class TestReplicationSourceManager { assertEquals(1, populatedMap); server.abort("", null); } + + @Test + public void testCleanupFailoverQueues() throws Exception { + final Server server = new DummyServer("hostname1.example.org"); + ReplicationQueues rq = + ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), + server); + rq.init(server.getServerName().toString()); + // populate some znodes in the peer znode + SortedSet<String> files = new TreeSet<String>(); + files.add("log1"); + files.add("log2"); + for (String file : files) { + rq.addLog("1", file); + } + Server s1 = new DummyServer("dummyserver1.example.org"); + ReplicationQueues rq1 = + ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); + rq1.init(s1.getServerName().toString()); + ReplicationPeers rp1 = + ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1); + rp1.init(); + NodeFailoverWorker w1 = + manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID( + new Long(1), new Long(2))); + w1.start(); + w1.join(5000); + assertEquals(1, manager.getHlogsByIdRecoveredQueues().size()); + String id = "1-" + server.getServerName().getServerName(); + assertEquals(files, manager.getHlogsByIdRecoveredQueues().get(id)); + manager.cleanOldLogs("log2", id, true); + // log1 should be deleted + assertEquals(Sets.newHashSet("log2"), manager.getHlogsByIdRecoveredQueues().get(id)); + } @Test public void testNodeFailoverDeadServerParsing() throws Exception {