This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push: new 04a4ac5ae9d HBASE-27728 Implement a tool to migrate replication peer data between different storage implementation (#5179) 04a4ac5ae9d is described below commit 04a4ac5ae9d8ef79bfb5dfcdc1a9d5ce27cfba67 Author: Duo Zhang <zhang...@apache.org> AuthorDate: Mon Apr 17 22:13:19 2023 +0800 HBASE-27728 Implement a tool to migrate replication peer data between different storage implementation (#5179) Signed-off-by: Liangjun He <heliang...@apache.org> Signed-off-by: Wellington Chevreuil <wchevre...@apache.org> (cherry picked from commit 31c4aea48c3d4bc7585784dd65375edbb4e61a8a) Conflicts: hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java --- bin/hbase | 3 + .../replication/ReplicationPeerConfigTestUtil.java | 84 +++------------ .../replication/TestReplicationPeerConfig.java | 35 +------ hbase-replication/pom.xml | 6 ++ .../hbase/replication/CopyReplicationPeers.java | 114 +++++++++++++++++++++ .../ReplicationPeerStorageTestBase.java | 85 +-------------- .../replication/TestCopyReplicationPeers.java | 92 +++++++++++++++++ 7 files changed, 231 insertions(+), 188 deletions(-) diff --git a/bin/hbase b/bin/hbase index 31547b1ab51..e786571d623 100755 --- a/bin/hbase +++ b/bin/hbase @@ -109,6 +109,7 @@ show_usage() { echo " pre-upgrade Run Pre-Upgrade validator tool" echo " hbtop Run HBTop tool" echo " credential Run the Hadoop Credential Shell" + echo " copyreppeers Run CopyReplicationPeers tool" echo " CLASSNAME Run the class named CLASSNAME" } @@ -769,6 +770,8 @@ elif [ "$COMMAND" = "hbtop" ] ; then HBASE_OPTS="${HBASE_OPTS} ${HBASE_HBTOP_OPTS}" elif [ "$COMMAND" = "credential" ] ; then CLASS='org.apache.hadoop.security.alias.CredentialShell' +elif [ "$COMMAND" = "copyreppeers" ] ; then + CLASS='org.apache.hadoop.hbase.replication.ReplicationPeerMigrationTool' else CLASS=$COMMAND if [[ "$CLASS" =~ .*IntegrationTest.* ]] ; then diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java similarity index 59% copy from hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java copy to hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java index bd31b2958c4..634f4626da5 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java @@ -20,11 +20,8 @@ package org.apache.hadoop.hbase.replication; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.util.HashMap; import java.util.Iterator; @@ -34,21 +31,24 @@ import java.util.Random; import java.util.Set; import java.util.stream.Stream; import org.apache.hadoop.hbase.TableName; -import org.junit.Test; -public abstract class ReplicationPeerStorageTestBase { +/** + * A helper tool for generating random {@link ReplicationPeerConfig} and do assertion. + */ +public final class ReplicationPeerConfigTestUtil { // Seed may be set with Random#setSeed private static final Random RNG = new Random(); - protected static ReplicationPeerStorage STORAGE; + private ReplicationPeerConfigTestUtil() { + } - private Set<String> randNamespaces(Random rand) { + private static Set<String> randNamespaces(Random rand) { return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5)) .collect(toSet()); } - private Map<TableName, List<String>> randTableCFs(Random rand) { + private static Map<TableName, List<String>> randTableCFs(Random rand) { int size = rand.nextInt(5); Map<TableName, List<String>> map = new HashMap<>(); for (int i = 0; i < size; i++) { @@ -60,7 +60,7 @@ public abstract class ReplicationPeerStorageTestBase { return map; } - private ReplicationPeerConfig getConfig(int seed) { + public static ReplicationPeerConfig getConfig(int seed) { RNG.setSeed(seed); return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong())) .setReplicationEndpointImpl(Long.toHexString(RNG.nextLong())) @@ -69,7 +69,7 @@ public abstract class ReplicationPeerStorageTestBase { .setReplicateAllUserTables(RNG.nextBoolean()).setBandwidth(RNG.nextInt(1000)).build(); } - private void assertSetEquals(Set<String> expected, Set<String> actual) { + private static void assertSetEquals(Set<String> expected, Set<String> actual) { if (expected == null || expected.size() == 0) { assertTrue(actual == null || actual.size() == 0); return; @@ -78,7 +78,7 @@ public abstract class ReplicationPeerStorageTestBase { expected.forEach(s -> assertTrue(actual.contains(s))); } - private void assertMapEquals(Map<TableName, List<String>> expected, + private static void assertMapEquals(Map<TableName, List<String>> expected, Map<TableName, List<String>> actual) { if (expected == null || expected.size() == 0) { assertTrue(actual == null || actual.size() == 0); @@ -101,7 +101,8 @@ public abstract class ReplicationPeerStorageTestBase { }); } - private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) { + public static void assertConfigEquals(ReplicationPeerConfig expected, + ReplicationPeerConfig actual) { assertEquals(expected.getClusterKey(), actual.getClusterKey()); assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl()); assertSetEquals(expected.getNamespaces(), actual.getNamespaces()); @@ -111,63 +112,4 @@ public abstract class ReplicationPeerStorageTestBase { assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables()); assertEquals(expected.getBandwidth(), actual.getBandwidth()); } - - @Test - public void test() throws ReplicationException { - int peerCount = 10; - for (int i = 0; i < peerCount; i++) { - STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0); - } - List<String> peerIds = STORAGE.listPeerIds(); - assertEquals(peerCount, peerIds.size()); - for (String peerId : peerIds) { - int seed = Integer.parseInt(peerId); - assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId)); - } - for (int i = 0; i < peerCount; i++) { - STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1)); - } - for (String peerId : peerIds) { - int seed = Integer.parseInt(peerId); - assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId)); - } - for (int i = 0; i < peerCount; i++) { - assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i))); - } - for (int i = 0; i < peerCount; i++) { - STORAGE.setPeerState(Integer.toString(i), i % 2 != 0); - } - for (int i = 0; i < peerCount; i++) { - assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i))); - } - String toRemove = Integer.toString(peerCount / 2); - STORAGE.removePeer(toRemove); - peerIds = STORAGE.listPeerIds(); - assertEquals(peerCount - 1, peerIds.size()); - assertFalse(peerIds.contains(toRemove)); - - try { - STORAGE.getPeerConfig(toRemove); - fail("Should throw a ReplicationException when getting peer config of a removed peer"); - } catch (ReplicationException e) { - } - } - - protected abstract void assertPeerNameControlException(ReplicationException e); - - @Test - public void testPeerNameControl() throws Exception { - String clusterKey = "key"; - STORAGE.addPeer("6", ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), - true); - - try { - ReplicationException e = assertThrows(ReplicationException.class, () -> STORAGE.addPeer("6", - ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), true)); - assertPeerNameControlException(e); - } finally { - // clean up - STORAGE.removePeer("6"); - } - } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java index 30d6c1d41d9..bdac27067b0 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java @@ -17,19 +17,14 @@ */ package org.apache.hadoop.hbase.replication; -import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toSet; +import static org.apache.hadoop.hbase.replication.ReplicationPeerConfigTestUtil.getConfig; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -260,34 +255,6 @@ public class TestReplicationPeerConfig { assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY2)); } - private static final Random RNG = new Random(); // Seed may be set with Random#setSeed - - private Set<String> randNamespaces(Random rand) { - return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5)) - .collect(toSet()); - } - - private Map<TableName, List<String>> randTableCFs(Random rand) { - int size = rand.nextInt(5); - Map<TableName, List<String>> map = new HashMap<>(); - for (int i = 0; i < size; i++) { - TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong())); - List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong())) - .limit(rand.nextInt(5)).collect(toList()); - map.put(tn, cfs); - } - return map; - } - - private ReplicationPeerConfig getConfig(int seed) { - RNG.setSeed(seed); - return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong())) - .setReplicationEndpointImpl(Long.toHexString(RNG.nextLong())) - .setNamespaces(randNamespaces(RNG)).setExcludeNamespaces(randNamespaces(RNG)) - .setTableCFsMap(randTableCFs(RNG)).setExcludeTableCFsMap(randTableCFs(RNG)) - .setReplicateAllUserTables(RNG.nextBoolean()).setBandwidth(RNG.nextInt(1000)).build(); - } - @Test public void testBaseReplicationPeerConfig() throws ReplicationException { String customPeerConfigKey = "hbase.xxx.custom_config"; diff --git a/hbase-replication/pom.xml b/hbase-replication/pom.xml index 4b1a5e61093..3c0d2174931 100644 --- a/hbase-replication/pom.xml +++ b/hbase-replication/pom.xml @@ -74,6 +74,12 @@ <type>test-jar</type> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-zookeeper</artifactId> diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/CopyReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/CopyReplicationPeers.java new file mode 100644 index 00000000000..13276ab492c --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/CopyReplicationPeers.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A tool for copying replication peer data across different replication peer storages. + * <p/> + * Notice that we will not delete the replication peer data from the source storage, as this tool + * can also be used by online migration. See HBASE-27110 for the whole design. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) +public class CopyReplicationPeers extends Configured implements Tool { + + private static final Logger LOG = LoggerFactory.getLogger(CopyReplicationPeers.class); + + public static final String NAME = "copyreppeers"; + + public CopyReplicationPeers(Configuration conf) { + super(conf); + } + + private ReplicationPeerStorage create(String type, FileSystem fs, ZKWatcher zk) { + Configuration conf = new Configuration(getConf()); + conf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL, type); + return ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf); + } + + private ZKWatcher createZKWatcher() throws IOException { + return new ZKWatcher(getConf(), getClass().getSimpleName(), new Abortable() { + + private volatile boolean aborted; + + @Override + public boolean isAborted() { + return aborted; + } + + @Override + public void abort(String why, Throwable e) { + aborted = true; + LOG.error(why, e); + System.exit(1); + } + }); + } + + private void migrate(ReplicationPeerStorage src, ReplicationPeerStorage dst) + throws ReplicationException { + LOG.info("Start migrating from {} to {}", src.getClass().getSimpleName(), + dst.getClass().getSimpleName()); + for (String peerId : src.listPeerIds()) { + LOG.info("Going to migrate {}", peerId); + ReplicationPeerConfig peerConfig = src.getPeerConfig(peerId); + boolean enabled = src.isPeerEnabled(peerId); + dst.addPeer(peerId, peerConfig, enabled); + LOG.info("Migrated peer {}, peerConfig = '{}', enabled = {}", peerId, peerConfig, enabled); + } + } + + @Override + public int run(String[] args) throws Exception { + if (args.length != 2) { + System.err.println("Usage: bin/hbase " + NAME + + " <SRC_REPLICATION_PEER_STORAGE> <DST_REPLICATION_PEER_STORAGE>"); + System.err.println("The possible values for replication storage type:"); + for (ReplicationPeerStorageType type : ReplicationPeerStorageType.values()) { + System.err.println(" " + type.name().toLowerCase()); + } + return -1; + } + FileSystem fs = FileSystem.get(getConf()); + try (ZKWatcher zk = createZKWatcher()) { + ReplicationPeerStorage src = create(args[0], fs, zk); + ReplicationPeerStorage dst = create(args[1], fs, zk); + migrate(src, dst); + } + return 0; + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + int ret = ToolRunner.run(conf, new CopyReplicationPeers(conf), args); + System.exit(ret); + } +} diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java index bd31b2958c4..e8191d66344 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java @@ -17,101 +17,20 @@ */ package org.apache.hadoop.hbase.replication; -import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toSet; +import static org.apache.hadoop.hbase.replication.ReplicationPeerConfigTestUtil.assertConfigEquals; +import static org.apache.hadoop.hbase.replication.ReplicationPeerConfigTestUtil.getConfig; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.stream.Stream; -import org.apache.hadoop.hbase.TableName; import org.junit.Test; public abstract class ReplicationPeerStorageTestBase { - // Seed may be set with Random#setSeed - private static final Random RNG = new Random(); - protected static ReplicationPeerStorage STORAGE; - private Set<String> randNamespaces(Random rand) { - return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5)) - .collect(toSet()); - } - - private Map<TableName, List<String>> randTableCFs(Random rand) { - int size = rand.nextInt(5); - Map<TableName, List<String>> map = new HashMap<>(); - for (int i = 0; i < size; i++) { - TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong())); - List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong())) - .limit(rand.nextInt(5)).collect(toList()); - map.put(tn, cfs); - } - return map; - } - - private ReplicationPeerConfig getConfig(int seed) { - RNG.setSeed(seed); - return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong())) - .setReplicationEndpointImpl(Long.toHexString(RNG.nextLong())) - .setNamespaces(randNamespaces(RNG)).setExcludeNamespaces(randNamespaces(RNG)) - .setTableCFsMap(randTableCFs(RNG)).setExcludeTableCFsMap(randTableCFs(RNG)) - .setReplicateAllUserTables(RNG.nextBoolean()).setBandwidth(RNG.nextInt(1000)).build(); - } - - private void assertSetEquals(Set<String> expected, Set<String> actual) { - if (expected == null || expected.size() == 0) { - assertTrue(actual == null || actual.size() == 0); - return; - } - assertEquals(expected.size(), actual.size()); - expected.forEach(s -> assertTrue(actual.contains(s))); - } - - private void assertMapEquals(Map<TableName, List<String>> expected, - Map<TableName, List<String>> actual) { - if (expected == null || expected.size() == 0) { - assertTrue(actual == null || actual.size() == 0); - return; - } - assertEquals(expected.size(), actual.size()); - expected.forEach((expectedTn, expectedCFs) -> { - List<String> actualCFs = actual.get(expectedTn); - if (expectedCFs == null || expectedCFs.size() == 0) { - assertTrue(actual.containsKey(expectedTn)); - assertTrue(actualCFs == null || actualCFs.size() == 0); - } else { - assertNotNull(actualCFs); - assertEquals(expectedCFs.size(), actualCFs.size()); - for (Iterator<String> expectedIt = expectedCFs.iterator(), - actualIt = actualCFs.iterator(); expectedIt.hasNext();) { - assertEquals(expectedIt.next(), actualIt.next()); - } - } - }); - } - - private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) { - assertEquals(expected.getClusterKey(), actual.getClusterKey()); - assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl()); - assertSetEquals(expected.getNamespaces(), actual.getNamespaces()); - assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces()); - assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap()); - assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap()); - assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables()); - assertEquals(expected.getBandwidth(), actual.getBandwidth()); - } - @Test public void test() throws ReplicationException { int peerCount = 10; diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestCopyReplicationPeers.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestCopyReplicationPeers.java new file mode 100644 index 00000000000..fdde24852fa --- /dev/null +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestCopyReplicationPeers.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.apache.hadoop.hbase.replication.ReplicationPeerConfigTestUtil.assertConfigEquals; +import static org.apache.hadoop.hbase.replication.ReplicationPeerConfigTestUtil.getConfig; +import static org.junit.Assert.assertEquals; + +import java.util.List; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestCopyReplicationPeers { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCopyReplicationPeers.class); + + private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility(); + + private static FileSystem FS; + + private static Path DIR; + + private static ReplicationPeerStorage SRC; + + private static ReplicationPeerStorage DST; + + @BeforeClass + public static void setUp() throws Exception { + DIR = UTIL.getDataTestDir("test_peer_migration"); + CommonFSUtils.setRootDir(UTIL.getConfiguration(), DIR); + FS = FileSystem.get(UTIL.getConfiguration()); + UTIL.startMiniZKCluster(); + SRC = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); + DST = new FSReplicationPeerStorage(FS, UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniZKCluster(); + UTIL.cleanupTestDir(); + } + + @Test + public void testMigrate() throws Exception { + // invalid args + assertEquals(-1, + ToolRunner.run(new CopyReplicationPeers(UTIL.getConfiguration()), new String[] {})); + int peerCount = 10; + for (int i = 0; i < peerCount; i++) { + SRC.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0); + } + // migrate + assertEquals(0, ToolRunner.run(new CopyReplicationPeers(UTIL.getConfiguration()), + new String[] { SRC.getClass().getName(), DST.getClass().getName() })); + // verify the replication peer data in dst storage + List<String> peerIds = DST.listPeerIds(); + assertEquals(peerCount, peerIds.size()); + for (String peerId : peerIds) { + int seed = Integer.parseInt(peerId); + assertConfigEquals(getConfig(seed), DST.getPeerConfig(peerId)); + } + } +}