Repository: hbase
Updated Branches:
  refs/heads/master 104f58701 -> 00095a2ef


http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 4a36e13..6d75fec 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -70,7 +70,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
-import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -170,9 +170,9 @@ public abstract class TestReplicationSourceManager {
             + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
     ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
     ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
-      ReplicationUtils.PEER_STATE_ENABLED_BYTES);
+      ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
     ZKUtil.createWithParents(zkw, "/hbase/replication/state");
-    ZKUtil.setData(zkw, "/hbase/replication/state", 
ReplicationUtils.PEER_STATE_ENABLED_BYTES);
+    ZKUtil.setData(zkw, "/hbase/replication/state", 
ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
 
     ZKClusterId.setClusterId(zkw, new ClusterId());
     FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());

http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateBasic.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateBasic.java
deleted file mode 100644
index 461420e..0000000
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateBasic.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.storage;
-
-import static org.hamcrest.CoreMatchers.hasItems;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
-import org.apache.hadoop.hbase.replication.ReplicationPeers;
-import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.replication.ReplicationUtils;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.apache.zookeeper.KeeperException;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
-
-/**
- * White box testing for replication state interfaces. Implementations should 
extend this class, and
- * initialize the interfaces properly.
- */
-public abstract class TestReplicationStateBasic {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestReplicationStateBasic.class);
-
-  protected ReplicationQueueStorage rqs;
-  protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 
1234, 12345);
-  protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 
1234, 12345);
-  protected ServerName server3 = ServerName.valueOf("hostname3.example.org", 
1234, 12345);
-  protected ReplicationPeers rp;
-  protected static final String ID_ONE = "1";
-  protected static final String ID_TWO = "2";
-  protected static String KEY_ONE;
-  protected static String KEY_TWO;
-
-  // For testing when we try to replicate to ourself
-  protected String OUR_KEY;
-
-  protected static int zkTimeoutCount;
-  protected static final int ZK_MAX_COUNT = 300;
-  protected static final int ZK_SLEEP_INTERVAL = 100; // millis
-
-  @Test
-  public void testReplicationQueueStorage() throws ReplicationException {
-    // Test methods with empty state
-    assertEquals(0, rqs.getListOfReplicators().size());
-    assertTrue(rqs.getWALsInQueue(server1, "qId1").isEmpty());
-    assertTrue(rqs.getAllQueues(server1).isEmpty());
-
-    /*
-     * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 
log files each --
-     * server2: zero queues
-     */
-    rqs.addWAL(server1, "qId1", "trash");
-    rqs.removeWAL(server1, "qId1", "trash");
-    rqs.addWAL(server1,"qId2", "filename1");
-    rqs.addWAL(server1,"qId3", "filename2");
-    rqs.addWAL(server1,"qId3", "filename3");
-    rqs.addWAL(server2,"trash", "trash");
-    rqs.removeQueue(server2,"trash");
-
-    List<ServerName> reps = rqs.getListOfReplicators();
-    assertEquals(2, reps.size());
-    assertTrue(server1.getServerName(), reps.contains(server1));
-    assertTrue(server2.getServerName(), reps.contains(server2));
-
-    assertTrue(rqs.getWALsInQueue(ServerName.valueOf("bogus", 12345, 12345), 
"bogus").isEmpty());
-    assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty());
-    assertEquals(0, rqs.getWALsInQueue(server1, "qId1").size());
-    assertEquals(1, rqs.getWALsInQueue(server1, "qId2").size());
-    assertEquals("filename1", rqs.getWALsInQueue(server1, "qId2").get(0));
-
-    assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 12345, 
-1L)).isEmpty());
-    assertEquals(0, rqs.getAllQueues(server2).size());
-    List<String> list = rqs.getAllQueues(server1);
-    assertEquals(3, list.size());
-    assertTrue(list.contains("qId2"));
-    assertTrue(list.contains("qId3"));
-  }
-
-  private void removeAllQueues(ServerName serverName) throws 
ReplicationException {
-    for (String queue: rqs.getAllQueues(serverName)) {
-      rqs.removeQueue(serverName, queue);
-    }
-  }
-  @Test
-  public void testReplicationQueues() throws ReplicationException {
-    // Initialize ReplicationPeer so we can add peers (we don't transfer lone 
queues)
-    rp.init();
-
-    rqs.removeQueue(server1, "bogus");
-    rqs.removeWAL(server1, "bogus", "bogus");
-    removeAllQueues(server1);
-    assertEquals(0, rqs.getAllQueues(server1).size());
-    assertEquals(0, rqs.getWALPosition(server1, "bogus", "bogus"));
-    assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty());
-    assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 1234, 
12345)).isEmpty());
-
-    populateQueues();
-
-    assertEquals(3, rqs.getListOfReplicators().size());
-    assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
-    assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
-    assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0"));
-    rqs.setWALPosition(server3, "qId5", "filename4", 354L, null);
-    assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4"));
-
-    assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
-    assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
-    assertEquals(0, rqs.getAllQueues(server1).size());
-    assertEquals(1, rqs.getAllQueues(server2).size());
-    assertEquals(5, rqs.getAllQueues(server3).size());
-
-    assertEquals(0, rqs.getAllQueues(server1).size());
-    rqs.removeReplicatorIfQueueIsEmpty(server1);
-    assertEquals(2, rqs.getListOfReplicators().size());
-
-    List<String> queues = rqs.getAllQueues(server3);
-    assertEquals(5, queues.size());
-    for (String queue : queues) {
-      rqs.claimQueue(server3, queue, server2);
-    }
-    rqs.removeReplicatorIfQueueIsEmpty(server3);
-    assertEquals(1, rqs.getListOfReplicators().size());
-
-    assertEquals(6, rqs.getAllQueues(server2).size());
-    removeAllQueues(server2);
-    rqs.removeReplicatorIfQueueIsEmpty(server2);
-    assertEquals(0, rqs.getListOfReplicators().size());
-  }
-
-  @Test
-  public void testHfileRefsReplicationQueues() throws ReplicationException, 
KeeperException {
-    rp.init();
-
-    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
-    files1.add(new Pair<>(null, new Path("file_1")));
-    files1.add(new Pair<>(null, new Path("file_2")));
-    files1.add(new Pair<>(null, new Path("file_3")));
-    assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
-    assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
-    rp.getPeerStorage().addPeer(ID_ONE,
-            ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), 
true);
-    rqs.addPeerToHFileRefs(ID_ONE);
-    rqs.addHFileRefs(ID_ONE, files1);
-    assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
-    assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
-    List<String> hfiles2 = new ArrayList<>(files1.size());
-    for (Pair<Path, Path> p : files1) {
-      hfiles2.add(p.getSecond().getName());
-    }
-    String removedString = hfiles2.remove(0);
-    rqs.removeHFileRefs(ID_ONE, hfiles2);
-    assertEquals(1, rqs.getReplicableHFiles(ID_ONE).size());
-    hfiles2 = new ArrayList<>(1);
-    hfiles2.add(removedString);
-    rqs.removeHFileRefs(ID_ONE, hfiles2);
-    assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size());
-    rp.getPeerStorage().removePeer(ID_ONE);
-  }
-
-  @Test
-  public void testRemovePeerForHFileRefs() throws ReplicationException, 
KeeperException {
-    rp.init();
-    rp.getPeerStorage().addPeer(ID_ONE,
-      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
-    rqs.addPeerToHFileRefs(ID_ONE);
-    rp.getPeerStorage().addPeer(ID_TWO,
-      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true);
-    rqs.addPeerToHFileRefs(ID_TWO);
-
-    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
-    files1.add(new Pair<>(null, new Path("file_1")));
-    files1.add(new Pair<>(null, new Path("file_2")));
-    files1.add(new Pair<>(null, new Path("file_3")));
-    rqs.addHFileRefs(ID_ONE, files1);
-    rqs.addHFileRefs(ID_TWO, files1);
-    assertEquals(2, rqs.getAllPeersFromHFileRefsQueue().size());
-    assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
-    assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
-
-    rp.getPeerStorage().removePeer(ID_ONE);
-    rqs.removePeerFromHFileRefs(ID_ONE);
-    assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
-    assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
-    assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
-
-    rp.getPeerStorage().removePeer(ID_TWO);
-    rqs.removePeerFromHFileRefs(ID_TWO);
-    assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
-    assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty());
-  }
-
-  @Test
-  public void testReplicationPeers() throws Exception {
-    rp.init();
-
-    try {
-      rp.getPeerStorage().setPeerState("bogus", true);
-      fail("Should have thrown an ReplicationException when passed a non-exist 
bogus peerId");
-    } catch (ReplicationException e) {
-    }
-    try {
-      rp.getPeerStorage().setPeerState("bogus", false);
-      fail("Should have thrown an ReplicationException when passed a non-exist 
bogus peerId");
-    } catch (ReplicationException e) {
-    }
-
-    try {
-      assertFalse(rp.addPeer("bogus"));
-      fail("Should have thrown an ReplicationException when creating a bogus 
peerId "
-          + "with null peer config");
-    } catch (ReplicationException e) {
-    }
-
-    assertNumberOfPeers(0);
-
-    // Add some peers
-    rp.getPeerStorage().addPeer(ID_ONE, new 
ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
-    assertNumberOfPeers(1);
-    rp.getPeerStorage().addPeer(ID_TWO, new 
ReplicationPeerConfig().setClusterKey(KEY_TWO), true);
-    assertNumberOfPeers(2);
-
-    assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils
-        
.getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), 
rp.getConf())));
-    rp.getPeerStorage().removePeer(ID_ONE);
-    rp.removePeer(ID_ONE);
-    assertNumberOfPeers(1);
-
-    // Add one peer
-    rp.getPeerStorage().addPeer(ID_ONE, new 
ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
-    rp.addPeer(ID_ONE);
-    assertNumberOfPeers(2);
-    assertTrue(rp.getPeer(ID_ONE).isPeerEnabled());
-    rp.getPeerStorage().setPeerState(ID_ONE, false);
-    // now we do not rely on zk watcher to trigger the state change so we need 
to trigger it
-    // manually...
-    ReplicationPeerImpl peer = rp.getPeer(ID_ONE);
-    rp.refreshPeerState(peer.getId());
-    assertEquals(PeerState.DISABLED, peer.getPeerState());
-    assertConnectedPeerStatus(false, ID_ONE);
-    rp.getPeerStorage().setPeerState(ID_ONE, true);
-    // now we do not rely on zk watcher to trigger the state change so we need 
to trigger it
-    // manually...
-    rp.refreshPeerState(peer.getId());
-    assertEquals(PeerState.ENABLED, peer.getPeerState());
-    assertConnectedPeerStatus(true, ID_ONE);
-
-    // Disconnect peer
-    rp.removePeer(ID_ONE);
-    assertNumberOfPeers(2);
-  }
-
-  private String getFileName(String base, int i) {
-    return String.format(base + "-%04d", i);
-  }
-
-  @Test
-  public void testPersistLogPositionAndSeqIdAtomically() throws Exception {
-    ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
-    assertTrue(rqs.getAllQueues(serverName1).isEmpty());
-    String queue1 = "1";
-    String region0 = "region0", region1 = "region1";
-    for (int i = 0; i < 10; i++) {
-      rqs.addWAL(serverName1, queue1, getFileName("file1", i));
-    }
-    List<String> queueIds = rqs.getAllQueues(serverName1);
-    assertEquals(1, queueIds.size());
-    assertThat(queueIds, hasItems("1"));
-
-    List<String> wals1 = rqs.getWALsInQueue(serverName1, queue1);
-    assertEquals(10, wals1.size());
-    for (int i = 0; i < 10; i++) {
-      assertThat(wals1, hasItems(getFileName("file1", i)));
-    }
-
-    for (int i = 0; i < 10; i++) {
-      assertEquals(0, rqs.getWALPosition(serverName1, queue1, 
getFileName("file1", i)));
-    }
-    assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region0, queue1));
-    assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region1, queue1));
-
-    for (int i = 0; i < 10; i++) {
-      rqs.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) 
* 100,
-        ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L));
-    }
-
-    for (int i = 0; i < 10; i++) {
-      assertEquals((i + 1) * 100, rqs.getWALPosition(serverName1, queue1, 
getFileName("file1", i)));
-    }
-    assertEquals(900L, rqs.getLastSequenceId(region0, queue1));
-    assertEquals(1000L, rqs.getLastSequenceId(region1, queue1));
-  }
-
-  protected void assertConnectedPeerStatus(boolean status, String peerId) 
throws Exception {
-    // we can first check if the value was changed in the store, if it wasn't 
then fail right away
-    if (status != rp.getPeerStorage().isPeerEnabled(peerId)) {
-      fail("ConnectedPeerStatus was " + !status + " but expected " + status + 
" in ZK");
-    }
-    while (true) {
-      if (status == rp.getPeer(peerId).isPeerEnabled()) {
-        return;
-      }
-      if (zkTimeoutCount < ZK_MAX_COUNT) {
-        LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + 
status
-            + ", sleeping and trying again.");
-        Thread.sleep(ZK_SLEEP_INTERVAL);
-      } else {
-        fail("Timed out waiting for ConnectedPeerStatus to be " + status);
-      }
-    }
-  }
-
-  protected void assertNumberOfPeers(int total) throws ReplicationException {
-    assertEquals(total, rp.getPeerStorage().listPeerIds().size());
-  }
-
-  /*
-   * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 
has 5 queues with 1, 2,
-   * 3, 4, 5 log files respectively
-   */
-  protected void populateQueues() throws ReplicationException {
-    rqs.addWAL(server1, "trash", "trash");
-    rqs.removeQueue(server1, "trash");
-
-    rqs.addWAL(server2, "qId1", "trash");
-    rqs.removeWAL(server2, "qId1", "trash");
-
-    for (int i = 1; i < 6; i++) {
-      for (int j = 0; j < i; j++) {
-        rqs.addWAL(server3, "qId" + i, "filename" + j);
-      }
-      // Add peers for the corresponding queues so they are not orphans
-      rp.getPeerStorage().addPeer("qId" + i,
-        
ReplicationPeerConfig.newBuilder().setClusterKey("localhost:2818:/bogus" + 
i).build(),
-        true);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateTableImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateTableImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateTableImpl.java
deleted file mode 100644
index d073669..0000000
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateTableImpl.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication.storage;
-
-import java.io.IOException;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ClusterId;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
-import org.apache.hadoop.hbase.replication.TableReplicationPeerStorage;
-import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage;
-import org.apache.hadoop.hbase.replication.TableReplicationStorageBase;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-import org.apache.zookeeper.KeeperException;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.experimental.categories.Category;
-
-@Category({ ReplicationTests.class, MediumTests.class })
-public class TestReplicationStateTableImpl extends TestReplicationStateBasic {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestReplicationStateTableImpl.class);
-
-  private static Configuration conf;
-  private static HBaseTestingUtility utility = new HBaseTestingUtility();
-  private static ZKWatcher zkw;
-  private static Connection connection;
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    conf = utility.getConfiguration();
-    conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
-    conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
-    utility.startMiniCluster();
-
-    // After the HBase Mini cluster startup, we set the storage implementation 
to table based
-    // implementation. Otherwise, we cannot setup the HBase Mini Cluster 
because the master will
-    // list peers before finish its initialization, and if master cannot 
finish initialization, the
-    // meta cannot be online, in other hand, if meta cannot be online, the 
list peers never success
-    // when using table based replication. a dead loop happen.
-    // Our UTs are written for testing storage layer, so no problem here.
-    conf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
-      TableReplicationPeerStorage.class.getName());
-    conf.set(ReplicationStorageFactory.REPLICATION_QUEUE_STORAGE_IMPL,
-      TableReplicationQueueStorage.class.getName());
-
-    zkw = utility.getZooKeeperWatcher();
-    connection = ConnectionFactory.createConnection(conf);
-
-    KEY_ONE = initPeerClusterState("/hbase1");
-    KEY_TWO = initPeerClusterState("/hbase2");
-  }
-
-  private static String initPeerClusterState(String baseZKNode)
-      throws IOException, KeeperException {
-    // Add a dummy region server and set up the cluster id
-    Configuration testConf = new Configuration(conf);
-    testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode);
-    ZKWatcher zkw1 = new ZKWatcher(testConf, "test1", null);
-    String fakeRs = ZNodePaths.joinZNode(zkw1.znodePaths.rsZNode, 
"hostname1.example.org:1234");
-    ZKUtil.createWithParents(zkw1, fakeRs);
-    ZKClusterId.setClusterId(zkw1, new ClusterId());
-    return ZKConfig.getZooKeeperClusterKey(testConf);
-  }
-
-  @Before
-  public void setUp() throws IOException {
-    rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
-    rp = ReplicationFactory.getReplicationPeers(zkw, conf);
-    OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
-
-    // Create hbase:replication meta table.
-    try (Admin admin = connection.getAdmin()) {
-      TableDescriptor table =
-          
TableReplicationStorageBase.createReplicationTableDescBuilder(conf).build();
-      admin.createTable(table);
-    }
-  }
-
-  @After
-  public void tearDown() throws KeeperException, IOException {
-    // Drop the hbase:replication meta table.
-    utility.deleteTable(TableReplicationStorageBase.REPLICATION_TABLE);
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    if (connection != null) {
-      IOUtils.closeQuietly(connection);
-    }
-    utility.shutdownMiniZKCluster();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateZKImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateZKImpl.java
deleted file mode 100644
index 993f2fb..0000000
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateZKImpl.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.storage;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ClusterId;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseZKTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-import org.apache.zookeeper.KeeperException;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.experimental.categories.Category;
-
-@Category({ ReplicationTests.class, MediumTests.class })
-public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestReplicationStateZKImpl.class);
-
-  private static Configuration conf;
-  private static HBaseZKTestingUtility utility;
-  private static ZKWatcher zkw;
-  private static String replicationZNode;
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    utility = new HBaseZKTestingUtility();
-    utility.startMiniZKCluster();
-    conf = utility.getConfiguration();
-    conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
-    zkw = utility.getZooKeeperWatcher();
-    String replicationZNodeName = conf.get("zookeeper.znode.replication", 
"replication");
-    replicationZNode = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, 
replicationZNodeName);
-    KEY_ONE = initPeerClusterState("/hbase1");
-    KEY_TWO = initPeerClusterState("/hbase2");
-  }
-
-  private static String initPeerClusterState(String baseZKNode)
-      throws IOException, KeeperException {
-    // Add a dummy region server and set up the cluster id
-    Configuration testConf = new Configuration(conf);
-    testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode);
-    ZKWatcher zkw1 = new ZKWatcher(testConf, "test1", null);
-    String fakeRs = ZNodePaths.joinZNode(zkw1.znodePaths.rsZNode, 
"hostname1.example.org:1234");
-    ZKUtil.createWithParents(zkw1, fakeRs);
-    ZKClusterId.setClusterId(zkw1, new ClusterId());
-    return ZKConfig.getZooKeeperClusterKey(testConf);
-  }
-
-  @Before
-  public void setUp() {
-    zkTimeoutCount = 0;
-    rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
-    rp = ReplicationFactory.getReplicationPeers(zkw, conf);
-    OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
-  }
-
-  @After
-  public void tearDown() throws KeeperException, IOException {
-    ZKUtil.deleteNodeRecursively(zkw, replicationZNode);
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    utility.shutdownMiniZKCluster();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationPeerStorage.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationPeerStorage.java
deleted file mode 100644
index 190eef4..0000000
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationPeerStorage.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.storage;
-
-import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toSet;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.stream.Stream;
-
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseZKTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ ReplicationTests.class, MediumTests.class })
-public class TestZKReplicationPeerStorage {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestZKReplicationPeerStorage.class);
-
-  private static final HBaseZKTestingUtility UTIL = new 
HBaseZKTestingUtility();
-
-  private static ZKReplicationPeerStorage STORAGE;
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    UTIL.startMiniZKCluster();
-    STORAGE = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), 
UTIL.getConfiguration());
-  }
-
-  @AfterClass
-  public static void tearDown() throws IOException {
-    UTIL.shutdownMiniZKCluster();
-  }
-
-  private Set<String> randNamespaces(Random rand) {
-    return Stream.generate(() -> 
Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
-        .collect(toSet());
-  }
-
-  private Map<TableName, List<String>> randTableCFs(Random rand) {
-    int size = rand.nextInt(5);
-    Map<TableName, List<String>> map = new HashMap<>();
-    for (int i = 0; i < size; i++) {
-      TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
-      List<String> cfs = Stream.generate(() -> 
Long.toHexString(rand.nextLong()))
-          .limit(rand.nextInt(5)).collect(toList());
-      map.put(tn, cfs);
-    }
-    return map;
-  }
-
-  private ReplicationPeerConfig getConfig(int seed) {
-    Random rand = new Random(seed);
-    return 
ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(rand.nextLong()))
-        .setReplicationEndpointImpl(Long.toHexString(rand.nextLong()))
-        
.setNamespaces(randNamespaces(rand)).setExcludeNamespaces(randNamespaces(rand))
-        
.setTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean())
-        .setBandwidth(rand.nextInt(1000)).build();
-  }
-
-  private void assertSetEquals(Set<String> expected, Set<String> actual) {
-    if (expected == null || expected.size() == 0) {
-      assertTrue(actual == null || actual.size() == 0);
-      return;
-    }
-    assertEquals(expected.size(), actual.size());
-    expected.forEach(s -> assertTrue(actual.contains(s)));
-  }
-
-  private void assertMapEquals(Map<TableName, List<String>> expected,
-      Map<TableName, List<String>> actual) {
-    if (expected == null || expected.size() == 0) {
-      assertTrue(actual == null || actual.size() == 0);
-      return;
-    }
-    assertEquals(expected.size(), actual.size());
-    expected.forEach((expectedTn, expectedCFs) -> {
-      List<String> actualCFs = actual.get(expectedTn);
-      if (expectedCFs == null || expectedCFs.size() == 0) {
-        assertTrue(actual.containsKey(expectedTn));
-        assertTrue(actualCFs == null || actualCFs.size() == 0);
-      } else {
-        assertNotNull(actualCFs);
-        assertEquals(expectedCFs.size(), actualCFs.size());
-        for (Iterator<String> expectedIt = expectedCFs.iterator(), actualIt = 
actualCFs.iterator();
-          expectedIt.hasNext();) {
-          assertEquals(expectedIt.next(), actualIt.next());
-        }
-      }
-    });
-  }
-
-  private void assertConfigEquals(ReplicationPeerConfig expected, 
ReplicationPeerConfig actual) {
-    assertEquals(expected.getClusterKey(), actual.getClusterKey());
-    assertEquals(expected.getReplicationEndpointImpl(), 
actual.getReplicationEndpointImpl());
-    assertSetEquals(expected.getNamespaces(), actual.getNamespaces());
-    assertSetEquals(expected.getExcludeNamespaces(), 
actual.getExcludeNamespaces());
-    assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap());
-    assertMapEquals(expected.getExcludeTableCFsMap(), 
actual.getExcludeTableCFsMap());
-    assertEquals(expected.replicateAllUserTables(), 
actual.replicateAllUserTables());
-    assertEquals(expected.getBandwidth(), actual.getBandwidth());
-  }
-
-  @Test
-  public void test() throws ReplicationException {
-    int peerCount = 10;
-    for (int i = 0; i < peerCount; i++) {
-      STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0);
-    }
-    List<String> peerIds = STORAGE.listPeerIds();
-    assertEquals(peerCount, peerIds.size());
-    for (String peerId : peerIds) {
-      int seed = Integer.parseInt(peerId);
-      assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId));
-    }
-    for (int i = 0; i < peerCount; i++) {
-      STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1));
-    }
-    for (String peerId : peerIds) {
-      int seed = Integer.parseInt(peerId);
-      assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId));
-    }
-    for (int i = 0; i < peerCount; i++) {
-      assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i)));
-    }
-    for (int i = 0; i < peerCount; i++) {
-      STORAGE.setPeerState(Integer.toString(i), i % 2 != 0);
-    }
-    for (int i = 0; i < peerCount; i++) {
-      assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i)));
-    }
-    String toRemove = Integer.toString(peerCount / 2);
-    STORAGE.removePeer(toRemove);
-    peerIds = STORAGE.listPeerIds();
-    assertEquals(peerCount - 1, peerIds.size());
-    assertFalse(peerIds.contains(toRemove));
-
-    try {
-      STORAGE.getPeerConfig(toRemove);
-      fail("Should throw a ReplicationException when get peer config of a 
peerId");
-    } catch (ReplicationException e) {
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationQueueStorage.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationQueueStorage.java
deleted file mode 100644
index 780ff2a..0000000
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationQueueStorage.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.storage;
-
-import static org.hamcrest.CoreMatchers.hasItems;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.SortedSet;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseZKTestingUtility;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.zookeeper.KeeperException;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ ReplicationTests.class, MediumTests.class })
-public class TestZKReplicationQueueStorage {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class);
-
-  private static final HBaseZKTestingUtility UTIL = new 
HBaseZKTestingUtility();
-
-  private static ZKReplicationQueueStorage STORAGE;
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    UTIL.startMiniZKCluster();
-    STORAGE = new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), 
UTIL.getConfiguration());
-  }
-
-  @AfterClass
-  public static void tearDown() throws IOException {
-    UTIL.shutdownMiniZKCluster();
-  }
-
-  @After
-  public void tearDownAfterTest() throws ReplicationException {
-    for (ServerName serverName : STORAGE.getListOfReplicators()) {
-      for (String queue : STORAGE.getAllQueues(serverName)) {
-        STORAGE.removeQueue(serverName, queue);
-      }
-      STORAGE.removeReplicatorIfQueueIsEmpty(serverName);
-    }
-    for (String peerId : STORAGE.getAllPeersFromHFileRefsQueue()) {
-      STORAGE.removePeerFromHFileRefs(peerId);
-    }
-  }
-
-  private ServerName getServerName(int i) {
-    return ServerName.valueOf("127.0.0.1", 8000 + i, 10000 + i);
-  }
-
-  @Test
-  public void testReplicator() throws ReplicationException {
-    assertTrue(STORAGE.getListOfReplicators().isEmpty());
-    String queueId = "1";
-    for (int i = 0; i < 10; i++) {
-      STORAGE.addWAL(getServerName(i), queueId, "file" + i);
-    }
-    List<ServerName> replicators = STORAGE.getListOfReplicators();
-    assertEquals(10, replicators.size());
-    for (int i = 0; i < 10; i++) {
-      assertThat(replicators, hasItems(getServerName(i)));
-    }
-    for (int i = 0; i < 5; i++) {
-      STORAGE.removeQueue(getServerName(i), queueId);
-    }
-    for (int i = 0; i < 10; i++) {
-      STORAGE.removeReplicatorIfQueueIsEmpty(getServerName(i));
-    }
-    replicators = STORAGE.getListOfReplicators();
-    assertEquals(5, replicators.size());
-    for (int i = 5; i < 10; i++) {
-      assertThat(replicators, hasItems(getServerName(i)));
-    }
-  }
-
-  private String getFileName(String base, int i) {
-    return String.format(base + "-%04d", i);
-  }
-
-  @Test
-  public void testAddRemoveLog() throws ReplicationException {
-    ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
-    assertTrue(STORAGE.getAllQueues(serverName1).isEmpty());
-    String queue1 = "1";
-    String queue2 = "2";
-    for (int i = 0; i < 10; i++) {
-      STORAGE.addWAL(serverName1, queue1, getFileName("file1", i));
-      STORAGE.addWAL(serverName1, queue2, getFileName("file2", i));
-    }
-    List<String> queueIds = STORAGE.getAllQueues(serverName1);
-    assertEquals(2, queueIds.size());
-    assertThat(queueIds, hasItems("1", "2"));
-
-    List<String> wals1 = STORAGE.getWALsInQueue(serverName1, queue1);
-    List<String> wals2 = STORAGE.getWALsInQueue(serverName1, queue2);
-    assertEquals(10, wals1.size());
-    assertEquals(10, wals2.size());
-    for (int i = 0; i < 10; i++) {
-      assertThat(wals1, hasItems(getFileName("file1", i)));
-      assertThat(wals2, hasItems(getFileName("file2", i)));
-    }
-
-    for (int i = 0; i < 10; i++) {
-      assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, 
getFileName("file1", i)));
-      assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, 
getFileName("file2", i)));
-      STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i 
+ 1) * 100, null);
-      STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i 
+ 1) * 100 + 10,
-        null);
-    }
-
-    for (int i = 0; i < 10; i++) {
-      assertEquals((i + 1) * 100,
-        STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
-      assertEquals((i + 1) * 100 + 10,
-        STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
-    }
-
-    for (int i = 0; i < 10; i++) {
-      if (i % 2 == 0) {
-        STORAGE.removeWAL(serverName1, queue1, getFileName("file1", i));
-      } else {
-        STORAGE.removeWAL(serverName1, queue2, getFileName("file2", i));
-      }
-    }
-
-    queueIds = STORAGE.getAllQueues(serverName1);
-    assertEquals(2, queueIds.size());
-    assertThat(queueIds, hasItems("1", "2"));
-
-    ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001);
-    Pair<String, SortedSet<String>> peer1 = STORAGE.claimQueue(serverName1, 
"1", serverName2);
-
-    assertEquals("1-" + serverName1.getServerName(), peer1.getFirst());
-    assertEquals(5, peer1.getSecond().size());
-    int i = 1;
-    for (String wal : peer1.getSecond()) {
-      assertEquals(getFileName("file1", i), wal);
-      assertEquals((i + 1) * 100,
-        STORAGE.getWALPosition(serverName2, peer1.getFirst(), 
getFileName("file1", i)));
-      i += 2;
-    }
-
-    queueIds = STORAGE.getAllQueues(serverName1);
-    assertEquals(1, queueIds.size());
-    assertThat(queueIds, hasItems("2"));
-    wals2 = STORAGE.getWALsInQueue(serverName1, queue2);
-    assertEquals(5, wals2.size());
-    for (i = 0; i < 10; i += 2) {
-      assertThat(wals2, hasItems(getFileName("file2", i)));
-    }
-
-    queueIds = STORAGE.getAllQueues(serverName2);
-    assertEquals(1, queueIds.size());
-    assertThat(queueIds, hasItems(peer1.getFirst()));
-    wals1 = STORAGE.getWALsInQueue(serverName2, peer1.getFirst());
-    assertEquals(5, wals1.size());
-    for (i = 1; i < 10; i += 2) {
-      assertThat(wals1, hasItems(getFileName("file1", i)));
-    }
-
-    Set<String> allWals = STORAGE.getAllWALs();
-    assertEquals(10, allWals.size());
-    for (i = 0; i < 10; i++) {
-      assertThat(allWals, hasItems(i % 2 == 0 ? getFileName("file2", i) : 
getFileName("file1", i)));
-    }
-  }
-
-  // For HBASE-12865
-  @Test
-  public void testClaimQueueChangeCversion() throws ReplicationException, 
KeeperException {
-    ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
-    STORAGE.addWAL(serverName1, "1", "file");
-
-    int v0 = STORAGE.getQueuesZNodeCversion();
-    ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001);
-    STORAGE.claimQueue(serverName1, "1", serverName2);
-    int v1 = STORAGE.getQueuesZNodeCversion();
-    // cversion should increase by 1 since a child node is deleted
-    assertEquals(1, v1 - v0);
-  }
-
-  private ZKReplicationQueueStorage createWithUnstableCversion() throws 
IOException {
-    return new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), 
UTIL.getConfiguration()) {
-
-      private int called = 0;
-
-      @Override
-      public int getQueuesZNodeCversion() throws KeeperException {
-        if (called < 4) {
-          called++;
-        }
-        return called;
-      }
-    };
-  }
-
-  @Test
-  public void testGetAllWALsCversionChange() throws IOException, 
ReplicationException {
-    ZKReplicationQueueStorage storage = createWithUnstableCversion();
-    storage.addWAL(getServerName(0), "1", "file");
-    // This should return eventually when cversion stabilizes
-    Set<String> allWals = storage.getAllWALs();
-    assertEquals(1, allWals.size());
-    assertThat(allWals, hasItems("file"));
-  }
-
-  // For HBASE-14621
-  @Test
-  public void testGetAllHFileRefsCversionChange() throws IOException, 
ReplicationException {
-    ZKReplicationQueueStorage storage = createWithUnstableCversion();
-    storage.addPeerToHFileRefs("1");
-    Path p = new Path("/test");
-    storage.addHFileRefs("1", Arrays.asList(Pair.newPair(p, p)));
-    // This should return eventually when cversion stabilizes
-    Set<String> allHFileRefs = storage.getAllHFileRefs();
-    assertEquals(1, allHFileRefs.size());
-    assertThat(allHFileRefs, hasItems("test"));
-  }
-}

Reply via email to