This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new a71105997f2 HBASE-27727 Implement filesystem based Replication peer 
storage (#5165)
a71105997f2 is described below

commit a71105997f2c08c74f3713925847cce3d44134bc
Author: Duo Zhang <[email protected]>
AuthorDate: Thu Apr 13 18:58:02 2023 +0800

    HBASE-27727 Implement filesystem based Replication peer storage (#5165)
    
    Signed-off-by: Liangjun He <[email protected]>
---
 .../hbase/replication/SyncReplicationState.java    |  30 ++
 .../org/apache/hadoop/hbase/util/RotateFile.java   | 197 +++++++++++++
 .../replication/TestReplicationPeerConfig.java     | 133 +++++++++
 .../apache/hadoop/hbase/util/TestRotateFile.java   | 155 ++++++++++
 .../mapreduce/replication/VerifyReplication.java   |   2 +-
 .../src/main/protobuf/HBase.proto                  |   5 +
 hbase-replication/pom.xml                          |   5 +
 .../replication/FSReplicationPeerStorage.java      | 321 +++++++++++++++++++++
 .../hbase/replication/ReplicationFactory.java      |   6 +-
 ...actory.java => ReplicationPeerStorageType.java} |  18 +-
 .../hadoop/hbase/replication/ReplicationPeers.java |   5 +-
 .../replication/ReplicationStorageFactory.java     |  42 ++-
 .../ReplicationPeerStorageTestBase.java            | 204 +++++++++++++
 .../replication/TestFSReplicationPeerStorage.java  |  87 ++++++
 .../replication/TestReplicationStateZKImpl.java    |   6 +-
 .../replication/TestZKReplicationPeerStorage.java  | 290 +------------------
 .../org/apache/hadoop/hbase/master/HMaster.java    |   3 +-
 .../master/replication/ReplicationPeerManager.java |   7 +-
 .../replication/regionserver/Replication.java      |   4 +-
 .../org/apache/hadoop/hbase/util/HBaseFsck.java    |   2 +-
 .../hadoop/hbase/util/hbck/ReplicationChecker.java |   8 +-
 .../cleaner/TestReplicationHFileCleaner.java       |  15 +-
 .../hbase/replication/SyncReplicationTestBase.java |   4 +-
 .../TestReplicationWithFSPeerStorage.java          |  66 +++++
 .../regionserver/TestReplicationSourceManager.java |   9 +-
 .../hbase/util/TestHBaseFsckReplication.java       |   4 +-
 26 files changed, 1316 insertions(+), 312 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java
index 39bbb20433b..9b7e2126765 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java
@@ -17,10 +17,16 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.util.Arrays;
 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
+import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
 import 
org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -74,4 +80,28 @@ public enum SyncReplicationState {
     return 
ReplicationPeerConfigUtil.toSyncReplicationState(ReplicationProtos.SyncReplicationState
       .parseFrom(Arrays.copyOfRange(bytes, ProtobufUtil.lengthOfPBMagic(), 
bytes.length)));
   }
+
+  public static byte[] toByteArray(SyncReplicationState state, 
SyncReplicationState newState) {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    try {
+      out.write(ProtobufMagic.PB_MAGIC);
+      
ReplicationPeerConfigUtil.toSyncReplicationState(state).writeDelimitedTo(out);
+      
ReplicationPeerConfigUtil.toSyncReplicationState(newState).writeDelimitedTo(out);
+    } catch (IOException e) {
+      // should not happen, all in memory operations
+      throw new AssertionError(e);
+    }
+    return out.toByteArray();
+  }
+
+  public static Pair<SyncReplicationState, SyncReplicationState>
+    parseStateAndNewStateFrom(byte[] bytes) throws IOException {
+    ByteArrayInputStream in = new ByteArrayInputStream(bytes);
+    ByteStreams.skipFully(in, ProtobufMagic.lengthOfPBMagic());
+    SyncReplicationState state = ReplicationPeerConfigUtil
+      
.toSyncReplicationState(ReplicationProtos.SyncReplicationState.parseDelimitedFrom(in));
+    SyncReplicationState newState = ReplicationPeerConfigUtil
+      
.toSyncReplicationState(ReplicationProtos.SyncReplicationState.parseDelimitedFrom(in));
+    return Pair.newPair(state, newState);
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/RotateFile.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/RotateFile.java
new file mode 100644
index 00000000000..7256700e9cc
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/RotateFile.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import com.google.errorprone.annotations.RestrictedApi;
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.zip.CRC32;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+
+/**
+ * A file storage which supports atomic update through two files, i.e, 
rotating. The implementation
+ * does not require atomic rename.
+ */
[email protected]
+public class RotateFile {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RotateFile.class);
+
+  private final FileSystem fs;
+
+  private final long maxFileSize;
+
+  private final Path[] files = new Path[2];
+
+  // this is used to make sure that we do not go backwards
+  private long prevTimestamp = -1;
+
+  private int nextFile = -1;
+
+  /**
+   * Constructs a new RotateFile object with the given parameters.
+   * @param fs          the file system to use.
+   * @param dir         the directory where the files will be created.
+   * @param name        the base name for the files.
+   * @param maxFileSize the maximum size of each file.
+   */
+  public RotateFile(FileSystem fs, Path dir, String name, long maxFileSize) {
+    this.fs = fs;
+    this.maxFileSize = maxFileSize;
+    this.files[0] = new Path(dir, name + "-0");
+    this.files[1] = new Path(dir, name + "-1");
+  }
+
+  private HBaseProtos.RotateFileData read(Path path) throws IOException {
+    byte[] data;
+    int expectedChecksum;
+    try (FSDataInputStream in = fs.open(path)) {
+      int length = in.readInt();
+      if (length <= 0 || length > maxFileSize) {
+        throw new IOException("Invalid file length " + length
+          + ", either less than 0 or greater then max allowed size " + 
maxFileSize);
+      }
+      data = new byte[length];
+      in.readFully(data);
+      expectedChecksum = in.readInt();
+    }
+    CRC32 crc32 = new CRC32();
+    crc32.update(data);
+    int calculatedChecksum = (int) crc32.getValue();
+    if (expectedChecksum != calculatedChecksum) {
+      throw new IOException(
+        "Checksum mismatch, expected " + expectedChecksum + ", actual " + 
calculatedChecksum);
+    }
+    return HBaseProtos.RotateFileData.parseFrom(data);
+  }
+
+  private int select(HBaseProtos.RotateFileData[] datas) {
+    if (datas[0] == null) {
+      return 1;
+    }
+    if (datas[1] == null) {
+      return 0;
+    }
+    return datas[0].getTimestamp() >= datas[1].getTimestamp() ? 0 : 1;
+  }
+
+  /**
+   * Reads the content of the rotate file by selecting the winner file based 
on the timestamp of the
+   * data inside the files. It reads the content of both files and selects the 
one with the latest
+   * timestamp as the winner. If a file is incomplete or does not exist, it 
logs the error and moves
+   * on to the next file. It returns the content of the winner file as a byte 
array. If none of the
+   * files have valid data, it returns null.
+   * @return a byte array containing the data from the winner file, or null if 
no valid data is
+   *         found.
+   * @throws IOException if an error occurs while reading the files.
+   */
+  public byte[] read() throws IOException {
+    HBaseProtos.RotateFileData[] datas = new HBaseProtos.RotateFileData[2];
+    for (int i = 0; i < 2; i++) {
+      try {
+        datas[i] = read(files[i]);
+      } catch (FileNotFoundException e) {
+        LOG.debug("file {} does not exist", files[i], e);
+      } catch (EOFException e) {
+        LOG.debug("file {} is incomplete", files[i], e);
+      }
+    }
+    int winnerIndex = select(datas);
+    nextFile = 1 - winnerIndex;
+    if (datas[winnerIndex] != null) {
+      prevTimestamp = datas[winnerIndex].getTimestamp();
+      return datas[winnerIndex].getData().toByteArray();
+    } else {
+      return null;
+    }
+  }
+
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/RotateFile.java|.*/src/test/.*")
+  static void write(FileSystem fs, Path file, long timestamp, byte[] data) 
throws IOException {
+    HBaseProtos.RotateFileData proto = HBaseProtos.RotateFileData.newBuilder()
+      .setTimestamp(timestamp).setData(ByteString.copyFrom(data)).build();
+    byte[] protoData = proto.toByteArray();
+    CRC32 crc32 = new CRC32();
+    crc32.update(protoData);
+    int checksum = (int) crc32.getValue();
+    // 4 bytes length, 8 bytes timestamp, 4 bytes checksum at the end
+    try (FSDataOutputStream out = fs.create(file, true)) {
+      out.writeInt(protoData.length);
+      out.write(protoData);
+      out.writeInt(checksum);
+    }
+  }
+
+  /**
+   * Writes the given data to the next file in the rotation, with a timestamp 
calculated based on
+   * the previous timestamp and the current time to make sure it is greater 
than the previous
+   * timestamp. The method also deletes the previous file, which is no longer 
needed.
+   * <p/>
+   * Notice that, for a newly created {@link RotateFile} instance, you need to 
call {@link #read()}
+   * first to initialize the nextFile index, before calling this method.
+   * @param data the data to be written to the file
+   * @throws IOException if an I/O error occurs while writing the data to the 
file
+   */
+  public void write(byte[] data) throws IOException {
+    if (data.length > maxFileSize) {
+      throw new IOException(
+        "Data size " + data.length + " is greater than max allowed size " + 
maxFileSize);
+    }
+    long timestamp = Math.max(prevTimestamp + 1, 
EnvironmentEdgeManager.currentTime());
+    write(fs, files[nextFile], timestamp, data);
+    prevTimestamp = timestamp;
+    nextFile = 1 - nextFile;
+    try {
+      fs.delete(files[nextFile], false);
+    } catch (IOException e) {
+      // we will create new file with overwrite = true, so not a big deal 
here, only for speed up
+      // loading as we do not need to read this file when loading
+      LOG.debug("Failed to delete old file {}, ignoring the exception", 
files[nextFile], e);
+    }
+  }
+
+  /**
+   * Deletes the two files used for rotating data. If any of the files cannot 
be deleted, an
+   * IOException is thrown.
+   * @throws IOException if there is an error deleting either file
+   */
+  public void delete() throws IOException {
+    Path next = files[nextFile];
+    // delete next file first, and then the current file, so when failing to 
delete, we can still
+    // read the correct data
+    if (fs.exists(next) && !fs.delete(next, false)) {
+      throw new IOException("Can not delete " + next);
+    }
+    Path current = files[1 - nextFile];
+    if (fs.exists(current) && !fs.delete(current, false)) {
+      throw new IOException("Can not delete " + current);
+    }
+  }
+}
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java
index fb74df39473..be516cc317c 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java
@@ -17,13 +17,24 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.BuilderStyleTest;
@@ -43,6 +54,8 @@ public class TestReplicationPeerConfig {
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestReplicationPeerConfig.class);
 
+  private static final Configuration CONF = HBaseConfiguration.create();
+
   private static final String NAMESPACE_REPLICATE = "replicate";
   private static final String NAMESPACE_OTHER = "other";
   private static final TableName TABLE_A = 
TableName.valueOf(NAMESPACE_REPLICATE, "testA");
@@ -246,4 +259,124 @@ public class TestReplicationPeerConfig {
     assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY1));
     assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY2));
   }
