Revert "HBASE-19665 Add table based replication peers/queues storage back"

This reverts commit 31978c31bbf363d98c50cc6b293105a085888471.

 Conflicts:
        
hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationStorageBase.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/00095a2e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/00095a2e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/00095a2e

Branch: refs/heads/master
Commit: 00095a2ef9442e3fd86c04876c9d91f2f8b23ad8
Parents: 104f587
Author: zhangduo <[email protected]>
Authored: Sat Mar 17 20:25:27 2018 +0800
Committer: zhangduo <[email protected]>
Committed: Sat Mar 17 20:25:27 2018 +0800

----------------------------------------------------------------------
 .../replication/ReplicationPeerStorage.java     |   3 +-
 .../replication/ReplicationStorageFactory.java  |  20 +-
 .../hbase/replication/ReplicationUtils.java     |  13 -
 .../TableReplicationPeerStorage.java            | 171 ------
 .../TableReplicationQueueStorage.java           | 522 -------------------
 .../TableReplicationStorageBase.java            | 127 -----
 .../replication/ZKReplicationPeerStorage.java   |  16 +-
 .../replication/ZKReplicationQueueStorage.java  |   6 +-
 .../replication/TestReplicationStateBasic.java  | 363 +++++++++++++
 .../replication/TestReplicationStateZKImpl.java |  95 ++++
 .../TestZKReplicationPeerStorage.java           | 178 +++++++
 .../TestZKReplicationQueueStorage.java          | 252 +++++++++
 .../TestReplicationSourceManager.java           |   6 +-
 .../storage/TestReplicationStateBasic.java      | 370 -------------
 .../storage/TestReplicationStateTableImpl.java  | 129 -----
 .../storage/TestReplicationStateZKImpl.java     |  98 ----
 .../storage/TestZKReplicationPeerStorage.java   | 182 -------
 .../storage/TestZKReplicationQueueStorage.java  | 255 ---------
 18 files changed, 907 insertions(+), 1899 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
