This is an automated email from the ASF dual-hosted git repository. andor pushed a commit to branch HBASE-28957_rebase in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 7da752635aa3d2d6055a1a1e6f1c3bd07cb86adb Author: vinayak hegde <vinayakph...@gmail.com> AuthorDate: Sat May 31 00:07:44 2025 +0530 HBASE-29133: Implement "pitr" Command for Point-in-Time Restore (#6717) Signed-off-by: Andor Molnar <an...@apache.org> Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org> --- bin/hbase | 17 + ...storeDriver.java => AbstractRestoreDriver.java} | 130 +++----- .../apache/hadoop/hbase/backup/BackupAdmin.java | 7 + .../hbase/backup/BackupRestoreConstants.java | 10 + .../hadoop/hbase/backup/HBackupFileSystem.java | 40 +++ .../hbase/backup/PointInTimeRestoreDriver.java | 137 ++++++++ .../hbase/backup/PointInTimeRestoreRequest.java | 111 +++++++ .../apache/hadoop/hbase/backup/RestoreDriver.java | 218 +------------ .../hadoop/hbase/backup/impl/BackupAdminImpl.java | 360 ++++++++++++++++++++- .../hbase/backup/impl/BackupSystemTable.java | 86 +++++ .../hbase/backup/impl/FullTableBackupClient.java | 12 +- .../ContinuousBackupReplicationEndpoint.java | 33 ++ .../hadoop/hbase/backup/util/BackupUtils.java | 162 ++++++++++ .../apache/hadoop/hbase/backup/TestBackupBase.java | 8 + .../hbase/backup/TestPointInTimeRestore.java | 277 ++++++++++++++++ 15 files changed, 1310 insertions(+), 298 deletions(-) diff --git a/bin/hbase b/bin/hbase index d8d9b6ec5b2..d0343dc85a3 100755 --- a/bin/hbase +++ b/bin/hbase @@ -102,6 +102,7 @@ show_usage() { echo " version Print the version" echo " backup Backup tables for recovery" echo " restore Restore tables from existing backup image" + echo " pitr Restore tables to a specific point in time using backup and WAL replay" echo " completebulkload Run BulkLoadHFiles tool" echo " regionsplitter Run RegionSplitter tool" echo " rowcounter Run RowCounter tool" @@ -639,6 +640,22 @@ elif [ "$COMMAND" = "restore" ] ; then fi done fi +elif [ "$COMMAND" = "pitr" ] ; then + CLASS='org.apache.hadoop.hbase.backup.PointInTimeRestoreDriver' + if [ -n "${shaded_jar}" ] ; then + for f in "${HBASE_HOME}"/lib/hbase-backup*.jar; do + if [ -f "${f}" ]; then + CLASSPATH="${CLASSPATH}:${f}" + break + fi + done + for f in "${HBASE_HOME}"/lib/commons-lang3*.jar; do + if [ -f "${f}" ]; then + CLASSPATH="${CLASSPATH}:${f}" + break + fi + done + fi elif [ "$COMMAND" = "upgrade" ] ; then echo "This command was used to upgrade to HBase 0.96, it was removed in HBase 2.0.0." echo "Please follow the documentation at http://hbase.apache.org/book.html#upgrading." diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/AbstractRestoreDriver.java similarity index 68% copy from hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java copy to hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/AbstractRestoreDriver.java index 38b767ecf67..8e053a73a6b 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/AbstractRestoreDriver.java @@ -33,15 +33,10 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_RESTORE_DESC; import java.io.IOException; -import java.net.URI; import java.util.List; import java.util.Objects; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; import org.apache.hadoop.hbase.backup.impl.BackupManager; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.backup.util.BackupUtils; @@ -49,8 +44,6 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.logging.Log4jUtils; import org.apache.hadoop.hbase.util.AbstractHBaseTool; -import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.util.ToolRunner; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,66 +51,60 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; -/** - * Command-line entry point for restore operation - */ @InterfaceAudience.Private -public class RestoreDriver extends AbstractHBaseTool { - private static final Logger LOG = LoggerFactory.getLogger(RestoreDriver.class); - private CommandLine cmd; - - private static final String USAGE_STRING = - "Usage: hbase restore <backup_path> <backup_id> [options]\n" - + " backup_path Path to a backup destination root\n" - + " backup_id Backup image ID to restore\n" - + " table(s) Comma-separated list of tables to restore\n"; +public abstract class AbstractRestoreDriver extends AbstractHBaseTool { + protected static final Logger LOG = LoggerFactory.getLogger(AbstractRestoreDriver.class); + protected CommandLine cmd; - private static final String USAGE_FOOTER = ""; + protected static final String USAGE_FOOTER = ""; - protected RestoreDriver() throws IOException { + protected AbstractRestoreDriver() { init(); } protected void init() { - // disable irrelevant loggers to avoid it mess up command output Log4jUtils.disableZkAndClientLoggers(); } + protected abstract int executeRestore(boolean check, TableName[] fromTables, TableName[] toTables, + boolean isOverwrite); + private int parseAndRun() throws IOException { - // Check if backup is enabled if (!BackupManager.isBackupEnabled(getConf())) { System.err.println(BackupRestoreConstants.ENABLE_BACKUP); return -1; } - // enable debug logging if (cmd.hasOption(OPTION_DEBUG)) { Log4jUtils.setLogLevel("org.apache.hadoop.hbase.backup", "DEBUG"); } - // whether to overwrite to existing table if any, false by default boolean overwrite = cmd.hasOption(OPTION_OVERWRITE); if (overwrite) { - LOG.debug("Found -overwrite option in restore command, " - + "will overwrite to existing table if any in the restore target"); + LOG.debug("Found overwrite option (-{}) in restore command, " + + "will overwrite to existing table if any in the restore target", OPTION_OVERWRITE); } - // whether to only check the dependencies, false by default boolean check = cmd.hasOption(OPTION_CHECK); if (check) { LOG.debug( - "Found -check option in restore command, " + "will check and verify the dependencies"); + "Found check option (-{}) in restore command, will check and verify the dependencies", + OPTION_CHECK); } if (cmd.hasOption(OPTION_SET) && cmd.hasOption(OPTION_TABLE)) { - System.err.println( - "Options -s and -t are mutaully exclusive," + " you can not specify both of them."); + System.err.printf( + "Set name (-%s) and table list (-%s) are mutually exclusive, you can not specify both " + + "of them.%n", + OPTION_SET, OPTION_TABLE); printToolUsage(); return -1; } if (!cmd.hasOption(OPTION_SET) && !cmd.hasOption(OPTION_TABLE)) { - System.err.println("You have to specify either set name or table list to restore"); + System.err.printf( + "You have to specify either set name (-%s) or table list (-%s) to " + "restore%n", + OPTION_SET, OPTION_TABLE); printToolUsage(); return -1; } @@ -128,20 +115,13 @@ public class RestoreDriver extends AbstractHBaseTool { getConf().set("mapreduce.job.queuename", queueName); } - // parse main restore command options - String[] remainArgs = cmd.getArgs(); - if (remainArgs.length != 2) { - printToolUsage(); - return -1; - } - - String backupRootDir = remainArgs[0]; - String backupId = remainArgs[1]; String tables; - String tableMapping = - cmd.hasOption(OPTION_TABLE_MAPPING) ? cmd.getOptionValue(OPTION_TABLE_MAPPING) : null; - try (final Connection conn = ConnectionFactory.createConnection(conf); - BackupAdmin client = new BackupAdminImpl(conn)) { + TableName[] sTableArray; + TableName[] tTableArray; + + String tableMapping = cmd.getOptionValue(OPTION_TABLE_MAPPING); + + try (final Connection conn = ConnectionFactory.createConnection(conf)) { // Check backup set if (cmd.hasOption(OPTION_SET)) { String setName = cmd.getOptionValue(OPTION_SET); @@ -162,8 +142,8 @@ public class RestoreDriver extends AbstractHBaseTool { tables = cmd.getOptionValue(OPTION_TABLE); } - TableName[] sTableArray = BackupUtils.parseTableNames(tables); - TableName[] tTableArray = BackupUtils.parseTableNames(tableMapping); + sTableArray = BackupUtils.parseTableNames(tables); + tTableArray = BackupUtils.parseTableNames(tableMapping); if ( sTableArray != null && tTableArray != null && (sTableArray.length != tTableArray.length) @@ -172,31 +152,13 @@ public class RestoreDriver extends AbstractHBaseTool { printToolUsage(); return -4; } - - client.restore(BackupUtils.createRestoreRequest(backupRootDir, backupId, check, sTableArray, - tTableArray, overwrite)); - } catch (Exception e) { - LOG.error("Error while running restore backup", e); - return -5; } - return 0; - } - private String getTablesForSet(Connection conn, String name) throws IOException { - try (final BackupSystemTable table = new BackupSystemTable(conn)) { - List<TableName> tables = table.describeBackupSet(name); - - if (tables == null) { - return null; - } - - return StringUtils.join(tables, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND); - } + return executeRestore(check, sTableArray, tTableArray, overwrite); } @Override protected void addOptions() { - // define supported options addOptNoArg(OPTION_OVERWRITE, OPTION_OVERWRITE_DESC); addOptNoArg(OPTION_CHECK, OPTION_CHECK_DESC); addOptNoArg(OPTION_DEBUG, OPTION_DEBUG_DESC); @@ -216,26 +178,14 @@ public class RestoreDriver extends AbstractHBaseTool { return parseAndRun(); } - public static void main(String[] args) throws Exception { - Configuration conf = HBaseConfiguration.create(); - Path hbasedir = CommonFSUtils.getRootDir(conf); - URI defaultFs = hbasedir.getFileSystem(conf).getUri(); - CommonFSUtils.setFsDefault(conf, new Path(defaultFs)); - int ret = ToolRunner.run(conf, new RestoreDriver(), args); - System.exit(ret); - } - @Override public int run(String[] args) { Objects.requireNonNull(conf, "Tool configuration is not initialized"); - CommandLine cmd; try { - // parse the command line arguments cmd = parseArgs(args); - cmdLineArgs = args; } catch (Exception e) { - System.out.println("Error when parsing command-line arguments: " + e.getMessage()); + System.out.println("Error parsing command-line arguments: " + e.getMessage()); printToolUsage(); return EXIT_FAILURE; } @@ -247,18 +197,16 @@ public class RestoreDriver extends AbstractHBaseTool { processOptions(cmd); - int ret = EXIT_FAILURE; try { - ret = doWork(); + return doWork(); } catch (Exception e) { - LOG.error("Error running command-line tool", e); + LOG.error("Error running restore tool", e); return EXIT_FAILURE; } - return ret; } protected void printToolUsage() { - System.out.println(USAGE_STRING); + System.out.println(getUsageString()); HelpFormatter helpFormatter = new HelpFormatter(); helpFormatter.setLeftPadding(2); helpFormatter.setDescPadding(8); @@ -267,4 +215,18 @@ public class RestoreDriver extends AbstractHBaseTool { helpFormatter.printHelp(" ", null, options, USAGE_FOOTER); System.out.println(BackupRestoreConstants.VERIFY_BACKUP); } + + protected abstract String getUsageString(); + + private String getTablesForSet(Connection conn, String name) throws IOException { + try (final BackupSystemTable table = new BackupSystemTable(conn)) { + List<TableName> tables = table.describeBackupSet(name); + + if (tables == null) { + return null; + } + + return StringUtils.join(tables, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND); + } + } } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java index 25055fd5e8e..7fdf1d9bfbf 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java @@ -51,6 +51,13 @@ public interface BackupAdmin extends Closeable { */ void restore(RestoreRequest request) throws IOException; + /** + * Restore the tables to specific time + * @param request Point in Time restore request + * @throws IOException exception + */ + void pointInTimeRestore(PointInTimeRestoreRequest request) throws IOException; + /** * Describe backup image command * @param backupId backup id diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java index 3df67ac1aef..be6e4c2686d 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java @@ -109,6 +109,16 @@ public interface BackupRestoreConstants { String OPTION_FORCE_DELETE_DESC = "Flag to forcefully delete the backup, even if it may be required for Point-in-Time Restore"; + String OPTION_TO_DATETIME = "td"; + String LONG_OPTION_TO_DATETIME = "to-datetime"; + String OPTION_TO_DATETIME_DESC = "Target date and time up to which data should be restored"; + + String OPTION_PITR_BACKUP_PATH = "bp"; + String LONG_OPTION_PITR_BACKUP_PATH = "backup-path"; + String OPTION_PITR_BACKUP_PATH_DESC = + "Specifies a custom backup location for Point-In-Time Recovery (PITR). " + + "If provided, this location will be used exclusively instead of deriving the path from the system table."; + String JOB_NAME_CONF_KEY = "mapreduce.job.name"; String BACKUP_CONFIG_STRING = BackupRestoreConstants.BACKUP_ENABLE_KEY + "=true\n" diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java index d5fd9aaf4c3..0ad5b3f0e05 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java @@ -17,13 +17,21 @@ */ package org.apache.hadoop.hbase.backup; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUPID_PREFIX; + import com.google.errorprone.annotations.RestrictedApi; import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.impl.BackupManifest; +import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,4 +143,36 @@ public final class HBackupFileSystem { new BackupManifest(conf, getManifestPath(conf, backupRootPath, backupId)); return manifest; } + + public static List<BackupImage> getAllBackupImages(Configuration conf, Path backupRootPath) + throws IOException { + FileSystem fs = FileSystem.get(backupRootPath.toUri(), conf); + RemoteIterator<LocatedFileStatus> it = fs.listLocatedStatus(backupRootPath); + + List<BackupImage> images = new ArrayList<>(); + + while (it.hasNext()) { + LocatedFileStatus lfs = it.next(); + if (!lfs.isDirectory()) { + continue; + } + + String backupId = lfs.getPath().getName(); + try { + BackupManifest manifest = getManifest(conf, backupRootPath, backupId); + images.add(manifest.getBackupImage()); + } catch (IOException e) { + LOG.error("Cannot load backup manifest from: " + lfs.getPath(), e); + } + } + + // Sort images by timestamp in descending order + images.sort(Comparator.comparingLong(m -> -getTimestamp(m.getBackupId()))); + + return images; + } + + private static long getTimestamp(String backupId) { + return Long.parseLong(backupId.substring(BACKUPID_PREFIX.length())); + } } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java new file mode 100644 index 00000000000..abdf52f1430 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.LONG_OPTION_PITR_BACKUP_PATH; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.LONG_OPTION_TO_DATETIME; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_PITR_BACKUP_PATH; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_PITR_BACKUP_PATH_DESC; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DATETIME; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DATETIME_DESC; + +import java.net.URI; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.util.ToolRunner; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.common.base.Strings; + +/** + * Command-line entry point for restore operation + */ +@InterfaceAudience.Private +public class PointInTimeRestoreDriver extends AbstractRestoreDriver { + private static final String USAGE_STRING = """ + Usage: hbase pitr [options] + <backup_path> Backup Path to use for Point in Time Restore + table(s) Comma-separated list of tables to restore + """; + + @Override + protected int executeRestore(boolean check, TableName[] fromTables, TableName[] toTables, + boolean isOverwrite) { + String walBackupDir = getConf().get(CONF_CONTINUOUS_BACKUP_WAL_DIR); + if (Strings.isNullOrEmpty(walBackupDir)) { + System.err.printf( + "Point-in-Time Restore requires the WAL backup directory (%s) to replay logs after full and incremental backups. " + + "Set this property if you need Point-in-Time Restore. Otherwise, use the normal restore process with the appropriate backup ID.%n", + CONF_CONTINUOUS_BACKUP_WAL_DIR); + return -1; + } + + String[] remainArgs = cmd.getArgs(); + if (remainArgs.length != 0) { + printToolUsage(); + return -1; + } + + String backupRootDir = cmd.getOptionValue(OPTION_PITR_BACKUP_PATH); + + try (final Connection conn = ConnectionFactory.createConnection(conf); + BackupAdmin client = new BackupAdminImpl(conn)) { + // Get the replication checkpoint (last known safe point for Continuous Backup) + long replicationCheckpoint = BackupUtils.getReplicationCheckpoint(conn); + long endTime = replicationCheckpoint; + + if (cmd.hasOption(OPTION_TO_DATETIME)) { + String time = cmd.getOptionValue(OPTION_TO_DATETIME); + try { + endTime = Long.parseLong(time); + // Convert seconds to milliseconds if input is in seconds + if (endTime < 10_000_000_000L) { + endTime *= 1000; + } + } catch (NumberFormatException e) { + System.out.println("ERROR: Invalid timestamp format for --to-datetime: " + time); + printToolUsage(); + return -5; + } + } + + // Ensure the requested restore time does not exceed the replication checkpoint + if (endTime > replicationCheckpoint) { + LOG.error( + "ERROR: Requested restore time ({}) exceeds the last known safe replication checkpoint ({}). " + + "Please choose a time before this checkpoint to ensure data consistency.", + endTime, replicationCheckpoint); + return -5; + } + + PointInTimeRestoreRequest pointInTimeRestoreRequest = new PointInTimeRestoreRequest.Builder() + .withBackupRootDir(backupRootDir).withCheck(check).withFromTables(fromTables) + .withToTables(toTables).withOverwrite(isOverwrite).withToDateTime(endTime).build(); + + client.pointInTimeRestore(pointInTimeRestoreRequest); + } catch (Exception e) { + LOG.error("Error while running restore backup", e); + return -5; + } + return 0; + } + + @Override + protected void addOptions() { + super.addOptions(); + addOptWithArg(OPTION_TO_DATETIME, LONG_OPTION_TO_DATETIME, OPTION_TO_DATETIME_DESC); + addOptWithArg(OPTION_PITR_BACKUP_PATH, LONG_OPTION_PITR_BACKUP_PATH, + OPTION_PITR_BACKUP_PATH_DESC); + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + Path rootDir = CommonFSUtils.getRootDir(conf); + URI defaultFs = rootDir.getFileSystem(conf).getUri(); + CommonFSUtils.setFsDefault(conf, new Path(defaultFs)); + int ret = ToolRunner.run(conf, new PointInTimeRestoreDriver(), args); + System.exit(ret); + } + + @Override + protected String getUsageString() { + return USAGE_STRING; + } +} diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreRequest.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreRequest.java new file mode 100644 index 00000000000..f2462a1cfd1 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreRequest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import org.apache.hadoop.hbase.TableName; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * POJO class for Point In Time Restore request + */ +@InterfaceAudience.Private +public final class PointInTimeRestoreRequest { + + private final String backupRootDir; + private final boolean check; + private final TableName[] fromTables; + private final TableName[] toTables; + private final boolean overwrite; + private final long toDateTime; + + private PointInTimeRestoreRequest(Builder builder) { + this.backupRootDir = builder.backupRootDir; + this.check = builder.check; + this.fromTables = builder.fromTables; + this.toTables = builder.toTables; + this.overwrite = builder.overwrite; + this.toDateTime = builder.toDateTime; + } + + public String getBackupRootDir() { + return backupRootDir; + } + + public boolean isCheck() { + return check; + } + + public TableName[] getFromTables() { + return fromTables; + } + + public TableName[] getToTables() { + return toTables; + } + + public boolean isOverwrite() { + return overwrite; + } + + public long getToDateTime() { + return toDateTime; + } + + public static class Builder { + private String backupRootDir; + private boolean check = false; + private TableName[] fromTables; + private TableName[] toTables; + private boolean overwrite = false; + private long toDateTime; + + public Builder withBackupRootDir(String backupRootDir) { + this.backupRootDir = backupRootDir; + return this; + } + + public Builder withCheck(boolean check) { + this.check = check; + return this; + } + + public Builder withFromTables(TableName[] fromTables) { + this.fromTables = fromTables; + return this; + } + + public Builder withToTables(TableName[] toTables) { + this.toTables = toTables; + return this; + } + + public Builder withOverwrite(boolean overwrite) { + this.overwrite = overwrite; + return this; + } + + public Builder withToDateTime(long dateTime) { + this.toDateTime = dateTime; + return this; + } + + public PointInTimeRestoreRequest build() { + return new PointInTimeRestoreRequest(this); + } + } +} diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java index 38b767ecf67..efc4b0df9a0 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java @@ -17,117 +17,34 @@ */ package org.apache.hadoop.hbase.backup; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK_DESC; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG_DESC; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE_DESC; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET_RESTORE_DESC; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_LIST_DESC; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING_DESC; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_RESTORE_DESC; - -import java.io.IOException; import java.net.URI; -import java.util.List; -import java.util.Objects; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; -import org.apache.hadoop.hbase.backup.impl.BackupManager; -import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.logging.Log4jUtils; -import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.util.ToolRunner; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; -import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; /** * Command-line entry point for restore operation */ @InterfaceAudience.Private -public class RestoreDriver extends AbstractHBaseTool { - private static final Logger LOG = LoggerFactory.getLogger(RestoreDriver.class); - private CommandLine cmd; - - private static final String USAGE_STRING = - "Usage: hbase restore <backup_path> <backup_id> [options]\n" - + " backup_path Path to a backup destination root\n" - + " backup_id Backup image ID to restore\n" - + " table(s) Comma-separated list of tables to restore\n"; - - private static final String USAGE_FOOTER = ""; - - protected RestoreDriver() throws IOException { - init(); - } - - protected void init() { - // disable irrelevant loggers to avoid it mess up command output - Log4jUtils.disableZkAndClientLoggers(); - } - - private int parseAndRun() throws IOException { - // Check if backup is enabled - if (!BackupManager.isBackupEnabled(getConf())) { - System.err.println(BackupRestoreConstants.ENABLE_BACKUP); - return -1; - } - - // enable debug logging - if (cmd.hasOption(OPTION_DEBUG)) { - Log4jUtils.setLogLevel("org.apache.hadoop.hbase.backup", "DEBUG"); - } - - // whether to overwrite to existing table if any, false by default - boolean overwrite = cmd.hasOption(OPTION_OVERWRITE); - if (overwrite) { - LOG.debug("Found -overwrite option in restore command, " - + "will overwrite to existing table if any in the restore target"); - } - - // whether to only check the dependencies, false by default - boolean check = cmd.hasOption(OPTION_CHECK); - if (check) { - LOG.debug( - "Found -check option in restore command, " + "will check and verify the dependencies"); - } - - if (cmd.hasOption(OPTION_SET) && cmd.hasOption(OPTION_TABLE)) { - System.err.println( - "Options -s and -t are mutaully exclusive," + " you can not specify both of them."); - printToolUsage(); - return -1; - } - - if (!cmd.hasOption(OPTION_SET) && !cmd.hasOption(OPTION_TABLE)) { - System.err.println("You have to specify either set name or table list to restore"); - printToolUsage(); - return -1; - } - - if (cmd.hasOption(OPTION_YARN_QUEUE_NAME)) { - String queueName = cmd.getOptionValue(OPTION_YARN_QUEUE_NAME); - // Set MR job queuename to configuration - getConf().set("mapreduce.job.queuename", queueName); - } +public class RestoreDriver extends AbstractRestoreDriver { + private static final String USAGE_STRING = """ + Usage: hbase restore <backup_path> <backup_id> [options] + backup_path Path to a backup destination root + backup_id Backup image ID to restore + table(s) Comma-separated list of tables to restore + """; + @Override + protected int executeRestore(boolean check, TableName[] fromTables, TableName[] toTables, + boolean isOverwrite) { // parse main restore command options String[] remainArgs = cmd.getArgs(); if (remainArgs.length != 2) { @@ -137,44 +54,11 @@ public class RestoreDriver extends AbstractHBaseTool { String backupRootDir = remainArgs[0]; String backupId = remainArgs[1]; - String tables; - String tableMapping = - cmd.hasOption(OPTION_TABLE_MAPPING) ? cmd.getOptionValue(OPTION_TABLE_MAPPING) : null; + try (final Connection conn = ConnectionFactory.createConnection(conf); BackupAdmin client = new BackupAdminImpl(conn)) { - // Check backup set - if (cmd.hasOption(OPTION_SET)) { - String setName = cmd.getOptionValue(OPTION_SET); - try { - tables = getTablesForSet(conn, setName); - } catch (IOException e) { - System.out.println("ERROR: " + e.getMessage() + " for setName=" + setName); - printToolUsage(); - return -2; - } - if (tables == null) { - System.out - .println("ERROR: Backup set '" + setName + "' is either empty or does not exist"); - printToolUsage(); - return -3; - } - } else { - tables = cmd.getOptionValue(OPTION_TABLE); - } - - TableName[] sTableArray = BackupUtils.parseTableNames(tables); - TableName[] tTableArray = BackupUtils.parseTableNames(tableMapping); - - if ( - sTableArray != null && tTableArray != null && (sTableArray.length != tTableArray.length) - ) { - System.out.println("ERROR: table mapping mismatch: " + tables + " : " + tableMapping); - printToolUsage(); - return -4; - } - - client.restore(BackupUtils.createRestoreRequest(backupRootDir, backupId, check, sTableArray, - tTableArray, overwrite)); + client.restore(BackupUtils.createRestoreRequest(backupRootDir, backupId, check, fromTables, + toTables, isOverwrite)); } catch (Exception e) { LOG.error("Error while running restore backup", e); return -5; @@ -182,40 +66,6 @@ public class RestoreDriver extends AbstractHBaseTool { return 0; } - private String getTablesForSet(Connection conn, String name) throws IOException { - try (final BackupSystemTable table = new BackupSystemTable(conn)) { - List<TableName> tables = table.describeBackupSet(name); - - if (tables == null) { - return null; - } - - return StringUtils.join(tables, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND); - } - } - - @Override - protected void addOptions() { - // define supported options - addOptNoArg(OPTION_OVERWRITE, OPTION_OVERWRITE_DESC); - addOptNoArg(OPTION_CHECK, OPTION_CHECK_DESC); - addOptNoArg(OPTION_DEBUG, OPTION_DEBUG_DESC); - addOptWithArg(OPTION_SET, OPTION_SET_RESTORE_DESC); - addOptWithArg(OPTION_TABLE, OPTION_TABLE_LIST_DESC); - addOptWithArg(OPTION_TABLE_MAPPING, OPTION_TABLE_MAPPING_DESC); - addOptWithArg(OPTION_YARN_QUEUE_NAME, OPTION_YARN_QUEUE_NAME_RESTORE_DESC); - } - - @Override - protected void processOptions(CommandLine cmd) { - this.cmd = cmd; - } - - @Override - protected int doWork() throws Exception { - return parseAndRun(); - } - public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); Path hbasedir = CommonFSUtils.getRootDir(conf); @@ -226,45 +76,7 @@ public class RestoreDriver extends AbstractHBaseTool { } @Override - public int run(String[] args) { - Objects.requireNonNull(conf, "Tool configuration is not initialized"); - - CommandLine cmd; - try { - // parse the command line arguments - cmd = parseArgs(args); - cmdLineArgs = args; - } catch (Exception e) { - System.out.println("Error when parsing command-line arguments: " + e.getMessage()); - printToolUsage(); - return EXIT_FAILURE; - } - - if (cmd.hasOption(SHORT_HELP_OPTION) || cmd.hasOption(LONG_HELP_OPTION)) { - printToolUsage(); - return EXIT_FAILURE; - } - - processOptions(cmd); - - int ret = EXIT_FAILURE; - try { - ret = doWork(); - } catch (Exception e) { - LOG.error("Error running command-line tool", e); - return EXIT_FAILURE; - } - return ret; - } - - protected void printToolUsage() { - System.out.println(USAGE_STRING); - HelpFormatter helpFormatter = new HelpFormatter(); - helpFormatter.setLeftPadding(2); - helpFormatter.setDescPadding(8); - helpFormatter.setWidth(100); - helpFormatter.setSyntaxPrefix("Options:"); - helpFormatter.printHelp(" ", null, options, USAGE_FOOTER); - System.out.println(BackupRestoreConstants.VERIFY_BACKUP); + protected String getUsageString() { + return USAGE_STRING; } } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java index 1e745c69cda..e94389d6938 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java @@ -17,18 +17,32 @@ */ package org.apache.hadoop.hbase.backup.impl; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS; +import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; + import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupAdmin; import org.apache.hadoop.hbase.backup.BackupClientFactory; @@ -40,12 +54,16 @@ import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.BackupRestoreFactory; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest; import org.apache.hadoop.hbase.backup.RestoreRequest; import org.apache.hadoop.hbase.backup.util.BackupSet; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.mapreduce.WALInputFormat; +import org.apache.hadoop.hbase.mapreduce.WALPlayer; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.util.Tool; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -490,15 +508,8 @@ public class BackupAdminImpl implements BackupAdmin { @Override public void restore(RestoreRequest request) throws IOException { if (request.isCheck()) { - // check and load backup image manifest for the tables - Path rootPath = new Path(request.getBackupRootDir()); - String backupId = request.getBackupId(); - TableName[] sTableArray = request.getFromTables(); - BackupManifest manifest = - HBackupFileSystem.getManifest(conn.getConfiguration(), rootPath, backupId); - - // Check and validate the backup image and its dependencies - if (BackupUtils.validate(Arrays.asList(sTableArray), manifest, conn.getConfiguration())) { + boolean isValid = validateRequest(request); + if (isValid) { LOG.info(CHECK_OK); } else { LOG.error(CHECK_FAILED); @@ -509,6 +520,337 @@ public class BackupAdminImpl implements BackupAdmin { new RestoreTablesClient(conn, request).execute(); } + private boolean validateRequest(RestoreRequest request) throws IOException { + // check and load backup image manifest for the tables + Path rootPath = new Path(request.getBackupRootDir()); + String backupId = request.getBackupId(); + TableName[] sTableArray = request.getFromTables(); + BackupManifest manifest = + HBackupFileSystem.getManifest(conn.getConfiguration(), rootPath, backupId); + + // Validate the backup image and its dependencies + return BackupUtils.validate(Arrays.asList(sTableArray), manifest, conn.getConfiguration()); + } + + @Override + public void pointInTimeRestore(PointInTimeRestoreRequest request) throws IOException { + if (request.getBackupRootDir() == null) { + defaultPointInTimeRestore(request); + } else { + // TODO: special case, not supported at the moment + throw new IOException("Custom backup location for Point-In-Time Recovery Not supported!"); + } + LOG.info("Successfully completed Point In Time Restore for all tables."); + } + + /** + * Performs a default Point-In-Time Restore (PITR) by restoring the latest valid backup and + * replaying the WALs to bring the table to the desired state. PITR requires: 1. A valid backup + * available before the end time. 2. Write-Ahead Logs (WALs) covering the remaining duration up to + * the end time. + * @param request PointInTimeRestoreRequest containing restore parameters. + * @throws IOException If no valid backup or WALs are found, or if an error occurs during + * restoration. + */ + private void defaultPointInTimeRestore(PointInTimeRestoreRequest request) throws IOException { + long endTime = request.getToDateTime(); + validateRequestToTime(endTime); + + TableName[] sTableArray = request.getFromTables(); + TableName[] tTableArray = resolveTargetTables(sTableArray, request.getToTables()); + + // Validate PITR requirements + validatePitr(endTime, sTableArray, tTableArray); + + // If only validation is required, log and return + if (request.isCheck()) { + LOG.info("PITR can be successfully executed"); + return; + } + + // Execute PITR process + try (BackupSystemTable table = new BackupSystemTable(conn)) { + Map<TableName, Long> continuousBackupTables = table.getContinuousBackupTableSet(); + List<BackupInfo> backupInfos = table.getBackupInfos(BackupState.COMPLETE); + + for (int i = 0; i < sTableArray.length; i++) { + restoreTableWithWalReplay(sTableArray[i], tTableArray[i], endTime, continuousBackupTables, + backupInfos, request); + } + } + } + + /** + * Validates whether the requested end time falls within the allowed PITR recovery window. + * @param endTime The target recovery time. + * @throws IOException If the requested recovery time is outside the allowed window. + */ + private void validateRequestToTime(long endTime) throws IOException { + long pitrWindowDays = conn.getConfiguration().getLong(CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS, + DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS); + long currentTime = EnvironmentEdgeManager.getDelegate().currentTime(); + long pitrMaxStartTime = currentTime - TimeUnit.DAYS.toMillis(pitrWindowDays); + + if (endTime < pitrMaxStartTime) { + String errorMsg = String.format( + "Requested recovery time (%d) is out of the allowed PITR window (last %d days).", endTime, + pitrWindowDays); + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + + if (endTime > currentTime) { + String errorMsg = String.format( + "Requested recovery time (%d) is in the future. Current time: %d.", endTime, currentTime); + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + } + + /** + * Resolves the target table array. If null or empty, defaults to the source table array. + */ + private TableName[] resolveTargetTables(TableName[] sourceTables, TableName[] targetTables) { + return (targetTables == null || targetTables.length == 0) ? sourceTables : targetTables; + } + + /** + * Validates whether Point-In-Time Recovery (PITR) is possible for the given tables at the + * specified time. + * <p> + * PITR requires: + * <ul> + * <li>Continuous backup to be enabled for the source tables.</li> + * <li>A valid backup image and corresponding WALs to be available.</li> + * </ul> + * @param endTime The target recovery time. + * @param sTableArray The source tables to restore. + * @param tTableArray The target tables where the restore will be performed. + * @throws IOException If PITR is not possible due to missing continuous backup or backup images. + */ + private void validatePitr(long endTime, TableName[] sTableArray, TableName[] tTableArray) + throws IOException { + try (BackupSystemTable table = new BackupSystemTable(conn)) { + // Retrieve the set of tables with continuous backup enabled + Map<TableName, Long> continuousBackupTables = table.getContinuousBackupTableSet(); + + // Ensure all source tables have continuous backup enabled + validateContinuousBackup(sTableArray, continuousBackupTables); + + // Fetch completed backup information + List<BackupInfo> backupInfos = table.getBackupInfos(BackupState.COMPLETE); + + // Ensure a valid backup and WALs exist for PITR + validateBackupAvailability(sTableArray, tTableArray, endTime, continuousBackupTables, + backupInfos); + } + } + + /** + * Ensures that all source tables have continuous backup enabled. + */ + private void validateContinuousBackup(TableName[] tables, + Map<TableName, Long> continuousBackupTables) throws IOException { + List<TableName> missingTables = + Arrays.stream(tables).filter(table -> !continuousBackupTables.containsKey(table)).toList(); + + if (!missingTables.isEmpty()) { + String errorMsg = "Continuous Backup is not enabled for the following tables: " + + missingTables.stream().map(TableName::getNameAsString).collect(Collectors.joining(", ")); + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + } + + /** + * Ensures that a valid backup and corresponding WALs exist for PITR for each source table. PITR + * requires: 1. A valid backup available before the end time. 2. Write-Ahead Logs (WALs) covering + * the remaining duration up to the end time. + */ + private void validateBackupAvailability(TableName[] sTableArray, TableName[] tTableArray, + long endTime, Map<TableName, Long> continuousBackupTables, List<BackupInfo> backupInfos) + throws IOException { + for (int i = 0; i < sTableArray.length; i++) { + if ( + !canPerformPitr(sTableArray[i], tTableArray[i], endTime, continuousBackupTables, + backupInfos) + ) { + String errorMsg = "Could not find a valid backup and WALs for PITR for table: " + + sTableArray[i].getNameAsString(); + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + } + } + + /** + * Checks whether PITR can be performed for a given source-target table pair. + */ + private boolean canPerformPitr(TableName stableName, TableName tTableName, long endTime, + Map<TableName, Long> continuousBackupTables, List<BackupInfo> backupInfos) { + return getValidBackupInfo(stableName, tTableName, endTime, continuousBackupTables, backupInfos) + != null; + } + + /** + * Finds a valid backup for PITR that meets the required conditions. + */ + private BackupInfo getValidBackupInfo(TableName sTableName, TableName tTablename, long endTime, + Map<TableName, Long> continuousBackupTables, List<BackupInfo> backupInfos) { + for (BackupInfo info : backupInfos) { + if (isValidBackupForPitr(info, sTableName, endTime, continuousBackupTables)) { + + RestoreRequest restoreRequest = + BackupUtils.createRestoreRequest(info.getBackupRootDir(), info.getBackupId(), true, + new TableName[] { sTableName }, new TableName[] { tTablename }, false); + + try { + if (validateRequest(restoreRequest)) { + return info; + } + } catch (IOException e) { + LOG.warn("Exception occurred while testing the backup : {} for restore ", info, e); + } + } + } + return null; + } + + /** + * Determines if the given backup is valid for PITR. + * <p> + * A backup is valid if: + * <ul> + * <li>It contains the source table.</li> + * <li>It was completed before the end time.</li> + * <li>The start timestamp of the backup is after the continuous backup start time for the + * table.</li> + * </ul> + * @param info Backup information object. + * @param tableName Table to check. + * @param endTime The target recovery time. + * @param continuousBackupTables Map of tables with continuous backup enabled. + * @return true if the backup is valid for PITR, false otherwise. + */ + private boolean isValidBackupForPitr(BackupInfo info, TableName tableName, long endTime, + Map<TableName, Long> continuousBackupTables) { + return info.getTableNames().contains(tableName) && info.getCompleteTs() <= endTime + && continuousBackupTables.getOrDefault(tableName, 0L) <= info.getStartTs(); + } + + /** + * Restores a table from a valid backup and replays WALs to reach the desired PITR state. + */ + private void restoreTableWithWalReplay(TableName sourceTable, TableName targetTable, long endTime, + Map<TableName, Long> continuousBackupTables, List<BackupInfo> backupInfos, + PointInTimeRestoreRequest request) throws IOException { + BackupInfo backupInfo = + getValidBackupInfo(sourceTable, targetTable, endTime, continuousBackupTables, backupInfos); + if (backupInfo == null) { + String errorMsg = "Could not find a valid backup and WALs for PITR for table: " + + sourceTable.getNameAsString(); + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + + RestoreRequest restoreRequest = BackupUtils.createRestoreRequest(backupInfo.getBackupRootDir(), + backupInfo.getBackupId(), false, new TableName[] { sourceTable }, + new TableName[] { targetTable }, request.isOverwrite()); + + restore(restoreRequest); + replayWal(sourceTable, targetTable, backupInfo.getStartTs(), endTime); + } + + /** + * Replays WALs to bring the table to the desired state. + */ + private void replayWal(TableName sourceTable, TableName targetTable, long startTime, long endTime) + throws IOException { + String walBackupDir = conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR); + Path walDirPath = new Path(walBackupDir); + LOG.info( + "Starting WAL replay for source: {}, target: {}, time range: {} - {}, WAL backup dir: {}", + sourceTable, targetTable, startTime, endTime, walDirPath); + + List<String> validDirs = + getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime); + if (validDirs.isEmpty()) { + LOG.warn("No valid WAL directories found for range {} - {}. Skipping WAL replay.", startTime, + endTime); + return; + } + + executeWalReplay(validDirs, sourceTable, targetTable, startTime, endTime); + } + + /** + * Fetches valid WAL directories based on the given time range. + */ + private List<String> getValidWalDirs(Configuration conf, Path walBackupDir, long startTime, + long endTime) throws IOException { + FileSystem backupFs = FileSystem.get(walBackupDir.toUri(), conf); + FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, WALS_DIR)); + + List<String> validDirs = new ArrayList<>(); + SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + + for (FileStatus dayDir : dayDirs) { + if (!dayDir.isDirectory()) { + continue; // Skip files, only process directories + } + + String dirName = dayDir.getPath().getName(); + try { + Date dirDate = dateFormat.parse(dirName); + long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00) + long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End time of day (23:59:59) + + // Check if this day's WAL files overlap with the required time range + if (dirEndTime >= startTime && dirStartTime <= endTime) { + validDirs.add(dayDir.getPath().toString()); + } + } catch (ParseException e) { + LOG.warn("Skipping invalid directory name: " + dirName, e); + } + } + return validDirs; + } + + /** + * Executes WAL replay using WALPlayer. + */ + private void executeWalReplay(List<String> walDirs, TableName sourceTable, TableName targetTable, + long startTime, long endTime) throws IOException { + Tool walPlayer = initializeWalPlayer(startTime, endTime); + String[] args = + { String.join(",", walDirs), sourceTable.getNameAsString(), targetTable.getNameAsString() }; + + try { + LOG.info("Executing WALPlayer with args: {}", Arrays.toString(args)); + int exitCode = walPlayer.run(args); + if (exitCode == 0) { + LOG.info("WAL replay completed successfully for {}", targetTable); + } else { + throw new IOException("WAL replay failed with exit code: " + exitCode); + } + } catch (Exception e) { + LOG.error("Error during WAL replay for {}: {}", targetTable, e.getMessage(), e); + throw new IOException("Exception during WAL replay", e); + } + } + + /** + * Initializes and configures WALPlayer. + */ + private Tool initializeWalPlayer(long startTime, long endTime) { + Configuration conf = HBaseConfiguration.create(conn.getConfiguration()); + conf.setLong(WALInputFormat.START_TIME_KEY, startTime); + conf.setLong(WALInputFormat.END_TIME_KEY, endTime); + Tool walPlayer = new WALPlayer(); + walPlayer.setConf(conf); + return walPlayer; + } + @Override public String backupTables(BackupRequest request) throws IOException { BackupType type = request.getBackupType(); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index fa01cd88345..b72858fa578 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -170,6 +170,12 @@ public final class BackupSystemTable implements Closeable { private final static String INCR_BACKUP_SET = "incrbackupset:"; private final static String CONTINUOUS_BACKUP_SET = "continuousbackupset"; + /** + * Row key identifier for storing the last replicated WAL timestamp in the backup system table for + * continuous backup. + */ + private static final String CONTINUOUS_BACKUP_REPLICATION_TIMESTAMP_ROW = + "continuous_backup_last_replicated"; private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; private final static String RS_LOG_TS_PREFIX = "rslogts:"; @@ -1003,6 +1009,86 @@ public final class BackupSystemTable implements Closeable { } } + /** + * Updates the latest replicated WAL timestamp for a region server in the backup system table. + * This is used to track the replication checkpoint for continuous backup and PITR (Point-in-Time + * Restore). + * @param serverName the server for which the latest WAL timestamp is being recorded + * @param timestamp the timestamp (in milliseconds) of the last WAL entry replicated + * @throws IOException if an error occurs while writing to the backup system table + */ + public void updateBackupCheckpointTimestamp(ServerName serverName, long timestamp) + throws IOException { + + HBaseProtos.ServerName.Builder serverProto = + HBaseProtos.ServerName.newBuilder().setHostName(serverName.getHostname()) + .setPort(serverName.getPort()).setStartCode(serverName.getStartCode()); + + try (Table table = connection.getTable(tableName)) { + Put put = createPutForBackupCheckpoint(serverProto.build().toByteArray(), timestamp); + if (!put.isEmpty()) { + table.put(put); + } + } + } + + /** + * Retrieves the latest replicated WAL timestamps for all region servers from the backup system + * table. This is used to track the replication checkpoint state for continuous backup and PITR + * (Point-in-Time Restore). + * @return a map where the key is {@link ServerName} and the value is the latest replicated WAL + * timestamp in milliseconds + * @throws IOException if an error occurs while reading from the backup system table + */ + public Map<ServerName, Long> getBackupCheckpointTimestamps() throws IOException { + LOG.trace("Fetching latest backup checkpoint timestamps for all region servers."); + + Map<ServerName, Long> checkpointMap = new HashMap<>(); + + byte[] rowKey = rowkey(CONTINUOUS_BACKUP_REPLICATION_TIMESTAMP_ROW); + Get get = new Get(rowKey); + get.addFamily(BackupSystemTable.META_FAMILY); + + try (Table table = connection.getTable(tableName)) { + Result result = table.get(get); + + if (result.isEmpty()) { + LOG.debug("No checkpoint timestamps found in backup system table."); + return checkpointMap; + } + + List<Cell> cells = result.listCells(); + for (Cell cell : cells) { + try { + HBaseProtos.ServerName protoServer = + HBaseProtos.ServerName.parseFrom(CellUtil.cloneQualifier(cell)); + ServerName serverName = ServerName.valueOf(protoServer.getHostName(), + protoServer.getPort(), protoServer.getStartCode()); + + long timestamp = Bytes.toLong(CellUtil.cloneValue(cell)); + checkpointMap.put(serverName, timestamp); + } catch (IllegalArgumentException e) { + LOG.warn("Failed to parse server name or timestamp from cell: {}", cell, e); + } + } + } + + return checkpointMap; + } + + /** + * Constructs a {@link Put} operation to update the last replicated WAL timestamp for a given + * server in the backup system table. + * @param serverNameBytes the serialized server name as bytes + * @param timestamp the WAL entry timestamp to store + * @return a {@link Put} object ready to be written to the system table + */ + private Put createPutForBackupCheckpoint(byte[] serverNameBytes, long timestamp) { + Put put = new Put(rowkey(CONTINUOUS_BACKUP_REPLICATION_TIMESTAMP_ROW)); + put.addColumn(BackupSystemTable.META_FAMILY, serverNameBytes, Bytes.toBytes(timestamp)); + return put; + } + /** * Deletes incremental backup set for a backup destination * @param backupRoot backup root diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java index 623494577be..42341cb2186 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java @@ -166,10 +166,18 @@ public class FullTableBackupClient extends TableBackupClient { private void handleContinuousBackup(Admin admin) throws IOException { backupInfo.setPhase(BackupInfo.BackupPhase.SETUP_WAL_REPLICATION); long startTimestamp = startContinuousWALBackup(admin); + backupManager.addContinuousBackupTableSet(backupInfo.getTables(), startTimestamp); - performBackupSnapshots(admin); + // Updating the start time of this backup to reflect the actual beginning of the full backup. + // So far, we have only set up continuous WAL replication, but the full backup has not yet + // started. + // Setting the correct start time is crucial for Point-In-Time Recovery (PITR). + // When selecting a backup for PITR, we must ensure that the backup started **on or after** the + // starting time of the WALs. If WAL streaming began later, we couldn't guarantee that WALs + // exist for the entire period between the backup's start time and the desired PITR timestamp. + backupInfo.setStartTs(startTimestamp); - backupManager.addContinuousBackupTableSet(backupInfo.getTables(), startTimestamp); + performBackupSnapshots(admin); // set overall backup status: complete. Here we make sure to complete the backup. // After this checkpoint, even if entering cancel process, will let the backup finished diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java index c973af8102e..34fcd76bf9c 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java @@ -36,6 +36,9 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; @@ -82,6 +85,8 @@ public class ContinuousBackupReplicationEndpoint extends BaseReplicationEndpoint private String peerId; private ScheduledExecutorService flushExecutor; + private long latestWALEntryTimestamp = -1L; + public static final long ONE_DAY_IN_MILLISECONDS = TimeUnit.DAYS.toMillis(1); public static final String WAL_FILE_PREFIX = "wal_file."; public static final String DATE_FORMAT = "yyyy-MM-dd"; @@ -174,6 +179,13 @@ public class ContinuousBackupReplicationEndpoint extends BaseReplicationEndpoint } } walWriters.clear(); + + // All received WAL entries have been flushed and persisted successfully. + // At this point, it's safe to record the latest replicated timestamp, + // as we are guaranteed that all entries up to that timestamp are durably stored. + // This checkpoint is essential for enabling consistent Point-in-Time Restore (PITR). + updateLastReplicatedTimestampForContinuousBackup(); + LOG.info("{} WAL writers flushed and cleared", Utils.logPeerId(peerId)); } @@ -218,6 +230,12 @@ public class ContinuousBackupReplicationEndpoint extends BaseReplicationEndpoint backupWalEntries(entry.getKey(), entry.getValue()); } + // Capture the timestamp of the last WAL entry processed. This is used as the replication + // checkpoint so that point-in-time restores know the latest consistent time up to which + // replication has + // occurred. + latestWALEntryTimestamp = entries.get(entries.size() - 1).getKey().getWriteTime(); + if (isAnyWriterFull()) { LOG.debug("{} Some WAL writers reached max size, triggering flush", Utils.logPeerId(peerId)); @@ -237,6 +255,21 @@ public class ContinuousBackupReplicationEndpoint extends BaseReplicationEndpoint } } + /** + * Persists the latest replicated WAL entry timestamp in the backup system table. This checkpoint + * is critical for Continuous Backup and Point-in-Time Restore (PITR) to ensure restore operations + * only go up to a known safe point. The value is stored per region server using its ServerName as + * the key. + * @throws IOException if the checkpoint update fails + */ + private void updateLastReplicatedTimestampForContinuousBackup() throws IOException { + try (final Connection conn = ConnectionFactory.createConnection(conf); + BackupSystemTable backupSystemTable = new BackupSystemTable(conn)) { + backupSystemTable.updateBackupCheckpointTimestamp(replicationSource.getServerWALsBelongTo(), + latestWALEntryTimestamp); + } + } + private Map<Long, List<WAL.Entry>> groupEntriesByDay(List<WAL.Entry> entries) { return entries.stream().collect( Collectors.groupingBy(entry -> (entry.getKey().getWriteTime() / ONE_DAY_IN_MILLISECONDS) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java index 15159ed73e4..34fa82ef45f 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hbase.backup.util; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER; +import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT; +import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; + import java.io.FileNotFoundException; import java.io.IOException; import java.net.URLDecoder; @@ -49,11 +53,17 @@ import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.RestoreRequest; import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.master.region.MasterRegionFactory; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; +import org.apache.hadoop.hbase.replication.ReplicationQueueId; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.tool.BulkLoadHFiles; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -770,4 +780,156 @@ public final class BackupUtils { return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp; } + /** + * Calculates the replication checkpoint timestamp used for continuous backup. + * <p> + * A replication checkpoint is the earliest timestamp across all region servers such that every + * WAL entry before that point is known to be replicated to the target system. This is essential + * for features like Point-in-Time Restore (PITR) and incremental backups, where we want to + * confidently restore data to a consistent state without missing updates. + * <p> + * The checkpoint is calculated using a combination of: + * <ul> + * <li>The start timestamps of WAL files currently being replicated for each server.</li> + * <li>The latest successfully replicated timestamp recorded by the replication marker chore.</li> + * </ul> + * <p> + * We combine these two sources to handle the following challenges: + * <ul> + * <li><b>Stale WAL start times:</b> If replication traffic is low or WALs are long-lived, the + * replication offset may point to the same WAL for a long time, resulting in stale timestamps + * that underestimate progress. This could delay PITR unnecessarily.</li> + * <li><b>Limitations of marker-only tracking:</b> The replication marker chore stores the last + * successfully replicated timestamp per region server in a system table. However, this data may + * become stale if the server goes offline or region ownership changes. For example, if a region + * initially belonged to rs1 and was later moved to rs4 due to re-balancing, rs1’s marker would + * persist even though it no longer holds any regions. Relying solely on these stale markers could + * lead to incorrect or outdated checkpoints.</li> + * </ul> + * <p> + * To handle these limitations, the method: + * <ol> + * <li>Verifies that the continuous backup peer exists to ensure replication is enabled.</li> + * <li>Retrieves WAL replication queue information for the peer, collecting WAL start times per + * region server. This gives us a lower bound for replication progress.</li> + * <li>Reads the marker chore's replicated timestamps from the backup system table.</li> + * <li>For servers found in both sources, if the marker timestamp is more recent than the WAL's + * start timestamp, we use the marker (since replication has progressed beyond the WAL).</li> + * <li>We discard marker entries for region servers that are not present in WAL queues, assuming + * those servers are no longer relevant (e.g., decommissioned or reassigned).</li> + * <li>The checkpoint is the minimum of all chosen timestamps — i.e., the slowest replicating + * region server.</li> + * <li>Finally, we persist the updated marker information to include any newly participating + * region servers.</li> + * </ol> + * <p> + * Note: If the replication marker chore is disabled, we fall back to using only the WAL start + * times. This ensures correctness but may lead to conservative checkpoint estimates during idle + * periods. + * @param conn the HBase connection + * @return the calculated replication checkpoint timestamp + * @throws IOException if reading replication queues or updating the backup system table fails + */ + public static long getReplicationCheckpoint(Connection conn) throws IOException { + Configuration conf = conn.getConfiguration(); + long checkpoint = EnvironmentEdgeManager.getDelegate().currentTime(); + + // Step 1: Ensure the continuous backup replication peer exists + if (!continuousBackupReplicationPeerExists(conn.getAdmin())) { + String msg = "Replication peer '" + CONTINUOUS_BACKUP_REPLICATION_PEER + + "' not found. Continuous backup not enabled."; + LOG.error(msg); + throw new IOException(msg); + } + + // Step 2: Get all replication queues for the continuous backup peer + ReplicationQueueStorage queueStorage = + ReplicationStorageFactory.getReplicationQueueStorage(conn, conf); + + List<ReplicationQueueId> queueIds; + try { + queueIds = queueStorage.listAllQueueIds(CONTINUOUS_BACKUP_REPLICATION_PEER); + } catch (ReplicationException e) { + String msg = "Failed to retrieve replication queue IDs for peer '" + + CONTINUOUS_BACKUP_REPLICATION_PEER + "'"; + LOG.error(msg, e); + throw new IOException(msg, e); + } + + if (queueIds.isEmpty()) { + String msg = "Replication peer '" + CONTINUOUS_BACKUP_REPLICATION_PEER + "' has no queues. " + + "This may indicate that continuous backup replication is not initialized correctly."; + LOG.error(msg); + throw new IOException(msg); + } + + // Step 3: Build a map of ServerName -> WAL start timestamp (lowest seen per server) + Map<ServerName, Long> serverToCheckpoint = new HashMap<>(); + for (ReplicationQueueId queueId : queueIds) { + Map<String, ReplicationGroupOffset> offsets; + try { + offsets = queueStorage.getOffsets(queueId); + } catch (ReplicationException e) { + String msg = "Failed to fetch WAL offsets for replication queue: " + queueId; + LOG.error(msg, e); + throw new IOException(msg, e); + } + + for (ReplicationGroupOffset offset : offsets.values()) { + String walFile = offset.getWal(); + long ts = AbstractFSWALProvider.getTimestamp(walFile); // WAL creation time + ServerName server = queueId.getServerName(); + // Store the minimum timestamp per server (ts - 1 to avoid edge boundary issues) + serverToCheckpoint.merge(server, ts - 1, Math::min); + } + } + + // Step 4: If replication markers are enabled, overlay fresher timestamps from backup system + // table + boolean replicationMarkerEnabled = + conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT); + if (replicationMarkerEnabled) { + try (BackupSystemTable backupSystemTable = new BackupSystemTable(conn)) { + Map<ServerName, Long> markerTimestamps = backupSystemTable.getBackupCheckpointTimestamps(); + + for (Map.Entry<ServerName, Long> entry : markerTimestamps.entrySet()) { + ServerName server = entry.getKey(); + long markerTs = entry.getValue(); + + // If marker timestamp is newer, override + if (serverToCheckpoint.containsKey(server)) { + long current = serverToCheckpoint.get(server); + if (markerTs > current) { + serverToCheckpoint.put(server, markerTs); + } + } else { + // This server is no longer active (e.g., RS moved or removed); skip + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping replication marker timestamp for inactive server: {}", server); + } + } + } + + // Step 5: Persist current server timestamps into backup system table + for (Map.Entry<ServerName, Long> entry : serverToCheckpoint.entrySet()) { + backupSystemTable.updateBackupCheckpointTimestamp(entry.getKey(), entry.getValue()); + } + } + } else { + LOG.warn( + "Replication marker chore is disabled. Using WAL-based timestamps only for checkpoint calculation."); + } + + // Step 6: Calculate final checkpoint as minimum timestamp across all active servers + for (long ts : serverToCheckpoint.values()) { + checkpoint = Math.min(checkpoint, ts); + } + + return checkpoint; + } + + private static boolean continuousBackupReplicationPeerExists(Admin admin) throws IOException { + return admin.listReplicationPeers().stream() + .anyMatch(peer -> peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER)); + } } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index b5f58508441..e5c78d04854 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hbase.backup; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INITIAL_DELAY; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INTERVAL; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -296,6 +300,10 @@ public class TestBackupBase { BACKUP_ROOT_DIR = Path.SEPARATOR + "backupUT"; BACKUP_REMOTE_ROOT_DIR = Path.SEPARATOR + "backupUT"; + conf1.set(CONF_BACKUP_MAX_WAL_SIZE, "10240"); + conf1.set(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, "10"); + conf1.set(CONF_STAGED_WAL_FLUSH_INTERVAL, "10"); + if (secure) { // set the always on security provider UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(), diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java new file mode 100644 index 00000000000..fb37977c4ee --- /dev/null +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DATETIME; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import java.io.IOException; +import java.util.Arrays; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category(LargeTests.class) +public class TestPointInTimeRestore extends TestBackupBase { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestPointInTimeRestore.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestPointInTimeRestore.class); + + private static final String backupWalDirName = "TestPointInTimeRestoreWalDir"; + private static final int WAIT_FOR_REPLICATION_MS = 30_000; + static Path backupWalDir; + static FileSystem fs; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + Path root = TEST_UTIL.getDataTestDirOnTestFS(); + backupWalDir = new Path(root, backupWalDirName); + fs = FileSystem.get(conf1); + fs.mkdirs(backupWalDir); + conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString()); + + setUpBackups(); + } + + /** + * Sets up multiple backups at different timestamps by: 1. Adjusting the system time to simulate + * past backup points. 2. Loading data into tables to create meaningful snapshots. 3. Running full + * backups with or without continuous backup enabled. 4. Ensuring replication is complete before + * proceeding. + */ + private static void setUpBackups() throws Exception { + // Simulate a backup taken 20 days ago + EnvironmentEdgeManager + .injectEdge(() -> System.currentTimeMillis() - 20 * ONE_DAY_IN_MILLISECONDS); + loadRandomData(table1, 1000); // Insert initial data into table1 + + // Perform a full backup for table1 with continuous backup enabled + String[] args = buildBackupArgs("full", new TableName[] { table1 }, true); + int ret = ToolRunner.run(conf1, new BackupDriver(), args); + assertEquals("Backup should succeed", 0, ret); + + // Move time forward to simulate 15 days ago + EnvironmentEdgeManager + .injectEdge(() -> System.currentTimeMillis() - 15 * ONE_DAY_IN_MILLISECONDS); + loadRandomData(table1, 1000); // Add more data to table1 + loadRandomData(table2, 500); // Insert data into table2 + + waitForReplication(); // Ensure replication is complete + + // Perform a full backup for table2 with continuous backup enabled + args = buildBackupArgs("full", new TableName[] { table2 }, true); + ret = ToolRunner.run(conf1, new BackupDriver(), args); + assertEquals("Backup should succeed", 0, ret); + + // Move time forward to simulate 10 days ago + EnvironmentEdgeManager + .injectEdge(() -> System.currentTimeMillis() - 10 * ONE_DAY_IN_MILLISECONDS); + loadRandomData(table2, 500); // Add more data to table2 + loadRandomData(table3, 500); // Insert data into table3 + + // Perform a full backup for table3 and table4 (without continuous backup) + args = buildBackupArgs("full", new TableName[] { table3, table4 }, false); + ret = ToolRunner.run(conf1, new BackupDriver(), args); + assertEquals("Backup should succeed", 0, ret); + + waitForReplication(); // Ensure replication is complete before concluding setup + + // Reset time mocking to avoid affecting other tests + EnvironmentEdgeManager.reset(); + } + + @AfterClass + public static void setupAfterClass() throws IOException { + Path root = TEST_UTIL.getDataTestDirOnTestFS(); + Path backupWalDir = new Path(root, backupWalDirName); + FileSystem fs = FileSystem.get(conf1); + + if (fs.exists(backupWalDir)) { + fs.delete(backupWalDir, true); + } + + conf1.unset(CONF_CONTINUOUS_BACKUP_WAL_DIR); + } + + /** + * Verifies that PITR (Point-in-Time Restore) fails when the requested restore time is either in + * the future or outside the allowed retention window. + */ + @Test + public void testPITR_FailsOutsideWindow() throws Exception { + // Case 1: Requested restore time is in the future (should fail) + String[] args = buildPITRArgs(new TableName[] { table1 }, + new TableName[] { TableName.valueOf("restoredTable1") }, + EnvironmentEdgeManager.currentTime() + ONE_DAY_IN_MILLISECONDS); + + int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args); + assertNotEquals("Restore should fail since the requested restore time is in the future", 0, + ret); + + // Case 2: Requested restore time is too old (beyond the retention window, should fail) + args = buildPITRArgs(new TableName[] { table1 }, + new TableName[] { TableName.valueOf("restoredTable1") }, + EnvironmentEdgeManager.currentTime() - 40 * ONE_DAY_IN_MILLISECONDS); + + ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args); + assertNotEquals( + "Restore should fail since the requested restore time is outside the retention window", 0, + ret); + } + + /** + * Ensures that PITR fails when attempting to restore tables where continuous backup was not + * enabled. + */ + @Test + public void testPointInTimeRestore_ContinuousBackupNotEnabledTables() throws Exception { + String[] args = buildPITRArgs(new TableName[] { table3 }, + new TableName[] { TableName.valueOf("restoredTable1") }, + EnvironmentEdgeManager.currentTime() - 10 * ONE_DAY_IN_MILLISECONDS); + + int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args); + assertNotEquals("Restore should fail since continuous backup is not enabled for the table", 0, + ret); + } + + /** + * Ensures that PITR fails when trying to restore from a point before continuous backup started. + */ + @Test + public void testPointInTimeRestore_TablesWithNoProperBackupOrWals() throws Exception { + String[] args = buildPITRArgs(new TableName[] { table2 }, + new TableName[] { TableName.valueOf("restoredTable1") }, + EnvironmentEdgeManager.currentTime() - 16 * ONE_DAY_IN_MILLISECONDS); + + int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args); + assertNotEquals( + "Restore should fail since the requested restore point is before the start of continuous backup", + 0, ret); + } + + /** + * Verifies that PITR successfully restores data for a single table. + */ + @Test + public void testPointInTimeRestore_SuccessfulRestoreForOneTable() throws Exception { + TableName restoredTable = TableName.valueOf("restoredTable"); + + // Perform restore operation + String[] args = buildPITRArgs(new TableName[] { table1 }, new TableName[] { restoredTable }, + EnvironmentEdgeManager.currentTime() - 5 * ONE_DAY_IN_MILLISECONDS); + + int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args); + assertEquals("Restore should succeed", 0, ret); + + // Validate that the restored table contains the same number of rows as the original table + assertEquals("Restored table should have the same row count as the original", + getRowCount(table1), getRowCount(restoredTable)); + } + + /** + * Verifies that PITR successfully restores multiple tables at once. + */ + @Test + public void testPointInTimeRestore_SuccessfulRestoreForMultipleTables() throws Exception { + TableName restoredTable1 = TableName.valueOf("restoredTable1"); + TableName restoredTable2 = TableName.valueOf("restoredTable2"); + + // Perform restore operation for multiple tables + String[] args = buildPITRArgs(new TableName[] { table1, table2 }, + new TableName[] { restoredTable1, restoredTable2 }, + EnvironmentEdgeManager.currentTime() - 5 * ONE_DAY_IN_MILLISECONDS); + + int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args); + assertEquals("Restore should succeed", 0, ret); + + // Validate that the restored tables contain the same number of rows as the originals + assertEquals("Restored table1 should have the same row count as the original", + getRowCount(table1), getRowCount(restoredTable1)); + assertEquals("Restored table2 should have the same row count as the original", + getRowCount(table2), getRowCount(restoredTable2)); + } + + private String[] buildPITRArgs(TableName[] sourceTables, TableName[] targetTables, long endTime) { + String sourceTableNames = + Arrays.stream(sourceTables).map(TableName::getNameAsString).collect(Collectors.joining(",")); + + String targetTableNames = + Arrays.stream(targetTables).map(TableName::getNameAsString).collect(Collectors.joining(",")); + + return new String[] { "-" + OPTION_TABLE, sourceTableNames, "-" + OPTION_TABLE_MAPPING, + targetTableNames, "-" + OPTION_TO_DATETIME, String.valueOf(endTime) }; + } + + private static String[] buildBackupArgs(String backupType, TableName[] tables, + boolean continuousEnabled) { + String tableNames = + Arrays.stream(tables).map(TableName::getNameAsString).collect(Collectors.joining(",")); + + if (continuousEnabled) { + return new String[] { "create", backupType, BACKUP_ROOT_DIR, "-" + OPTION_TABLE, tableNames, + "-" + OPTION_ENABLE_CONTINUOUS_BACKUP }; + } else { + return new String[] { "create", backupType, BACKUP_ROOT_DIR, "-" + OPTION_TABLE, tableNames }; + } + } + + private static void loadRandomData(TableName tableName, int totalRows) throws IOException { + int rowSize = 32; + try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { + TEST_UTIL.loadRandomRows(table, famName, rowSize, totalRows); + } + } + + private static void waitForReplication() { + LOG.info("Waiting for replication to complete for {} ms", WAIT_FOR_REPLICATION_MS); + try { + Thread.sleep(WAIT_FOR_REPLICATION_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread was interrupted while waiting", e); + } + } + + private int getRowCount(TableName tableName) throws IOException { + try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { + return HBaseTestingUtil.countRows(table); + } + } +}