+
+  private static final Random RNG = new Random(); // Seed may be set with 
Random#setSeed
+
+  private Set<String> randNamespaces(Random rand) {
+    return Stream.generate(() -> 
Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
+      .collect(toSet());
+  }
+
+  private Map<TableName, List<String>> randTableCFs(Random rand) {
+    int size = rand.nextInt(5);
+    Map<TableName, List<String>> map = new HashMap<>();
+    for (int i = 0; i < size; i++) {
+      TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
+      List<String> cfs = Stream.generate(() -> 
Long.toHexString(rand.nextLong()))
+        .limit(rand.nextInt(5)).collect(toList());
+      map.put(tn, cfs);
+    }
+    return map;
+  }
+
+  private ReplicationPeerConfig getConfig(int seed) {
+    RNG.setSeed(seed);
+    return 
ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong()))
+      .setReplicationEndpointImpl(Long.toHexString(RNG.nextLong()))
+      
.setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG))
+      
.setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG))
+      
.setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean())
+      .setBandwidth(RNG.nextInt(1000)).build();
+  }
+
+  @Test
+  public void testBaseReplicationPeerConfig() throws ReplicationException {
+    String customPeerConfigKey = "hbase.xxx.custom_config";
+    String customPeerConfigValue = "test";
+    String customPeerConfigUpdatedValue = "testUpdated";
+
+    String customPeerConfigSecondKey = "hbase.xxx.custom_second_config";
+    String customPeerConfigSecondValue = "testSecond";
+    String customPeerConfigSecondUpdatedValue = "testSecondUpdated";
+
+    ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
+
+    // custom config not present
+    
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey),
 null);
+
+    Configuration conf = new Configuration(CONF);
+    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
+      customPeerConfigKey.concat("=").concat(customPeerConfigValue).concat(";")
+        
.concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondValue));
+
+    ReplicationPeerConfig updatedReplicationPeerConfig = 
ReplicationPeerConfigUtil
+      .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
+
+    // validates base configs are present in replicationPeerConfig
+    assertEquals(customPeerConfigValue,
+      
updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
+    assertEquals(customPeerConfigSecondValue,
+      
updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigSecondKey));
+
+    // validates base configs get updated values even if config already present
+    conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
+    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
+      
customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";")
+        
.concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue));
+
+    ReplicationPeerConfig replicationPeerConfigAfterValueUpdate = 
ReplicationPeerConfigUtil
+      .updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig);
+
+    assertEquals(customPeerConfigUpdatedValue,
+      
replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigKey));
+    assertEquals(customPeerConfigSecondUpdatedValue,
+      
replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigSecondKey));
+  }
+
+  @Test
+  public void testBaseReplicationRemovePeerConfig() throws 
ReplicationException {
+    String customPeerConfigKey = "hbase.xxx.custom_config";
+    String customPeerConfigValue = "test";
+    ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
+
+    // custom config not present
+    
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey),
 null);
+
+    Configuration conf = new Configuration(CONF);
+    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
+      customPeerConfigKey.concat("=").concat(customPeerConfigValue));
+
+    ReplicationPeerConfig updatedReplicationPeerConfig = 
ReplicationPeerConfigUtil
+      .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
+
+    // validates base configs are present in replicationPeerConfig
+    assertEquals(customPeerConfigValue,
+      
updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
+
+    conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
+    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
+      customPeerConfigKey.concat("=").concat(""));
+
+    ReplicationPeerConfig replicationPeerConfigRemoved = 
ReplicationPeerConfigUtil
+      .updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig);
+
+    
assertNull(replicationPeerConfigRemoved.getConfiguration().get(customPeerConfigKey));
+  }
+
+  @Test
+  public void testBaseReplicationRemovePeerConfigWithNoExistingConfig()
+    throws ReplicationException {
+    String customPeerConfigKey = "hbase.xxx.custom_config";
+    ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
+
+    // custom config not present
+    
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey),
 null);
