This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new a71105997f2 HBASE-27727 Implement filesystem based Replication peer
storage (#5165)
a71105997f2 is described below
commit a71105997f2c08c74f3713925847cce3d44134bc
Author: Duo Zhang <[email protected]>
AuthorDate: Thu Apr 13 18:58:02 2023 +0800
HBASE-27727 Implement filesystem based Replication peer storage (#5165)
Signed-off-by: Liangjun He <[email protected]>
---
.../hbase/replication/SyncReplicationState.java | 30 ++
.../org/apache/hadoop/hbase/util/RotateFile.java | 197 +++++++++++++
.../replication/TestReplicationPeerConfig.java | 133 +++++++++
.../apache/hadoop/hbase/util/TestRotateFile.java | 155 ++++++++++
.../mapreduce/replication/VerifyReplication.java | 2 +-
.../src/main/protobuf/HBase.proto | 5 +
hbase-replication/pom.xml | 5 +
.../replication/FSReplicationPeerStorage.java | 321 +++++++++++++++++++++
.../hbase/replication/ReplicationFactory.java | 6 +-
...actory.java => ReplicationPeerStorageType.java} | 18 +-
.../hadoop/hbase/replication/ReplicationPeers.java | 5 +-
.../replication/ReplicationStorageFactory.java | 42 ++-
.../ReplicationPeerStorageTestBase.java | 204 +++++++++++++
.../replication/TestFSReplicationPeerStorage.java | 87 ++++++
.../replication/TestReplicationStateZKImpl.java | 6 +-
.../replication/TestZKReplicationPeerStorage.java | 290 +------------------
.../org/apache/hadoop/hbase/master/HMaster.java | 3 +-
.../master/replication/ReplicationPeerManager.java | 7 +-
.../replication/regionserver/Replication.java | 4 +-
.../org/apache/hadoop/hbase/util/HBaseFsck.java | 2 +-
.../hadoop/hbase/util/hbck/ReplicationChecker.java | 8 +-
.../cleaner/TestReplicationHFileCleaner.java | 15 +-
.../hbase/replication/SyncReplicationTestBase.java | 4 +-
.../TestReplicationWithFSPeerStorage.java | 66 +++++
.../regionserver/TestReplicationSourceManager.java | 9 +-
.../hbase/util/TestHBaseFsckReplication.java | 4 +-
26 files changed, 1316 insertions(+), 312 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java
index 39bbb20433b..9b7e2126765 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java
@@ -17,10 +17,16 @@
*/
package org.apache.hadoop.hbase.replication;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
+import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
import
org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -74,4 +80,28 @@ public enum SyncReplicationState {
return
ReplicationPeerConfigUtil.toSyncReplicationState(ReplicationProtos.SyncReplicationState
.parseFrom(Arrays.copyOfRange(bytes, ProtobufUtil.lengthOfPBMagic(),
bytes.length)));
}
+
+ public static byte[] toByteArray(SyncReplicationState state,
SyncReplicationState newState) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try {
+ out.write(ProtobufMagic.PB_MAGIC);
+
ReplicationPeerConfigUtil.toSyncReplicationState(state).writeDelimitedTo(out);
+
ReplicationPeerConfigUtil.toSyncReplicationState(newState).writeDelimitedTo(out);
+ } catch (IOException e) {
+ // should not happen, all in memory operations
+ throw new AssertionError(e);
+ }
+ return out.toByteArray();
+ }
+
+ public static Pair<SyncReplicationState, SyncReplicationState>
+ parseStateAndNewStateFrom(byte[] bytes) throws IOException {
+ ByteArrayInputStream in = new ByteArrayInputStream(bytes);
+ ByteStreams.skipFully(in, ProtobufMagic.lengthOfPBMagic());
+ SyncReplicationState state = ReplicationPeerConfigUtil
+
.toSyncReplicationState(ReplicationProtos.SyncReplicationState.parseDelimitedFrom(in));
+ SyncReplicationState newState = ReplicationPeerConfigUtil
+
.toSyncReplicationState(ReplicationProtos.SyncReplicationState.parseDelimitedFrom(in));
+ return Pair.newPair(state, newState);
+ }
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/RotateFile.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/RotateFile.java
new file mode 100644
index 00000000000..7256700e9cc
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/RotateFile.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import com.google.errorprone.annotations.RestrictedApi;
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.zip.CRC32;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+
+/**
+ * A file storage which supports atomic update through two files, i.e,
rotating. The implementation
+ * does not require atomic rename.
+ */
[email protected]
+public class RotateFile {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RotateFile.class);
+
+ private final FileSystem fs;
+
+ private final long maxFileSize;
+
+ private final Path[] files = new Path[2];
+
+ // this is used to make sure that we do not go backwards
+ private long prevTimestamp = -1;
+
+ private int nextFile = -1;
+
+ /**
+ * Constructs a new RotateFile object with the given parameters.
+ * @param fs the file system to use.
+ * @param dir the directory where the files will be created.
+ * @param name the base name for the files.
+ * @param maxFileSize the maximum size of each file.
+ */
+ public RotateFile(FileSystem fs, Path dir, String name, long maxFileSize) {
+ this.fs = fs;
+ this.maxFileSize = maxFileSize;
+ this.files[0] = new Path(dir, name + "-0");
+ this.files[1] = new Path(dir, name + "-1");
+ }
+
+ private HBaseProtos.RotateFileData read(Path path) throws IOException {
+ byte[] data;
+ int expectedChecksum;
+ try (FSDataInputStream in = fs.open(path)) {
+ int length = in.readInt();
+ if (length <= 0 || length > maxFileSize) {
+ throw new IOException("Invalid file length " + length
+ + ", either less than 0 or greater then max allowed size " +
maxFileSize);
+ }
+ data = new byte[length];
+ in.readFully(data);
+ expectedChecksum = in.readInt();
+ }
+ CRC32 crc32 = new CRC32();
+ crc32.update(data);
+ int calculatedChecksum = (int) crc32.getValue();
+ if (expectedChecksum != calculatedChecksum) {
+ throw new IOException(
+ "Checksum mismatch, expected " + expectedChecksum + ", actual " +
calculatedChecksum);
+ }
+ return HBaseProtos.RotateFileData.parseFrom(data);
+ }
+
+ private int select(HBaseProtos.RotateFileData[] datas) {
+ if (datas[0] == null) {
+ return 1;
+ }
+ if (datas[1] == null) {
+ return 0;
+ }
+ return datas[0].getTimestamp() >= datas[1].getTimestamp() ? 0 : 1;
+ }
+
+ /**
+ * Reads the content of the rotate file by selecting the winner file based
on the timestamp of the
+ * data inside the files. It reads the content of both files and selects the
one with the latest
+ * timestamp as the winner. If a file is incomplete or does not exist, it
logs the error and moves
+ * on to the next file. It returns the content of the winner file as a byte
array. If none of the
+ * files have valid data, it returns null.
+ * @return a byte array containing the data from the winner file, or null if
no valid data is
+ * found.
+ * @throws IOException if an error occurs while reading the files.
+ */
+ public byte[] read() throws IOException {
+ HBaseProtos.RotateFileData[] datas = new HBaseProtos.RotateFileData[2];
+ for (int i = 0; i < 2; i++) {
+ try {
+ datas[i] = read(files[i]);
+ } catch (FileNotFoundException e) {
+ LOG.debug("file {} does not exist", files[i], e);
+ } catch (EOFException e) {
+ LOG.debug("file {} is incomplete", files[i], e);
+ }
+ }
+ int winnerIndex = select(datas);
+ nextFile = 1 - winnerIndex;
+ if (datas[winnerIndex] != null) {
+ prevTimestamp = datas[winnerIndex].getTimestamp();
+ return datas[winnerIndex].getData().toByteArray();
+ } else {
+ return null;
+ }
+ }
+
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/RotateFile.java|.*/src/test/.*")
+ static void write(FileSystem fs, Path file, long timestamp, byte[] data)
throws IOException {
+ HBaseProtos.RotateFileData proto = HBaseProtos.RotateFileData.newBuilder()
+ .setTimestamp(timestamp).setData(ByteString.copyFrom(data)).build();
+ byte[] protoData = proto.toByteArray();
+ CRC32 crc32 = new CRC32();
+ crc32.update(protoData);
+ int checksum = (int) crc32.getValue();
+ // 4 bytes length, 8 bytes timestamp, 4 bytes checksum at the end
+ try (FSDataOutputStream out = fs.create(file, true)) {
+ out.writeInt(protoData.length);
+ out.write(protoData);
+ out.writeInt(checksum);
+ }
+ }
+
+ /**
+ * Writes the given data to the next file in the rotation, with a timestamp
calculated based on
+ * the previous timestamp and the current time to make sure it is greater
than the previous
+ * timestamp. The method also deletes the previous file, which is no longer
needed.
+ * <p/>
+ * Notice that, for a newly created {@link RotateFile} instance, you need to
call {@link #read()}
+ * first to initialize the nextFile index, before calling this method.
+ * @param data the data to be written to the file
+ * @throws IOException if an I/O error occurs while writing the data to the
file
+ */
+ public void write(byte[] data) throws IOException {
+ if (data.length > maxFileSize) {
+ throw new IOException(
+ "Data size " + data.length + " is greater than max allowed size " +
maxFileSize);
+ }
+ long timestamp = Math.max(prevTimestamp + 1,
EnvironmentEdgeManager.currentTime());
+ write(fs, files[nextFile], timestamp, data);
+ prevTimestamp = timestamp;
+ nextFile = 1 - nextFile;
+ try {
+ fs.delete(files[nextFile], false);
+ } catch (IOException e) {
+ // we will create new file with overwrite = true, so not a big deal
here, only for speed up
+ // loading as we do not need to read this file when loading
+ LOG.debug("Failed to delete old file {}, ignoring the exception",
files[nextFile], e);
+ }
+ }
+
+ /**
+ * Deletes the two files used for rotating data. If any of the files cannot
be deleted, an
+ * IOException is thrown.
+ * @throws IOException if there is an error deleting either file
+ */
+ public void delete() throws IOException {
+ Path next = files[nextFile];
+ // delete next file first, and then the current file, so when failing to
delete, we can still
+ // read the correct data
+ if (fs.exists(next) && !fs.delete(next, false)) {
+ throw new IOException("Can not delete " + next);
+ }
+ Path current = files[1 - nextFile];
+ if (fs.exists(current) && !fs.delete(current, false)) {
+ throw new IOException("Can not delete " + current);
+ }
+ }
+}
diff --git
a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java
index fb74df39473..be516cc317c 100644
---
a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java
+++
b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java
@@ -17,13 +17,24 @@
*/
package org.apache.hadoop.hbase.replication;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.BuilderStyleTest;
@@ -43,6 +54,8 @@ public class TestReplicationPeerConfig {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationPeerConfig.class);
+ private static final Configuration CONF = HBaseConfiguration.create();
+
private static final String NAMESPACE_REPLICATE = "replicate";
private static final String NAMESPACE_OTHER = "other";
private static final TableName TABLE_A =
TableName.valueOf(NAMESPACE_REPLICATE, "testA");
@@ -246,4 +259,124 @@ public class TestReplicationPeerConfig {
assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY1));
assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY2));
}
+
+ private static final Random RNG = new Random(); // Seed may be set with
Random#setSeed
+
+ private Set<String> randNamespaces(Random rand) {
+ return Stream.generate(() ->
Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
+ .collect(toSet());
+ }
+
+ private Map<TableName, List<String>> randTableCFs(Random rand) {
+ int size = rand.nextInt(5);
+ Map<TableName, List<String>> map = new HashMap<>();
+ for (int i = 0; i < size; i++) {
+ TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
+ List<String> cfs = Stream.generate(() ->
Long.toHexString(rand.nextLong()))
+ .limit(rand.nextInt(5)).collect(toList());
+ map.put(tn, cfs);
+ }
+ return map;
+ }
+
+ private ReplicationPeerConfig getConfig(int seed) {
+ RNG.setSeed(seed);
+ return
ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong()))
+ .setReplicationEndpointImpl(Long.toHexString(RNG.nextLong()))
+
.setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG))
+
.setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG))
+
.setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean())
+ .setBandwidth(RNG.nextInt(1000)).build();
+ }
+
+ @Test
+ public void testBaseReplicationPeerConfig() throws ReplicationException {
+ String customPeerConfigKey = "hbase.xxx.custom_config";
+ String customPeerConfigValue = "test";
+ String customPeerConfigUpdatedValue = "testUpdated";
+
+ String customPeerConfigSecondKey = "hbase.xxx.custom_second_config";
+ String customPeerConfigSecondValue = "testSecond";
+ String customPeerConfigSecondUpdatedValue = "testSecondUpdated";
+
+ ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
+
+ // custom config not present
+
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey),
null);
+
+ Configuration conf = new Configuration(CONF);
+ conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
+ customPeerConfigKey.concat("=").concat(customPeerConfigValue).concat(";")
+
.concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondValue));
+
+ ReplicationPeerConfig updatedReplicationPeerConfig =
ReplicationPeerConfigUtil
+ .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
+
+ // validates base configs are present in replicationPeerConfig
+ assertEquals(customPeerConfigValue,
+
updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
+ assertEquals(customPeerConfigSecondValue,
+
updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigSecondKey));
+
+ // validates base configs get updated values even if config already present
+ conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
+ conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
+
customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";")
+
.concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue));
+
+ ReplicationPeerConfig replicationPeerConfigAfterValueUpdate =
ReplicationPeerConfigUtil
+ .updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig);
+
+ assertEquals(customPeerConfigUpdatedValue,
+
replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigKey));
+ assertEquals(customPeerConfigSecondUpdatedValue,
+
replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigSecondKey));
+ }
+
+ @Test
+ public void testBaseReplicationRemovePeerConfig() throws
ReplicationException {
+ String customPeerConfigKey = "hbase.xxx.custom_config";
+ String customPeerConfigValue = "test";
+ ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
+
+ // custom config not present
+
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey),
null);
+
+ Configuration conf = new Configuration(CONF);
+ conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
+ customPeerConfigKey.concat("=").concat(customPeerConfigValue));
+
+ ReplicationPeerConfig updatedReplicationPeerConfig =
ReplicationPeerConfigUtil
+ .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
+
+ // validates base configs are present in replicationPeerConfig
+ assertEquals(customPeerConfigValue,
+
updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
+
+ conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
+ conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
+ customPeerConfigKey.concat("=").concat(""));
+
+ ReplicationPeerConfig replicationPeerConfigRemoved =
ReplicationPeerConfigUtil
+ .updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig);
+
+
assertNull(replicationPeerConfigRemoved.getConfiguration().get(customPeerConfigKey));
+ }
+
+ @Test
+ public void testBaseReplicationRemovePeerConfigWithNoExistingConfig()
+ throws ReplicationException {
+ String customPeerConfigKey = "hbase.xxx.custom_config";
+ ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
+
+ // custom config not present
+
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey),
null);
+ Configuration conf = new Configuration(CONF);
+ conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
+ customPeerConfigKey.concat("=").concat(""));
+
+ ReplicationPeerConfig updatedReplicationPeerConfig =
ReplicationPeerConfigUtil
+ .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
+
assertNull(updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
+ }
}
diff --git
a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestRotateFile.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestRotateFile.java
new file mode 100644
index 00000000000..c229290e8e5
--- /dev/null
+++
b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestRotateFile.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
+
+@Category({ MiscTests.class, SmallTests.class })
+public class TestRotateFile {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRotateFile.class);
+
+ private static HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();
+
+ private static FileSystem FS;
+
+ private Path dir;
+
+ private RotateFile rotateFile;
+
+ @Rule
+ public final TestName name = new TestName();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws IOException {
+ FS = FileSystem.get(UTIL.getConfiguration());
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() {
+ UTIL.cleanupTestDir();
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ dir = UTIL.getDataTestDir(name.getMethodName());
+ if (!FS.mkdirs(dir)) {
+ throw new IOException("Can not create dir " + dir);
+ }
+ rotateFile = new RotateFile(FS, dir, name.getMethodName(), 1024);
+ assertNull(rotateFile.read());
+ }
+
+ @Test
+ public void testSimpleReadWrite() throws IOException {
+ for (int i = 0; i < 10; i++) {
+ rotateFile.write(Bytes.toBytes(i));
+ assertEquals(i, Bytes.toInt(rotateFile.read()));
+ }
+ rotateFile.delete();
+ assertNull(rotateFile.read());
+ }
+
+ @Test
+ public void testCompareTimestamp() throws IOException {
+ long now = EnvironmentEdgeManager.currentTime();
+ rotateFile.write(Bytes.toBytes(10));
+ Path file = FS.listStatus(dir)[0].getPath();
+ rotateFile.write(Bytes.toBytes(100));
+
+ // put a fake file with a less timestamp there
+ RotateFile.write(FS, file, now - 1, Bytes.toBytes(10));
+ assertEquals(100, Bytes.toInt(rotateFile.read()));
+
+ // put a fake file with a greater timestamp there
+ RotateFile.write(FS, file, EnvironmentEdgeManager.currentTime() + 100,
Bytes.toBytes(10));
+ assertEquals(10, Bytes.toInt(rotateFile.read()));
+ }
+
+ @Test
+ public void testMaxFileSize() throws IOException {
+ assertThrows(IOException.class, () -> rotateFile.write(new byte[1025]));
+ // put a file greater than max file size
+ rotateFile.write(Bytes.toBytes(10));
+ Path file = FS.listStatus(dir)[0].getPath();
+ RotateFile.write(FS, file, EnvironmentEdgeManager.currentTime(), new
byte[1025]);
+ assertThrows(IOException.class, () -> rotateFile.read());
+ }
+
+ @Test
+ public void testNotEnoughData() throws IOException {
+ rotateFile.write(Bytes.toBytes(10));
+ assertEquals(10, Bytes.toInt(rotateFile.read()));
+ // remove the last byte
+ Path file = FS.listStatus(dir)[0].getPath();
+ byte[] data;
+ try (FSDataInputStream in = FS.open(file)) {
+ data = ByteStreams.toByteArray(in);
+ }
+ try (FSDataOutputStream out = FS.create(file, true)) {
+ out.write(data, 0, data.length - 1);
+ }
+ // should hit EOF so read nothing
+ assertNull(rotateFile.read());
+ }
+
+ @Test
+ public void testChecksumMismatch() throws IOException {
+ rotateFile.write(Bytes.toBytes(10));
+ assertEquals(10, Bytes.toInt(rotateFile.read()));
+ // mess up one byte
+ Path file = FS.listStatus(dir)[0].getPath();
+ byte[] data;
+ try (FSDataInputStream in = FS.open(file)) {
+ data = ByteStreams.toByteArray(in);
+ }
+ data[4]++;
+ try (FSDataOutputStream out = FS.create(file, true)) {
+ out.write(data, 0, data.length);
+ }
+ // should get checksum mismatch
+ IOException error = assertThrows(IOException.class, () ->
rotateFile.read());
+ assertThat(error.getMessage(), startsWith("Checksum mismatch"));
+ }
+}
diff --git
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 6a854b97239..1e268c1858b 100644
---
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -347,7 +347,7 @@ public class VerifyReplication extends Configured
implements Tool {
}
});
ReplicationPeerStorage storage =
- ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf);
+
ReplicationStorageFactory.getReplicationPeerStorage(FileSystem.get(conf),
localZKW, conf);
ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId);
return Pair.newPair(peerConfig,
ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf));
diff --git a/hbase-protocol-shaded/src/main/protobuf/HBase.proto
b/hbase-protocol-shaded/src/main/protobuf/HBase.proto
index e4dde33a96f..0fd3d667d4d 100644
--- a/hbase-protocol-shaded/src/main/protobuf/HBase.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/HBase.proto
@@ -284,3 +284,8 @@ message LogEntry {
required string log_class_name = 1;
required bytes log_message = 2;
}
+
+message RotateFileData {
+ required int64 timestamp = 1;
+ required bytes data = 2;
+}
diff --git a/hbase-replication/pom.xml b/hbase-replication/pom.xml
index dad93578609..2527434e97b 100644
--- a/hbase-replication/pom.xml
+++ b/hbase-replication/pom.xml
@@ -103,6 +103,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/FSReplicationPeerStorage.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/FSReplicationPeerStorage.java
new file mode 100644
index 00000000000..8bbe21c4a46
--- /dev/null
+++
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/FSReplicationPeerStorage.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import com.google.errorprone.annotations.RestrictedApi;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.RotateFile;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A filesystem based replication peer storage. The implementation does not
require atomic rename so
+ * you can use it on cloud OSS.
+ * <p/>
+ * FileSystem layout:
+ *
+ * <pre>
+ * hbase
+ * |
+ * --peers
+ * |
+ * --<peer_id>
+ * |
+ * --peer_config
+ * |
+ * --disabled
+ * |
+ * --sync-rep-state
+ * </pre>
+ *
+ * Notice that, if the peer is enabled, we will not have a disabled file.
+ * <p/>
+ * And for other files, to avoid depending on atomic rename, we will use two
files for storing the
+ * content. When loading, we will try to read both the files and load the
newer one. And when
+ * writing, we will write to the older file.
+ */
[email protected]
+public class FSReplicationPeerStorage implements ReplicationPeerStorage {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FSReplicationPeerStorage.class);
+
+ public static final String PEERS_DIR = "hbase.replication.peers.directory";
+
+ public static final String PEERS_DIR_DEFAULT = "peers";
+
+ static final String PEER_CONFIG_FILE = "peer_config";
+
+ static final String DISABLED_FILE = "disabled";
+
+ static final String SYNC_REPLICATION_STATE_FILE = "sync-rep-state";
+
+ static final byte[] NONE_STATE_BYTES =
+ SyncReplicationState.toByteArray(SyncReplicationState.NONE);
+
+ private final FileSystem fs;
+
+ private final Path dir;
+
+ public FSReplicationPeerStorage(FileSystem fs, Configuration conf) throws
IOException {
+ this.fs = fs;
+ this.dir = new Path(CommonFSUtils.getRootDir(conf), conf.get(PEERS_DIR,
PEERS_DIR_DEFAULT));
+ }
+
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/FSReplicationPeerStorage.java|.*/src/test/.*")
+ Path getPeerDir(String peerId) {
+ return new Path(dir, peerId);
+ }
+
+ @Override
+ public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean
enabled,
+ SyncReplicationState syncReplicationState) throws ReplicationException {
+ Path peerDir = getPeerDir(peerId);
+ try {
+ if (fs.exists(peerDir)) {
+ // check whether this is a valid peer, if so we should fail the add
peer operation
+ if (read(fs, peerDir, PEER_CONFIG_FILE) != null) {
+ throw new ReplicationException("Could not add peer with id=" +
peerId + ", peerConfig=>"
+ + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED")
+ + ", syncReplicationState=" + syncReplicationState + ", peer
already exists");
+ }
+ }
+ if (!enabled) {
+ fs.createNewFile(new Path(peerDir, DISABLED_FILE));
+ }
+ write(fs, peerDir, SYNC_REPLICATION_STATE_FILE,
+ SyncReplicationState.toByteArray(syncReplicationState,
SyncReplicationState.NONE));
+ // write the peer config data at last, so when loading, if we can not
load the peer_config, we
+ // know that this is not a valid peer
+ write(fs, peerDir, PEER_CONFIG_FILE,
ReplicationPeerConfigUtil.toByteArray(peerConfig));
+ } catch (IOException e) {
+ throw new ReplicationException(
+ "Could not add peer with id=" + peerId + ", peerConfig=>" + peerConfig
+ ", state="
+ + (enabled ? "ENABLED" : "DISABLED") + ", syncReplicationState=" +
syncReplicationState,
+ e);
+ }
+ }
+
+ @Override
+ public void removePeer(String peerId) throws ReplicationException {
+ // delete the peer config first, and then delete the directory
+ // we will consider this is not a valid peer by reading the peer config
file
+ Path peerDir = getPeerDir(peerId);
+ try {
+ delete(fs, peerDir, PEER_CONFIG_FILE);
+ if (!fs.delete(peerDir, true)) {
+ throw new IOException("Can not delete " + peerDir);
+ }
+ } catch (IOException e) {
+ throw new ReplicationException("Could not remove peer with id=" +
peerId, e);
+ }
+ }
+
+ @Override
+ public void setPeerState(String peerId, boolean enabled) throws
ReplicationException {
+ Path disabledFile = new Path(getPeerDir(peerId), DISABLED_FILE);
+ try {
+ if (enabled) {
+ if (fs.exists(disabledFile) && !fs.delete(disabledFile, false)) {
+ throw new IOException("Can not delete " + disabledFile);
+ }
+ } else {
+ if (!fs.exists(disabledFile) && !fs.createNewFile(disabledFile)) {
+ throw new IOException("Can not touch " + disabledFile);
+ }
+ }
+ } catch (IOException e) {
+ throw new ReplicationException(
+ "Unable to change state of the peer with id=" + peerId + " to " +
enabled, e);
+ }
+ }
+
+ @Override
+ public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
+ throws ReplicationException {
+ Path peerDir = getPeerDir(peerId);
+ try {
+ write(fs, peerDir, PEER_CONFIG_FILE,
ReplicationPeerConfigUtil.toByteArray(peerConfig));
+ } catch (IOException e) {
+ throw new ReplicationException(
+ "There was a problem trying to save changes to the " + "replication
peer " + peerId, e);
+ }
+ }
+
+ @Override
+ public List<String> listPeerIds() throws ReplicationException {
+ try {
+ FileStatus[] statuses = fs.listStatus(dir);
+ if (statuses == null || statuses.length == 0) {
+ return Collections.emptyList();
+ }
+ List<String> peerIds = new ArrayList<>();
+ for (FileStatus status : statuses) {
+ String peerId = status.getPath().getName();
+ Path peerDir = getPeerDir(peerId);
+ // confirm that this is a valid peer
+ byte[] peerConfigData = read(fs, peerDir, PEER_CONFIG_FILE);
+ if (peerConfigData != null) {
+ peerIds.add(peerId);
+ }
+ }
+ return Collections.unmodifiableList(peerIds);
+ } catch (FileNotFoundException e) {
+ LOG.debug("Peer directory does not exist yet", e);
+ return Collections.emptyList();
+ } catch (IOException e) {
+ throw new ReplicationException("Cannot get the list of peers", e);
+ }
+ }
+
+ @Override
+ public boolean isPeerEnabled(String peerId) throws ReplicationException {
+ Path disabledFile = new Path(getPeerDir(peerId), DISABLED_FILE);
+ try {
+ return !fs.exists(disabledFile);
+ } catch (IOException e) {
+ throw new ReplicationException("Unable to get status of the peer with
id=" + peerId, e);
+ }
+ }
+
+ @Override
+ public ReplicationPeerConfig getPeerConfig(String peerId) throws
ReplicationException {
+ Path peerDir = getPeerDir(peerId);
+ byte[] data;
+ try {
+ data = read(fs, peerDir, PEER_CONFIG_FILE);
+ } catch (IOException e) {
+ throw new ReplicationException("Error getting configuration for peer
with id=" + peerId, e);
+ }
+ if (data == null || data.length == 0) {
+ throw new ReplicationException(
+ "Replication peer config data shouldn't be empty, peerId=" + peerId);
+ }
+ try {
+ return ReplicationPeerConfigUtil.parsePeerFrom(data);
+ } catch (DeserializationException e) {
+ throw new ReplicationException(
+ "Failed to parse replication peer config for peer with id=" + peerId,
e);
+ }
+ }
+
+ private Pair<SyncReplicationState, SyncReplicationState>
getStateAndNewState(String peerId)
+ throws IOException {
+ Path peerDir = getPeerDir(peerId);
+ if (!fs.exists(peerDir)) {
+ throw new IOException("peer does not exists");
+ }
+ byte[] data = read(fs, peerDir, SYNC_REPLICATION_STATE_FILE);
+ if (data == null) {
+ // should be a peer from previous version, set the sync replication
state for it.
+ write(fs, peerDir, SYNC_REPLICATION_STATE_FILE,
+ SyncReplicationState.toByteArray(SyncReplicationState.NONE,
SyncReplicationState.NONE));
+ return Pair.newPair(SyncReplicationState.NONE,
SyncReplicationState.NONE);
+ } else {
+ return SyncReplicationState.parseStateAndNewStateFrom(data);
+ }
+ }
+
+ @Override
+ public void setPeerNewSyncReplicationState(String peerId,
SyncReplicationState newState)
+ throws ReplicationException {
+ Path peerDir = getPeerDir(peerId);
+ try {
+ Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
+ getStateAndNewState(peerId);
+ write(fs, peerDir, SYNC_REPLICATION_STATE_FILE,
+ SyncReplicationState.toByteArray(stateAndNewState.getFirst(),
newState));
+ } catch (IOException e) {
+ throw new ReplicationException(
+ "Unable to set the new sync replication state for peer with id=" +
peerId + ", newState="
+ + newState,
+ e);
+ }
+ }
+
+ @Override
+ public void transitPeerSyncReplicationState(String peerId) throws
ReplicationException {
+ Path peerDir = getPeerDir(peerId);
+ try {
+ Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
+ getStateAndNewState(peerId);
+ write(fs, peerDir, SYNC_REPLICATION_STATE_FILE,
+ SyncReplicationState.toByteArray(stateAndNewState.getSecond(),
SyncReplicationState.NONE));
+ } catch (IOException e) {
+ throw new ReplicationException(
+ "Error transiting sync replication state for peer with id=" + peerId,
e);
+ }
+ }
+
+ @Override
+ public SyncReplicationState getPeerSyncReplicationState(String peerId)
+ throws ReplicationException {
+ try {
+ return getStateAndNewState(peerId).getFirst();
+ } catch (IOException e) {
+ throw new ReplicationException(
+ "Error getting sync replication state for peer with id=" + peerId, e);
+ }
+ }
+
+ @Override
+ public SyncReplicationState getPeerNewSyncReplicationState(String peerId)
+ throws ReplicationException {
+ try {
+ return getStateAndNewState(peerId).getSecond();
+ } catch (IOException e) {
+ throw new ReplicationException(
+ "Error getting new sync replication state for peer with id=" + peerId,
e);
+ }
+ }
+
+ // 16 MB is big enough for our usage here
+ private static final long MAX_FILE_SIZE = 16 * 1024 * 1024;
+
+ private static byte[] read(FileSystem fs, Path dir, String name) throws
IOException {
+ RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE);
+ return file.read();
+ }
+
+ private static void write(FileSystem fs, Path dir, String name, byte[] data)
throws IOException {
+ RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE);
+ // to initialize the nextFile index
+ file.read();
+ file.write(data);
+ }
+
+ private static void delete(FileSystem fs, Path dir, String name) throws
IOException {
+ RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE);
+ // to initialize the nextFile index
+ file.read();
+ file.delete();
+ }
+}
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
index 6dba30a34c0..9047bbf29ca 100644
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@@ -32,7 +33,8 @@ public final class ReplicationFactory {
private ReplicationFactory() {
}
- public static ReplicationPeers getReplicationPeers(ZKWatcher zk,
Configuration conf) {
- return new ReplicationPeers(zk, conf);
+ public static ReplicationPeers getReplicationPeers(FileSystem fs, ZKWatcher
zk,
+ Configuration conf) {
+ return new ReplicationPeers(fs, zk, conf);
}
}
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageType.java
similarity index 66%
copy from
hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
copy to
hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageType.java
index 6dba30a34c0..1b110c3a6a5 100644
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageType.java
@@ -17,22 +17,24 @@
*/
package org.apache.hadoop.hbase.replication;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
/**
- * A factory class for instantiating replication objects that deal with
replication state.
+ * Specify the implementations for {@link ReplicationPeerStorage}.
*/
@InterfaceAudience.Private
-public final class ReplicationFactory {
+public enum ReplicationPeerStorageType {
- public static final String REPLICATION_TRACKER_IMPL =
"hbase.replication.tracker.impl";
+ FILESYSTEM(FSReplicationPeerStorage.class),
+ ZOOKEEPER(ZKReplicationPeerStorage.class);
- private ReplicationFactory() {
+ private final Class<? extends ReplicationPeerStorage> clazz;
+
+ private ReplicationPeerStorageType(Class<? extends ReplicationPeerStorage>
clazz) {
+ this.clazz = clazz;
}
- public static ReplicationPeers getReplicationPeers(ZKWatcher zk,
Configuration conf) {
- return new ReplicationPeers(zk, conf);
+ public Class<? extends ReplicationPeerStorage> getClazz() {
+ return clazz;
}
}
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index 70344c07bdc..a8f4a5efa54 100644
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -23,6 +23,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@@ -40,10 +41,10 @@ public class ReplicationPeers {
private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
private final ReplicationPeerStorage peerStorage;
- ReplicationPeers(ZKWatcher zookeeper, Configuration conf) {
+ ReplicationPeers(FileSystem fs, ZKWatcher zookeeper, Configuration conf) {
this.conf = conf;
this.peerCache = new ConcurrentHashMap<>();
- this.peerStorage =
ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf);
+ this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs,
zookeeper, conf);
}
public Configuration getConf() {
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
index 1080b2125c7..0124dbdd113 100644
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
+++
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
@@ -17,26 +17,60 @@
*/
package org.apache.hadoop.hbase.replication;
+import java.lang.reflect.Constructor;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Used to create replication storage(peer, queue) classes.
- * <p>
- * For now we only have zk based implementation.
*/
@InterfaceAudience.Private
public final class ReplicationStorageFactory {
+ public static final String REPLICATION_PEER_STORAGE_IMPL =
"hbase.replication.peer.storage.impl";
+
+ // must use zookeeper here, otherwise when user upgrading from an old
version without changing the
+ // config file, they will loss all the replication peer data.
+ public static final ReplicationPeerStorageType
DEFAULT_REPLICATION_PEER_STORAGE_IMPL =
+ ReplicationPeerStorageType.ZOOKEEPER;
+
private ReplicationStorageFactory() {
}
+ private static Class<? extends ReplicationPeerStorage>
+ getReplicationPeerStorageClass(Configuration conf) {
+ try {
+ ReplicationPeerStorageType type = ReplicationPeerStorageType.valueOf(
+ conf.get(REPLICATION_PEER_STORAGE_IMPL,
DEFAULT_REPLICATION_PEER_STORAGE_IMPL.name())
+ .toUpperCase());
+ return type.getClazz();
+ } catch (IllegalArgumentException e) {
+ return conf.getClass(REPLICATION_PEER_STORAGE_IMPL,
+ DEFAULT_REPLICATION_PEER_STORAGE_IMPL.getClazz(),
ReplicationPeerStorage.class);
+ }
+ }
+
/**
* Create a new {@link ReplicationPeerStorage}.
*/
- public static ReplicationPeerStorage getReplicationPeerStorage(ZKWatcher zk,
Configuration conf) {
- return new ZKReplicationPeerStorage(zk, conf);
+ public static ReplicationPeerStorage getReplicationPeerStorage(FileSystem
fs, ZKWatcher zk,
+ Configuration conf) {
+ Class<? extends ReplicationPeerStorage> clazz =
getReplicationPeerStorageClass(conf);
+ for (Constructor<?> c : clazz.getConstructors()) {
+ if (c.getParameterCount() != 2) {
+ continue;
+ }
+ if (c.getParameterTypes()[0].isAssignableFrom(FileSystem.class)) {
+ return ReflectionUtils.newInstance(clazz, fs, conf);
+ } else if (c.getParameterTypes()[0].isAssignableFrom(ZKWatcher.class)) {
+ return ReflectionUtils.newInstance(clazz, zk, conf);
+ }
+ }
+ throw new IllegalArgumentException(
+ "Can not create replication peer storage with type " + clazz);
}
/**
diff --git
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java
new file mode 100644
index 00000000000..018883290c1
--- /dev/null
+++
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.hbase.TableName;
+import org.junit.Test;
+
+public abstract class ReplicationPeerStorageTestBase {
+
+ // Seed may be set with Random#setSeed
+ private static final Random RNG = new Random();
+
+ protected static ReplicationPeerStorage STORAGE;
+
+ private Set<String> randNamespaces(Random rand) {
+ return Stream.generate(() ->
Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
+ .collect(toSet());
+ }
+
+ private Map<TableName, List<String>> randTableCFs(Random rand) {
+ int size = rand.nextInt(5);
+ Map<TableName, List<String>> map = new HashMap<>();
+ for (int i = 0; i < size; i++) {
+ TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
+ List<String> cfs = Stream.generate(() ->
Long.toHexString(rand.nextLong()))
+ .limit(rand.nextInt(5)).collect(toList());
+ map.put(tn, cfs);
+ }
+ return map;
+ }
+
+ private ReplicationPeerConfig getConfig(int seed) {
+ RNG.setSeed(seed);
+ return
ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong()))
+ .setReplicationEndpointImpl(Long.toHexString(RNG.nextLong()))
+
.setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG))
+
.setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG))
+
.setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean())
+ .setBandwidth(RNG.nextInt(1000)).build();
+ }
+
+ private void assertSetEquals(Set<String> expected, Set<String> actual) {
+ if (expected == null || expected.size() == 0) {
+ assertTrue(actual == null || actual.size() == 0);
+ return;
+ }
+ assertEquals(expected.size(), actual.size());
+ expected.forEach(s -> assertTrue(actual.contains(s)));
+ }
+
+ private void assertMapEquals(Map<TableName, List<String>> expected,
+ Map<TableName, List<String>> actual) {
+ if (expected == null || expected.size() == 0) {
+ assertTrue(actual == null || actual.size() == 0);
+ return;
+ }
+ assertEquals(expected.size(), actual.size());
+ expected.forEach((expectedTn, expectedCFs) -> {
+ List<String> actualCFs = actual.get(expectedTn);
+ if (expectedCFs == null || expectedCFs.size() == 0) {
+ assertTrue(actual.containsKey(expectedTn));
+ assertTrue(actualCFs == null || actualCFs.size() == 0);
+ } else {
+ assertNotNull(actualCFs);
+ assertEquals(expectedCFs.size(), actualCFs.size());
+ for (Iterator<String> expectedIt = expectedCFs.iterator(),
+ actualIt = actualCFs.iterator(); expectedIt.hasNext();) {
+ assertEquals(expectedIt.next(), actualIt.next());
+ }
+ }
+ });
+ }
+
+ private void assertConfigEquals(ReplicationPeerConfig expected,
ReplicationPeerConfig actual) {
+ assertEquals(expected.getClusterKey(), actual.getClusterKey());
+ assertEquals(expected.getReplicationEndpointImpl(),
actual.getReplicationEndpointImpl());
+ assertSetEquals(expected.getNamespaces(), actual.getNamespaces());
+ assertSetEquals(expected.getExcludeNamespaces(),
actual.getExcludeNamespaces());
+ assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap());
+ assertMapEquals(expected.getExcludeTableCFsMap(),
actual.getExcludeTableCFsMap());
+ assertEquals(expected.replicateAllUserTables(),
actual.replicateAllUserTables());
+ assertEquals(expected.getBandwidth(), actual.getBandwidth());
+ }
+
+ @Test
+ public void test() throws ReplicationException {
+ int peerCount = 10;
+ for (int i = 0; i < peerCount; i++) {
+ STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0,
+ SyncReplicationState.valueOf(i % 4));
+ }
+ List<String> peerIds = STORAGE.listPeerIds();
+ assertEquals(peerCount, peerIds.size());
+ for (String peerId : peerIds) {
+ int seed = Integer.parseInt(peerId);
+ assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId));
+ }
+ for (int i = 0; i < peerCount; i++) {
+ STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1));
+ }
+ for (String peerId : peerIds) {
+ int seed = Integer.parseInt(peerId);
+ assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId));
+ }
+ for (int i = 0; i < peerCount; i++) {
+ assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i)));
+ }
+ for (int i = 0; i < peerCount; i++) {
+ STORAGE.setPeerState(Integer.toString(i), i % 2 != 0);
+ }
+ for (int i = 0; i < peerCount; i++) {
+ assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i)));
+ }
+ for (int i = 0; i < peerCount; i++) {
+ assertEquals(SyncReplicationState.valueOf(i % 4),
+ STORAGE.getPeerSyncReplicationState(Integer.toString(i)));
+ }
+ String toRemove = Integer.toString(peerCount / 2);
+ STORAGE.removePeer(toRemove);
+ peerIds = STORAGE.listPeerIds();
+ assertEquals(peerCount - 1, peerIds.size());
+ assertFalse(peerIds.contains(toRemove));
+
+ try {
+ STORAGE.getPeerConfig(toRemove);
+ fail("Should throw a ReplicationException when getting peer config of a
removed peer");
+ } catch (ReplicationException e) {
+ }
+ }
+
+ protected abstract void removePeerSyncRelicationState(String peerId) throws
Exception;
+
+ protected abstract void assertPeerSyncReplicationStateCreate(String peerId)
throws Exception;
+
+ @Test
+ public void testNoSyncReplicationState() throws Exception {
+ // This could happen for a peer created before we introduce sync
replication.
+ String peerId = "testNoSyncReplicationState";
+ assertThrows("Should throw a ReplicationException when getting state of
inexist peer",
+ ReplicationException.class, () ->
STORAGE.getPeerSyncReplicationState(peerId));
+ assertThrows("Should throw a ReplicationException when getting state of
inexist peer",
+ ReplicationException.class, () ->
STORAGE.getPeerNewSyncReplicationState(peerId));
+
+ STORAGE.addPeer(peerId, getConfig(0), true, SyncReplicationState.NONE);
+ // delete the sync replication state node to simulate
+ removePeerSyncRelicationState(peerId);
+ // should not throw exception as the peer exists
+ assertEquals(SyncReplicationState.NONE,
STORAGE.getPeerSyncReplicationState(peerId));
+ assertEquals(SyncReplicationState.NONE,
STORAGE.getPeerNewSyncReplicationState(peerId));
+ // make sure we create the node for the old format peer
+ assertPeerSyncReplicationStateCreate(peerId);
+ }
+
+ protected abstract void assertPeerNameControlException(ReplicationException
e);
+
+ @Test
+ public void testPeerNameControl() throws Exception {
+ String clusterKey = "key";
+ STORAGE.addPeer("6",
ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), true,
+ SyncReplicationState.NONE);
+
+ try {
+ ReplicationException e = assertThrows(ReplicationException.class,
+ () -> STORAGE.addPeer("6",
+
ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), true,
+ SyncReplicationState.NONE));
+ assertPeerNameControlException(e);
+ } finally {
+ // clean up
+ STORAGE.removePeer("6");
+ }
+ }
+}
diff --git
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestFSReplicationPeerStorage.java
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestFSReplicationPeerStorage.java
new file mode 100644
index 00000000000..f99f529c9f0
--- /dev/null
+++
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestFSReplicationPeerStorage.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.endsWith;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.RotateFile;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestFSReplicationPeerStorage extends
ReplicationPeerStorageTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestFSReplicationPeerStorage.class);
+
+ private static final HBaseCommonTestingUtil UTIL = new
HBaseCommonTestingUtil();
+
+ private static FileSystem FS;
+
+ private static Path DIR;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ DIR = UTIL.getDataTestDir("test_fs_peer_storage");
+ CommonFSUtils.setRootDir(UTIL.getConfiguration(), DIR);
+ FS = FileSystem.get(UTIL.getConfiguration());
+ STORAGE = new FSReplicationPeerStorage(FS, UTIL.getConfiguration());
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ UTIL.cleanupTestDir();
+ }
+
+ @Override
+ protected void removePeerSyncRelicationState(String peerId) throws Exception
{
+ FSReplicationPeerStorage storage = (FSReplicationPeerStorage) STORAGE;
+ Path peerDir = storage.getPeerDir(peerId);
+ RotateFile file =
+ new RotateFile(FS, peerDir,
FSReplicationPeerStorage.SYNC_REPLICATION_STATE_FILE, 1024);
+ file.read();
+ file.delete();
+ }
+
+ @Override
+ protected void assertPeerSyncReplicationStateCreate(String peerId) throws
Exception {
+ FSReplicationPeerStorage storage = (FSReplicationPeerStorage) STORAGE;
+ Path peerDir = storage.getPeerDir(peerId);
+ RotateFile file =
+ new RotateFile(FS, peerDir,
FSReplicationPeerStorage.SYNC_REPLICATION_STATE_FILE, 1024);
+ assertNotNull(file.read());
+ }
+
+ @Override
+ protected void assertPeerNameControlException(ReplicationException e) {
+ assertThat(e.getMessage(), endsWith("peer already exists"));
+ }
+}
diff --git
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index 311e7e337f9..d2540987906 100644
---
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtil;
@@ -77,10 +78,11 @@ public class TestReplicationStateZKImpl extends
TestReplicationStateBasic {
}
@Before
- public void setUp() {
+ public void setUp() throws IOException {
zkTimeoutCount = 0;
rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
- rp = ReplicationFactory.getReplicationPeers(zkw, conf);
+ rp =
+
ReplicationFactory.getReplicationPeers(FileSystem.get(utility.getConfiguration()),
zkw, conf);
OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
}
diff --git
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
index 7bc47918992..12262461f74 100644
---
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
+++
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
@@ -17,52 +17,30 @@
*/
package org.apache.hadoop.hbase.replication;
-import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toSet;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.stream.Stream;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtil;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.zookeeper.KeeperException;
-import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, MediumTests.class })
-public class TestZKReplicationPeerStorage {
+public class TestZKReplicationPeerStorage extends
ReplicationPeerStorageTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestZKReplicationPeerStorage.class);
private static final HBaseZKTestingUtil UTIL = new HBaseZKTestingUtil();
- private static final Random RNG = new Random(); // Seed may be set with
Random#setSeed
- private static ZKReplicationPeerStorage STORAGE;
@BeforeClass
public static void setUp() throws Exception {
@@ -75,264 +53,24 @@ public class TestZKReplicationPeerStorage {
UTIL.shutdownMiniZKCluster();
}
- @After
- public void cleanCustomConfigurations() {
-
UTIL.getConfiguration().unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
+ @Override
+ protected void removePeerSyncRelicationState(String peerId) throws Exception
{
+ ZKReplicationPeerStorage storage = (ZKReplicationPeerStorage) STORAGE;
+ ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(),
storage.getSyncReplicationStateNode(peerId));
+ ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(),
storage.getNewSyncReplicationStateNode(peerId));
}
- private Set<String> randNamespaces(Random rand) {
- return Stream.generate(() ->
Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
- .collect(toSet());
- }
-
- private Map<TableName, List<String>> randTableCFs(Random rand) {
- int size = rand.nextInt(5);
- Map<TableName, List<String>> map = new HashMap<>();
- for (int i = 0; i < size; i++) {
- TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
- List<String> cfs = Stream.generate(() ->
Long.toHexString(rand.nextLong()))
- .limit(rand.nextInt(5)).collect(toList());
- map.put(tn, cfs);
- }
- return map;
- }
-
- private ReplicationPeerConfig getConfig(int seed) {
- RNG.setSeed(seed);
- return
ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong()))
- .setReplicationEndpointImpl(Long.toHexString(RNG.nextLong()))
-
.setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG))
-
.setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG))
-
.setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean())
- .setBandwidth(RNG.nextInt(1000)).build();
- }
-
- private void assertSetEquals(Set<String> expected, Set<String> actual) {
- if (expected == null || expected.size() == 0) {
- assertTrue(actual == null || actual.size() == 0);
- return;
- }
- assertEquals(expected.size(), actual.size());
- expected.forEach(s -> assertTrue(actual.contains(s)));
- }
-
- private void assertMapEquals(Map<TableName, List<String>> expected,
- Map<TableName, List<String>> actual) {
- if (expected == null || expected.size() == 0) {
- assertTrue(actual == null || actual.size() == 0);
- return;
- }
- assertEquals(expected.size(), actual.size());
- expected.forEach((expectedTn, expectedCFs) -> {
- List<String> actualCFs = actual.get(expectedTn);
- if (expectedCFs == null || expectedCFs.size() == 0) {
- assertTrue(actual.containsKey(expectedTn));
- assertTrue(actualCFs == null || actualCFs.size() == 0);
- } else {
- assertNotNull(actualCFs);
- assertEquals(expectedCFs.size(), actualCFs.size());
- for (Iterator<String> expectedIt = expectedCFs.iterator(),
- actualIt = actualCFs.iterator(); expectedIt.hasNext();) {
- assertEquals(expectedIt.next(), actualIt.next());
- }
- }
- });
- }
-
- private void assertConfigEquals(ReplicationPeerConfig expected,
ReplicationPeerConfig actual) {
- assertEquals(expected.getClusterKey(), actual.getClusterKey());
- assertEquals(expected.getReplicationEndpointImpl(),
actual.getReplicationEndpointImpl());
- assertSetEquals(expected.getNamespaces(), actual.getNamespaces());
- assertSetEquals(expected.getExcludeNamespaces(),
actual.getExcludeNamespaces());
- assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap());
- assertMapEquals(expected.getExcludeTableCFsMap(),
actual.getExcludeTableCFsMap());
- assertEquals(expected.replicateAllUserTables(),
actual.replicateAllUserTables());
- assertEquals(expected.getBandwidth(), actual.getBandwidth());
- }
-
- @Test
- public void test() throws ReplicationException {
- int peerCount = 10;
- for (int i = 0; i < peerCount; i++) {
- STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0,
- SyncReplicationState.valueOf(i % 4));
- }
- List<String> peerIds = STORAGE.listPeerIds();
- assertEquals(peerCount, peerIds.size());
- for (String peerId : peerIds) {
- int seed = Integer.parseInt(peerId);
- assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId));
- }
- for (int i = 0; i < peerCount; i++) {
- STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1));
- }
- for (String peerId : peerIds) {
- int seed = Integer.parseInt(peerId);
- assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId));
- }
- for (int i = 0; i < peerCount; i++) {
- assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i)));
- }
- for (int i = 0; i < peerCount; i++) {
- STORAGE.setPeerState(Integer.toString(i), i % 2 != 0);
- }
- for (int i = 0; i < peerCount; i++) {
- assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i)));
- }
- for (int i = 0; i < peerCount; i++) {
- assertEquals(SyncReplicationState.valueOf(i % 4),
- STORAGE.getPeerSyncReplicationState(Integer.toString(i)));
- }
- String toRemove = Integer.toString(peerCount / 2);
- STORAGE.removePeer(toRemove);
- peerIds = STORAGE.listPeerIds();
- assertEquals(peerCount - 1, peerIds.size());
- assertFalse(peerIds.contains(toRemove));
-
- try {
- STORAGE.getPeerConfig(toRemove);
- fail("Should throw a ReplicationException when getting peer config of a
removed peer");
- } catch (ReplicationException e) {
- }
- }
-
- @Test
- public void testNoSyncReplicationState()
- throws ReplicationException, KeeperException, IOException {
- // This could happen for a peer created before we introduce sync
replication.
- String peerId = "testNoSyncReplicationState";
- try {
- STORAGE.getPeerSyncReplicationState(peerId);
- fail("Should throw a ReplicationException when getting state of inexist
peer");
- } catch (ReplicationException e) {
- // expected
- }
- try {
- STORAGE.getPeerNewSyncReplicationState(peerId);
- fail("Should throw a ReplicationException when getting state of inexist
peer");
- } catch (ReplicationException e) {
- // expected
- }
- STORAGE.addPeer(peerId, getConfig(0), true, SyncReplicationState.NONE);
- // delete the sync replication state node to simulate
- ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(),
STORAGE.getSyncReplicationStateNode(peerId));
- ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(),
STORAGE.getNewSyncReplicationStateNode(peerId));
- // should not throw exception as the peer exists
- assertEquals(SyncReplicationState.NONE,
STORAGE.getPeerSyncReplicationState(peerId));
- assertEquals(SyncReplicationState.NONE,
STORAGE.getPeerNewSyncReplicationState(peerId));
- // make sure we create the node for the old format peer
+ @Override
+ protected void assertPeerSyncReplicationStateCreate(String peerId) throws
Exception {
+ ZKReplicationPeerStorage storage = (ZKReplicationPeerStorage) STORAGE;
assertNotEquals(-1,
- ZKUtil.checkExists(UTIL.getZooKeeperWatcher(),
STORAGE.getSyncReplicationStateNode(peerId)));
+ ZKUtil.checkExists(UTIL.getZooKeeperWatcher(),
storage.getSyncReplicationStateNode(peerId)));
assertNotEquals(-1, ZKUtil.checkExists(UTIL.getZooKeeperWatcher(),
- STORAGE.getNewSyncReplicationStateNode(peerId)));
- }
-
- @Test
- public void testBaseReplicationPeerConfig() throws ReplicationException {
- String customPeerConfigKey = "hbase.xxx.custom_config";
- String customPeerConfigValue = "test";
- String customPeerConfigUpdatedValue = "testUpdated";
-
- String customPeerConfigSecondKey = "hbase.xxx.custom_second_config";
- String customPeerConfigSecondValue = "testSecond";
- String customPeerConfigSecondUpdatedValue = "testSecondUpdated";
-
- ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
-
- // custom config not present
-
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey),
null);
-
- Configuration conf = UTIL.getConfiguration();
- conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
- customPeerConfigKey.concat("=").concat(customPeerConfigValue).concat(";")
-
.concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondValue));
-
- ReplicationPeerConfig updatedReplicationPeerConfig =
ReplicationPeerConfigUtil
- .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
-
- // validates base configs are present in replicationPeerConfig
- assertEquals(customPeerConfigValue,
-
updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
- assertEquals(customPeerConfigSecondValue,
-
updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigSecondKey));
-
- // validates base configs get updated values even if config already present
- conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
- conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
-
customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";")
-
.concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue));
-
- ReplicationPeerConfig replicationPeerConfigAfterValueUpdate =
ReplicationPeerConfigUtil
- .updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig);
-
- assertEquals(customPeerConfigUpdatedValue,
-
replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigKey));
- assertEquals(customPeerConfigSecondUpdatedValue,
-
replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigSecondKey));
- }
-
- @Test
- public void testBaseReplicationRemovePeerConfig() throws
ReplicationException {
- String customPeerConfigKey = "hbase.xxx.custom_config";
- String customPeerConfigValue = "test";
- ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
-
- // custom config not present
-
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey),
null);
-
- Configuration conf = UTIL.getConfiguration();
- conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
- customPeerConfigKey.concat("=").concat(customPeerConfigValue));
-
- ReplicationPeerConfig updatedReplicationPeerConfig =
ReplicationPeerConfigUtil
- .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
-
- // validates base configs are present in replicationPeerConfig
- assertEquals(customPeerConfigValue,
-
updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
-
- conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
- conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
- customPeerConfigKey.concat("=").concat(""));
-
- ReplicationPeerConfig replicationPeerConfigRemoved =
ReplicationPeerConfigUtil
- .updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig);
-
-
assertNull(replicationPeerConfigRemoved.getConfiguration().get(customPeerConfigKey));
+ storage.getNewSyncReplicationStateNode(peerId)));
}
- @Test
- public void testBaseReplicationRemovePeerConfigWithNoExistingConfig()
- throws ReplicationException {
- String customPeerConfigKey = "hbase.xxx.custom_config";
- ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
-
- // custom config not present
-
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey),
null);
- Configuration conf = UTIL.getConfiguration();
- conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
- customPeerConfigKey.concat("=").concat(""));
-
- ReplicationPeerConfig updatedReplicationPeerConfig =
ReplicationPeerConfigUtil
- .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
-
assertNull(updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
- }
-
- @Test
- public void testPeerNameControl() throws Exception {
- String clusterKey = "key";
- STORAGE.addPeer("6",
ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), true,
- SyncReplicationState.NONE);
-
- try {
- STORAGE.addPeer("6",
ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(),
- true, SyncReplicationState.NONE);
- fail();
- } catch (ReplicationException e) {
- assertThat(e.getCause(),
instanceOf(KeeperException.NodeExistsException.class));
- } finally {
- // clean up
- STORAGE.removePeer("6");
- }
+ @Override
+ protected void assertPeerNameControlException(ReplicationException e) {
+ assertThat(e.getCause(),
instanceOf(KeeperException.NodeExistsException.class));
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 58a5d7cae47..d0fd34848a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -783,7 +783,8 @@ public class HMaster extends
HBaseServerBase<MasterRpcServices> implements Maste
}
this.rsGroupInfoManager = RSGroupInfoManager.create(this);
- this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper,
conf, clusterId);
+ this.replicationPeerManager =
+ ReplicationPeerManager.create(fileSystemManager.getFileSystem(),
zooKeeper, conf, clusterId);
this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this,
this.serverManager);
this.drainingServerTracker.start();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 06cf559d492..30164f29671 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -33,6 +33,7 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -559,10 +560,10 @@ public class ReplicationPeerManager {
return queueStorage;
}
- public static ReplicationPeerManager create(ZKWatcher zk, Configuration
conf, String clusterId)
- throws ReplicationException {
+ public static ReplicationPeerManager create(FileSystem fs, ZKWatcher zk,
Configuration conf,
+ String clusterId) throws ReplicationException {
ReplicationPeerStorage peerStorage =
- ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
+ ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf);
ConcurrentMap<String, ReplicationPeerDescription> peers = new
ConcurrentHashMap<>();
for (String peerId : peerStorage.listPeerIds()) {
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index ea28a20c56b..84b98ed3c93 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -96,8 +96,8 @@ public class Replication implements ReplicationSourceService {
try {
this.queueStorage =
ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(),
conf);
- this.replicationPeers =
- ReplicationFactory.getReplicationPeers(server.getZooKeeper(),
this.conf);
+ this.replicationPeers =
ReplicationFactory.getReplicationPeers(server.getFileSystem(),
+ server.getZooKeeper(), this.conf);
this.replicationPeers.init();
} catch (Exception e) {
throw new IOException("Failed replication handler create", e);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index ab8d9fcde6a..7e10fd786a4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -2557,7 +2557,7 @@ public class HBaseFsck extends Configured implements
Closeable {
return hbi;
}
- private void checkAndFixReplication() throws ReplicationException {
+ private void checkAndFixReplication() throws ReplicationException,
IOException {
ReplicationChecker checker = new ReplicationChecker(getConf(), zkw,
errors);
checker.checkUnDeletedQueues();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
index 234daef85b5..7e7a46573b8 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.util.hbck;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -24,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
@@ -53,8 +55,10 @@ public class ReplicationChecker {
private final ReplicationPeerStorage peerStorage;
private final ReplicationQueueStorage queueStorage;
- public ReplicationChecker(Configuration conf, ZKWatcher zkw,
HbckErrorReporter errorReporter) {
- this.peerStorage =
ReplicationStorageFactory.getReplicationPeerStorage(zkw, conf);
+ public ReplicationChecker(Configuration conf, ZKWatcher zkw,
HbckErrorReporter errorReporter)
+ throws IOException {
+ this.peerStorage =
+
ReplicationStorageFactory.getReplicationPeerStorage(FileSystem.get(conf), zkw,
conf);
this.queueStorage =
ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
this.errorReporter = errorReporter;
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index 899d3eb4722..87d21e583dd 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -91,7 +91,8 @@ public class TestReplicationHFileCleaner {
server = new DummyServer();
conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
HMaster.decorateMasterConfiguration(conf);
- rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf);
+ rp =
+ ReplicationFactory.getReplicationPeers(server.getFileSystem(),
server.getZooKeeper(), conf);
rp.init();
rq =
ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(),
conf);
fs = FileSystem.get(conf);
@@ -236,7 +237,17 @@ public class TestReplicationHFileCleaner {
try {
return new ZKWatcher(getConfiguration(), "dummy server", this);
} catch (IOException e) {
- e.printStackTrace();
+ LOG.error("Can not get ZKWatcher", e);
+ }
+ return null;
+ }
+
+ @Override
+ public FileSystem getFileSystem() {
+ try {
+ return TEST_UTIL.getTestFileSystem();
+ } catch (IOException e) {
+ LOG.error("Can not get FileSystem", e);
}
return null;
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index e82d69826d8..7f5df02ecfc 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -232,8 +232,8 @@ public class SyncReplicationTestBase {
protected final void verifyRemovedPeer(String peerId, Path remoteWALDir,
HBaseTestingUtil utility)
throws Exception {
- ReplicationPeerStorage rps = ReplicationStorageFactory
- .getReplicationPeerStorage(utility.getZooKeeperWatcher(),
utility.getConfiguration());
+ ReplicationPeerStorage rps =
ReplicationStorageFactory.getReplicationPeerStorage(
+ utility.getTestFileSystem(), utility.getZooKeeperWatcher(),
utility.getConfiguration());
try {
rps.getPeerSyncReplicationState(peerId);
fail("Should throw exception when get the sync replication state of a
removed peer.");
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithFSPeerStorage.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithFSPeerStorage.java
new file mode 100644
index 00000000000..6f5c6c20d8d
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithFSPeerStorage.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestReplicationWithFSPeerStorage extends TestReplicationBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationWithFSPeerStorage.class);
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // enable file system based peer storage
+
UTIL1.getConfiguration().set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
+ ReplicationPeerStorageType.FILESYSTEM.name().toLowerCase());
+
UTIL2.getConfiguration().set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
+ ReplicationPeerStorageType.FILESYSTEM.name().toLowerCase());
+ TestReplicationBase.setUpBeforeClass();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ cleanUp();
+ }
+
+ /**
+ * Add a row, check it's replicated, delete it, check's gone
+ */
+ @Test
+ public void testSimplePutDelete() throws Exception {
+ runSimplePutDeleteTest();
+ }
+
+ /**
+ * Try a small batch upload using the write buffer, check it's replicated
+ */
+ @Test
+ public void testSmallBatch() throws Exception {
+ runSmallBatchTest();
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index dd989293ff5..c48dbc39a03 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -388,8 +388,8 @@ public abstract class TestReplicationSourceManager {
rq.addWAL(server.getServerName(), "1", file);
}
Server s1 = new DummyServer("dummyserver1.example.org");
- ReplicationPeers rp1 =
- ReplicationFactory.getReplicationPeers(s1.getZooKeeper(),
s1.getConfiguration());
+ ReplicationPeers rp1 =
ReplicationFactory.getReplicationPeers(s1.getFileSystem(),
+ s1.getZooKeeper(), s1.getConfiguration());
rp1.init();
manager.claimQueue(server.getServerName(), "1");
assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
@@ -857,6 +857,11 @@ public abstract class TestReplicationSourceManager {
return zkw;
}
+ @Override
+ public FileSystem getFileSystem() {
+ return fs;
+ }
+
@Override
public Connection getConnection() {
return null;
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java
index dc8fb849633..e44e00d2d37 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java
@@ -61,8 +61,8 @@ public class TestHBaseFsckReplication {
@Test
public void test() throws Exception {
- ReplicationPeerStorage peerStorage = ReplicationStorageFactory
- .getReplicationPeerStorage(UTIL.getZooKeeperWatcher(),
UTIL.getConfiguration());
+ ReplicationPeerStorage peerStorage =
ReplicationStorageFactory.getReplicationPeerStorage(
+ UTIL.getTestFileSystem(), UTIL.getZooKeeperWatcher(),
UTIL.getConfiguration());
ReplicationQueueStorage queueStorage = ReplicationStorageFactory
.getReplicationQueueStorage(UTIL.getZooKeeperWatcher(),
UTIL.getConfiguration());