This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-27109/table_based_rqs in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 845ad90bca0ed136b980a69b8628c9acd7091ad4 Author: Liangjun He <heliang...@apache.org> AuthorDate: Wed Apr 5 23:37:04 2023 +0800 HBASE-27623 Start a new ReplicationSyncUp after the previous failed (#5150) Signed-off-by: Duo Zhang <zhang...@apache.org> --- .../regionserver/ReplicationSyncUp.java | 46 ++++++++++++++++++++-- .../replication/TestReplicationSyncUpTool.java | 36 +++++++++++++++++ .../replication/TestReplicationSyncUpToolBase.java | 7 +++- 3 files changed, 84 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java index f071cf6f1f8..cd6a4d9ac4d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java @@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -182,7 +184,7 @@ public class ReplicationSyncUp extends Configured implements Tool { } } - private void writeInfoFile(FileSystem fs) throws IOException { + private void writeInfoFile(FileSystem fs, boolean isForce) throws IOException { // Record the info of this run. Currently only record the time we run the job. We will use this // timestamp to clean up the data for last sequence ids and hfile refs in replication queue // storage. See ReplicationQueueStorage.removeLastSequenceIdsAndHFileRefsBefore. @@ -190,11 +192,48 @@ public class ReplicationSyncUp extends Configured implements Tool { new ReplicationSyncUpToolInfo(EnvironmentEdgeManager.currentTime()); String json = JsonMapper.writeObjectAsString(info); Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR); - try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), false)) { + try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), isForce)) { out.write(Bytes.toBytes(json)); } } + private static boolean parseOpts(String args[]) { + LinkedList<String> argv = new LinkedList<>(); + argv.addAll(Arrays.asList(args)); + String cmd = null; + while ((cmd = argv.poll()) != null) { + if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) { + printUsageAndExit(null, 0); + } + if (cmd.equals("-f")) { + return true; + } + if (!argv.isEmpty()) { + printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); + } + } + return false; + } + + private static void printUsageAndExit(final String message, final int exitCode) { + printUsage(message); + System.exit(exitCode); + } + + private static void printUsage(final String message) { + if (message != null && message.length() > 0) { + System.err.println(message); + } + System.err.println("Usage: hbase " + ReplicationSyncUp.class.getName() + " \\"); + System.err.println(" <OPTIONS> [-D<property=value>]*"); + System.err.println(); + System.err.println("General Options:"); + System.err.println(" -h|--h|--help Show this help and exit."); + System.err + .println(" -f Start a new ReplicationSyncUp after the previous ReplicationSyncUp failed. " + + "See HBASE-27623 for details."); + } + @Override public int run(String[] args) throws Exception { Abortable abortable = new Abortable() { @@ -217,6 +256,7 @@ public class ReplicationSyncUp extends Configured implements Tool { return abort; } }; + boolean isForce = parseOpts(args); Configuration conf = getConf(); try (ZKWatcher zkw = new ZKWatcher(conf, "syncupReplication" + EnvironmentEdgeManager.currentTime(), abortable, true)) { @@ -226,7 +266,7 @@ public class ReplicationSyncUp extends Configured implements Tool { Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME); System.out.println("Start Replication Server"); - writeInfoFile(fs); + writeInfoFile(fs, isForce); Replication replication = new Replication(); // use offline table replication queue storage getConf().setClass(ReplicationStorageFactory.REPLICATION_QUEUE_IMPL, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java index 38225613b9d..66de933832b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java @@ -27,6 +27,8 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -300,4 +302,38 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase { assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200, rowCountHt2TargetAtPeer1); } + + /** + * test "start a new ReplicationSyncUp after the previous failed". See HBASE-27623 for details. + */ + @Test + public void testStartANewSyncUpToolAfterFailed() throws Exception { + // Start syncUpTool for the first time with non-force mode, + // let's assume that this will fail in sync data, + // this does not affect our test results + syncUp(UTIL1); + Path rootDir = CommonFSUtils.getRootDir(UTIL1.getConfiguration()); + Path syncUpInfoDir = new Path(rootDir, ReplicationSyncUp.INFO_DIR); + Path replicationInfoPath = new Path(syncUpInfoDir, ReplicationSyncUp.INFO_FILE); + FileSystem fs = UTIL1.getTestFileSystem(); + assertTrue(fs.exists(replicationInfoPath)); + FileStatus fileStatus1 = fs.getFileStatus(replicationInfoPath); + + // Start syncUpTool for the second time with non-force mode, + // startup will fail because replication info file already exists + try { + syncUp(UTIL1); + } catch (Exception e) { + assertTrue("e should be a FileAlreadyExistsException", + (e instanceof FileAlreadyExistsException)); + } + FileStatus fileStatus2 = fs.getFileStatus(replicationInfoPath); + assertEquals(fileStatus1.getModificationTime(), fileStatus2.getModificationTime()); + + // Start syncUpTool for the third time with force mode, + // startup will success and create a new replication info file + syncUp(UTIL1, new String[] { "-f" }); + FileStatus fileStatus3 = fs.getFileStatus(replicationInfoPath); + assertTrue(fileStatus3.getModificationTime() > fileStatus2.getModificationTime()); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java index 8a28db3b185..44258241058 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java @@ -136,8 +136,11 @@ public abstract class TestReplicationSyncUpToolBase { } final void syncUp(HBaseTestingUtil util) throws Exception { - ToolRunner.run(new Configuration(util.getConfiguration()), new ReplicationSyncUp(), - new String[0]); + syncUp(util, new String[0]); + } + + final void syncUp(HBaseTestingUtil util, String[] args) throws Exception { + ToolRunner.run(new Configuration(util.getConfiguration()), new ReplicationSyncUp(), args); } // Utilities that manager shutdown / restart of source / sink clusters. They take care of