+    Configuration conf = new Configuration(CONF);
+    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
+      customPeerConfigKey.concat("=").concat(""));
+
+    ReplicationPeerConfig updatedReplicationPeerConfig = 
ReplicationPeerConfigUtil
+      .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
+    
assertNull(updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
+  }
 }
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestRotateFile.java 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestRotateFile.java
new file mode 100644
index 00000000000..c229290e8e5
--- /dev/null
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestRotateFile.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
+
+@Category({ MiscTests.class, SmallTests.class })
+public class TestRotateFile {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRotateFile.class);
+
+  private static HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();
+
+  private static FileSystem FS;
+
+  private Path dir;
+
+  private RotateFile rotateFile;
+
+  @Rule
+  public final TestName name = new TestName();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws IOException {
+    FS = FileSystem.get(UTIL.getConfiguration());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() {
+    UTIL.cleanupTestDir();
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    dir = UTIL.getDataTestDir(name.getMethodName());
+    if (!FS.mkdirs(dir)) {
+      throw new IOException("Can not create dir " + dir);
+    }
+    rotateFile = new RotateFile(FS, dir, name.getMethodName(), 1024);
+    assertNull(rotateFile.read());
+  }
+
+  @Test
+  public void testSimpleReadWrite() throws IOException {
+    for (int i = 0; i < 10; i++) {
+      rotateFile.write(Bytes.toBytes(i));
+      assertEquals(i, Bytes.toInt(rotateFile.read()));
+    }
+    rotateFile.delete();
+    assertNull(rotateFile.read());
+  }
+
+  @Test
+  public void testCompareTimestamp() throws IOException {
+    long now = EnvironmentEdgeManager.currentTime();
+    rotateFile.write(Bytes.toBytes(10));
+    Path file = FS.listStatus(dir)[0].getPath();
+    rotateFile.write(Bytes.toBytes(100));
+
+    // put a fake file with a less timestamp there
+    RotateFile.write(FS, file, now - 1, Bytes.toBytes(10));
+    assertEquals(100, Bytes.toInt(rotateFile.read()));
+
+    // put a fake file with a greater timestamp there
+    RotateFile.write(FS, file, EnvironmentEdgeManager.currentTime() + 100, 
Bytes.toBytes(10));
+    assertEquals(10, Bytes.toInt(rotateFile.read()));
+  }
+
+  @Test
+  public void testMaxFileSize() throws IOException {
+    assertThrows(IOException.class, () -> rotateFile.write(new byte[1025]));
+    // put a file greater than max file size
+    rotateFile.write(Bytes.toBytes(10));
+    Path file = FS.listStatus(dir)[0].getPath();
+    RotateFile.write(FS, file, EnvironmentEdgeManager.currentTime(), new 
byte[1025]);
+    assertThrows(IOException.class, () -> rotateFile.read());
+  }
+
+  @Test
+  public void testNotEnoughData() throws IOException {
+    rotateFile.write(Bytes.toBytes(10));
+    assertEquals(10, Bytes.toInt(rotateFile.read()));
+    // remove the last byte
+    Path file = FS.listStatus(dir)[0].getPath();
+    byte[] data;
+    try (FSDataInputStream in = FS.open(file)) {
+      data = ByteStreams.toByteArray(in);
+    }
+    try (FSDataOutputStream out = FS.create(file, true)) {
+      out.write(data, 0, data.length - 1);
+    }
+    // should hit EOF so read nothing
+    assertNull(rotateFile.read());
+  }
+
+  @Test
+  public void testChecksumMismatch() throws IOException {
+    rotateFile.write(Bytes.toBytes(10));
+    assertEquals(10, Bytes.toInt(rotateFile.read()));
+    // mess up one byte
+    Path file = FS.listStatus(dir)[0].getPath();
+    byte[] data;
+    try (FSDataInputStream in = FS.open(file)) {
+      data = ByteStreams.toByteArray(in);
+    }
+    data[4]++;
+    try (FSDataOutputStream out = FS.create(file, true)) {
+      out.write(data, 0, data.length);
+    }
+    // should get checksum mismatch
+    IOException error = assertThrows(IOException.class, () -> 
rotateFile.read());
+    assertThat(error.getMessage(), startsWith("Checksum mismatch"));
+  }
+}
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 6a854b97239..1e268c1858b 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -347,7 +347,7 @@ public class VerifyReplication extends Configured 
implements Tool {
         }
       });
       ReplicationPeerStorage storage =
-        ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf);
+        
ReplicationStorageFactory.getReplicationPeerStorage(FileSystem.get(conf), 
localZKW, conf);
       ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId);
       return Pair.newPair(peerConfig,
         ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf));
diff --git a/hbase-protocol-shaded/src/main/protobuf/HBase.proto 
b/hbase-protocol-shaded/src/main/protobuf/HBase.proto
index e4dde33a96f..0fd3d667d4d 100644
--- a/hbase-protocol-shaded/src/main/protobuf/HBase.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/HBase.proto
@@ -284,3 +284,8 @@ message LogEntry {
   required string log_class_name = 1;
   required bytes log_message = 2;
 }
