http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java index bd6d070..346ff37 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java @@ -39,6 +39,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Set; @@ -58,15 +59,20 @@ public class TestReplicationStateHBaseImpl { private static ReplicationQueues rq1; private static ReplicationQueues rq2; private static ReplicationQueues rq3; + private static ReplicationQueuesClient rqc; private static ReplicationPeers rp; - private static final String server1 = ServerName.valueOf("hostname1.example.org", 1234, 123L) - .toString(); + + private static final String server0 = ServerName.valueOf("hostname0.example.org", 1234, -1L) + .toString(); + private static final String server1 = ServerName.valueOf("hostname1.example.org", 1234, 1L) + .toString(); private static final String server2 = ServerName.valueOf("hostname2.example.org", 1234, 1L) - .toString(); + .toString(); private static final String server3 = ServerName.valueOf("hostname3.example.org", 1234, 1L) - .toString(); + .toString(); + private static DummyServer ds0; private static DummyServer ds1; private static DummyServer ds2; private static DummyServer ds3; @@ -77,9 +83,9 @@ public class TestReplicationStateHBaseImpl { utility.startMiniCluster(); conf = utility.getConfiguration(); conf.setClass("hbase.region.replica.replication.ReplicationQueuesType", - ReplicationQueuesHBaseImpl.class, ReplicationQueues.class); - conf.setClass("hbase.region.replica.replication.ReplicationQueuesType", - ReplicationQueuesHBaseImpl.class, ReplicationQueues.class); + TableBasedReplicationQueuesImpl.class, ReplicationQueues.class); + conf.setClass("hbase.region.replica.replication.ReplicationQueuesClientType", + TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class); zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName); @@ -88,6 +94,9 @@ public class TestReplicationStateHBaseImpl { @Before public void setUp() { try { + ds0 = new DummyServer(server0); + rqc = ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments( + conf, ds0)); ds1 = new DummyServer(server1); rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw)); rq1.init(server1); @@ -99,9 +108,6 @@ public class TestReplicationStateHBaseImpl { rq3.init(server3); rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); rp.init(); - rp.addPeer("Queue1", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1")); - rp.addPeer("Queue2", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus2")); - rp.addPeer("Queue3", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus3")); } catch (Exception e) { fail("testReplicationStateHBaseConstruction received an exception" + e.getMessage()); } @@ -165,13 +171,13 @@ public class TestReplicationStateHBaseImpl { try { rq1.getLogPosition("Queue1", "NotHereWAL"); fail("Replication queue should have thrown a ReplicationException for reading from a " + - "non-existent WAL"); + "non-existent WAL"); } catch (ReplicationException e) { } try { rq1.getLogPosition("NotHereQueue", "NotHereWAL"); fail("Replication queue should have thrown a ReplicationException for reading from a " + - "non-existent queue"); + "non-existent queue"); } catch (ReplicationException e) { } // Test removing logs @@ -198,6 +204,13 @@ public class TestReplicationStateHBaseImpl { @Test public void TestMultipleReplicationQueuesHBaseImpl () { try { + rp.addPeer("Queue1", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1")); + rp.addPeer("Queue2", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus2")); + rp.addPeer("Queue3", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus3")); + } catch (ReplicationException e) { + fail("Failed to add peers to ReplicationPeers"); + } + try { // Test adding in WAL files rq1.addLog("Queue1", "WALLogFile1.1"); rq1.addLog("Queue1", "WALLogFile1.2"); @@ -298,6 +311,56 @@ public class TestReplicationStateHBaseImpl { } } + @Test + public void TestReplicationQueuesClient() throws Exception{ + + // Test ReplicationQueuesClient log tracking + rq1.addLog("Queue1", "WALLogFile1.1"); + assertEquals(1, rqc.getLogsInQueue(server1, "Queue1").size()); + rq1.removeLog("Queue1", "WALLogFile1.1"); + assertEquals(0, rqc.getLogsInQueue(server1, "Queue1").size()); + rq2.addLog("Queue2", "WALLogFile2.1"); + rq2.addLog("Queue2", "WALLogFile2.2"); + assertEquals(2, rqc.getLogsInQueue(server2, "Queue2").size()); + rq3.addLog("Queue1", "WALLogFile1.1"); + rq3.addLog("Queue3", "WALLogFile3.1"); + rq3.addLog("Queue3", "WALLogFile3.2"); + + // Test ReplicationQueueClient log tracking for faulty cases + assertEquals(0, ds0.getAbortCount()); + assertNull(rqc.getLogsInQueue("NotHereServer", "NotHereQueue")); + assertNull(rqc.getLogsInQueue(server1, "NotHereQueue")); + assertNull(rqc.getLogsInQueue("NotHereServer", "WALLogFile1.1")); + assertEquals(3, ds0.getAbortCount()); + // Test ReplicationQueueClient replicators + List<String> replicators = rqc.getListOfReplicators(); + assertEquals(3, replicators.size()); + assertTrue(replicators.contains(server1)); + assertTrue(replicators.contains(server2)); + rq1.removeQueue("Queue1"); + assertEquals(2, rqc.getListOfReplicators().size()); + + // Test ReplicationQueuesClient queue tracking + assertEquals(0, rqc.getAllQueues(server1).size()); + rq1.addLog("Queue2", "WALLogFile2.1"); + rq1.addLog("Queue3", "WALLogFile3.1"); + assertEquals(2, rqc.getAllQueues(server1).size()); + rq1.removeAllQueues(); + assertEquals(0, rqc.getAllQueues(server1).size()); + + // Test ReplicationQueuesClient queue tracking for faulty cases + assertEquals(0, rqc.getAllQueues("NotHereServer").size()); + + // Test ReplicationQueuesClient get all WAL's + assertEquals(5 , rqc.getAllWALs().size()); + rq3.removeLog("Queue1", "WALLogFile1.1"); + assertEquals(4, rqc.getAllWALs().size()); + rq3.removeAllQueues(); + assertEquals(2, rqc.getAllWALs().size()); + rq2.removeAllQueues(); + assertEquals(0, rqc.getAllWALs().size()); + } + @After public void clearQueues() throws Exception{ rq1.removeAllQueues(); @@ -306,6 +369,7 @@ public class TestReplicationStateHBaseImpl { assertEquals(0, rq1.getAllQueues().size()); assertEquals(0, rq2.getAllQueues().size()); assertEquals(0, rq3.getAllQueues().size()); + ds0.resetAbortCount(); ds1.resetAbortCount(); ds2.resetAbortCount(); ds3.resetAbortCount(); @@ -313,7 +377,7 @@ public class TestReplicationStateHBaseImpl { @After public void tearDown() throws KeeperException, IOException { - ZKUtil.deleteNodeRecursively(zkw, replicationZNode); + ZKUtil.deleteNodeRecursively(zkw, replicationZNode); } @AfterClass
http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java index 972a400..a357a1f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java @@ -96,11 +96,13 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw)); rq2 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds2, zkw)); rq3 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds3, zkw)); + rqc = ReplicationFactory.getReplicationQueuesClient( + new ReplicationQueuesClientArguments(conf, ds1, zkw)); } catch (Exception e) { - // This should not occur, because getReplicationQueues() only throws for ReplicationQueuesHBaseImpl + // This should not occur, because getReplicationQueues() only throws for + // TableBasedReplicationQueuesImpl fail("ReplicationFactory.getReplicationQueues() threw an IO Exception"); } - rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, ds1); rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); rqZK = new ReplicationQueuesZKImpl(zkw, conf, ds1); http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index e14fd3c..bf47d4f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -68,6 +68,8 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; @@ -436,8 +438,9 @@ public class TestReplicationSourceManager { s1.getZooKeeper())); rq1.init(s1.getServerName().toString()); - ReplicationQueuesClient client = - ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1); + ReplicationQueuesClientZKImpl client = + (ReplicationQueuesClientZKImpl)ReplicationFactory.getReplicationQueuesClient( + new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, s1.getZooKeeper())); int v0 = client.getQueuesZNodeCversion(); rq1.claimQueues(s0.getServerName().getServerName());
