Repository: hbase Updated Branches: refs/heads/master 104f58701 -> 00095a2ef
http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 4a36e13..6d75fec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -70,7 +70,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; -import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -170,9 +170,9 @@ public abstract class TestReplicationSourceManager { + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1")); ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state"); ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", - ReplicationUtils.PEER_STATE_ENABLED_BYTES); + ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); ZKUtil.createWithParents(zkw, "/hbase/replication/state"); - ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationUtils.PEER_STATE_ENABLED_BYTES); + ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); ZKClusterId.setClusterId(zkw, new ClusterId()); FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir()); http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateBasic.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateBasic.java deleted file mode 100644 index 461420e..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateBasic.java +++ /dev/null @@ -1,370 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.storage; - -import static org.hamcrest.CoreMatchers.hasItems; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; -import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; -import org.apache.hadoop.hbase.replication.ReplicationPeers; -import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.replication.ReplicationUtils; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.zookeeper.ZKConfig; -import org.apache.zookeeper.KeeperException; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; - -/** - * White box testing for replication state interfaces. Implementations should extend this class, and - * initialize the interfaces properly. - */ -public abstract class TestReplicationStateBasic { - - private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class); - - protected ReplicationQueueStorage rqs; - protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345); - protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345); - protected ServerName server3 = ServerName.valueOf("hostname3.example.org", 1234, 12345); - protected ReplicationPeers rp; - protected static final String ID_ONE = "1"; - protected static final String ID_TWO = "2"; - protected static String KEY_ONE; - protected static String KEY_TWO; - - // For testing when we try to replicate to ourself - protected String OUR_KEY; - - protected static int zkTimeoutCount; - protected static final int ZK_MAX_COUNT = 300; - protected static final int ZK_SLEEP_INTERVAL = 100; // millis - - @Test - public void testReplicationQueueStorage() throws ReplicationException { - // Test methods with empty state - assertEquals(0, rqs.getListOfReplicators().size()); - assertTrue(rqs.getWALsInQueue(server1, "qId1").isEmpty()); - assertTrue(rqs.getAllQueues(server1).isEmpty()); - - /* - * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each -- - * server2: zero queues - */ - rqs.addWAL(server1, "qId1", "trash"); - rqs.removeWAL(server1, "qId1", "trash"); - rqs.addWAL(server1,"qId2", "filename1"); - rqs.addWAL(server1,"qId3", "filename2"); - rqs.addWAL(server1,"qId3", "filename3"); - rqs.addWAL(server2,"trash", "trash"); - rqs.removeQueue(server2,"trash"); - - List<ServerName> reps = rqs.getListOfReplicators(); - assertEquals(2, reps.size()); - assertTrue(server1.getServerName(), reps.contains(server1)); - assertTrue(server2.getServerName(), reps.contains(server2)); - - assertTrue(rqs.getWALsInQueue(ServerName.valueOf("bogus", 12345, 12345), "bogus").isEmpty()); - assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty()); - assertEquals(0, rqs.getWALsInQueue(server1, "qId1").size()); - assertEquals(1, rqs.getWALsInQueue(server1, "qId2").size()); - assertEquals("filename1", rqs.getWALsInQueue(server1, "qId2").get(0)); - - assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 12345, -1L)).isEmpty()); - assertEquals(0, rqs.getAllQueues(server2).size()); - List<String> list = rqs.getAllQueues(server1); - assertEquals(3, list.size()); - assertTrue(list.contains("qId2")); - assertTrue(list.contains("qId3")); - } - - private void removeAllQueues(ServerName serverName) throws ReplicationException { - for (String queue: rqs.getAllQueues(serverName)) { - rqs.removeQueue(serverName, queue); - } - } - @Test - public void testReplicationQueues() throws ReplicationException { - // Initialize ReplicationPeer so we can add peers (we don't transfer lone queues) - rp.init(); - - rqs.removeQueue(server1, "bogus"); - rqs.removeWAL(server1, "bogus", "bogus"); - removeAllQueues(server1); - assertEquals(0, rqs.getAllQueues(server1).size()); - assertEquals(0, rqs.getWALPosition(server1, "bogus", "bogus")); - assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty()); - assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 1234, 12345)).isEmpty()); - - populateQueues(); - - assertEquals(3, rqs.getListOfReplicators().size()); - assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size()); - assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); - assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0")); - rqs.setWALPosition(server3, "qId5", "filename4", 354L, null); - assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4")); - - assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); - assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size()); - assertEquals(0, rqs.getAllQueues(server1).size()); - assertEquals(1, rqs.getAllQueues(server2).size()); - assertEquals(5, rqs.getAllQueues(server3).size()); - - assertEquals(0, rqs.getAllQueues(server1).size()); - rqs.removeReplicatorIfQueueIsEmpty(server1); - assertEquals(2, rqs.getListOfReplicators().size()); - - List<String> queues = rqs.getAllQueues(server3); - assertEquals(5, queues.size()); - for (String queue : queues) { - rqs.claimQueue(server3, queue, server2); - } - rqs.removeReplicatorIfQueueIsEmpty(server3); - assertEquals(1, rqs.getListOfReplicators().size()); - - assertEquals(6, rqs.getAllQueues(server2).size()); - removeAllQueues(server2); - rqs.removeReplicatorIfQueueIsEmpty(server2); - assertEquals(0, rqs.getListOfReplicators().size()); - } - - @Test - public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException { - rp.init(); - - List<Pair<Path, Path>> files1 = new ArrayList<>(3); - files1.add(new Pair<>(null, new Path("file_1"))); - files1.add(new Pair<>(null, new Path("file_2"))); - files1.add(new Pair<>(null, new Path("file_3"))); - assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); - assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); - rp.getPeerStorage().addPeer(ID_ONE, - ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true); - rqs.addPeerToHFileRefs(ID_ONE); - rqs.addHFileRefs(ID_ONE, files1); - assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); - assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); - List<String> hfiles2 = new ArrayList<>(files1.size()); - for (Pair<Path, Path> p : files1) { - hfiles2.add(p.getSecond().getName()); - } - String removedString = hfiles2.remove(0); - rqs.removeHFileRefs(ID_ONE, hfiles2); - assertEquals(1, rqs.getReplicableHFiles(ID_ONE).size()); - hfiles2 = new ArrayList<>(1); - hfiles2.add(removedString); - rqs.removeHFileRefs(ID_ONE, hfiles2); - assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size()); - rp.getPeerStorage().removePeer(ID_ONE); - } - - @Test - public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException { - rp.init(); - rp.getPeerStorage().addPeer(ID_ONE, - ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true); - rqs.addPeerToHFileRefs(ID_ONE); - rp.getPeerStorage().addPeer(ID_TWO, - ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true); - rqs.addPeerToHFileRefs(ID_TWO); - - List<Pair<Path, Path>> files1 = new ArrayList<>(3); - files1.add(new Pair<>(null, new Path("file_1"))); - files1.add(new Pair<>(null, new Path("file_2"))); - files1.add(new Pair<>(null, new Path("file_3"))); - rqs.addHFileRefs(ID_ONE, files1); - rqs.addHFileRefs(ID_TWO, files1); - assertEquals(2, rqs.getAllPeersFromHFileRefsQueue().size()); - assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); - assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); - - rp.getPeerStorage().removePeer(ID_ONE); - rqs.removePeerFromHFileRefs(ID_ONE); - assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); - assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); - assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); - - rp.getPeerStorage().removePeer(ID_TWO); - rqs.removePeerFromHFileRefs(ID_TWO); - assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); - assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty()); - } - - @Test - public void testReplicationPeers() throws Exception { - rp.init(); - - try { - rp.getPeerStorage().setPeerState("bogus", true); - fail("Should have thrown an ReplicationException when passed a non-exist bogus peerId"); - } catch (ReplicationException e) { - } - try { - rp.getPeerStorage().setPeerState("bogus", false); - fail("Should have thrown an ReplicationException when passed a non-exist bogus peerId"); - } catch (ReplicationException e) { - } - - try { - assertFalse(rp.addPeer("bogus")); - fail("Should have thrown an ReplicationException when creating a bogus peerId " - + "with null peer config"); - } catch (ReplicationException e) { - } - - assertNumberOfPeers(0); - - // Add some peers - rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true); - assertNumberOfPeers(1); - rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true); - assertNumberOfPeers(2); - - assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils - .getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf()))); - rp.getPeerStorage().removePeer(ID_ONE); - rp.removePeer(ID_ONE); - assertNumberOfPeers(1); - - // Add one peer - rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true); - rp.addPeer(ID_ONE); - assertNumberOfPeers(2); - assertTrue(rp.getPeer(ID_ONE).isPeerEnabled()); - rp.getPeerStorage().setPeerState(ID_ONE, false); - // now we do not rely on zk watcher to trigger the state change so we need to trigger it - // manually... - ReplicationPeerImpl peer = rp.getPeer(ID_ONE); - rp.refreshPeerState(peer.getId()); - assertEquals(PeerState.DISABLED, peer.getPeerState()); - assertConnectedPeerStatus(false, ID_ONE); - rp.getPeerStorage().setPeerState(ID_ONE, true); - // now we do not rely on zk watcher to trigger the state change so we need to trigger it - // manually... - rp.refreshPeerState(peer.getId()); - assertEquals(PeerState.ENABLED, peer.getPeerState()); - assertConnectedPeerStatus(true, ID_ONE); - - // Disconnect peer - rp.removePeer(ID_ONE); - assertNumberOfPeers(2); - } - - private String getFileName(String base, int i) { - return String.format(base + "-%04d", i); - } - - @Test - public void testPersistLogPositionAndSeqIdAtomically() throws Exception { - ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); - assertTrue(rqs.getAllQueues(serverName1).isEmpty()); - String queue1 = "1"; - String region0 = "region0", region1 = "region1"; - for (int i = 0; i < 10; i++) { - rqs.addWAL(serverName1, queue1, getFileName("file1", i)); - } - List<String> queueIds = rqs.getAllQueues(serverName1); - assertEquals(1, queueIds.size()); - assertThat(queueIds, hasItems("1")); - - List<String> wals1 = rqs.getWALsInQueue(serverName1, queue1); - assertEquals(10, wals1.size()); - for (int i = 0; i < 10; i++) { - assertThat(wals1, hasItems(getFileName("file1", i))); - } - - for (int i = 0; i < 10; i++) { - assertEquals(0, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i))); - } - assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region0, queue1)); - assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region1, queue1)); - - for (int i = 0; i < 10; i++) { - rqs.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, - ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L)); - } - - for (int i = 0; i < 10; i++) { - assertEquals((i + 1) * 100, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i))); - } - assertEquals(900L, rqs.getLastSequenceId(region0, queue1)); - assertEquals(1000L, rqs.getLastSequenceId(region1, queue1)); - } - - protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception { - // we can first check if the value was changed in the store, if it wasn't then fail right away - if (status != rp.getPeerStorage().isPeerEnabled(peerId)) { - fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK"); - } - while (true) { - if (status == rp.getPeer(peerId).isPeerEnabled()) { - return; - } - if (zkTimeoutCount < ZK_MAX_COUNT) { - LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status - + ", sleeping and trying again."); - Thread.sleep(ZK_SLEEP_INTERVAL); - } else { - fail("Timed out waiting for ConnectedPeerStatus to be " + status); - } - } - } - - protected void assertNumberOfPeers(int total) throws ReplicationException { - assertEquals(total, rp.getPeerStorage().listPeerIds().size()); - } - - /* - * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2, - * 3, 4, 5 log files respectively - */ - protected void populateQueues() throws ReplicationException { - rqs.addWAL(server1, "trash", "trash"); - rqs.removeQueue(server1, "trash"); - - rqs.addWAL(server2, "qId1", "trash"); - rqs.removeWAL(server2, "qId1", "trash"); - - for (int i = 1; i < 6; i++) { - for (int j = 0; j < i; j++) { - rqs.addWAL(server3, "qId" + i, "filename" + j); - } - // Add peers for the corresponding queues so they are not orphans - rp.getPeerStorage().addPeer("qId" + i, - ReplicationPeerConfig.newBuilder().setClusterKey("localhost:2818:/bogus" + i).build(), - true); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateTableImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateTableImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateTableImpl.java deleted file mode 100644 index d073669..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateTableImpl.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.replication.storage; - -import java.io.IOException; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ClusterId; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.replication.ReplicationFactory; -import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; -import org.apache.hadoop.hbase.replication.TableReplicationPeerStorage; -import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage; -import org.apache.hadoop.hbase.replication.TableReplicationStorageBase; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.zookeeper.ZKClusterId; -import org.apache.hadoop.hbase.zookeeper.ZKConfig; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; -import org.apache.zookeeper.KeeperException; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.experimental.categories.Category; - -@Category({ ReplicationTests.class, MediumTests.class }) -public class TestReplicationStateTableImpl extends TestReplicationStateBasic { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestReplicationStateTableImpl.class); - - private static Configuration conf; - private static HBaseTestingUtility utility = new HBaseTestingUtility(); - private static ZKWatcher zkw; - private static Connection connection; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - conf = utility.getConfiguration(); - conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); - conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); - utility.startMiniCluster(); - - // After the HBase Mini cluster startup, we set the storage implementation to table based - // implementation. Otherwise, we cannot setup the HBase Mini Cluster because the master will - // list peers before finish its initialization, and if master cannot finish initialization, the - // meta cannot be online, in other hand, if meta cannot be online, the list peers never success - // when using table based replication. a dead loop happen. - // Our UTs are written for testing storage layer, so no problem here. - conf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL, - TableReplicationPeerStorage.class.getName()); - conf.set(ReplicationStorageFactory.REPLICATION_QUEUE_STORAGE_IMPL, - TableReplicationQueueStorage.class.getName()); - - zkw = utility.getZooKeeperWatcher(); - connection = ConnectionFactory.createConnection(conf); - - KEY_ONE = initPeerClusterState("/hbase1"); - KEY_TWO = initPeerClusterState("/hbase2"); - } - - private static String initPeerClusterState(String baseZKNode) - throws IOException, KeeperException { - // Add a dummy region server and set up the cluster id - Configuration testConf = new Configuration(conf); - testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode); - ZKWatcher zkw1 = new ZKWatcher(testConf, "test1", null); - String fakeRs = ZNodePaths.joinZNode(zkw1.znodePaths.rsZNode, "hostname1.example.org:1234"); - ZKUtil.createWithParents(zkw1, fakeRs); - ZKClusterId.setClusterId(zkw1, new ClusterId()); - return ZKConfig.getZooKeeperClusterKey(testConf); - } - - @Before - public void setUp() throws IOException { - rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); - rp = ReplicationFactory.getReplicationPeers(zkw, conf); - OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); - - // Create hbase:replication meta table. - try (Admin admin = connection.getAdmin()) { - TableDescriptor table = - TableReplicationStorageBase.createReplicationTableDescBuilder(conf).build(); - admin.createTable(table); - } - } - - @After - public void tearDown() throws KeeperException, IOException { - // Drop the hbase:replication meta table. - utility.deleteTable(TableReplicationStorageBase.REPLICATION_TABLE); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - if (connection != null) { - IOUtils.closeQuietly(connection); - } - utility.shutdownMiniZKCluster(); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateZKImpl.java deleted file mode 100644 index 993f2fb..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateZKImpl.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.storage; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ClusterId; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseZKTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.replication.ReplicationFactory; -import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.zookeeper.ZKClusterId; -import org.apache.hadoop.hbase.zookeeper.ZKConfig; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; -import org.apache.zookeeper.KeeperException; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.experimental.categories.Category; - -@Category({ ReplicationTests.class, MediumTests.class }) -public class TestReplicationStateZKImpl extends TestReplicationStateBasic { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestReplicationStateZKImpl.class); - - private static Configuration conf; - private static HBaseZKTestingUtility utility; - private static ZKWatcher zkw; - private static String replicationZNode; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - utility = new HBaseZKTestingUtility(); - utility.startMiniZKCluster(); - conf = utility.getConfiguration(); - conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); - zkw = utility.getZooKeeperWatcher(); - String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); - replicationZNode = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, replicationZNodeName); - KEY_ONE = initPeerClusterState("/hbase1"); - KEY_TWO = initPeerClusterState("/hbase2"); - } - - private static String initPeerClusterState(String baseZKNode) - throws IOException, KeeperException { - // Add a dummy region server and set up the cluster id - Configuration testConf = new Configuration(conf); - testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode); - ZKWatcher zkw1 = new ZKWatcher(testConf, "test1", null); - String fakeRs = ZNodePaths.joinZNode(zkw1.znodePaths.rsZNode, "hostname1.example.org:1234"); - ZKUtil.createWithParents(zkw1, fakeRs); - ZKClusterId.setClusterId(zkw1, new ClusterId()); - return ZKConfig.getZooKeeperClusterKey(testConf); - } - - @Before - public void setUp() { - zkTimeoutCount = 0; - rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); - rp = ReplicationFactory.getReplicationPeers(zkw, conf); - OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); - } - - @After - public void tearDown() throws KeeperException, IOException { - ZKUtil.deleteNodeRecursively(zkw, replicationZNode); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - utility.shutdownMiniZKCluster(); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationPeerStorage.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationPeerStorage.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationPeerStorage.java deleted file mode 100644 index 190eef4..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationPeerStorage.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.storage; - -import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toSet; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.stream.Stream; - -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseZKTestingUtility; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ ReplicationTests.class, MediumTests.class }) -public class TestZKReplicationPeerStorage { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestZKReplicationPeerStorage.class); - - private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility(); - - private static ZKReplicationPeerStorage STORAGE; - - @BeforeClass - public static void setUp() throws Exception { - UTIL.startMiniZKCluster(); - STORAGE = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); - } - - @AfterClass - public static void tearDown() throws IOException { - UTIL.shutdownMiniZKCluster(); - } - - private Set<String> randNamespaces(Random rand) { - return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5)) - .collect(toSet()); - } - - private Map<TableName, List<String>> randTableCFs(Random rand) { - int size = rand.nextInt(5); - Map<TableName, List<String>> map = new HashMap<>(); - for (int i = 0; i < size; i++) { - TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong())); - List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong())) - .limit(rand.nextInt(5)).collect(toList()); - map.put(tn, cfs); - } - return map; - } - - private ReplicationPeerConfig getConfig(int seed) { - Random rand = new Random(seed); - return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(rand.nextLong())) - .setReplicationEndpointImpl(Long.toHexString(rand.nextLong())) - .setNamespaces(randNamespaces(rand)).setExcludeNamespaces(randNamespaces(rand)) - .setTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean()) - .setBandwidth(rand.nextInt(1000)).build(); - } - - private void assertSetEquals(Set<String> expected, Set<String> actual) { - if (expected == null || expected.size() == 0) { - assertTrue(actual == null || actual.size() == 0); - return; - } - assertEquals(expected.size(), actual.size()); - expected.forEach(s -> assertTrue(actual.contains(s))); - } - - private void assertMapEquals(Map<TableName, List<String>> expected, - Map<TableName, List<String>> actual) { - if (expected == null || expected.size() == 0) { - assertTrue(actual == null || actual.size() == 0); - return; - } - assertEquals(expected.size(), actual.size()); - expected.forEach((expectedTn, expectedCFs) -> { - List<String> actualCFs = actual.get(expectedTn); - if (expectedCFs == null || expectedCFs.size() == 0) { - assertTrue(actual.containsKey(expectedTn)); - assertTrue(actualCFs == null || actualCFs.size() == 0); - } else { - assertNotNull(actualCFs); - assertEquals(expectedCFs.size(), actualCFs.size()); - for (Iterator<String> expectedIt = expectedCFs.iterator(), actualIt = actualCFs.iterator(); - expectedIt.hasNext();) { - assertEquals(expectedIt.next(), actualIt.next()); - } - } - }); - } - - private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) { - assertEquals(expected.getClusterKey(), actual.getClusterKey()); - assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl()); - assertSetEquals(expected.getNamespaces(), actual.getNamespaces()); - assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces()); - assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap()); - assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap()); - assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables()); - assertEquals(expected.getBandwidth(), actual.getBandwidth()); - } - - @Test - public void test() throws ReplicationException { - int peerCount = 10; - for (int i = 0; i < peerCount; i++) { - STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0); - } - List<String> peerIds = STORAGE.listPeerIds(); - assertEquals(peerCount, peerIds.size()); - for (String peerId : peerIds) { - int seed = Integer.parseInt(peerId); - assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId)); - } - for (int i = 0; i < peerCount; i++) { - STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1)); - } - for (String peerId : peerIds) { - int seed = Integer.parseInt(peerId); - assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId)); - } - for (int i = 0; i < peerCount; i++) { - assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i))); - } - for (int i = 0; i < peerCount; i++) { - STORAGE.setPeerState(Integer.toString(i), i % 2 != 0); - } - for (int i = 0; i < peerCount; i++) { - assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i))); - } - String toRemove = Integer.toString(peerCount / 2); - STORAGE.removePeer(toRemove); - peerIds = STORAGE.listPeerIds(); - assertEquals(peerCount - 1, peerIds.size()); - assertFalse(peerIds.contains(toRemove)); - - try { - STORAGE.getPeerConfig(toRemove); - fail("Should throw a ReplicationException when get peer config of a peerId"); - } catch (ReplicationException e) { - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationQueueStorage.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationQueueStorage.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationQueueStorage.java deleted file mode 100644 index 780ff2a..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationQueueStorage.java +++ /dev/null @@ -1,255 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.storage; - -import static org.hamcrest.CoreMatchers.hasItems; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.SortedSet; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseZKTestingUtility; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.zookeeper.KeeperException; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ ReplicationTests.class, MediumTests.class }) -public class TestZKReplicationQueueStorage { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class); - - private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility(); - - private static ZKReplicationQueueStorage STORAGE; - - @BeforeClass - public static void setUp() throws Exception { - UTIL.startMiniZKCluster(); - STORAGE = new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); - } - - @AfterClass - public static void tearDown() throws IOException { - UTIL.shutdownMiniZKCluster(); - } - - @After - public void tearDownAfterTest() throws ReplicationException { - for (ServerName serverName : STORAGE.getListOfReplicators()) { - for (String queue : STORAGE.getAllQueues(serverName)) { - STORAGE.removeQueue(serverName, queue); - } - STORAGE.removeReplicatorIfQueueIsEmpty(serverName); - } - for (String peerId : STORAGE.getAllPeersFromHFileRefsQueue()) { - STORAGE.removePeerFromHFileRefs(peerId); - } - } - - private ServerName getServerName(int i) { - return ServerName.valueOf("127.0.0.1", 8000 + i, 10000 + i); - } - - @Test - public void testReplicator() throws ReplicationException { - assertTrue(STORAGE.getListOfReplicators().isEmpty()); - String queueId = "1"; - for (int i = 0; i < 10; i++) { - STORAGE.addWAL(getServerName(i), queueId, "file" + i); - } - List<ServerName> replicators = STORAGE.getListOfReplicators(); - assertEquals(10, replicators.size()); - for (int i = 0; i < 10; i++) { - assertThat(replicators, hasItems(getServerName(i))); - } - for (int i = 0; i < 5; i++) { - STORAGE.removeQueue(getServerName(i), queueId); - } - for (int i = 0; i < 10; i++) { - STORAGE.removeReplicatorIfQueueIsEmpty(getServerName(i)); - } - replicators = STORAGE.getListOfReplicators(); - assertEquals(5, replicators.size()); - for (int i = 5; i < 10; i++) { - assertThat(replicators, hasItems(getServerName(i))); - } - } - - private String getFileName(String base, int i) { - return String.format(base + "-%04d", i); - } - - @Test - public void testAddRemoveLog() throws ReplicationException { - ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); - assertTrue(STORAGE.getAllQueues(serverName1).isEmpty()); - String queue1 = "1"; - String queue2 = "2"; - for (int i = 0; i < 10; i++) { - STORAGE.addWAL(serverName1, queue1, getFileName("file1", i)); - STORAGE.addWAL(serverName1, queue2, getFileName("file2", i)); - } - List<String> queueIds = STORAGE.getAllQueues(serverName1); - assertEquals(2, queueIds.size()); - assertThat(queueIds, hasItems("1", "2")); - - List<String> wals1 = STORAGE.getWALsInQueue(serverName1, queue1); - List<String> wals2 = STORAGE.getWALsInQueue(serverName1, queue2); - assertEquals(10, wals1.size()); - assertEquals(10, wals2.size()); - for (int i = 0; i < 10; i++) { - assertThat(wals1, hasItems(getFileName("file1", i))); - assertThat(wals2, hasItems(getFileName("file2", i))); - } - - for (int i = 0; i < 10; i++) { - assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i))); - assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i))); - STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, null); - STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10, - null); - } - - for (int i = 0; i < 10; i++) { - assertEquals((i + 1) * 100, - STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i))); - assertEquals((i + 1) * 100 + 10, - STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i))); - } - - for (int i = 0; i < 10; i++) { - if (i % 2 == 0) { - STORAGE.removeWAL(serverName1, queue1, getFileName("file1", i)); - } else { - STORAGE.removeWAL(serverName1, queue2, getFileName("file2", i)); - } - } - - queueIds = STORAGE.getAllQueues(serverName1); - assertEquals(2, queueIds.size()); - assertThat(queueIds, hasItems("1", "2")); - - ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001); - Pair<String, SortedSet<String>> peer1 = STORAGE.claimQueue(serverName1, "1", serverName2); - - assertEquals("1-" + serverName1.getServerName(), peer1.getFirst()); - assertEquals(5, peer1.getSecond().size()); - int i = 1; - for (String wal : peer1.getSecond()) { - assertEquals(getFileName("file1", i), wal); - assertEquals((i + 1) * 100, - STORAGE.getWALPosition(serverName2, peer1.getFirst(), getFileName("file1", i))); - i += 2; - } - - queueIds = STORAGE.getAllQueues(serverName1); - assertEquals(1, queueIds.size()); - assertThat(queueIds, hasItems("2")); - wals2 = STORAGE.getWALsInQueue(serverName1, queue2); - assertEquals(5, wals2.size()); - for (i = 0; i < 10; i += 2) { - assertThat(wals2, hasItems(getFileName("file2", i))); - } - - queueIds = STORAGE.getAllQueues(serverName2); - assertEquals(1, queueIds.size()); - assertThat(queueIds, hasItems(peer1.getFirst())); - wals1 = STORAGE.getWALsInQueue(serverName2, peer1.getFirst()); - assertEquals(5, wals1.size()); - for (i = 1; i < 10; i += 2) { - assertThat(wals1, hasItems(getFileName("file1", i))); - } - - Set<String> allWals = STORAGE.getAllWALs(); - assertEquals(10, allWals.size()); - for (i = 0; i < 10; i++) { - assertThat(allWals, hasItems(i % 2 == 0 ? getFileName("file2", i) : getFileName("file1", i))); - } - } - - // For HBASE-12865 - @Test - public void testClaimQueueChangeCversion() throws ReplicationException, KeeperException { - ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); - STORAGE.addWAL(serverName1, "1", "file"); - - int v0 = STORAGE.getQueuesZNodeCversion(); - ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001); - STORAGE.claimQueue(serverName1, "1", serverName2); - int v1 = STORAGE.getQueuesZNodeCversion(); - // cversion should increase by 1 since a child node is deleted - assertEquals(1, v1 - v0); - } - - private ZKReplicationQueueStorage createWithUnstableCversion() throws IOException { - return new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()) { - - private int called = 0; - - @Override - public int getQueuesZNodeCversion() throws KeeperException { - if (called < 4) { - called++; - } - return called; - } - }; - } - - @Test - public void testGetAllWALsCversionChange() throws IOException, ReplicationException { - ZKReplicationQueueStorage storage = createWithUnstableCversion(); - storage.addWAL(getServerName(0), "1", "file"); - // This should return eventually when cversion stabilizes - Set<String> allWals = storage.getAllWALs(); - assertEquals(1, allWals.size()); - assertThat(allWals, hasItems("file")); - } - - // For HBASE-14621 - @Test - public void testGetAllHFileRefsCversionChange() throws IOException, ReplicationException { - ZKReplicationQueueStorage storage = createWithUnstableCversion(); - storage.addPeerToHFileRefs("1"); - Path p = new Path("/test"); - storage.addHFileRefs("1", Arrays.asList(Pair.newPair(p, p))); - // This should return eventually when cversion stabilizes - Set<String> allHFileRefs = storage.getAllHFileRefs(); - assertEquals(1, allHFileRefs.size()); - assertThat(allHFileRefs, hasItems("test")); - } -}