+
+message RotateFileData {
+  required int64 timestamp = 1;
+  required bytes data = 2;
+}
diff --git a/hbase-replication/pom.xml b/hbase-replication/pom.xml
index dad93578609..2527434e97b 100644
--- a/hbase-replication/pom.xml
+++ b/hbase-replication/pom.xml
@@ -103,6 +103,11 @@
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-library</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>jcl-over-slf4j</artifactId>
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/FSReplicationPeerStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/FSReplicationPeerStorage.java
new file mode 100644
index 00000000000..8bbe21c4a46
--- /dev/null
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/FSReplicationPeerStorage.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import com.google.errorprone.annotations.RestrictedApi;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.RotateFile;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A filesystem based replication peer storage. The implementation does not 
require atomic rename so
+ * you can use it on cloud OSS.
+ * <p/>
+ * FileSystem layout:
+ *
+ * <pre>
+ * hbase
+ *   |
+ *   --peers
+ *       |
+ *       --&lt;peer_id&gt;
+ *           |
+ *           --peer_config
+ *           |
+ *           --disabled
+ *           |
+ *           --sync-rep-state
+ * </pre>
+ *
+ * Notice that, if the peer is enabled, we will not have a disabled file.
+ * <p/>
+ * And for other files, to avoid depending on atomic rename, we will use two 
files for storing the
+ * content. When loading, we will try to read both the files and load the 
newer one. And when
+ * writing, we will write to the older file.
+ */
[email protected]
+public class FSReplicationPeerStorage implements ReplicationPeerStorage {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FSReplicationPeerStorage.class);
+
+  public static final String PEERS_DIR = "hbase.replication.peers.directory";
+
+  public static final String PEERS_DIR_DEFAULT = "peers";
+
+  static final String PEER_CONFIG_FILE = "peer_config";
+
+  static final String DISABLED_FILE = "disabled";
+
+  static final String SYNC_REPLICATION_STATE_FILE = "sync-rep-state";
+
+  static final byte[] NONE_STATE_BYTES =
+    SyncReplicationState.toByteArray(SyncReplicationState.NONE);
+
+  private final FileSystem fs;
+
+  private final Path dir;
+
+  public FSReplicationPeerStorage(FileSystem fs, Configuration conf) throws 
IOException {
+    this.fs = fs;
+    this.dir = new Path(CommonFSUtils.getRootDir(conf), conf.get(PEERS_DIR, 
PEERS_DIR_DEFAULT));
+  }
+
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/FSReplicationPeerStorage.java|.*/src/test/.*")
+  Path getPeerDir(String peerId) {
+    return new Path(dir, peerId);
+  }
+
+  @Override
+  public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean 
enabled,
+    SyncReplicationState syncReplicationState) throws ReplicationException {
+    Path peerDir = getPeerDir(peerId);
+    try {
+      if (fs.exists(peerDir)) {
+        // check whether this is a valid peer, if so we should fail the add 
peer operation
+        if (read(fs, peerDir, PEER_CONFIG_FILE) != null) {
+          throw new ReplicationException("Could not add peer with id=" + 
peerId + ", peerConfig=>"
+            + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED")
+            + ", syncReplicationState=" + syncReplicationState + ", peer 
already exists");
+        }
+      }
+      if (!enabled) {
+        fs.createNewFile(new Path(peerDir, DISABLED_FILE));
+      }
+      write(fs, peerDir, SYNC_REPLICATION_STATE_FILE,
+        SyncReplicationState.toByteArray(syncReplicationState, 
SyncReplicationState.NONE));
+      // write the peer config data at last, so when loading, if we can not 
load the peer_config, we
+      // know that this is not a valid peer
+      write(fs, peerDir, PEER_CONFIG_FILE, 
ReplicationPeerConfigUtil.toByteArray(peerConfig));
+    } catch (IOException e) {
+      throw new ReplicationException(
+        "Could not add peer with id=" + peerId + ", peerConfig=>" + peerConfig 
+ ", state="
+          + (enabled ? "ENABLED" : "DISABLED") + ", syncReplicationState=" + 
syncReplicationState,
+        e);
+    }
+  }
+
+  @Override
+  public void removePeer(String peerId) throws ReplicationException {
+    // delete the peer config first, and then delete the directory
+    // we will consider this is not a valid peer by reading the peer config 
file
+    Path peerDir = getPeerDir(peerId);
+    try {
+      delete(fs, peerDir, PEER_CONFIG_FILE);
+      if (!fs.delete(peerDir, true)) {
+        throw new IOException("Can not delete " + peerDir);
+      }
+    } catch (IOException e) {
+      throw new ReplicationException("Could not remove peer with id=" + 
peerId, e);
+    }
+  }
+
+  @Override
+  public void setPeerState(String peerId, boolean enabled) throws 
ReplicationException {
+    Path disabledFile = new Path(getPeerDir(peerId), DISABLED_FILE);
+    try {
+      if (enabled) {
+        if (fs.exists(disabledFile) && !fs.delete(disabledFile, false)) {
+          throw new IOException("Can not delete " + disabledFile);
+        }
+      } else {
+        if (!fs.exists(disabledFile) && !fs.createNewFile(disabledFile)) {
+          throw new IOException("Can not touch " + disabledFile);
+        }
+      }
+    } catch (IOException e) {
+      throw new ReplicationException(
+        "Unable to change state of the peer with id=" + peerId + " to " + 
enabled, e);
+    }
+  }
+
+  @Override
+  public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
+    throws ReplicationException {
+    Path peerDir = getPeerDir(peerId);
+    try {
+      write(fs, peerDir, PEER_CONFIG_FILE, 
ReplicationPeerConfigUtil.toByteArray(peerConfig));
+    } catch (IOException e) {
+      throw new ReplicationException(
+        "There was a problem trying to save changes to the " + "replication 
peer " + peerId, e);
+    }
+  }
+
+  @Override
+  public List<String> listPeerIds() throws ReplicationException {
+    try {
+      FileStatus[] statuses = fs.listStatus(dir);
+      if (statuses == null || statuses.length == 0) {
+        return Collections.emptyList();
+      }
+      List<String> peerIds = new ArrayList<>();
+      for (FileStatus status : statuses) {
+        String peerId = status.getPath().getName();
+        Path peerDir = getPeerDir(peerId);
+        // confirm that this is a valid peer
+        byte[] peerConfigData = read(fs, peerDir, PEER_CONFIG_FILE);
+        if (peerConfigData != null) {
+          peerIds.add(peerId);
+        }
+      }
+      return Collections.unmodifiableList(peerIds);
+    } catch (FileNotFoundException e) {
+      LOG.debug("Peer directory does not exist yet", e);
+      return Collections.emptyList();
+    } catch (IOException e) {
+      throw new ReplicationException("Cannot get the list of peers", e);
+    }
+  }
+
+  @Override
+  public boolean isPeerEnabled(String peerId) throws ReplicationException {
+    Path disabledFile = new Path(getPeerDir(peerId), DISABLED_FILE);
+    try {
+      return !fs.exists(disabledFile);
+    } catch (IOException e) {
+      throw new ReplicationException("Unable to get status of the peer with 
id=" + peerId, e);
+    }
+  }
+
+  @Override
+  public ReplicationPeerConfig getPeerConfig(String peerId) throws 
ReplicationException {
+    Path peerDir = getPeerDir(peerId);
+    byte[] data;
+    try {
+      data = read(fs, peerDir, PEER_CONFIG_FILE);
+    } catch (IOException e) {
+      throw new ReplicationException("Error getting configuration for peer 
with id=" + peerId, e);
+    }
+    if (data == null || data.length == 0) {
+      throw new ReplicationException(
+        "Replication peer config data shouldn't be empty, peerId=" + peerId);
+    }
+    try {
+      return ReplicationPeerConfigUtil.parsePeerFrom(data);
+    } catch (DeserializationException e) {
+      throw new ReplicationException(
+        "Failed to parse replication peer config for peer with id=" + peerId, 
e);
+    }
+  }
+
+  private Pair<SyncReplicationState, SyncReplicationState> 
getStateAndNewState(String peerId)
+    throws IOException {
+    Path peerDir = getPeerDir(peerId);
+    if (!fs.exists(peerDir)) {
+      throw new IOException("peer does not exists");
+    }
+    byte[] data = read(fs, peerDir, SYNC_REPLICATION_STATE_FILE);
+    if (data == null) {
+      // should be a peer from previous version, set the sync replication 
state for it.
+      write(fs, peerDir, SYNC_REPLICATION_STATE_FILE,
+        SyncReplicationState.toByteArray(SyncReplicationState.NONE, 
SyncReplicationState.NONE));
+      return Pair.newPair(SyncReplicationState.NONE, 
SyncReplicationState.NONE);
+    } else {
+      return SyncReplicationState.parseStateAndNewStateFrom(data);
+    }
+  }
+
+  @Override
+  public void setPeerNewSyncReplicationState(String peerId, 
SyncReplicationState newState)
+    throws ReplicationException {
+    Path peerDir = getPeerDir(peerId);
+    try {
+      Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
+        getStateAndNewState(peerId);
+      write(fs, peerDir, SYNC_REPLICATION_STATE_FILE,
+        SyncReplicationState.toByteArray(stateAndNewState.getFirst(), 
newState));
+    } catch (IOException e) {
+      throw new ReplicationException(
+        "Unable to set the new sync replication state for peer with id=" + 
peerId + ", newState="
+          + newState,
+        e);
+    }
+  }
+
+  @Override
+  public void transitPeerSyncReplicationState(String peerId) throws 
ReplicationException {
+    Path peerDir = getPeerDir(peerId);
+    try {
+      Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
+        getStateAndNewState(peerId);
+      write(fs, peerDir, SYNC_REPLICATION_STATE_FILE,
+        SyncReplicationState.toByteArray(stateAndNewState.getSecond(), 
SyncReplicationState.NONE));
+    } catch (IOException e) {
+      throw new ReplicationException(
+        "Error transiting sync replication state for peer with id=" + peerId, 
e);
+    }
+  }
+
+  @Override
+  public SyncReplicationState getPeerSyncReplicationState(String peerId)
+    throws ReplicationException {
+    try {
+      return getStateAndNewState(peerId).getFirst();
+    } catch (IOException e) {
+      throw new ReplicationException(
+        "Error getting sync replication state for peer with id=" + peerId, e);
+    }
+  }
+
+  @Override
+  public SyncReplicationState getPeerNewSyncReplicationState(String peerId)
+    throws ReplicationException {
+    try {
+      return getStateAndNewState(peerId).getSecond();
+    } catch (IOException e) {
+      throw new ReplicationException(
+        "Error getting new sync replication state for peer with id=" + peerId, 
e);
+    }
+  }
+
+  // 16 MB is big enough for our usage here
+  private static final long MAX_FILE_SIZE = 16 * 1024 * 1024;
+
+  private static byte[] read(FileSystem fs, Path dir, String name) throws 
IOException {
+    RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE);
+    return file.read();
+  }
+
+  private static void write(FileSystem fs, Path dir, String name, byte[] data) 
throws IOException {
+    RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE);
+    // to initialize the nextFile index
+    file.read();
+    file.write(data);
+  }
+
+  private static void delete(FileSystem fs, Path dir, String name) throws 
IOException {
+    RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE);
+    // to initialize the nextFile index
+    file.read();
+    file.delete();
+  }
+}
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
index 6dba30a34c0..9047bbf29ca 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.replication;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -32,7 +33,8 @@ public final class ReplicationFactory {
   private ReplicationFactory() {
   }
 
-  public static ReplicationPeers getReplicationPeers(ZKWatcher zk, 
Configuration conf) {
-    return new ReplicationPeers(zk, conf);
+  public static ReplicationPeers getReplicationPeers(FileSystem fs, ZKWatcher 
zk,
+    Configuration conf) {
+    return new ReplicationPeers(fs, zk, conf);
   }
 }
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageType.java
similarity index 66%
copy from 
hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
copy to 
hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageType.java
index 6dba30a34c0..1b110c3a6a5 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageType.java
@@ -17,22 +17,24 @@
  */
 package org.apache.hadoop.hbase.replication;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
- * A factory class for instantiating replication objects that deal with 
replication state.
+ * Specify the implementations for {@link ReplicationPeerStorage}.
  */
 @InterfaceAudience.Private
-public final class ReplicationFactory {
+public enum ReplicationPeerStorageType {
 
-  public static final String REPLICATION_TRACKER_IMPL = 
"hbase.replication.tracker.impl";
+  FILESYSTEM(FSReplicationPeerStorage.class),
+  ZOOKEEPER(ZKReplicationPeerStorage.class);
 
-  private ReplicationFactory() {
+  private final Class<? extends ReplicationPeerStorage> clazz;
+
+  private ReplicationPeerStorageType(Class<? extends ReplicationPeerStorage> 
clazz) {
+    this.clazz = clazz;
   }
 
-  public static ReplicationPeers getReplicationPeers(ZKWatcher zk, 
Configuration conf) {
-    return new ReplicationPeers(zk, conf);
+  public Class<? extends ReplicationPeerStorage> getClazz() {
+    return clazz;
   }
 }
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index 70344c07bdc..a8f4a5efa54 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -23,6 +23,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -40,10 +41,10 @@ public class ReplicationPeers {
   private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
   private final ReplicationPeerStorage peerStorage;
 
-  ReplicationPeers(ZKWatcher zookeeper, Configuration conf) {
+  ReplicationPeers(FileSystem fs, ZKWatcher zookeeper, Configuration conf) {
     this.conf = conf;
     this.peerCache = new ConcurrentHashMap<>();
-    this.peerStorage = 
ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf);
+    this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, 
zookeeper, conf);
   }
 
   public Configuration getConf() {
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
index 1080b2125c7..0124dbdd113 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
@@ -17,26 +17,60 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import java.lang.reflect.Constructor;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * Used to create replication storage(peer, queue) classes.
- * <p>
- * For now we only have zk based implementation.
  */
 @InterfaceAudience.Private
 public final class ReplicationStorageFactory {
 
+  public static final String REPLICATION_PEER_STORAGE_IMPL = 
"hbase.replication.peer.storage.impl";
+
+  // must use zookeeper here, otherwise when user upgrading from an old 
version without changing the
+  // config file, they will loss all the replication peer data.
+  public static final ReplicationPeerStorageType 
DEFAULT_REPLICATION_PEER_STORAGE_IMPL =
+    ReplicationPeerStorageType.ZOOKEEPER;
+
   private ReplicationStorageFactory() {
   }
 
+  private static Class<? extends ReplicationPeerStorage>
+    getReplicationPeerStorageClass(Configuration conf) {
+    try {
+      ReplicationPeerStorageType type = ReplicationPeerStorageType.valueOf(
+        conf.get(REPLICATION_PEER_STORAGE_IMPL, 
DEFAULT_REPLICATION_PEER_STORAGE_IMPL.name())
+          .toUpperCase());
+      return type.getClazz();
+    } catch (IllegalArgumentException e) {
+      return conf.getClass(REPLICATION_PEER_STORAGE_IMPL,
+        DEFAULT_REPLICATION_PEER_STORAGE_IMPL.getClazz(), 
ReplicationPeerStorage.class);
+    }
+  }
+
   /**
    * Create a new {@link ReplicationPeerStorage}.
    */
-  public static ReplicationPeerStorage getReplicationPeerStorage(ZKWatcher zk, 
Configuration conf) {
-    return new ZKReplicationPeerStorage(zk, conf);
+  public static ReplicationPeerStorage getReplicationPeerStorage(FileSystem 
fs, ZKWatcher zk,
+    Configuration conf) {
+    Class<? extends ReplicationPeerStorage> clazz = 
getReplicationPeerStorageClass(conf);
+    for (Constructor<?> c : clazz.getConstructors()) {
+      if (c.getParameterCount() != 2) {
+        continue;
+      }
+      if (c.getParameterTypes()[0].isAssignableFrom(FileSystem.class)) {
+        return ReflectionUtils.newInstance(clazz, fs, conf);
+      } else if (c.getParameterTypes()[0].isAssignableFrom(ZKWatcher.class)) {
+        return ReflectionUtils.newInstance(clazz, zk, conf);
+      }
+    }
+    throw new IllegalArgumentException(
+      "Can not create replication peer storage with type " + clazz);
   }
 
   /**
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java
new file mode 100644
index 00000000000..018883290c1
--- /dev/null
+++ 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.hbase.TableName;
+import org.junit.Test;
+
+public abstract class ReplicationPeerStorageTestBase {
+
+  // Seed may be set with Random#setSeed
+  private static final Random RNG = new Random();
+
+  protected static ReplicationPeerStorage STORAGE;
+
+  private Set<String> randNamespaces(Random rand) {
+    return Stream.generate(() -> 
Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
+      .collect(toSet());
+  }
+
+  private Map<TableName, List<String>> randTableCFs(Random rand) {
+    int size = rand.nextInt(5);
+    Map<TableName, List<String>> map = new HashMap<>();
+    for (int i = 0; i < size; i++) {
+      TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
+      List<String> cfs = Stream.generate(() -> 
Long.toHexString(rand.nextLong()))
+        .limit(rand.nextInt(5)).collect(toList());
+      map.put(tn, cfs);
+    }
+    return map;
+  }
+
+  private ReplicationPeerConfig getConfig(int seed) {
+    RNG.setSeed(seed);
+    return 
ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong()))
+      .setReplicationEndpointImpl(Long.toHexString(RNG.nextLong()))
+      
.setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG))
+      
.setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG))
+      
.setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean())
+      .setBandwidth(RNG.nextInt(1000)).build();
+  }
+
+  private void assertSetEquals(Set<String> expected, Set<String> actual) {
+    if (expected == null || expected.size() == 0) {
+      assertTrue(actual == null || actual.size() == 0);
+      return;
+    }
+    assertEquals(expected.size(), actual.size());
+    expected.forEach(s -> assertTrue(actual.contains(s)));
+  }
+
+  private void assertMapEquals(Map<TableName, List<String>> expected,
+    Map<TableName, List<String>> actual) {
+    if (expected == null || expected.size() == 0) {
+      assertTrue(actual == null || actual.size() == 0);
+      return;
+    }
+    assertEquals(expected.size(), actual.size());
+    expected.forEach((expectedTn, expectedCFs) -> {
+      List<String> actualCFs = actual.get(expectedTn);
+      if (expectedCFs == null || expectedCFs.size() == 0) {
+        assertTrue(actual.containsKey(expectedTn));
+        assertTrue(actualCFs == null || actualCFs.size() == 0);
+      } else {
+        assertNotNull(actualCFs);
+        assertEquals(expectedCFs.size(), actualCFs.size());
+        for (Iterator<String> expectedIt = expectedCFs.iterator(),
+            actualIt = actualCFs.iterator(); expectedIt.hasNext();) {
+          assertEquals(expectedIt.next(), actualIt.next());
+        }
+      }
+    });
+  }
+
+  private void assertConfigEquals(ReplicationPeerConfig expected, 
ReplicationPeerConfig actual) {
+    assertEquals(expected.getClusterKey(), actual.getClusterKey());
+    assertEquals(expected.getReplicationEndpointImpl(), 
actual.getReplicationEndpointImpl());
+    assertSetEquals(expected.getNamespaces(), actual.getNamespaces());
+    assertSetEquals(expected.getExcludeNamespaces(), 
actual.getExcludeNamespaces());
+    assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap());
+    assertMapEquals(expected.getExcludeTableCFsMap(), 
actual.getExcludeTableCFsMap());
+    assertEquals(expected.replicateAllUserTables(), 
actual.replicateAllUserTables());
+    assertEquals(expected.getBandwidth(), actual.getBandwidth());
+  }
+
+  @Test
+  public void test() throws ReplicationException {
+    int peerCount = 10;
+    for (int i = 0; i < peerCount; i++) {
+      STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0,
+        SyncReplicationState.valueOf(i % 4));
+    }
+    List<String> peerIds = STORAGE.listPeerIds();
+    assertEquals(peerCount, peerIds.size());
+    for (String peerId : peerIds) {
+      int seed = Integer.parseInt(peerId);
+      assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId));
+    }
+    for (int i = 0; i < peerCount; i++) {
+      STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1));
+    }
+    for (String peerId : peerIds) {
+      int seed = Integer.parseInt(peerId);
+      assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId));
+    }
+    for (int i = 0; i < peerCount; i++) {
+      assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i)));
+    }
+    for (int i = 0; i < peerCount; i++) {
+      STORAGE.setPeerState(Integer.toString(i), i % 2 != 0);
+    }
+    for (int i = 0; i < peerCount; i++) {
+      assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i)));
+    }
+    for (int i = 0; i < peerCount; i++) {
+      assertEquals(SyncReplicationState.valueOf(i % 4),
+        STORAGE.getPeerSyncReplicationState(Integer.toString(i)));
+    }
+    String toRemove = Integer.toString(peerCount / 2);
+    STORAGE.removePeer(toRemove);
+    peerIds = STORAGE.listPeerIds();
+    assertEquals(peerCount - 1, peerIds.size());
+    assertFalse(peerIds.contains(toRemove));
+
+    try {
+      STORAGE.getPeerConfig(toRemove);
+      fail("Should throw a ReplicationException when getting peer config of a 
removed peer");
+    } catch (ReplicationException e) {
+    }
+  }
+
+  protected abstract void removePeerSyncRelicationState(String peerId) throws 
Exception;
+
+  protected abstract void assertPeerSyncReplicationStateCreate(String peerId) 
throws Exception;
+
+  @Test
+  public void testNoSyncReplicationState() throws Exception {
+    // This could happen for a peer created before we introduce sync 
replication.
+    String peerId = "testNoSyncReplicationState";
+    assertThrows("Should throw a ReplicationException when getting state of 
inexist peer",
+      ReplicationException.class, () -> 
STORAGE.getPeerSyncReplicationState(peerId));
+    assertThrows("Should throw a ReplicationException when getting state of 
inexist peer",
+      ReplicationException.class, () -> 
STORAGE.getPeerNewSyncReplicationState(peerId));
+
+    STORAGE.addPeer(peerId, getConfig(0), true, SyncReplicationState.NONE);
+    // delete the sync replication state node to simulate
+    removePeerSyncRelicationState(peerId);
+    // should not throw exception as the peer exists
+    assertEquals(SyncReplicationState.NONE, 
STORAGE.getPeerSyncReplicationState(peerId));
+    assertEquals(SyncReplicationState.NONE, 
STORAGE.getPeerNewSyncReplicationState(peerId));
+    // make sure we create the node for the old format peer
+    assertPeerSyncReplicationStateCreate(peerId);
+  }
+
+  protected abstract void assertPeerNameControlException(ReplicationException 
e);
+
+  @Test
+  public void testPeerNameControl() throws Exception {
+    String clusterKey = "key";
+    STORAGE.addPeer("6", 
ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), true,
+      SyncReplicationState.NONE);
+
+    try {
+      ReplicationException e = assertThrows(ReplicationException.class,
+        () -> STORAGE.addPeer("6",
+          
ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), true,
+          SyncReplicationState.NONE));
+      assertPeerNameControlException(e);
+    } finally {
+      // clean up
+      STORAGE.removePeer("6");
+    }
+  }
+}
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestFSReplicationPeerStorage.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestFSReplicationPeerStorage.java
new file mode 100644
index 00000000000..f99f529c9f0
--- /dev/null
+++ 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestFSReplicationPeerStorage.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.endsWith;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.RotateFile;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestFSReplicationPeerStorage extends 
ReplicationPeerStorageTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestFSReplicationPeerStorage.class);
+
+  private static final HBaseCommonTestingUtil UTIL = new 
HBaseCommonTestingUtil();
+
+  private static FileSystem FS;
+
+  private static Path DIR;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    DIR = UTIL.getDataTestDir("test_fs_peer_storage");
+    CommonFSUtils.setRootDir(UTIL.getConfiguration(), DIR);
+    FS = FileSystem.get(UTIL.getConfiguration());
+    STORAGE = new FSReplicationPeerStorage(FS, UTIL.getConfiguration());
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    UTIL.cleanupTestDir();
+  }
+
+  @Override
+  protected void removePeerSyncRelicationState(String peerId) throws Exception 
{
+    FSReplicationPeerStorage storage = (FSReplicationPeerStorage) STORAGE;
+    Path peerDir = storage.getPeerDir(peerId);
+    RotateFile file =
+      new RotateFile(FS, peerDir, 
FSReplicationPeerStorage.SYNC_REPLICATION_STATE_FILE, 1024);
+    file.read();
+    file.delete();
+  }
+
+  @Override
+  protected void assertPeerSyncReplicationStateCreate(String peerId) throws 
Exception {
+    FSReplicationPeerStorage storage = (FSReplicationPeerStorage) STORAGE;
+    Path peerDir = storage.getPeerDir(peerId);
+    RotateFile file =
+      new RotateFile(FS, peerDir, 
FSReplicationPeerStorage.SYNC_REPLICATION_STATE_FILE, 1024);
+    assertNotNull(file.read());
+  }
+
+  @Override
+  protected void assertPeerNameControlException(ReplicationException e) {
+    assertThat(e.getMessage(), endsWith("peer already exists"));
+  }
+}
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index 311e7e337f9..d2540987906 100644
--- 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication;
 
 import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.ClusterId;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseZKTestingUtil;
@@ -77,10 +78,11 @@ public class TestReplicationStateZKImpl extends 
TestReplicationStateBasic {
   }
 
   @Before
-  public void setUp() {
+  public void setUp() throws IOException {
     zkTimeoutCount = 0;
     rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
-    rp = ReplicationFactory.getReplicationPeers(zkw, conf);
+    rp =
+      
ReplicationFactory.getReplicationPeers(FileSystem.get(utility.getConfiguration()),
 zkw, conf);
     OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
   }
 
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
index 7bc47918992..12262461f74 100644
--- 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
+++ 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
@@ -17,52 +17,30 @@
  */
 package org.apache.hadoop.hbase.replication;
 
-import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toSet;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.stream.Stream;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseZKTestingUtil;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.zookeeper.KeeperException;
-import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 @Category({ ReplicationTests.class, MediumTests.class })
-public class TestZKReplicationPeerStorage {
+public class TestZKReplicationPeerStorage extends 
ReplicationPeerStorageTestBase {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestZKReplicationPeerStorage.class);
 
   private static final HBaseZKTestingUtil UTIL = new HBaseZKTestingUtil();
-  private static final Random RNG = new Random(); // Seed may be set with 
Random#setSeed
-  private static ZKReplicationPeerStorage STORAGE;
 
   @BeforeClass
   public static void setUp() throws Exception {
@@ -75,264 +53,24 @@ public class TestZKReplicationPeerStorage {
     UTIL.shutdownMiniZKCluster();
   }
 
-  @After
-  public void cleanCustomConfigurations() {
-    
UTIL.getConfiguration().unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
+  @Override
+  protected void removePeerSyncRelicationState(String peerId) throws Exception 
{
+    ZKReplicationPeerStorage storage = (ZKReplicationPeerStorage) STORAGE;
+    ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(), 
storage.getSyncReplicationStateNode(peerId));
+    ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(), 
storage.getNewSyncReplicationStateNode(peerId));
   }
 
-  private Set<String> randNamespaces(Random rand) {
-    return Stream.generate(() -> 
Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
-      .collect(toSet());
-  }
-
-  private Map<TableName, List<String>> randTableCFs(Random rand) {
-    int size = rand.nextInt(5);
-    Map<TableName, List<String>> map = new HashMap<>();
-    for (int i = 0; i < size; i++) {
-      TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
-      List<String> cfs = Stream.generate(() -> 
Long.toHexString(rand.nextLong()))
-        .limit(rand.nextInt(5)).collect(toList());
-      map.put(tn, cfs);
-    }
-    return map;
-  }
-
-  private ReplicationPeerConfig getConfig(int seed) {
-    RNG.setSeed(seed);
-    return 
ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong()))
-      .setReplicationEndpointImpl(Long.toHexString(RNG.nextLong()))
-      
.setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG))
-      
.setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG))
-      
.setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean())
-      .setBandwidth(RNG.nextInt(1000)).build();
-  }
-
-  private void assertSetEquals(Set<String> expected, Set<String> actual) {
-    if (expected == null || expected.size() == 0) {
-      assertTrue(actual == null || actual.size() == 0);
-      return;
-    }
-    assertEquals(expected.size(), actual.size());
-    expected.forEach(s -> assertTrue(actual.contains(s)));
-  }
-
-  private void assertMapEquals(Map<TableName, List<String>> expected,
-    Map<TableName, List<String>> actual) {
-    if (expected == null || expected.size() == 0) {
-      assertTrue(actual == null || actual.size() == 0);
-      return;
-    }
-    assertEquals(expected.size(), actual.size());
-    expected.forEach((expectedTn, expectedCFs) -> {
-      List<String> actualCFs = actual.get(expectedTn);
-      if (expectedCFs == null || expectedCFs.size() == 0) {
-        assertTrue(actual.containsKey(expectedTn));
-        assertTrue(actualCFs == null || actualCFs.size() == 0);
-      } else {
-        assertNotNull(actualCFs);
-        assertEquals(expectedCFs.size(), actualCFs.size());
-        for (Iterator<String> expectedIt = expectedCFs.iterator(),
-            actualIt = actualCFs.iterator(); expectedIt.hasNext();) {
-          assertEquals(expectedIt.next(), actualIt.next());
-        }
-      }
-    });
-  }
-
-  private void assertConfigEquals(ReplicationPeerConfig expected, 
ReplicationPeerConfig actual) {
-    assertEquals(expected.getClusterKey(), actual.getClusterKey());
-    assertEquals(expected.getReplicationEndpointImpl(), 
actual.getReplicationEndpointImpl());
-    assertSetEquals(expected.getNamespaces(), actual.getNamespaces());
-    assertSetEquals(expected.getExcludeNamespaces(), 
actual.getExcludeNamespaces());
-    assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap());
-    assertMapEquals(expected.getExcludeTableCFsMap(), 
actual.getExcludeTableCFsMap());
-    assertEquals(expected.replicateAllUserTables(), 
actual.replicateAllUserTables());
-    assertEquals(expected.getBandwidth(), actual.getBandwidth());
-  }
-
-  @Test
-  public void test() throws ReplicationException {
-    int peerCount = 10;
-    for (int i = 0; i < peerCount; i++) {
-      STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0,
-        SyncReplicationState.valueOf(i % 4));
-    }
-    List<String> peerIds = STORAGE.listPeerIds();
-    assertEquals(peerCount, peerIds.size());
-    for (String peerId : peerIds) {
-      int seed = Integer.parseInt(peerId);
-      assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId));
-    }
-    for (int i = 0; i < peerCount; i++) {
-      STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1));
-    }
-    for (String peerId : peerIds) {
-      int seed = Integer.parseInt(peerId);
-      assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId));
-    }
-    for (int i = 0; i < peerCount; i++) {
-      assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i)));
-    }
-    for (int i = 0; i < peerCount; i++) {
-      STORAGE.setPeerState(Integer.toString(i), i % 2 != 0);
-    }
-    for (int i = 0; i < peerCount; i++) {
-      assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i)));
-    }
-    for (int i = 0; i < peerCount; i++) {
-      assertEquals(SyncReplicationState.valueOf(i % 4),
-        STORAGE.getPeerSyncReplicationState(Integer.toString(i)));
-    }
-    String toRemove = Integer.toString(peerCount / 2);
-    STORAGE.removePeer(toRemove);
-    peerIds = STORAGE.listPeerIds();
-    assertEquals(peerCount - 1, peerIds.size());
-    assertFalse(peerIds.contains(toRemove));
-
-    try {
-      STORAGE.getPeerConfig(toRemove);
-      fail("Should throw a ReplicationException when getting peer config of a 
removed peer");
-    } catch (ReplicationException e) {
-    }
-  }
-
-  @Test
-  public void testNoSyncReplicationState()
-    throws ReplicationException, KeeperException, IOException {
-    // This could happen for a peer created before we introduce sync 
replication.
-    String peerId = "testNoSyncReplicationState";
-    try {
-      STORAGE.getPeerSyncReplicationState(peerId);
-      fail("Should throw a ReplicationException when getting state of inexist 
peer");
-    } catch (ReplicationException e) {
-      // expected
-    }
-    try {
-      STORAGE.getPeerNewSyncReplicationState(peerId);
-      fail("Should throw a ReplicationException when getting state of inexist 
peer");
-    } catch (ReplicationException e) {
-      // expected
-    }
-    STORAGE.addPeer(peerId, getConfig(0), true, SyncReplicationState.NONE);
-    // delete the sync replication state node to simulate
-    ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(), 
STORAGE.getSyncReplicationStateNode(peerId));
-    ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(), 
STORAGE.getNewSyncReplicationStateNode(peerId));
-    // should not throw exception as the peer exists
-    assertEquals(SyncReplicationState.NONE, 
STORAGE.getPeerSyncReplicationState(peerId));
-    assertEquals(SyncReplicationState.NONE, 
STORAGE.getPeerNewSyncReplicationState(peerId));
-    // make sure we create the node for the old format peer
+  @Override
+  protected void assertPeerSyncReplicationStateCreate(String peerId) throws 
Exception {
+    ZKReplicationPeerStorage storage = (ZKReplicationPeerStorage) STORAGE;
     assertNotEquals(-1,
-      ZKUtil.checkExists(UTIL.getZooKeeperWatcher(), 
STORAGE.getSyncReplicationStateNode(peerId)));
+      ZKUtil.checkExists(UTIL.getZooKeeperWatcher(), 
storage.getSyncReplicationStateNode(peerId)));
     assertNotEquals(-1, ZKUtil.checkExists(UTIL.getZooKeeperWatcher(),
-      STORAGE.getNewSyncReplicationStateNode(peerId)));
-  }
-
-  @Test
-  public void testBaseReplicationPeerConfig() throws ReplicationException {
-    String customPeerConfigKey = "hbase.xxx.custom_config";
-    String customPeerConfigValue = "test";
-    String customPeerConfigUpdatedValue = "testUpdated";
-
-    String customPeerConfigSecondKey = "hbase.xxx.custom_second_config";
-    String customPeerConfigSecondValue = "testSecond";
-    String customPeerConfigSecondUpdatedValue = "testSecondUpdated";
-
-    ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
-
-    // custom config not present
-    
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey),
 null);