index 4684f08..1adda02 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
@@ -42,8 +42,7 @@ public interface ReplicationPeerStorage {
 
   /**
    * Set the state of peer, {@code true} to {@code ENABLED}, otherwise to 
{@code DISABLED}.
-   * @throws ReplicationException if there are errors accessing the storage 
service or peer does not
-   *           exist.
+   * @throws ReplicationException if there are errors accessing the storage 
service.
    */
   void setPeerState(String peerId, boolean enabled) throws 
ReplicationException;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
index cbfec3b..462cfed 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.replication;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -30,15 +29,6 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public final class ReplicationStorageFactory {
 
-  public static final String REPLICATION_PEER_STORAGE_IMPL = 
"hbase.replication.peer.storage.impl";
-  public static final String DEFAULT_REPLICATION_PEER_STORAGE_IMPL =
-      ZKReplicationPeerStorage.class.getName();
-
-  public static final String REPLICATION_QUEUE_STORAGE_IMPL =
-      "hbase.replication.queue.storage.impl";
-  public static final String DEFAULT_REPLICATION_QUEUE_STORAGE_IMPL =
-      ZKReplicationQueueStorage.class.getName();
-
   private ReplicationStorageFactory() {
   }
 
@@ -46,10 +36,7 @@ public final class ReplicationStorageFactory {
    * Create a new {@link ReplicationPeerStorage}.
    */
   public static ReplicationPeerStorage getReplicationPeerStorage(ZKWatcher zk, 
Configuration conf) {
-    String peerStorageClass =
-        conf.get(REPLICATION_PEER_STORAGE_IMPL, 
DEFAULT_REPLICATION_PEER_STORAGE_IMPL);
-    return ReflectionUtils.instantiateWithCustomCtor(peerStorageClass,
-      new Class[] { ZKWatcher.class, Configuration.class }, new Object[] { zk, 
conf });
+    return new ZKReplicationPeerStorage(zk, conf);
   }
 
   /**
@@ -57,9 +44,6 @@ public final class ReplicationStorageFactory {
    */
   public static ReplicationQueueStorage getReplicationQueueStorage(ZKWatcher 
zk,
       Configuration conf) {
-    String queueStorageClass =
-        conf.get(REPLICATION_QUEUE_STORAGE_IMPL, 
DEFAULT_REPLICATION_QUEUE_STORAGE_IMPL);
-    return ReflectionUtils.instantiateWithCustomCtor(queueStorageClass,
-      new Class[] { ZKWatcher.class, Configuration.class }, new Object[] { zk, 
conf });
+    return new ZKReplicationQueueStorage(zk, conf);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index 2e86c17..e2479e0 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hbase.replication;
 
-import static 
org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.toByteArray;
-
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
@@ -32,19 +30,12 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
-
 /**
  * Helper class for replication.
  */
 @InterfaceAudience.Private
 public final class ReplicationUtils {
 
-  public static final byte[] PEER_STATE_ENABLED_BYTES =
-      toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
-  public static final byte[] PEER_STATE_DISABLED_BYTES =
-      toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
-
   private ReplicationUtils() {
   }
 
@@ -182,8 +173,4 @@ public final class ReplicationUtils {
       return tableCFs != null && tableCFs.containsKey(tableName);
     }
   }
-
-  public static String parsePeerIdFromQueueId(String queueId) {
-    return new ReplicationQueueInfo(queueId).getPeerId();
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationPeerStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationPeerStorage.java
deleted file mode 100644
index ee7969b..0000000
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationPeerStorage.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import static 
org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_DISABLED_BYTES;
-import static 
org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_ENABLED_BYTES;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * Table based replication peer storage.
- */
[email protected]
-public class TableReplicationPeerStorage extends TableReplicationStorageBase
-    implements ReplicationPeerStorage {
-
-  public TableReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) 
throws IOException {
-    super(zookeeper, conf);
-  }
-
-  @Override
-  public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean 
enabled)
-      throws ReplicationException {
-    try (Table table = getReplicationMetaTable()) {
-      Put put = new Put(Bytes.toBytes(peerId));
-      put.addColumn(FAMILY_PEER, QUALIFIER_PEER_CONFIG,
-        ReplicationPeerConfigUtil.toByteArray(peerConfig));
-      put.addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE,
-        enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES);
-      table.put(put);
-    } catch (IOException e) {
-      throw new ReplicationException("Failed to add peer " + peerId, e);
-    }
-  }
-
-  @Override
-  public void removePeer(String peerId) throws ReplicationException {
-    try (Table table = getReplicationMetaTable()) {
-      Delete delete = new Delete(Bytes.toBytes(peerId));
-      table.delete(delete);
-    } catch (IOException e) {
-      throw new ReplicationException("Failed to remove peer " + peerId, e);
-    }
-  }
-
-  // TODO make it to be a checkExistAndMutate operation.
-  private boolean peerExist(String peerId, Table table) throws IOException {
-    Get get = new Get(Bytes.toBytes(peerId));
-    get.addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE);
-    return table.exists(get);
-  }
-
-  @Override
-  public void setPeerState(String peerId, boolean enabled) throws 
ReplicationException {
-    try (Table table = getReplicationMetaTable()) {
-      if (!peerExist(peerId, table)) {
-        throw new ReplicationException("Peer " + peerId + " does not exist.");
-      }
-      Put put = new Put(Bytes.toBytes(peerId));
-      put.addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE,
-        enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES);
-      table.put(put);
-    } catch (IOException e) {
-      throw new ReplicationException(
-          "Failed to set peer state, peerId=" + peerId + ", state=" + enabled, 
e);
-    }
-  }
-
-  @Override
-  public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
-      throws ReplicationException {
-    try (Table table = getReplicationMetaTable()) {
-      if (!peerExist(peerId, table)) {
-        throw new ReplicationException("Peer " + peerId + " does not exist.");
-      }
-      Put put = new Put(Bytes.toBytes(peerId));
-      put.addColumn(FAMILY_PEER, QUALIFIER_PEER_CONFIG,
-        ReplicationPeerConfigUtil.toByteArray(peerConfig));
-      table.put(put);
-    } catch (IOException e) {
-      throw new ReplicationException("Failed to update peer configuration, 
peerId=" + peerId, e);
-    }
-  }
-
-  @Override
-  public List<String> listPeerIds() throws ReplicationException {
-    try (Table table = getReplicationMetaTable()) {
-      Scan scan = new Scan().addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE);
-      try (ResultScanner scanner = table.getScanner(scan)) {
-        List<String> peerIds = new ArrayList<>();
-        for (Result r : scanner) {
-          peerIds.add(Bytes.toString(r.getRow()));
-        }
-        return peerIds;
-      }
-    } catch (IOException e) {
-      throw new ReplicationException("Failed to list peers", e);
-    }
-  }
-
-  @Override
-  public boolean isPeerEnabled(String peerId) throws ReplicationException {
-    try (Table table = getReplicationMetaTable()) {
-      Get get = new Get(Bytes.toBytes(peerId)).addColumn(FAMILY_PEER, 
QUALIFIER_PEER_STATE);
-      Result r = table.get(get);
-      if (r == null) {
-        throw new ReplicationException("Peer " + peerId + " does not found");
-      }
-      return Arrays.equals(PEER_STATE_ENABLED_BYTES, r.getValue(FAMILY_PEER, 
QUALIFIER_PEER_STATE));
-    } catch (IOException e) {
-      throw new ReplicationException("Failed to read the peer state, peerId=" 
+ peerId, e);
-    }
-  }
-
-  @Override
-  public ReplicationPeerConfig getPeerConfig(String peerId) throws 
ReplicationException {
-    try (Table table = getReplicationMetaTable()) {
-      Get get = new Get(Bytes.toBytes(peerId)).addColumn(FAMILY_PEER, 
QUALIFIER_PEER_CONFIG);
-      Result r = table.get(get);
-      if (r == null) {
-        throw new ReplicationException("Peer " + peerId + " does not found");
-      }
-      byte[] data = r.getValue(FAMILY_PEER, QUALIFIER_PEER_CONFIG);
-      if (data == null || data.length == 0) {
-        throw new ReplicationException(
-            "Replication peer config data shouldn't be empty, peerId=" + 
peerId);
-      }
-      try {
-        return ReplicationPeerConfigUtil.parsePeerFrom(data);
-      } catch (DeserializationException e) {
-        throw new ReplicationException(
-            "Failed to parse replication peer config for peer with id=" + 
peerId, e);
-      }
-    } catch (IOException e) {
-      throw new ReplicationException(
-          "Failed to read the peer configuration in hbase:replication, 
peerId=" + peerId, e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
deleted file mode 100644
index abb279d..0000000
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
+++ /dev/null
@@ -1,522 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.CollectionUtils;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Table based replication queue storage.
- */
[email protected]
-public class TableReplicationQueueStorage extends TableReplicationStorageBase
-    implements ReplicationQueueStorage {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(TableReplicationQueueStorage.class);
-
-  public TableReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) 
throws IOException {
-    super(zookeeper, conf);
-  }
-
-  /**
-   * Serialize the {fileName, position} pair into a byte array.
-   */
-  private static byte[] makeByteArray(String fileName, long position) {
-    byte[] data = new byte[Bytes.SIZEOF_INT + fileName.length() + 
Bytes.SIZEOF_LONG];
-    int pos = 0;
-    pos = Bytes.putInt(data, pos, fileName.length());
-    pos = Bytes.putBytes(data, pos, Bytes.toBytes(fileName), 0, 
fileName.length());
-    pos = Bytes.putLong(data, pos, position);
-    assert pos == data.length;
-    return data;
-  }
-
-  /**
-   * Deserialize the byte array into a {filename, position} pair.
-   */
-  private static Pair<String, Long> parseFileNameAndPosition(byte[] data, int 
offset)
-      throws IOException {
-    if (data == null) {
-      throw new IOException("The byte array shouldn't be null");
-    }
-    int pos = offset;
-    int len = Bytes.toInt(data, pos, Bytes.SIZEOF_INT);
-    pos += Bytes.SIZEOF_INT;
-    if (pos + len > data.length) {
-      throw new IllegalArgumentException("offset (" + pos + ") + length (" + 
len + ") exceed the"
-          + " capacity of the array: " + data.length);
-    }
-    String fileName = Bytes.toString(Bytes.copy(data, pos, len));
-    pos += len;
-    long position = Bytes.toLong(data, pos, Bytes.SIZEOF_LONG);
-    return new Pair<>(fileName, position);
-  }
-
-  @Override
-  public void removeQueue(ServerName serverName, String queueId) throws 
ReplicationException {
-    try (Table table = getReplicationMetaTable()) {
-      Delete delete = new Delete(getServerNameRowKey(serverName));
-      delete.addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId));
-      // Delete all <fileName, position> pairs.
-      delete.addColumns(FAMILY_WAL, Bytes.toBytes(queueId), 
HConstants.LATEST_TIMESTAMP);
-      table.delete(delete);
-    } catch (IOException e) {
-      throw new ReplicationException(
-          "Failed to remove wal from queue, serverName=" + serverName + ", 
queueId=" + queueId, e);
-    }
-  }
-
-  @Override
-  public void addWAL(ServerName serverName, String queueId, String fileName)
-      throws ReplicationException {
-    try (Table table = getReplicationMetaTable()) {
-      Put put = new Put(getServerNameRowKey(serverName));
-      put.addColumn(FAMILY_RS_STATE, QUALIFIER_STATE_ENABLED, 
HConstants.EMPTY_BYTE_ARRAY);
-      put.addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId), 
HConstants.EMPTY_BYTE_ARRAY);
-      put.addColumn(FAMILY_WAL, Bytes.toBytes(queueId), 
makeByteArray(fileName, 0L));
-      table.put(put);
-    } catch (IOException e) {
-      throw new ReplicationException("Failed to add wal to queue, serverName=" 
+ serverName
-          + ", queueId=" + queueId + ", fileName=" + fileName, e);
-    }
-  }
-
-  @Override
-  public void removeWAL(ServerName serverName, String queueId, String fileName)
-      throws ReplicationException {
-    try (Table table = getReplicationMetaTable()) {
-      Optional<WALCell> walCell = getWALsInQueue0(table, serverName, 
queueId).stream()
-          .filter(w -> w.fileNameMatch(fileName)).findFirst();
-      if (walCell.isPresent()) {
-        Delete delete = new 
Delete(getServerNameRowKey(walCell.get().serverName))
-            .addColumn(FAMILY_WAL, Bytes.toBytes(queueId), 
walCell.get().cellTimestamp);
-        table.delete(delete);
-      } else {
-        LOG.warn(fileName + " has already been deleted when removing log");
-      }
-    } catch (IOException e) {
-      throw new ReplicationException("Failed to remove wal from queue, 
serverName=" + serverName
-          + ", queueId=" + queueId + ", fileName=" + fileName, e);
-    }
-  }
-
-  @Override
-  public void setWALPosition(ServerName serverName, String queueId, String 
fileName, long position,
-      Map<String, Long> lastSeqIds) throws ReplicationException {
-    try (Table table = getReplicationMetaTable()) {
-      Optional<WALCell> walCell = getWALsInQueue0(table, serverName, 
queueId).stream()
-          .filter(w -> w.fileNameMatch(fileName)).findFirst();
-      if (walCell.isPresent()) {
-        List<Put> puts = new ArrayList<>();
-        Put put = new 
Put(getServerNameRowKey(serverName)).addColumn(FAMILY_WAL,
-              Bytes.toBytes(walCell.get().queueId), 
walCell.get().cellTimestamp,
-              makeByteArray(fileName, position));
-        puts.add(put);
-        // Update the last pushed sequence id for each region in a batch.
-        String peerId = ReplicationUtils.parsePeerIdFromQueueId(queueId);
-        if (lastSeqIds != null && lastSeqIds.size() > 0) {
-          for (Map.Entry<String, Long> e : lastSeqIds.entrySet()) {
-            Put regionPut = new 
Put(Bytes.toBytes(peerId)).addColumn(FAMILY_REGIONS,
-              getRegionQualifier(e.getKey()), Bytes.toBytes(e.getValue()));
-            puts.add(regionPut);
-          }
-        }
-        table.put(puts);
-      } else {
-        throw new ReplicationException("WAL file " + fileName + " does not 
found under queue "
-            + queueId + " for server " + serverName);
-      }
-    } catch (IOException e) {
-      throw new ReplicationException(
-          "Failed to set wal position and last sequence ids, serverName=" + 
serverName
-              + ", queueId=" + queueId + ", fileName=" + fileName + ", 
position=" + position,
-          e);
-    }
-  }
-
-  @Override
-  public long getLastSequenceId(String encodedRegionName, String peerId)
-      throws ReplicationException {
-    try (Table table = getReplicationMetaTable()) {
-      Get get = new Get(Bytes.toBytes(peerId));
-      get.addColumn(FAMILY_REGIONS, getRegionQualifier(encodedRegionName));
-      Result r = table.get(get);
-      if (r == null || r.listCells() == null) {
-        return HConstants.NO_SEQNUM;
-      }
-      return Bytes.toLong(r.getValue(FAMILY_REGIONS, 
getRegionQualifier(encodedRegionName)));
-    } catch (IOException e) {
-      throw new ReplicationException(
-          "Failed to get last sequence id, region=" + encodedRegionName + ", 
peerId=" + peerId, e);
-    }
-  }
-
-  @Override
-  public long getWALPosition(ServerName serverName, String queueId, String 
fileName)
-      throws ReplicationException {
-    try (Table table = getReplicationMetaTable()) {
-      Optional<WALCell> walCell = getWALsInQueue0(table, serverName, 
queueId).stream()
-          .filter(w -> w.fileNameMatch(fileName)).findFirst();
-      if (walCell.isPresent()) {
-        return walCell.get().position;
-      } else {
-        LOG.warn("WAL " + fileName + " does not found under queue " + queueId 
+ " for server "
-            + serverName);
-        return 0;
-      }
-    } catch (IOException e) {
-      throw new ReplicationException("Failed to get wal position. serverName=" 
+ serverName
-          + ", queueId=" + queueId + ", fileName=" + fileName, e);
-    }
-  }
-
-  /**
-   * Each cell in column wal:{queueId} will be parsed to a WALCell. The 
WALCell will be more
-   * friendly to upper layer.
-   */
-  private static final class WALCell {
-    ServerName serverName;
-    String queueId;
-    String wal;
-    long position;
-    long cellTimestamp;// Timestamp of the cell
-
-    private WALCell(ServerName serverName, String queueId, String wal, long 
position,
-        long cellTimestamp) {
-      this.serverName = serverName;
-      this.queueId = queueId;
-      this.wal = wal;
-      this.position = position;
-      this.cellTimestamp = cellTimestamp;
-    }
-
-    public static WALCell create(Cell cell) throws IOException {
-      ServerName serverName = ServerName.parseServerName(
-        Bytes.toString(cell.getRowArray(), cell.getRowOffset(), 
cell.getRowLength()));
-      String queueId = Bytes.toString(cell.getQualifierArray(), 
cell.getQualifierOffset(),
-        cell.getQualifierLength());
-      Pair<String, Long> fileAndPos =
-          parseFileNameAndPosition(cell.getValueArray(), 
cell.getValueOffset());
-      return new WALCell(serverName, queueId, fileAndPos.getFirst(), 
fileAndPos.getSecond(),
-          cell.getTimestamp());
-    }
-
-    public boolean fileNameMatch(String fileName) {
-      return StringUtils.equals(wal, fileName);
-    }
-  }
-
-  /**
-   * Parse the WALCell list from a HBase result.
-   */
-  private List<WALCell> result2WALCells(Result r) throws IOException {
-    List<WALCell> wals = new ArrayList<>();
-    if (r != null && r.listCells() != null && r.listCells().size() > 0) {
-      for (Cell cell : r.listCells()) {
-        wals.add(WALCell.create(cell));
-      }
-    }
-    return wals;
-  }
-
-  /**
-   * List all WALs for the specific region server and queueId.
-   */
-  private List<WALCell> getWALsInQueue0(Table table, ServerName serverName, 
String queueId)
-      throws IOException {
-    Get get = new Get(getServerNameRowKey(serverName)).addColumn(FAMILY_WAL, 
Bytes.toBytes(queueId))
-        .readAllVersions();
-    return result2WALCells(table.get(get));
-  }
-
-  @Override
-  public List<String> getWALsInQueue(ServerName serverName, String queueId)
-      throws ReplicationException {
-    try (Table table = getReplicationMetaTable()) {
-      return getWALsInQueue0(table, serverName, queueId).stream().map(p -> 
p.wal)
-          .collect(Collectors.toList());
-    } catch (IOException e) {
-      throw new ReplicationException(
-          "Failed to get wals in queue. serverName=" + serverName + ", 
queueId=" + queueId, e);
-    }
-  }
-
-  @Override
-  public List<String> getAllQueues(ServerName serverName) throws 
ReplicationException {
-    try (Table table = getReplicationMetaTable()) {
-      List<String> queues = new ArrayList<>();
-      Get get = new 
Get(getServerNameRowKey(serverName)).addFamily(FAMILY_QUEUE);
-      Result r = table.get(get);
-      if (r != null && r.listCells() != null && r.listCells().size() > 0) {
-        for (Cell c : r.listCells()) {
-          String queue =
-              Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), 
c.getQualifierLength());
-          queues.add(queue);
-        }
-      }
-      return queues;
-    } catch (IOException e) {
-      throw new ReplicationException("Failed to get all queues. serverName=" + 
serverName, e);
-    }
-  }
-
-  @Override
-  public Pair<String, SortedSet<String>> claimQueue(ServerName 
sourceServerName, String queueId,
-      ServerName destServerName) throws ReplicationException {
-    LOG.info(
-      "Atomically moving " + sourceServerName + "/" + queueId + "'s WALs to " 
+ destServerName);
-    try (Table table = getReplicationMetaTable()) {
-      // Create an enabled region server for destination.
-      byte[] destServerNameRowKey = getServerNameRowKey(destServerName);
-      byte[] srcServerNameRowKey = getServerNameRowKey(sourceServerName);
-      Put put = new Put(destServerNameRowKey).addColumn(FAMILY_RS_STATE, 
QUALIFIER_STATE_ENABLED,
-        HConstants.EMPTY_BYTE_ARRAY);
-      table.put(put);
-      List<WALCell> wals = getWALsInQueue0(table, sourceServerName, queueId);
-      String newQueueId = queueId + "-" + sourceServerName;
-      // Remove the queue in source region server if wal set of the queue is 
empty.
-      if (CollectionUtils.isEmpty(wals)) {
-        Delete delete =
-            new Delete(srcServerNameRowKey).addColumn(FAMILY_QUEUE, 
Bytes.toBytes(queueId))
-                .addColumns(FAMILY_WAL, Bytes.toBytes(queueId), 
HConstants.LATEST_TIMESTAMP);
-        table.delete(delete);
-        LOG.info("Removed " + sourceServerName + "/" + queueId + " since it's 
empty");
-        return new Pair<>(newQueueId, Collections.emptySortedSet());
-      }
-      // Transfer all wals from source region server to destination region 
server in a batch.
-      List<Mutation> mutations = new ArrayList<>();
-      // a. Create queue for destination server.
-      mutations.add(new Put(destServerNameRowKey).addColumn(FAMILY_QUEUE, 
Bytes.toBytes(newQueueId),
-        HConstants.EMPTY_BYTE_ARRAY));
-      SortedSet<String> logQueue = new TreeSet<>();
-      for (WALCell wal : wals) {
-        byte[] data = makeByteArray(wal.wal, wal.cellTimestamp);
-        // b. Add wal to destination server.
-        mutations.add(
-          new Put(destServerNameRowKey).addColumn(FAMILY_WAL, 
Bytes.toBytes(newQueueId), data));
-        // c. Remove wal from source server.
-        mutations.add(new Delete(srcServerNameRowKey).addColumn(FAMILY_WAL, 
Bytes.toBytes(queueId),
-          wal.cellTimestamp));
-        logQueue.add(wal.wal);
-      }
-      // d. Remove the queue of source server.
-      mutations
-          .add(new Delete(srcServerNameRowKey).addColumn(FAMILY_QUEUE, 
Bytes.toBytes(queueId)));
-      Object[] results = new Object[mutations.size()];
-      table.batch(mutations, results);
-      boolean allSuccess = Stream.of(results).allMatch(r -> r != null);
-      if (!allSuccess) {
-        throw new ReplicationException("Claim queue queueId=" + queueId + " 
from "
-            + sourceServerName + " to " + destServerName + " failed, not all 
mutations success.");
-      }
-      LOG.info(
-        "Atomically moved " + sourceServerName + "/" + queueId + "'s WALs to " 
+ destServerName);
-      return new Pair<>(newQueueId, logQueue);
-    } catch (IOException | InterruptedException e) {
-      throw new ReplicationException("Claim queue queueId=" + queueId + " from 
" + sourceServerName
-          + " to " + destServerName + " failed", e);
-    }
-  }
-
-  @Override
-  public void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws 
ReplicationException {
-    // TODO Make this to be a checkAndDelete, and provide a UT for it.
-    try (Table table = getReplicationMetaTable()) {
-      Get get = new 
Get(getServerNameRowKey(serverName)).addFamily(FAMILY_WAL).readAllVersions();
-      Result r = table.get(get);
-      if (r == null || r.listCells() == null || r.listCells().size() == 0) {
-        Delete delete = new Delete(getServerNameRowKey(serverName));
-        table.delete(delete);
-      }
-    } catch (IOException e) {
-      throw new ReplicationException(
-          "Failed to remove replicator when queue is empty, serverName=" + 
serverName, e);
-    }
-  }
-
-  @Override
-  public List<ServerName> getListOfReplicators() throws ReplicationException {
-    try (Table table = getReplicationMetaTable()) {
-      Scan scan = new Scan().addColumn(FAMILY_RS_STATE, 
QUALIFIER_STATE_ENABLED).readVersions(1);
-      Set<ServerName> serverNames = new HashSet<>();
-      try (ResultScanner scanner = table.getScanner(scan)) {
-        for (Result r : scanner) {
-          if (r.listCells().size() > 0) {
-            Cell firstCell = r.listCells().get(0);
-            String serverName = Bytes.toString(firstCell.getRowArray(), 
firstCell.getRowOffset(),
-              firstCell.getRowLength());
-            serverNames.add(ServerName.parseServerName(serverName));
-          }
-        }
-      }
-      return new ArrayList<>(serverNames);
-    } catch (IOException e) {
-      throw new ReplicationException("Failed to get list of replicators", e);
-    }
-  }
-
-  @Override
-  public Set<String> getAllWALs() throws ReplicationException {
-    Set<String> walSet = new HashSet<>();
-    try (Table table = getReplicationMetaTable()) {
-      Scan scan = new Scan().addFamily(FAMILY_WAL).readAllVersions();
-      try (ResultScanner scanner = table.getScanner(scan)) {
-        for (Result r : scanner) {
-          result2WALCells(r).forEach(w -> walSet.add(w.wal));
-        }
-      }
-      return walSet;
-    } catch (IOException e) {
-      throw new ReplicationException("Failed to get all wals", e);
-    }
-  }
-
-  @Override
-  public void addPeerToHFileRefs(String peerId) throws ReplicationException {
-    // Need to do nothing.
-  }
-
-  @Override
-  public void removePeerFromHFileRefs(String peerId) throws 
ReplicationException {
-    // Need to do nothing.
-  }
-
-  @Override
-  public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
-      throws ReplicationException {
-    try (Table table = getReplicationMetaTable()) {
-      List<Put> puts = new ArrayList<>();
-      for (Pair<Path, Path> p : pairs) {
-        Put put = new Put(Bytes.toBytes(peerId));
-        put.addColumn(FAMILY_HFILE_REFS, 
Bytes.toBytes(p.getSecond().getName()),
-          HConstants.EMPTY_BYTE_ARRAY);
-        puts.add(put);
-      }
-      table.put(puts);
-    } catch (IOException e) {
-      throw new ReplicationException("Failed to add hfile refs, peerId=" + 
peerId, e);
-    }
-  }
-
-  @Override
-  public void removeHFileRefs(String peerId, List<String> files) throws 
ReplicationException {
-    try (Table table = getReplicationMetaTable()) {
-      List<Delete> deletes = new ArrayList<>();
-      for (String file : files) {
-        Delete delete = new Delete(Bytes.toBytes(peerId));
-        delete.addColumns(FAMILY_HFILE_REFS, Bytes.toBytes(file));
-        deletes.add(delete);
-      }
-      table.delete(deletes);
-    } catch (IOException e) {
-      throw new ReplicationException("Failed to remove hfile refs, peerId=" + 
peerId, e);
-    }
-  }
-
-  @Override
-  public List<String> getAllPeersFromHFileRefsQueue() throws 
ReplicationException {
-    try (Table table = getReplicationMetaTable()) {
-      Set<String> peers = new HashSet<>();
-      Scan scan = new Scan().addFamily(FAMILY_HFILE_REFS);
-      try (ResultScanner scanner = table.getScanner(scan)) {
-        for (Result r : scanner) {
-          if (r.listCells().size() > 0) {
-            Cell firstCell = r.listCells().get(0);
-            String peerId = Bytes.toString(firstCell.getRowArray(), 
firstCell.getRowOffset(),
-              firstCell.getRowLength());
-            peers.add(peerId);
-          }
-        }
-      }
-      return new ArrayList<>(peers);
-    } catch (IOException e) {
-      throw new ReplicationException("Faield to get all peers by reading 
hbase:replication meta",
-          e);
-    }
-  }
-
-  @Override
-  public List<String> getReplicableHFiles(String peerId) throws 
ReplicationException {
-    try (Table table = getReplicationMetaTable()) {
-      Get get = new Get(Bytes.toBytes(peerId)).addFamily(FAMILY_HFILE_REFS);
-      Result r = table.get(get);
-      List<String> hfiles = new ArrayList<>();
-      if (r != null && r.listCells() != null) {
-        for (Cell c : r.listCells()) {
-          hfiles.add(
-            Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), 
c.getQualifierLength()));
-        }
-      }
-      return hfiles;
-    } catch (IOException e) {
-      throw new ReplicationException("Failed to get replicable hfiles, 
peerId=" + peerId, e);
-    }
-  }
-
-  @Override
-  public Set<String> getAllHFileRefs() throws ReplicationException {
-    try (Table table = getReplicationMetaTable()) {
-      Scan scan = new Scan().addFamily(FAMILY_HFILE_REFS);
-      try (ResultScanner scanner = table.getScanner(scan)) {
-        Set<String> hfileSet = new HashSet<>();
-        for (Result r : scanner) {
-          for (Cell c : r.listCells()) {
-            String hfile = Bytes.toString(c.getQualifierArray(), 
c.getQualifierOffset(),
-              c.getQualifierLength());
-            hfileSet.add(hfile);
-          }
-        }
-        return hfileSet;
-      }
-    } catch (IOException e) {
-      throw new ReplicationException("Failed to get all hfile refs", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationStorageBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationStorageBase.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationStorageBase.java
deleted file mode 100644
index 40c10d5..0000000
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationStorageBase.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.NamespaceDescriptor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
-
-import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
[email protected]
-public class TableReplicationStorageBase {
-  protected final ZKWatcher zookeeper;
-  protected final Configuration conf;
-
-  public static final TableName REPLICATION_TABLE =
-      TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, 
"replication");
-
-  // Peer family, the row key would be peer id.
-  public static final byte[] FAMILY_PEER = Bytes.toBytes("peer");
-  public static final byte[] QUALIFIER_PEER_CONFIG = Bytes.toBytes("config");
-  public static final byte[] QUALIFIER_PEER_STATE = Bytes.toBytes("state");
-
-  // Region server state family, the row key would be name of region server.
-  public static final byte[] FAMILY_RS_STATE = Bytes.toBytes("rs_state");
-  public static final byte[] QUALIFIER_STATE_ENABLED = 
Bytes.toBytes("enabled");
-
-  // Queue and wal family, the row key would be name of region server.
-  public static final byte[] FAMILY_QUEUE = Bytes.toBytes("queue");
-  public static final byte[] FAMILY_WAL = Bytes.toBytes("wal");
-
-  // HFile-Refs family, the row key would be peer id.
-  public static final byte[] FAMILY_HFILE_REFS = Bytes.toBytes("hfile-refs");
-
-  // Region family, the row key would be peer id.
-  public static final byte[] FAMILY_REGIONS = Bytes.toBytes("regions");
-
-  private Connection connection;
-
-  protected static byte[] getServerNameRowKey(ServerName serverName) {
-    return Bytes.toBytes(serverName.toString());
-  }
-
-  protected static byte[] getRegionQualifier(String encodedRegionName) {
-    return Bytes.toBytes(encodedRegionName);
-  }
-
-  @VisibleForTesting
-  public static TableDescriptorBuilder createReplicationTableDescBuilder(final 
Configuration conf)
-      throws IOException {
-    int metaMaxVersion =
-        conf.getInt(HConstants.HBASE_META_VERSIONS, 
HConstants.DEFAULT_HBASE_META_VERSIONS);
-    int metaBlockSize =
-        conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, 
HConstants.DEFAULT_HBASE_META_BLOCK_SIZE);
-    return TableDescriptorBuilder
-        .newBuilder(REPLICATION_TABLE)
-        .setColumnFamily(
-          
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_PEER).setMaxVersions(metaMaxVersion)
-              .setInMemory(true).setBlocksize(metaBlockSize)
-              
.setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE)
-              .build())
-        .setColumnFamily(
-          
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_RS_STATE).setMaxVersions(metaMaxVersion)
-              .setInMemory(true).setBlocksize(metaBlockSize)
-              
.setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE)
-              .build())
-        .setColumnFamily(
-          
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_QUEUE).setMaxVersions(metaMaxVersion)
-              .setInMemory(true).setBlocksize(metaBlockSize)
-              
.setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE)
-              .build())
-        .setColumnFamily(
-          ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_WAL)
-              .setMaxVersions(HConstants.ALL_VERSIONS).setInMemory(true)
-              
.setBlocksize(metaBlockSize).setScope(HConstants.REPLICATION_SCOPE_LOCAL)
-              .setBloomFilterType(BloomType.NONE).build())
-        .setColumnFamily(
-          
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_REGIONS).setMaxVersions(metaMaxVersion)
-              .setInMemory(true).setBlocksize(metaBlockSize)
-              
.setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE)
-              .build())
-        .setColumnFamily(
-          ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_HFILE_REFS)
-              
.setMaxVersions(metaMaxVersion).setInMemory(true).setBlocksize(metaBlockSize)
-              
.setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE)
-              .build());
-  }
-
-  protected TableReplicationStorageBase(ZKWatcher zookeeper, Configuration 
conf)
-      throws IOException {
-    this.zookeeper = zookeeper;
-    this.conf = conf;
-    this.connection = ConnectionFactory.createConnection(conf);
-  }
-
-  protected Table getReplicationMetaTable() throws IOException {
-    return this.connection.getTable(REPLICATION_TABLE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
index 138f14a..a53500a 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
@@ -17,9 +17,6 @@
  */
 package org.apache.hadoop.hbase.replication;
 
-import static 
org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_DISABLED_BYTES;
-import static 
org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_ENABLED_BYTES;
-
 import java.util.Arrays;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
@@ -34,6 +31,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
+
 /**
  * ZK based replication peer storage.
  */
@@ -47,6 +46,11 @@ public class ZKReplicationPeerStorage extends 
ZKReplicationStorageBase
   public static final String PEERS_STATE_ZNODE = 
"zookeeper.znode.replication.peers.state";
   public static final String PEERS_STATE_ZNODE_DEFAULT = "peer-state";
 
+  public static final byte[] ENABLED_ZNODE_BYTES =
+    toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
+  public static final byte[] DISABLED_ZNODE_BYTES =
+    toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
+
   /**
    * The name of the znode that contains the replication status of a remote 
slave (i.e. peer)
    * cluster.
@@ -85,7 +89,7 @@ public class ZKReplicationPeerStorage extends 
ZKReplicationStorageBase
           ZKUtilOp.createAndFailSilent(getPeerNode(peerId),
             ReplicationPeerConfigUtil.toByteArray(peerConfig)),
           ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId),
-            enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES)),
+            enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)),
         false);
     } catch (KeeperException e) {
       throw new ReplicationException("Could not add peer with id=" + peerId + 
", peerConfif=>"
@@ -104,7 +108,7 @@ public class ZKReplicationPeerStorage extends 
ZKReplicationStorageBase
 
   @Override
   public void setPeerState(String peerId, boolean enabled) throws 
ReplicationException {
-    byte[] stateBytes = enabled ? PEER_STATE_ENABLED_BYTES : 
PEER_STATE_DISABLED_BYTES;
+    byte[] stateBytes = enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
     try {
       ZKUtil.setData(zookeeper, getPeerStateNode(peerId), stateBytes);
     } catch (KeeperException e) {
@@ -136,7 +140,7 @@ public class ZKReplicationPeerStorage extends 
ZKReplicationStorageBase
   @Override
   public boolean isPeerEnabled(String peerId) throws ReplicationException {
     try {
-      return Arrays.equals(PEER_STATE_ENABLED_BYTES,
+      return Arrays.equals(ENABLED_ZNODE_BYTES,
         ZKUtil.getData(zookeeper, getPeerStateNode(peerId)));
     } catch (KeeperException | InterruptedException e) {
       throw new ReplicationException("Unable to get status of the peer with 
id=" + peerId, e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index 4a1e780..72caf82 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -79,7 +79,7 @@ import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
  * </pre>
  */
 @InterfaceAudience.Private
-public class ZKReplicationQueueStorage extends ZKReplicationStorageBase
+class ZKReplicationQueueStorage extends ZKReplicationStorageBase
     implements ReplicationQueueStorage {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ZKReplicationQueueStorage.class);
@@ -199,7 +199,7 @@ public class ZKReplicationQueueStorage extends 
ZKReplicationStorageBase
       // Persist the max sequence id(s) of regions for serial replication 
atomically.
       if (lastSeqIds != null && lastSeqIds.size() > 0) {
         for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
-          String peerId = ReplicationUtils.parsePeerIdFromQueueId(queueId);
+          String peerId = new ReplicationQueueInfo(queueId).getPeerId();
           String path = 
getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
           /*
            * Make sure the existence of path
@@ -375,7 +375,7 @@ public class ZKReplicationQueueStorage extends 
ZKReplicationStorageBase
 
   // will be overridden in UTs
   @VisibleForTesting
-  public int getQueuesZNodeCversion() throws KeeperException {
+  protected int getQueuesZNodeCversion() throws KeeperException {
     Stat stat = new Stat();
     ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
     return stat.getCversion();

http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
new file mode 100644
index 0000000..5999c1f
--- /dev/null
+++ 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+/**
+ * White box testing for replication state interfaces. Implementations should 
extend this class, and
+ * initialize the interfaces properly.
+ */
+public abstract class TestReplicationStateBasic {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestReplicationStateBasic.class);
+
+  protected ReplicationQueueStorage rqs;
+  protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 
1234, 12345);
+  protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 
1234, 12345);
+  protected ServerName server3 = ServerName.valueOf("hostname3.example.org", 
1234, 12345);
+  protected ReplicationPeers rp;
+  protected static final String ID_ONE = "1";
+  protected static final String ID_TWO = "2";
+  protected static String KEY_ONE;
+  protected static String KEY_TWO;
+
+  // For testing when we try to replicate to ourself
+  protected String OUR_KEY;
+
+  protected static int zkTimeoutCount;
+  protected static final int ZK_MAX_COUNT = 300;
+  protected static final int ZK_SLEEP_INTERVAL = 100; // millis
+
+  @Test
+  public void testReplicationQueueStorage() throws ReplicationException {
+    // Test methods with empty state
+    assertEquals(0, rqs.getListOfReplicators().size());
+    assertTrue(rqs.getWALsInQueue(server1, "qId1").isEmpty());
+    assertTrue(rqs.getAllQueues(server1).isEmpty());
+
+    /*
+     * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 
log files each --
+     * server2: zero queues
+     */
+    rqs.addWAL(server1, "qId1", "trash");
+    rqs.removeWAL(server1, "qId1", "trash");
+    rqs.addWAL(server1,"qId2", "filename1");
+    rqs.addWAL(server1,"qId3", "filename2");
+    rqs.addWAL(server1,"qId3", "filename3");
+    rqs.addWAL(server2,"trash", "trash");
+    rqs.removeQueue(server2,"trash");
+
+    List<ServerName> reps = rqs.getListOfReplicators();
+    assertEquals(2, reps.size());
+    assertTrue(server1.getServerName(), reps.contains(server1));
+    assertTrue(server2.getServerName(), reps.contains(server2));
+
+    assertTrue(rqs.getWALsInQueue(ServerName.valueOf("bogus", 12345, 12345), 
"bogus").isEmpty());
+    assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty());
+    assertEquals(0, rqs.getWALsInQueue(server1, "qId1").size());
+    assertEquals(1, rqs.getWALsInQueue(server1, "qId2").size());
+    assertEquals("filename1", rqs.getWALsInQueue(server1, "qId2").get(0));
+
+    assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 12345, 
-1L)).isEmpty());
+    assertEquals(0, rqs.getAllQueues(server2).size());
+    List<String> list = rqs.getAllQueues(server1);
+    assertEquals(3, list.size());
+    assertTrue(list.contains("qId2"));
+    assertTrue(list.contains("qId3"));
+  }
+
+  private void removeAllQueues(ServerName serverName) throws 
ReplicationException {
+    for (String queue: rqs.getAllQueues(serverName)) {
+      rqs.removeQueue(serverName, queue);
+    }
+  }
+  @Test
+  public void testReplicationQueues() throws ReplicationException {
+    // Initialize ReplicationPeer so we can add peers (we don't transfer lone 
queues)
+    rp.init();
+
+    rqs.removeQueue(server1, "bogus");
+    rqs.removeWAL(server1, "bogus", "bogus");
+    removeAllQueues(server1);
+    assertEquals(0, rqs.getAllQueues(server1).size());
+    assertEquals(0, rqs.getWALPosition(server1, "bogus", "bogus"));
+    assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty());
+    assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 1234, 
12345)).isEmpty());
+
+    populateQueues();
+
+    assertEquals(3, rqs.getListOfReplicators().size());
+    assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
+    assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
+    assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0"));
+    rqs.setWALPosition(server3, "qId5", "filename4", 354L, null);
+    assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4"));
+
+    assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
+    assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
+    assertEquals(0, rqs.getAllQueues(server1).size());
+    assertEquals(1, rqs.getAllQueues(server2).size());
+    assertEquals(5, rqs.getAllQueues(server3).size());
+
+    assertEquals(0, rqs.getAllQueues(server1).size());
+    rqs.removeReplicatorIfQueueIsEmpty(server1);
+    assertEquals(2, rqs.getListOfReplicators().size());
+
+    List<String> queues = rqs.getAllQueues(server3);
+    assertEquals(5, queues.size());
+    for (String queue : queues) {
+      rqs.claimQueue(server3, queue, server2);
+    }
+    rqs.removeReplicatorIfQueueIsEmpty(server3);
+    assertEquals(1, rqs.getListOfReplicators().size());
+
+    assertEquals(6, rqs.getAllQueues(server2).size());
+    removeAllQueues(server2);
+    rqs.removeReplicatorIfQueueIsEmpty(server2);
+    assertEquals(0, rqs.getListOfReplicators().size());
+  }
+
+  @Test
+  public void testHfileRefsReplicationQueues() throws ReplicationException, 
KeeperException {
+    rp.init();
+
+    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
+    files1.add(new Pair<>(null, new Path("file_1")));
+    files1.add(new Pair<>(null, new Path("file_2")));
+    files1.add(new Pair<>(null, new Path("file_3")));
+    assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
+    assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
+    rp.getPeerStorage().addPeer(ID_ONE,
+            ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), 
true);
+    rqs.addPeerToHFileRefs(ID_ONE);
+    rqs.addHFileRefs(ID_ONE, files1);
+    assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
+    assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
+    List<String> hfiles2 = new ArrayList<>(files1.size());
+    for (Pair<Path, Path> p : files1) {
+      hfiles2.add(p.getSecond().getName());
+    }
+    String removedString = hfiles2.remove(0);
+    rqs.removeHFileRefs(ID_ONE, hfiles2);
+    assertEquals(1, rqs.getReplicableHFiles(ID_ONE).size());
+    hfiles2 = new ArrayList<>(1);
+    hfiles2.add(removedString);
+    rqs.removeHFileRefs(ID_ONE, hfiles2);
+    assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size());
+    rp.getPeerStorage().removePeer(ID_ONE);
+  }
+
+  @Test
+  public void testRemovePeerForHFileRefs() throws ReplicationException, 
KeeperException {
+    rp.init();
+    rp.getPeerStorage().addPeer(ID_ONE,
+      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
+    rqs.addPeerToHFileRefs(ID_ONE);
+    rp.getPeerStorage().addPeer(ID_TWO,
+      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true);
+    rqs.addPeerToHFileRefs(ID_TWO);
+
+    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
+    files1.add(new Pair<>(null, new Path("file_1")));
+    files1.add(new Pair<>(null, new Path("file_2")));
+    files1.add(new Pair<>(null, new Path("file_3")));
+    rqs.addHFileRefs(ID_ONE, files1);
+    rqs.addHFileRefs(ID_TWO, files1);
+    assertEquals(2, rqs.getAllPeersFromHFileRefsQueue().size());
+    assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
+    assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
+
+    rp.getPeerStorage().removePeer(ID_ONE);
+    rqs.removePeerFromHFileRefs(ID_ONE);
+    assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
+    assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
+    assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
+
+    rp.getPeerStorage().removePeer(ID_TWO);
+    rqs.removePeerFromHFileRefs(ID_TWO);
+    assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
+    assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty());
+  }
+
+  @Test
+  public void testReplicationPeers() throws Exception {
+    rp.init();
+
+    try {
+      rp.getPeerStorage().setPeerState("bogus", true);
+      fail("Should have thrown an IllegalArgumentException when passed a bogus 
peerId");
+    } catch (ReplicationException e) {
+    }
+    try {
+      rp.getPeerStorage().setPeerState("bogus", false);
+      fail("Should have thrown an IllegalArgumentException when passed a bogus 
peerId");
+    } catch (ReplicationException e) {
+    }
+
+    try {
+      assertFalse(rp.addPeer("bogus"));
+      fail("Should have thrown an ReplicationException when passed a bogus 
peerId");
+    } catch (ReplicationException e) {
+    }
+
+    assertNumberOfPeers(0);
+
+    // Add some peers
+    rp.getPeerStorage().addPeer(ID_ONE, new 
ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
+    assertNumberOfPeers(1);
+    rp.getPeerStorage().addPeer(ID_TWO, new 
ReplicationPeerConfig().setClusterKey(KEY_TWO), true);
+    assertNumberOfPeers(2);
+
+    assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils
+        
.getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), 
rp.getConf())));
+    rp.getPeerStorage().removePeer(ID_ONE);
+    rp.removePeer(ID_ONE);
+    assertNumberOfPeers(1);
+
+    // Add one peer
+    rp.getPeerStorage().addPeer(ID_ONE, new 
ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
+    rp.addPeer(ID_ONE);
+    assertNumberOfPeers(2);
+    assertTrue(rp.getPeer(ID_ONE).isPeerEnabled());
+    rp.getPeerStorage().setPeerState(ID_ONE, false);
+    // now we do not rely on zk watcher to trigger the state change so we need 
to trigger it
+    // manually...
+    ReplicationPeerImpl peer = rp.getPeer(ID_ONE);
+    rp.refreshPeerState(peer.getId());
+    assertEquals(PeerState.DISABLED, peer.getPeerState());
+    assertConnectedPeerStatus(false, ID_ONE);
+    rp.getPeerStorage().setPeerState(ID_ONE, true);
+    // now we do not rely on zk watcher to trigger the state change so we need 
to trigger it
+    // manually...
+    rp.refreshPeerState(peer.getId());
+    assertEquals(PeerState.ENABLED, peer.getPeerState());
+    assertConnectedPeerStatus(true, ID_ONE);
+
+    // Disconnect peer
+    rp.removePeer(ID_ONE);
+    assertNumberOfPeers(2);
+  }
+
+  private String getFileName(String base, int i) {
+    return String.format(base + "-%04d", i);
+  }
+
+  @Test
+  public void testPersistLogPositionAndSeqIdAtomically() throws Exception {
+    ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
+    assertTrue(rqs.getAllQueues(serverName1).isEmpty());
+    String queue1 = "1";
+    String region0 = "region0", region1 = "region1";
+    for (int i = 0; i < 10; i++) {
+      rqs.addWAL(serverName1, queue1, getFileName("file1", i));
+    }
+    List<String> queueIds = rqs.getAllQueues(serverName1);
+    assertEquals(1, queueIds.size());
+    assertThat(queueIds, hasItems("1"));
+
+    List<String> wals1 = rqs.getWALsInQueue(serverName1, queue1);
+    assertEquals(10, wals1.size());
+    for (int i = 0; i < 10; i++) {
+      assertThat(wals1, hasItems(getFileName("file1", i)));
+    }
+
+    for (int i = 0; i < 10; i++) {
+      assertEquals(0, rqs.getWALPosition(serverName1, queue1, 
getFileName("file1", i)));
+    }
+    assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region0, queue1));
+    assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region1, queue1));
+
+    for (int i = 0; i < 10; i++) {
+      rqs.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) 
* 100,
+        ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L));
+    }
+
+    for (int i = 0; i < 10; i++) {
+      assertEquals((i + 1) * 100, rqs.getWALPosition(serverName1, queue1, 
getFileName("file1", i)));
+    }
+    assertEquals(900L, rqs.getLastSequenceId(region0, queue1));
+    assertEquals(1000L, rqs.getLastSequenceId(region1, queue1));
+  }
+
+  protected void assertConnectedPeerStatus(boolean status, String peerId) 
throws Exception {
+    // we can first check if the value was changed in the store, if it wasn't 
then fail right away
+    if (status != rp.getPeerStorage().isPeerEnabled(peerId)) {
+      fail("ConnectedPeerStatus was " + !status + " but expected " + status + 
" in ZK");
+    }
+    while (true) {
+      if (status == rp.getPeer(peerId).isPeerEnabled()) {
+        return;
+      }
+      if (zkTimeoutCount < ZK_MAX_COUNT) {
+        LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + 
status
+            + ", sleeping and trying again.");
+        Thread.sleep(ZK_SLEEP_INTERVAL);
+      } else {
+        fail("Timed out waiting for ConnectedPeerStatus to be " + status);
+      }
+    }
+  }
+
+  protected void assertNumberOfPeers(int total) throws ReplicationException {
+    assertEquals(total, rp.getPeerStorage().listPeerIds().size());
+  }
+
+  /*
+   * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 
has 5 queues with 1, 2,
+   * 3, 4, 5 log files respectively
+   */
+  protected void populateQueues() throws ReplicationException {
+    rqs.addWAL(server1, "trash", "trash");
+    rqs.removeQueue(server1, "trash");
+
+    rqs.addWAL(server2, "qId1", "trash");
+    rqs.removeWAL(server2, "qId1", "trash");
+
+    for (int i = 1; i < 6; i++) {
+      for (int j = 0; j < i; j++) {
+        rqs.addWAL(server3, "qId" + i, "filename" + j);
+      }
+      // Add peers for the corresponding queues so they are not orphans
+      rp.getPeerStorage().addPeer("qId" + i,
+        
ReplicationPeerConfig.newBuilder().setClusterKey("localhost:2818:/bogus" + 
i).build(),
+        true);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
new file mode 100644
index 0000000..08178f4
--- /dev/null
+++ 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseZKTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestReplicationStateZKImpl.class);
+
+  private static Configuration conf;
+  private static HBaseZKTestingUtility utility;
+  private static ZKWatcher zkw;
+  private static String replicationZNode;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    utility = new HBaseZKTestingUtility();
+    utility.startMiniZKCluster();
+    conf = utility.getConfiguration();
+    conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+    zkw = utility.getZooKeeperWatcher();
+    String replicationZNodeName = conf.get("zookeeper.znode.replication", 
"replication");
+    replicationZNode = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, 
replicationZNodeName);
+    KEY_ONE = initPeerClusterState("/hbase1");
+    KEY_TWO = initPeerClusterState("/hbase2");
+  }
+
+  private static String initPeerClusterState(String baseZKNode)
+      throws IOException, KeeperException {
+    // Add a dummy region server and set up the cluster id
+    Configuration testConf = new Configuration(conf);
+    testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode);
+    ZKWatcher zkw1 = new ZKWatcher(testConf, "test1", null);
+    String fakeRs = ZNodePaths.joinZNode(zkw1.znodePaths.rsZNode, 
"hostname1.example.org:1234");
+    ZKUtil.createWithParents(zkw1, fakeRs);
+    ZKClusterId.setClusterId(zkw1, new ClusterId());
+    return ZKConfig.getZooKeeperClusterKey(testConf);
+  }
+
+  @Before
+  public void setUp() {
+    zkTimeoutCount = 0;
+    rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
+    rp = ReplicationFactory.getReplicationPeers(zkw, conf);
+    OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
+  }
+
+  @After
+  public void tearDown() throws KeeperException, IOException {
+    ZKUtil.deleteNodeRecursively(zkw, replicationZNode);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    utility.shutdownMiniZKCluster();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
new file mode 100644
index 0000000..3290fb0
--- /dev/null
+++ 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseZKTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestZKReplicationPeerStorage {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestZKReplicationPeerStorage.class);
+
+  private static final HBaseZKTestingUtility UTIL = new 
HBaseZKTestingUtility();
+
+  private static ZKReplicationPeerStorage STORAGE;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL.startMiniZKCluster();
+    STORAGE = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), 
UTIL.getConfiguration());
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    UTIL.shutdownMiniZKCluster();
+  }
+
+  private Set<String> randNamespaces(Random rand) {
+    return Stream.generate(() -> 
Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
+        .collect(toSet());
+  }
+
+  private Map<TableName, List<String>> randTableCFs(Random rand) {
+    int size = rand.nextInt(5);
+    Map<TableName, List<String>> map = new HashMap<>();
+    for (int i = 0; i < size; i++) {
+      TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
+      List<String> cfs = Stream.generate(() -> 
Long.toHexString(rand.nextLong()))
+          .limit(rand.nextInt(5)).collect(toList());
+      map.put(tn, cfs);
+    }
+    return map;
+  }
+
+  private ReplicationPeerConfig getConfig(int seed) {
+    Random rand = new Random(seed);
+    return 
ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(rand.nextLong()))
+        .setReplicationEndpointImpl(Long.toHexString(rand.nextLong()))
+        
.setNamespaces(randNamespaces(rand)).setExcludeNamespaces(randNamespaces(rand))
+        
.setTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean())
+        .setBandwidth(rand.nextInt(1000)).build();
+  }
+
+  private void assertSetEquals(Set<String> expected, Set<String> actual) {
+    if (expected == null || expected.size() == 0) {
+      assertTrue(actual == null || actual.size() == 0);
+      return;
+    }
+    assertEquals(expected.size(), actual.size());
+    expected.forEach(s -> assertTrue(actual.contains(s)));
+  }
+
+  private void assertMapEquals(Map<TableName, List<String>> expected,
+      Map<TableName, List<String>> actual) {
+    if (expected == null || expected.size() == 0) {
+      assertTrue(actual == null || actual.size() == 0);
+      return;
+    }
+    assertEquals(expected.size(), actual.size());
+    expected.forEach((expectedTn, expectedCFs) -> {
+      List<String> actualCFs = actual.get(expectedTn);
+      if (expectedCFs == null || expectedCFs.size() == 0) {
+        assertTrue(actual.containsKey(expectedTn));
+        assertTrue(actualCFs == null || actualCFs.size() == 0);
+      } else {
+        assertNotNull(actualCFs);
+        assertEquals(expectedCFs.size(), actualCFs.size());
+        for (Iterator<String> expectedIt = expectedCFs.iterator(), actualIt = 
actualCFs.iterator();
+          expectedIt.hasNext();) {
+          assertEquals(expectedIt.next(), actualIt.next());
+        }
+      }
+    });
+  }
+
+  private void assertConfigEquals(ReplicationPeerConfig expected, 
ReplicationPeerConfig actual) {
+    assertEquals(expected.getClusterKey(), actual.getClusterKey());
+    assertEquals(expected.getReplicationEndpointImpl(), 
actual.getReplicationEndpointImpl());
+    assertSetEquals(expected.getNamespaces(), actual.getNamespaces());
+    assertSetEquals(expected.getExcludeNamespaces(), 
actual.getExcludeNamespaces());
+    assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap());
+    assertMapEquals(expected.getExcludeTableCFsMap(), 
actual.getExcludeTableCFsMap());
+    assertEquals(expected.replicateAllUserTables(), 
actual.replicateAllUserTables());
+    assertEquals(expected.getBandwidth(), actual.getBandwidth());
+  }
+
+  @Test
+  public void test() throws ReplicationException {
+    int peerCount = 10;
+    for (int i = 0; i < peerCount; i++) {
+      STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0);
+    }
+    List<String> peerIds = STORAGE.listPeerIds();
+    assertEquals(peerCount, peerIds.size());
+    for (String peerId : peerIds) {
+      int seed = Integer.parseInt(peerId);
+      assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId));
+    }
+    for (int i = 0; i < peerCount; i++) {
+      STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1));
+    }
+    for (String peerId : peerIds) {
+      int seed = Integer.parseInt(peerId);
+      assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId));
+    }
+    for (int i = 0; i < peerCount; i++) {
+      assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i)));
+    }
+    for (int i = 0; i < peerCount; i++) {
+      STORAGE.setPeerState(Integer.toString(i), i % 2 != 0);
+    }
+    for (int i = 0; i < peerCount; i++) {
+      assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i)));
+    }
+    String toRemove = Integer.toString(peerCount / 2);
+    STORAGE.removePeer(toRemove);
+    peerIds = STORAGE.listPeerIds();
+    assertEquals(peerCount - 1, peerIds.size());
+    assertFalse(peerIds.contains(toRemove));
+
+    try {
+      STORAGE.getPeerConfig(toRemove);
+      fail("Should throw a ReplicationException when get peer config of a 
peerId");
+    } catch (ReplicationException e) {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
new file mode 100644
index 0000000..8ff52f3
--- /dev/null
+++ 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseZKTestingUtility;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestZKReplicationQueueStorage {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class);
+
+  private static final HBaseZKTestingUtility UTIL = new 
HBaseZKTestingUtility();
+
+  private static ZKReplicationQueueStorage STORAGE;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL.startMiniZKCluster();
+    STORAGE = new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), 
UTIL.getConfiguration());
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    UTIL.shutdownMiniZKCluster();
+  }
+
+  @After
+  public void tearDownAfterTest() throws ReplicationException {
+    for (ServerName serverName : STORAGE.getListOfReplicators()) {
+      for (String queue : STORAGE.getAllQueues(serverName)) {
+        STORAGE.removeQueue(serverName, queue);
+      }
+      STORAGE.removeReplicatorIfQueueIsEmpty(serverName);
+    }
+    for (String peerId : STORAGE.getAllPeersFromHFileRefsQueue()) {
+      STORAGE.removePeerFromHFileRefs(peerId);
+    }
+  }
+
+  private ServerName getServerName(int i) {
+    return ServerName.valueOf("127.0.0.1", 8000 + i, 10000 + i);
+  }
+
+  @Test
+  public void testReplicator() throws ReplicationException {
+    assertTrue(STORAGE.getListOfReplicators().isEmpty());
+    String queueId = "1";
+    for (int i = 0; i < 10; i++) {
+      STORAGE.addWAL(getServerName(i), queueId, "file" + i);
+    }
+    List<ServerName> replicators = STORAGE.getListOfReplicators();
+    assertEquals(10, replicators.size());
+    for (int i = 0; i < 10; i++) {
+      assertThat(replicators, hasItems(getServerName(i)));
+    }
+    for (int i = 0; i < 5; i++) {
+      STORAGE.removeQueue(getServerName(i), queueId);
+    }
+    for (int i = 0; i < 10; i++) {
+      STORAGE.removeReplicatorIfQueueIsEmpty(getServerName(i));
+    }
+    replicators = STORAGE.getListOfReplicators();
+    assertEquals(5, replicators.size());
+    for (int i = 5; i < 10; i++) {
+      assertThat(replicators, hasItems(getServerName(i)));
+    }
+  }
+
+  private String getFileName(String base, int i) {
+    return String.format(base + "-%04d", i);
+  }
+
+  @Test
+  public void testAddRemoveLog() throws ReplicationException {
+    ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
+    assertTrue(STORAGE.getAllQueues(serverName1).isEmpty());
+    String queue1 = "1";
+    String queue2 = "2";
+    for (int i = 0; i < 10; i++) {
+      STORAGE.addWAL(serverName1, queue1, getFileName("file1", i));
+      STORAGE.addWAL(serverName1, queue2, getFileName("file2", i));
+    }
+    List<String> queueIds = STORAGE.getAllQueues(serverName1);
+    assertEquals(2, queueIds.size());
+    assertThat(queueIds, hasItems("1", "2"));
+
+    List<String> wals1 = STORAGE.getWALsInQueue(serverName1, queue1);
+    List<String> wals2 = STORAGE.getWALsInQueue(serverName1, queue2);
+    assertEquals(10, wals1.size());
+    assertEquals(10, wals2.size());
+    for (int i = 0; i < 10; i++) {
+      assertThat(wals1, hasItems(getFileName("file1", i)));
+      assertThat(wals2, hasItems(getFileName("file2", i)));
+    }
+
+    for (int i = 0; i < 10; i++) {
+      assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, 
getFileName("file1", i)));
+      assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, 
getFileName("file2", i)));
+      STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i 
+ 1) * 100, null);
+      STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i 
+ 1) * 100 + 10,
+        null);
+    }
+
+    for (int i = 0; i < 10; i++) {
+      assertEquals((i + 1) * 100,
+        STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
+      assertEquals((i + 1) * 100 + 10,
+        STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
+    }
+
+    for (int i = 0; i < 10; i++) {
+      if (i % 2 == 0) {
+        STORAGE.removeWAL(serverName1, queue1, getFileName("file1", i));
+      } else {
+        STORAGE.removeWAL(serverName1, queue2, getFileName("file2", i));
+      }
+    }
+
+    queueIds = STORAGE.getAllQueues(serverName1);
+    assertEquals(2, queueIds.size());
+    assertThat(queueIds, hasItems("1", "2"));
+
+    ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001);
+    Pair<String, SortedSet<String>> peer1 = STORAGE.claimQueue(serverName1, 
"1", serverName2);
+
+    assertEquals("1-" + serverName1.getServerName(), peer1.getFirst());
+    assertEquals(5, peer1.getSecond().size());
+    int i = 1;
+    for (String wal : peer1.getSecond()) {
+      assertEquals(getFileName("file1", i), wal);
+      assertEquals((i + 1) * 100,
+        STORAGE.getWALPosition(serverName2, peer1.getFirst(), 
getFileName("file1", i)));
+      i += 2;
+    }
+
+    queueIds = STORAGE.getAllQueues(serverName1);
+    assertEquals(1, queueIds.size());
+    assertThat(queueIds, hasItems("2"));
+    wals2 = STORAGE.getWALsInQueue(serverName1, queue2);
+    assertEquals(5, wals2.size());
+    for (i = 0; i < 10; i += 2) {
+      assertThat(wals2, hasItems(getFileName("file2", i)));
+    }
+
+    queueIds = STORAGE.getAllQueues(serverName2);
+    assertEquals(1, queueIds.size());
+    assertThat(queueIds, hasItems(peer1.getFirst()));
+    wals1 = STORAGE.getWALsInQueue(serverName2, peer1.getFirst());
+    assertEquals(5, wals1.size());
+    for (i = 1; i < 10; i += 2) {
+      assertThat(wals1, hasItems(getFileName("file1", i)));
+    }
+
+    Set<String> allWals = STORAGE.getAllWALs();
+    assertEquals(10, allWals.size());
+    for (i = 0; i < 10; i++) {
+      assertThat(allWals, hasItems(i % 2 == 0 ? getFileName("file2", i) : 
getFileName("file1", i)));
+    }
+  }
+
+  // For HBASE-12865
+  @Test
+  public void testClaimQueueChangeCversion() throws ReplicationException, 
KeeperException {
+    ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
+    STORAGE.addWAL(serverName1, "1", "file");
+
+    int v0 = STORAGE.getQueuesZNodeCversion();
+    ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001);
+    STORAGE.claimQueue(serverName1, "1", serverName2);
+    int v1 = STORAGE.getQueuesZNodeCversion();
+    // cversion should increase by 1 since a child node is deleted
+    assertEquals(1, v1 - v0);
+  }
+
+  private ZKReplicationQueueStorage createWithUnstableCversion() throws 
IOException {
+    return new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), 
UTIL.getConfiguration()) {
+
+      private int called = 0;
+
+      @Override
+      protected int getQueuesZNodeCversion() throws KeeperException {
+        if (called < 4) {
+          called++;
+        }
+        return called;
+      }
+    };
+  }
+
+  @Test
+  public void testGetAllWALsCversionChange() throws IOException, 
ReplicationException {
+    ZKReplicationQueueStorage storage = createWithUnstableCversion();
+    storage.addWAL(getServerName(0), "1", "file");
+    // This should return eventually when cversion stabilizes
+    Set<String> allWals = storage.getAllWALs();
+    assertEquals(1, allWals.size());
+    assertThat(allWals, hasItems("file"));
+  }
+
+  // For HBASE-14621
+  @Test
+  public void testGetAllHFileRefsCversionChange() throws IOException, 
ReplicationException {
+    ZKReplicationQueueStorage storage = createWithUnstableCversion();
+    storage.addPeerToHFileRefs("1");
+    Path p = new Path("/test");
+    storage.addHFileRefs("1", Arrays.asList(Pair.newPair(p, p)));
+    // This should return eventually when cversion stabilizes
+    Set<String> allHFileRefs = storage.getAllHFileRefs();
+    assertEquals(1, allHFileRefs.size());
+    assertThat(allHFileRefs, hasItems("test"));
+  }
+}

Reply via email to