-
-    Configuration conf = UTIL.getConfiguration();
-    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
-      customPeerConfigKey.concat("=").concat(customPeerConfigValue).concat(";")
-        
.concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondValue));
-
-    ReplicationPeerConfig updatedReplicationPeerConfig = 
ReplicationPeerConfigUtil
-      .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
-
-    // validates base configs are present in replicationPeerConfig
-    assertEquals(customPeerConfigValue,
-      
updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
-    assertEquals(customPeerConfigSecondValue,
-      
updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigSecondKey));
-
-    // validates base configs get updated values even if config already present
-    conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
-    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
-      
customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";")
-        
.concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue));
-
-    ReplicationPeerConfig replicationPeerConfigAfterValueUpdate = 
ReplicationPeerConfigUtil
-      .updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig);
-
-    assertEquals(customPeerConfigUpdatedValue,
-      
replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigKey));
-    assertEquals(customPeerConfigSecondUpdatedValue,
-      
replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigSecondKey));
-  }
-
-  @Test
-  public void testBaseReplicationRemovePeerConfig() throws 
ReplicationException {
-    String customPeerConfigKey = "hbase.xxx.custom_config";
-    String customPeerConfigValue = "test";
-    ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
-
-    // custom config not present
-    
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey),
 null);
-
-    Configuration conf = UTIL.getConfiguration();
-    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
-      customPeerConfigKey.concat("=").concat(customPeerConfigValue));
-
-    ReplicationPeerConfig updatedReplicationPeerConfig = 
ReplicationPeerConfigUtil
-      .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
-
-    // validates base configs are present in replicationPeerConfig
-    assertEquals(customPeerConfigValue,
-      
updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
-
-    conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
-    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
-      customPeerConfigKey.concat("=").concat(""));
-
-    ReplicationPeerConfig replicationPeerConfigRemoved = 
ReplicationPeerConfigUtil
-      .updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig);
-
-    
assertNull(replicationPeerConfigRemoved.getConfiguration().get(customPeerConfigKey));
+      storage.getNewSyncReplicationStateNode(peerId)));
   }
 
-  @Test
-  public void testBaseReplicationRemovePeerConfigWithNoExistingConfig()
-    throws ReplicationException {
-    String customPeerConfigKey = "hbase.xxx.custom_config";
-    ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
-
-    // custom config not present
-    
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey),
 null);
-    Configuration conf = UTIL.getConfiguration();
-    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
-      customPeerConfigKey.concat("=").concat(""));
-
-    ReplicationPeerConfig updatedReplicationPeerConfig = 
ReplicationPeerConfigUtil
-      .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
-    
assertNull(updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
-  }
-
-  @Test
-  public void testPeerNameControl() throws Exception {
-    String clusterKey = "key";
-    STORAGE.addPeer("6", 
ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), true,
-      SyncReplicationState.NONE);
-
-    try {
-      STORAGE.addPeer("6", 
ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(),
-        true, SyncReplicationState.NONE);
-      fail();
-    } catch (ReplicationException e) {
-      assertThat(e.getCause(), 
instanceOf(KeeperException.NodeExistsException.class));
-    } finally {
-      // clean up
-      STORAGE.removePeer("6");
-    }
+  @Override
+  protected void assertPeerNameControlException(ReplicationException e) {
+    assertThat(e.getCause(), 
instanceOf(KeeperException.NodeExistsException.class));
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 58a5d7cae47..d0fd34848a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -783,7 +783,8 @@ public class HMaster extends 
HBaseServerBase<MasterRpcServices> implements Maste
     }
     this.rsGroupInfoManager = RSGroupInfoManager.create(this);
 
-    this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, 
conf, clusterId);
+    this.replicationPeerManager =
+      ReplicationPeerManager.create(fileSystemManager.getFileSystem(), 
zooKeeper, conf, clusterId);
 
     this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, 
this.serverManager);
     this.drainingServerTracker.start();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 06cf559d492..30164f29671 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -33,6 +33,7 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -559,10 +560,10 @@ public class ReplicationPeerManager {
     return queueStorage;
   }
 
-  public static ReplicationPeerManager create(ZKWatcher zk, Configuration 
conf, String clusterId)
-    throws ReplicationException {
+  public static ReplicationPeerManager create(FileSystem fs, ZKWatcher zk, 
Configuration conf,
+    String clusterId) throws ReplicationException {
     ReplicationPeerStorage peerStorage =
-      ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
+      ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf);
     ConcurrentMap<String, ReplicationPeerDescription> peers = new 
ConcurrentHashMap<>();
     for (String peerId : peerStorage.listPeerIds()) {
       ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index ea28a20c56b..84b98ed3c93 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -96,8 +96,8 @@ public class Replication implements ReplicationSourceService {
     try {
       this.queueStorage =
         
ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), 
conf);
-      this.replicationPeers =
-        ReplicationFactory.getReplicationPeers(server.getZooKeeper(), 
this.conf);
+      this.replicationPeers = 
ReplicationFactory.getReplicationPeers(server.getFileSystem(),
+        server.getZooKeeper(), this.conf);
       this.replicationPeers.init();
     } catch (Exception e) {
       throw new IOException("Failed replication handler create", e);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index ab8d9fcde6a..7e10fd786a4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -2557,7 +2557,7 @@ public class HBaseFsck extends Configured implements 
Closeable {
     return hbi;
   }
 
-  private void checkAndFixReplication() throws ReplicationException {
+  private void checkAndFixReplication() throws ReplicationException, 
IOException {
     ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, 
errors);
     checker.checkUnDeletedQueues();
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
index 234daef85b5..7e7a46573b8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.util.hbck;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -24,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
@@ -53,8 +55,10 @@ public class ReplicationChecker {
   private final ReplicationPeerStorage peerStorage;
   private final ReplicationQueueStorage queueStorage;
 
-  public ReplicationChecker(Configuration conf, ZKWatcher zkw, 
HbckErrorReporter errorReporter) {
-    this.peerStorage = 
ReplicationStorageFactory.getReplicationPeerStorage(zkw, conf);
+  public ReplicationChecker(Configuration conf, ZKWatcher zkw, 
HbckErrorReporter errorReporter)
+    throws IOException {
+    this.peerStorage =
+      
ReplicationStorageFactory.getReplicationPeerStorage(FileSystem.get(conf), zkw, 
conf);
     this.queueStorage = 
ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
     this.errorReporter = errorReporter;
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index 899d3eb4722..87d21e583dd 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -91,7 +91,8 @@ public class TestReplicationHFileCleaner {
     server = new DummyServer();
     conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
     HMaster.decorateMasterConfiguration(conf);
-    rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf);
+    rp =
+      ReplicationFactory.getReplicationPeers(server.getFileSystem(), 
server.getZooKeeper(), conf);
     rp.init();
     rq = 
ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), 
conf);
     fs = FileSystem.get(conf);
@@ -236,7 +237,17 @@ public class TestReplicationHFileCleaner {
       try {
         return new ZKWatcher(getConfiguration(), "dummy server", this);
       } catch (IOException e) {
-        e.printStackTrace();
+        LOG.error("Can not get ZKWatcher", e);
+      }
+      return null;
+    }
+
+    @Override
+    public FileSystem getFileSystem() {
+      try {
+        return TEST_UTIL.getTestFileSystem();
+      } catch (IOException e) {
+        LOG.error("Can not get FileSystem", e);
       }
       return null;
     }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index e82d69826d8..7f5df02ecfc 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -232,8 +232,8 @@ public class SyncReplicationTestBase {
 
   protected final void verifyRemovedPeer(String peerId, Path remoteWALDir, 
HBaseTestingUtil utility)
     throws Exception {
-    ReplicationPeerStorage rps = ReplicationStorageFactory
-      .getReplicationPeerStorage(utility.getZooKeeperWatcher(), 
utility.getConfiguration());
+    ReplicationPeerStorage rps = 
ReplicationStorageFactory.getReplicationPeerStorage(
+      utility.getTestFileSystem(), utility.getZooKeeperWatcher(), 
utility.getConfiguration());
     try {
       rps.getPeerSyncReplicationState(peerId);
       fail("Should throw exception when get the sync replication state of a 
removed peer.");
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithFSPeerStorage.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithFSPeerStorage.java
new file mode 100644
index 00000000000..6f5c6c20d8d
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithFSPeerStorage.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestReplicationWithFSPeerStorage extends TestReplicationBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestReplicationWithFSPeerStorage.class);
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // enable file system based peer storage
+    
UTIL1.getConfiguration().set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
+      ReplicationPeerStorageType.FILESYSTEM.name().toLowerCase());
+    
UTIL2.getConfiguration().set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
+      ReplicationPeerStorageType.FILESYSTEM.name().toLowerCase());
+    TestReplicationBase.setUpBeforeClass();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    cleanUp();
+  }
+
+  /**
+   * Add a row, check it's replicated, delete it, check's gone
+   */
+  @Test
+  public void testSimplePutDelete() throws Exception {
+    runSimplePutDeleteTest();
+  }
+
+  /**
+   * Try a small batch upload using the write buffer, check it's replicated
+   */
+  @Test
+  public void testSmallBatch() throws Exception {
+    runSmallBatchTest();
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index dd989293ff5..c48dbc39a03 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -388,8 +388,8 @@ public abstract class TestReplicationSourceManager {
       rq.addWAL(server.getServerName(), "1", file);
     }
     Server s1 = new DummyServer("dummyserver1.example.org");
-    ReplicationPeers rp1 =
-      ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), 
s1.getConfiguration());
+    ReplicationPeers rp1 = 
ReplicationFactory.getReplicationPeers(s1.getFileSystem(),
+      s1.getZooKeeper(), s1.getConfiguration());
     rp1.init();
     manager.claimQueue(server.getServerName(), "1");
     assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
@@ -857,6 +857,11 @@ public abstract class TestReplicationSourceManager {
       return zkw;
     }
 
+    @Override
+    public FileSystem getFileSystem() {
+      return fs;
+    }
+
     @Override
     public Connection getConnection() {
       return null;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java
index dc8fb849633..e44e00d2d37 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java
@@ -61,8 +61,8 @@ public class TestHBaseFsckReplication {
 
   @Test
   public void test() throws Exception {
-    ReplicationPeerStorage peerStorage = ReplicationStorageFactory
-      .getReplicationPeerStorage(UTIL.getZooKeeperWatcher(), 
UTIL.getConfiguration());
+    ReplicationPeerStorage peerStorage = 
ReplicationStorageFactory.getReplicationPeerStorage(
+      UTIL.getTestFileSystem(), UTIL.getZooKeeperWatcher(), 
UTIL.getConfiguration());
     ReplicationQueueStorage queueStorage = ReplicationStorageFactory
       .getReplicationQueueStorage(UTIL.getZooKeeperWatcher(), 
UTIL.getConfiguration());
 


Reply via email to