This is an automated email from the ASF dual-hosted git repository.
taklwu pushed a commit to branch HBASE-28957
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-28957 by this push:
new 5fc58af0e2a HBASE-29133: Implement "pitr" Command for Point-in-Time
Restore (#6717)
5fc58af0e2a is described below
commit 5fc58af0e2a081ef804544631081c5beb7797879
Author: vinayak hegde <[email protected]>
AuthorDate: Sat May 31 00:07:44 2025 +0530
HBASE-29133: Implement "pitr" Command for Point-in-Time Restore (#6717)
Signed-off-by: Andor Molnar <[email protected]>
Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
---
bin/hbase | 17 +
...storeDriver.java => AbstractRestoreDriver.java} | 130 +++-----
.../apache/hadoop/hbase/backup/BackupAdmin.java | 7 +
.../hbase/backup/BackupRestoreConstants.java | 10 +
.../hadoop/hbase/backup/HBackupFileSystem.java | 40 +++
.../hbase/backup/PointInTimeRestoreDriver.java | 137 ++++++++
.../hbase/backup/PointInTimeRestoreRequest.java | 111 +++++++
.../apache/hadoop/hbase/backup/RestoreDriver.java | 218 +------------
.../hadoop/hbase/backup/impl/BackupAdminImpl.java | 360 ++++++++++++++++++++-
.../hbase/backup/impl/BackupSystemTable.java | 86 +++++
.../hbase/backup/impl/FullTableBackupClient.java | 12 +-
.../ContinuousBackupReplicationEndpoint.java | 33 ++
.../hadoop/hbase/backup/util/BackupUtils.java | 162 ++++++++++
.../apache/hadoop/hbase/backup/TestBackupBase.java | 8 +
.../hbase/backup/TestPointInTimeRestore.java | 277 ++++++++++++++++
15 files changed, 1310 insertions(+), 298 deletions(-)
diff --git a/bin/hbase b/bin/hbase
index d8d9b6ec5b2..d0343dc85a3 100755
--- a/bin/hbase
+++ b/bin/hbase
@@ -102,6 +102,7 @@ show_usage() {
echo " version Print the version"
echo " backup Backup tables for recovery"
echo " restore Restore tables from existing backup image"
+ echo " pitr Restore tables to a specific point in time using
backup and WAL replay"
echo " completebulkload Run BulkLoadHFiles tool"
echo " regionsplitter Run RegionSplitter tool"
echo " rowcounter Run RowCounter tool"
@@ -639,6 +640,22 @@ elif [ "$COMMAND" = "restore" ] ; then
fi
done
fi
+elif [ "$COMMAND" = "pitr" ] ; then
+ CLASS='org.apache.hadoop.hbase.backup.PointInTimeRestoreDriver'
+ if [ -n "${shaded_jar}" ] ; then
+ for f in "${HBASE_HOME}"/lib/hbase-backup*.jar; do
+ if [ -f "${f}" ]; then
+ CLASSPATH="${CLASSPATH}:${f}"
+ break
+ fi
+ done
+ for f in "${HBASE_HOME}"/lib/commons-lang3*.jar; do
+ if [ -f "${f}" ]; then
+ CLASSPATH="${CLASSPATH}:${f}"
+ break
+ fi
+ done
+ fi
elif [ "$COMMAND" = "upgrade" ] ; then
echo "This command was used to upgrade to HBase 0.96, it was removed in
HBase 2.0.0."
echo "Please follow the documentation at
http://hbase.apache.org/book.html#upgrading."
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/AbstractRestoreDriver.java
similarity index 68%
copy from
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java
copy to
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/AbstractRestoreDriver.java
index 38b767ecf67..8e053a73a6b 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/AbstractRestoreDriver.java
@@ -33,15 +33,10 @@ import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_RESTORE_DESC;
import java.io.IOException;
-import java.net.URI;
import java.util.List;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
import org.apache.hadoop.hbase.backup.impl.BackupManager;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
@@ -49,8 +44,6 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.logging.Log4jUtils;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
-import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.util.ToolRunner;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,66 +51,60 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
-/**
- * Command-line entry point for restore operation
- */
@InterfaceAudience.Private
-public class RestoreDriver extends AbstractHBaseTool {
- private static final Logger LOG =
LoggerFactory.getLogger(RestoreDriver.class);
- private CommandLine cmd;
-
- private static final String USAGE_STRING =
- "Usage: hbase restore <backup_path> <backup_id> [options]\n"
- + " backup_path Path to a backup destination root\n"
- + " backup_id Backup image ID to restore\n"
- + " table(s) Comma-separated list of tables to restore\n";
+public abstract class AbstractRestoreDriver extends AbstractHBaseTool {
+ protected static final Logger LOG =
LoggerFactory.getLogger(AbstractRestoreDriver.class);
+ protected CommandLine cmd;
- private static final String USAGE_FOOTER = "";
+ protected static final String USAGE_FOOTER = "";
- protected RestoreDriver() throws IOException {
+ protected AbstractRestoreDriver() {
init();
}
protected void init() {
- // disable irrelevant loggers to avoid it mess up command output
Log4jUtils.disableZkAndClientLoggers();
}
+ protected abstract int executeRestore(boolean check, TableName[] fromTables,
TableName[] toTables,
+ boolean isOverwrite);
+
private int parseAndRun() throws IOException {
- // Check if backup is enabled
if (!BackupManager.isBackupEnabled(getConf())) {
System.err.println(BackupRestoreConstants.ENABLE_BACKUP);
return -1;
}
- // enable debug logging
if (cmd.hasOption(OPTION_DEBUG)) {
Log4jUtils.setLogLevel("org.apache.hadoop.hbase.backup", "DEBUG");
}
- // whether to overwrite to existing table if any, false by default
boolean overwrite = cmd.hasOption(OPTION_OVERWRITE);
if (overwrite) {
- LOG.debug("Found -overwrite option in restore command, "
- + "will overwrite to existing table if any in the restore target");
+ LOG.debug("Found overwrite option (-{}) in restore command, "
+ + "will overwrite to existing table if any in the restore target",
OPTION_OVERWRITE);
}
- // whether to only check the dependencies, false by default
boolean check = cmd.hasOption(OPTION_CHECK);
if (check) {
LOG.debug(
- "Found -check option in restore command, " + "will check and verify
the dependencies");
+ "Found check option (-{}) in restore command, will check and verify
the dependencies",
+ OPTION_CHECK);
}
if (cmd.hasOption(OPTION_SET) && cmd.hasOption(OPTION_TABLE)) {
- System.err.println(
- "Options -s and -t are mutaully exclusive," + " you can not specify
both of them.");
+ System.err.printf(
+ "Set name (-%s) and table list (-%s) are mutually exclusive, you can
not specify both "
+ + "of them.%n",
+ OPTION_SET, OPTION_TABLE);
printToolUsage();
return -1;
}
if (!cmd.hasOption(OPTION_SET) && !cmd.hasOption(OPTION_TABLE)) {
- System.err.println("You have to specify either set name or table list to
restore");
+ System.err.printf(
+ "You have to specify either set name (-%s) or table list (-%s) to " +
"restore%n",
+ OPTION_SET, OPTION_TABLE);
printToolUsage();
return -1;
}
@@ -128,20 +115,13 @@ public class RestoreDriver extends AbstractHBaseTool {
getConf().set("mapreduce.job.queuename", queueName);
}
- // parse main restore command options
- String[] remainArgs = cmd.getArgs();
- if (remainArgs.length != 2) {
- printToolUsage();
- return -1;
- }
-
- String backupRootDir = remainArgs[0];
- String backupId = remainArgs[1];
String tables;
- String tableMapping =
- cmd.hasOption(OPTION_TABLE_MAPPING) ?
cmd.getOptionValue(OPTION_TABLE_MAPPING) : null;
- try (final Connection conn = ConnectionFactory.createConnection(conf);
- BackupAdmin client = new BackupAdminImpl(conn)) {
+ TableName[] sTableArray;
+ TableName[] tTableArray;
+
+ String tableMapping = cmd.getOptionValue(OPTION_TABLE_MAPPING);
+
+ try (final Connection conn = ConnectionFactory.createConnection(conf)) {
// Check backup set
if (cmd.hasOption(OPTION_SET)) {
String setName = cmd.getOptionValue(OPTION_SET);
@@ -162,8 +142,8 @@ public class RestoreDriver extends AbstractHBaseTool {
tables = cmd.getOptionValue(OPTION_TABLE);
}
- TableName[] sTableArray = BackupUtils.parseTableNames(tables);
- TableName[] tTableArray = BackupUtils.parseTableNames(tableMapping);
+ sTableArray = BackupUtils.parseTableNames(tables);
+ tTableArray = BackupUtils.parseTableNames(tableMapping);
if (
sTableArray != null && tTableArray != null && (sTableArray.length !=
tTableArray.length)
@@ -172,31 +152,13 @@ public class RestoreDriver extends AbstractHBaseTool {
printToolUsage();
return -4;
}
-
- client.restore(BackupUtils.createRestoreRequest(backupRootDir, backupId,
check, sTableArray,
- tTableArray, overwrite));
- } catch (Exception e) {
- LOG.error("Error while running restore backup", e);
- return -5;
}
- return 0;
- }
- private String getTablesForSet(Connection conn, String name) throws
IOException {
- try (final BackupSystemTable table = new BackupSystemTable(conn)) {
- List<TableName> tables = table.describeBackupSet(name);
-
- if (tables == null) {
- return null;
- }
-
- return StringUtils.join(tables,
BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND);
- }
+ return executeRestore(check, sTableArray, tTableArray, overwrite);
}
@Override
protected void addOptions() {
- // define supported options
addOptNoArg(OPTION_OVERWRITE, OPTION_OVERWRITE_DESC);
addOptNoArg(OPTION_CHECK, OPTION_CHECK_DESC);
addOptNoArg(OPTION_DEBUG, OPTION_DEBUG_DESC);
@@ -216,26 +178,14 @@ public class RestoreDriver extends AbstractHBaseTool {
return parseAndRun();
}
- public static void main(String[] args) throws Exception {
- Configuration conf = HBaseConfiguration.create();
- Path hbasedir = CommonFSUtils.getRootDir(conf);
- URI defaultFs = hbasedir.getFileSystem(conf).getUri();
- CommonFSUtils.setFsDefault(conf, new Path(defaultFs));
- int ret = ToolRunner.run(conf, new RestoreDriver(), args);
- System.exit(ret);
- }
-
@Override
public int run(String[] args) {
Objects.requireNonNull(conf, "Tool configuration is not initialized");
- CommandLine cmd;
try {
- // parse the command line arguments
cmd = parseArgs(args);
- cmdLineArgs = args;
} catch (Exception e) {
- System.out.println("Error when parsing command-line arguments: " +
e.getMessage());
+ System.out.println("Error parsing command-line arguments: " +
e.getMessage());
printToolUsage();
return EXIT_FAILURE;
}
@@ -247,18 +197,16 @@ public class RestoreDriver extends AbstractHBaseTool {
processOptions(cmd);
- int ret = EXIT_FAILURE;
try {
- ret = doWork();
+ return doWork();
} catch (Exception e) {
- LOG.error("Error running command-line tool", e);
+ LOG.error("Error running restore tool", e);
return EXIT_FAILURE;
}
- return ret;
}
protected void printToolUsage() {
- System.out.println(USAGE_STRING);
+ System.out.println(getUsageString());
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.setLeftPadding(2);
helpFormatter.setDescPadding(8);
@@ -267,4 +215,18 @@ public class RestoreDriver extends AbstractHBaseTool {
helpFormatter.printHelp(" ", null, options, USAGE_FOOTER);
System.out.println(BackupRestoreConstants.VERIFY_BACKUP);
}
+
+ protected abstract String getUsageString();
+
+ private String getTablesForSet(Connection conn, String name) throws
IOException {
+ try (final BackupSystemTable table = new BackupSystemTable(conn)) {
+ List<TableName> tables = table.describeBackupSet(name);
+
+ if (tables == null) {
+ return null;
+ }
+
+ return StringUtils.join(tables,
BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND);
+ }
+ }
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java
index 25055fd5e8e..7fdf1d9bfbf 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java
@@ -51,6 +51,13 @@ public interface BackupAdmin extends Closeable {
*/
void restore(RestoreRequest request) throws IOException;
+ /**
+ * Restore the tables to specific time
+ * @param request Point in Time restore request
+ * @throws IOException exception
+ */
+ void pointInTimeRestore(PointInTimeRestoreRequest request) throws
IOException;
+
/**
* Describe backup image command
* @param backupId backup id
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java
index 7989a5be8d8..e38b044d71d 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java
@@ -106,6 +106,16 @@ public interface BackupRestoreConstants {
String OPTION_FORCE_DELETE_DESC =
"Flag to forcefully delete the backup, even if it may be required for
Point-in-Time Restore";
+ String OPTION_TO_DATETIME = "td";
+ String LONG_OPTION_TO_DATETIME = "to-datetime";
+ String OPTION_TO_DATETIME_DESC = "Target date and time up to which data
should be restored";
+
+ String OPTION_PITR_BACKUP_PATH = "bp";
+ String LONG_OPTION_PITR_BACKUP_PATH = "backup-path";
+ String OPTION_PITR_BACKUP_PATH_DESC =
+ "Specifies a custom backup location for Point-In-Time Recovery (PITR). "
+ + "If provided, this location will be used exclusively instead of
deriving the path from the system table.";
+
String JOB_NAME_CONF_KEY = "mapreduce.job.name";
String BACKUP_CONFIG_STRING =
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
index d5fd9aaf4c3..0ad5b3f0e05 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
@@ -17,13 +17,21 @@
*/
package org.apache.hadoop.hbase.backup;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUPID_PREFIX;
+
import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupManifest;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -135,4 +143,36 @@ public final class HBackupFileSystem {
new BackupManifest(conf, getManifestPath(conf, backupRootPath,
backupId));
return manifest;
}
+
+ public static List<BackupImage> getAllBackupImages(Configuration conf, Path
backupRootPath)
+ throws IOException {
+ FileSystem fs = FileSystem.get(backupRootPath.toUri(), conf);
+ RemoteIterator<LocatedFileStatus> it =
fs.listLocatedStatus(backupRootPath);
+
+ List<BackupImage> images = new ArrayList<>();
+
+ while (it.hasNext()) {
+ LocatedFileStatus lfs = it.next();
+ if (!lfs.isDirectory()) {
+ continue;
+ }
+
+ String backupId = lfs.getPath().getName();
+ try {
+ BackupManifest manifest = getManifest(conf, backupRootPath, backupId);
+ images.add(manifest.getBackupImage());
+ } catch (IOException e) {
+ LOG.error("Cannot load backup manifest from: " + lfs.getPath(), e);
+ }
+ }
+
+ // Sort images by timestamp in descending order
+ images.sort(Comparator.comparingLong(m -> -getTimestamp(m.getBackupId())));
+
+ return images;
+ }
+
+ private static long getTimestamp(String backupId) {
+ return Long.parseLong(backupId.substring(BACKUPID_PREFIX.length()));
+ }
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java
new file mode 100644
index 00000000000..abdf52f1430
--- /dev/null
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.LONG_OPTION_PITR_BACKUP_PATH;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.LONG_OPTION_TO_DATETIME;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_PITR_BACKUP_PATH;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_PITR_BACKUP_PATH_DESC;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DATETIME;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DATETIME_DESC;
+
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Strings;
+
+/**
+ * Command-line entry point for restore operation
+ */
[email protected]
+public class PointInTimeRestoreDriver extends AbstractRestoreDriver {
+ private static final String USAGE_STRING = """
+ Usage: hbase pitr [options]
+ <backup_path> Backup Path to use for Point in Time Restore
+ table(s) Comma-separated list of tables to restore
+ """;
+
+ @Override
+ protected int executeRestore(boolean check, TableName[] fromTables,
TableName[] toTables,
+ boolean isOverwrite) {
+ String walBackupDir = getConf().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ if (Strings.isNullOrEmpty(walBackupDir)) {
+ System.err.printf(
+ "Point-in-Time Restore requires the WAL backup directory (%s) to
replay logs after full and incremental backups. "
+ + "Set this property if you need Point-in-Time Restore. Otherwise,
use the normal restore process with the appropriate backup ID.%n",
+ CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ return -1;
+ }
+
+ String[] remainArgs = cmd.getArgs();
+ if (remainArgs.length != 0) {
+ printToolUsage();
+ return -1;
+ }
+
+ String backupRootDir = cmd.getOptionValue(OPTION_PITR_BACKUP_PATH);
+
+ try (final Connection conn = ConnectionFactory.createConnection(conf);
+ BackupAdmin client = new BackupAdminImpl(conn)) {
+ // Get the replication checkpoint (last known safe point for Continuous
Backup)
+ long replicationCheckpoint = BackupUtils.getReplicationCheckpoint(conn);
+ long endTime = replicationCheckpoint;
+
+ if (cmd.hasOption(OPTION_TO_DATETIME)) {
+ String time = cmd.getOptionValue(OPTION_TO_DATETIME);
+ try {
+ endTime = Long.parseLong(time);
+ // Convert seconds to milliseconds if input is in seconds
+ if (endTime < 10_000_000_000L) {
+ endTime *= 1000;
+ }
+ } catch (NumberFormatException e) {
+ System.out.println("ERROR: Invalid timestamp format for
--to-datetime: " + time);
+ printToolUsage();
+ return -5;
+ }
+ }
+
+ // Ensure the requested restore time does not exceed the replication
checkpoint
+ if (endTime > replicationCheckpoint) {
+ LOG.error(
+ "ERROR: Requested restore time ({}) exceeds the last known safe
replication checkpoint ({}). "
+ + "Please choose a time before this checkpoint to ensure data
consistency.",
+ endTime, replicationCheckpoint);
+ return -5;
+ }
+
+ PointInTimeRestoreRequest pointInTimeRestoreRequest = new
PointInTimeRestoreRequest.Builder()
+
.withBackupRootDir(backupRootDir).withCheck(check).withFromTables(fromTables)
+
.withToTables(toTables).withOverwrite(isOverwrite).withToDateTime(endTime).build();
+
+ client.pointInTimeRestore(pointInTimeRestoreRequest);
+ } catch (Exception e) {
+ LOG.error("Error while running restore backup", e);
+ return -5;
+ }
+ return 0;
+ }
+
+ @Override
+ protected void addOptions() {
+ super.addOptions();
+ addOptWithArg(OPTION_TO_DATETIME, LONG_OPTION_TO_DATETIME,
OPTION_TO_DATETIME_DESC);
+ addOptWithArg(OPTION_PITR_BACKUP_PATH, LONG_OPTION_PITR_BACKUP_PATH,
+ OPTION_PITR_BACKUP_PATH_DESC);
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ Path rootDir = CommonFSUtils.getRootDir(conf);
+ URI defaultFs = rootDir.getFileSystem(conf).getUri();
+ CommonFSUtils.setFsDefault(conf, new Path(defaultFs));
+ int ret = ToolRunner.run(conf, new PointInTimeRestoreDriver(), args);
+ System.exit(ret);
+ }
+
+ @Override
+ protected String getUsageString() {
+ return USAGE_STRING;
+ }
+}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreRequest.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreRequest.java
new file mode 100644
index 00000000000..f2462a1cfd1
--- /dev/null
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreRequest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * POJO class for Point In Time Restore request
+ */
[email protected]
+public final class PointInTimeRestoreRequest {
+
+ private final String backupRootDir;
+ private final boolean check;
+ private final TableName[] fromTables;
+ private final TableName[] toTables;
+ private final boolean overwrite;
+ private final long toDateTime;
+
+ private PointInTimeRestoreRequest(Builder builder) {
+ this.backupRootDir = builder.backupRootDir;
+ this.check = builder.check;
+ this.fromTables = builder.fromTables;
+ this.toTables = builder.toTables;
+ this.overwrite = builder.overwrite;
+ this.toDateTime = builder.toDateTime;
+ }
+
+ public String getBackupRootDir() {
+ return backupRootDir;
+ }
+
+ public boolean isCheck() {
+ return check;
+ }
+
+ public TableName[] getFromTables() {
+ return fromTables;
+ }
+
+ public TableName[] getToTables() {
+ return toTables;
+ }
+
+ public boolean isOverwrite() {
+ return overwrite;
+ }
+
+ public long getToDateTime() {
+ return toDateTime;
+ }
+
+ public static class Builder {
+ private String backupRootDir;
+ private boolean check = false;
+ private TableName[] fromTables;
+ private TableName[] toTables;
+ private boolean overwrite = false;
+ private long toDateTime;
+
+ public Builder withBackupRootDir(String backupRootDir) {
+ this.backupRootDir = backupRootDir;
+ return this;
+ }
+
+ public Builder withCheck(boolean check) {
+ this.check = check;
+ return this;
+ }
+
+ public Builder withFromTables(TableName[] fromTables) {
+ this.fromTables = fromTables;
+ return this;
+ }
+
+ public Builder withToTables(TableName[] toTables) {
+ this.toTables = toTables;
+ return this;
+ }
+
+ public Builder withOverwrite(boolean overwrite) {
+ this.overwrite = overwrite;
+ return this;
+ }
+
+ public Builder withToDateTime(long dateTime) {
+ this.toDateTime = dateTime;
+ return this;
+ }
+
+ public PointInTimeRestoreRequest build() {
+ return new PointInTimeRestoreRequest(this);
+ }
+ }
+}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java
index 38b767ecf67..efc4b0df9a0 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java
@@ -17,117 +17,34 @@
*/
package org.apache.hadoop.hbase.backup;
-import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK;
-import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK_DESC;
-import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG;
-import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG_DESC;
-import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE;
-import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE_DESC;
-import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET;
-import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET_RESTORE_DESC;
-import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE;
-import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_LIST_DESC;
-import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING;
-import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING_DESC;
-import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME;
-import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_RESTORE_DESC;
-
-import java.io.IOException;
import java.net.URI;
-import java.util.List;
-import java.util.Objects;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
-import org.apache.hadoop.hbase.backup.impl.BackupManager;
-import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.logging.Log4jUtils;
-import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
-import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
/**
* Command-line entry point for restore operation
*/
@InterfaceAudience.Private
-public class RestoreDriver extends AbstractHBaseTool {
- private static final Logger LOG =
LoggerFactory.getLogger(RestoreDriver.class);
- private CommandLine cmd;
-
- private static final String USAGE_STRING =
- "Usage: hbase restore <backup_path> <backup_id> [options]\n"
- + " backup_path Path to a backup destination root\n"
- + " backup_id Backup image ID to restore\n"
- + " table(s) Comma-separated list of tables to restore\n";
-
- private static final String USAGE_FOOTER = "";
-
- protected RestoreDriver() throws IOException {
- init();
- }
-
- protected void init() {
- // disable irrelevant loggers to avoid it mess up command output
- Log4jUtils.disableZkAndClientLoggers();
- }
-
- private int parseAndRun() throws IOException {
- // Check if backup is enabled
- if (!BackupManager.isBackupEnabled(getConf())) {
- System.err.println(BackupRestoreConstants.ENABLE_BACKUP);
- return -1;
- }
-
- // enable debug logging
- if (cmd.hasOption(OPTION_DEBUG)) {
- Log4jUtils.setLogLevel("org.apache.hadoop.hbase.backup", "DEBUG");
- }
-
- // whether to overwrite to existing table if any, false by default
- boolean overwrite = cmd.hasOption(OPTION_OVERWRITE);
- if (overwrite) {
- LOG.debug("Found -overwrite option in restore command, "
- + "will overwrite to existing table if any in the restore target");
- }
-
- // whether to only check the dependencies, false by default
- boolean check = cmd.hasOption(OPTION_CHECK);
- if (check) {
- LOG.debug(
- "Found -check option in restore command, " + "will check and verify
the dependencies");
- }
-
- if (cmd.hasOption(OPTION_SET) && cmd.hasOption(OPTION_TABLE)) {
- System.err.println(
- "Options -s and -t are mutaully exclusive," + " you can not specify
both of them.");
- printToolUsage();
- return -1;
- }
-
- if (!cmd.hasOption(OPTION_SET) && !cmd.hasOption(OPTION_TABLE)) {
- System.err.println("You have to specify either set name or table list to
restore");
- printToolUsage();
- return -1;
- }
-
- if (cmd.hasOption(OPTION_YARN_QUEUE_NAME)) {
- String queueName = cmd.getOptionValue(OPTION_YARN_QUEUE_NAME);
- // Set MR job queuename to configuration
- getConf().set("mapreduce.job.queuename", queueName);
- }
+public class RestoreDriver extends AbstractRestoreDriver {
+ private static final String USAGE_STRING = """
+ Usage: hbase restore <backup_path> <backup_id> [options]
+ backup_path Path to a backup destination root
+ backup_id Backup image ID to restore
+ table(s) Comma-separated list of tables to restore
+ """;
+ @Override
+ protected int executeRestore(boolean check, TableName[] fromTables,
TableName[] toTables,
+ boolean isOverwrite) {
// parse main restore command options
String[] remainArgs = cmd.getArgs();
if (remainArgs.length != 2) {
@@ -137,44 +54,11 @@ public class RestoreDriver extends AbstractHBaseTool {
String backupRootDir = remainArgs[0];
String backupId = remainArgs[1];
- String tables;
- String tableMapping =
- cmd.hasOption(OPTION_TABLE_MAPPING) ?
cmd.getOptionValue(OPTION_TABLE_MAPPING) : null;
+
try (final Connection conn = ConnectionFactory.createConnection(conf);
BackupAdmin client = new BackupAdminImpl(conn)) {
- // Check backup set
- if (cmd.hasOption(OPTION_SET)) {
- String setName = cmd.getOptionValue(OPTION_SET);
- try {
- tables = getTablesForSet(conn, setName);
- } catch (IOException e) {
- System.out.println("ERROR: " + e.getMessage() + " for setName=" +
setName);
- printToolUsage();
- return -2;
- }
- if (tables == null) {
- System.out
- .println("ERROR: Backup set '" + setName + "' is either empty or
does not exist");
- printToolUsage();
- return -3;
- }
- } else {
- tables = cmd.getOptionValue(OPTION_TABLE);
- }
-
- TableName[] sTableArray = BackupUtils.parseTableNames(tables);
- TableName[] tTableArray = BackupUtils.parseTableNames(tableMapping);
-
- if (
- sTableArray != null && tTableArray != null && (sTableArray.length !=
tTableArray.length)
- ) {
- System.out.println("ERROR: table mapping mismatch: " + tables + " : "
+ tableMapping);
- printToolUsage();
- return -4;
- }
-
- client.restore(BackupUtils.createRestoreRequest(backupRootDir, backupId,
check, sTableArray,
- tTableArray, overwrite));
+ client.restore(BackupUtils.createRestoreRequest(backupRootDir, backupId,
check, fromTables,
+ toTables, isOverwrite));
} catch (Exception e) {
LOG.error("Error while running restore backup", e);
return -5;
@@ -182,40 +66,6 @@ public class RestoreDriver extends AbstractHBaseTool {
return 0;
}
- private String getTablesForSet(Connection conn, String name) throws
IOException {
- try (final BackupSystemTable table = new BackupSystemTable(conn)) {
- List<TableName> tables = table.describeBackupSet(name);
-
- if (tables == null) {
- return null;
- }
-
- return StringUtils.join(tables,
BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND);
- }
- }
-
- @Override
- protected void addOptions() {
- // define supported options
- addOptNoArg(OPTION_OVERWRITE, OPTION_OVERWRITE_DESC);
- addOptNoArg(OPTION_CHECK, OPTION_CHECK_DESC);
- addOptNoArg(OPTION_DEBUG, OPTION_DEBUG_DESC);
- addOptWithArg(OPTION_SET, OPTION_SET_RESTORE_DESC);
- addOptWithArg(OPTION_TABLE, OPTION_TABLE_LIST_DESC);
- addOptWithArg(OPTION_TABLE_MAPPING, OPTION_TABLE_MAPPING_DESC);
- addOptWithArg(OPTION_YARN_QUEUE_NAME, OPTION_YARN_QUEUE_NAME_RESTORE_DESC);
- }
-
- @Override
- protected void processOptions(CommandLine cmd) {
- this.cmd = cmd;
- }
-
- @Override
- protected int doWork() throws Exception {
- return parseAndRun();
- }
-
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
Path hbasedir = CommonFSUtils.getRootDir(conf);
@@ -226,45 +76,7 @@ public class RestoreDriver extends AbstractHBaseTool {
}
@Override
- public int run(String[] args) {
- Objects.requireNonNull(conf, "Tool configuration is not initialized");
-
- CommandLine cmd;
- try {
- // parse the command line arguments
- cmd = parseArgs(args);
- cmdLineArgs = args;
- } catch (Exception e) {
- System.out.println("Error when parsing command-line arguments: " +
e.getMessage());
- printToolUsage();
- return EXIT_FAILURE;
- }
-
- if (cmd.hasOption(SHORT_HELP_OPTION) || cmd.hasOption(LONG_HELP_OPTION)) {
- printToolUsage();
- return EXIT_FAILURE;
- }
-
- processOptions(cmd);
-
- int ret = EXIT_FAILURE;
- try {
- ret = doWork();
- } catch (Exception e) {
- LOG.error("Error running command-line tool", e);
- return EXIT_FAILURE;
- }
- return ret;
- }
-
- protected void printToolUsage() {
- System.out.println(USAGE_STRING);
- HelpFormatter helpFormatter = new HelpFormatter();
- helpFormatter.setLeftPadding(2);
- helpFormatter.setDescPadding(8);
- helpFormatter.setWidth(100);
- helpFormatter.setSyntaxPrefix("Options:");
- helpFormatter.printHelp(" ", null, options, USAGE_FOOTER);
- System.out.println(BackupRestoreConstants.VERIFY_BACKUP);
+ protected String getUsageString() {
+ return USAGE_STRING;
}
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
index 1e745c69cda..e94389d6938 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
@@ -17,18 +17,32 @@
*/
package org.apache.hadoop.hbase.backup.impl;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
+import static
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
+import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
+import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+
import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupAdmin;
import org.apache.hadoop.hbase.backup.BackupClientFactory;
@@ -40,12 +54,16 @@ import
org.apache.hadoop.hbase.backup.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest;
import org.apache.hadoop.hbase.backup.RestoreRequest;
import org.apache.hadoop.hbase.backup.util.BackupSet;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.Tool;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -490,15 +508,8 @@ public class BackupAdminImpl implements BackupAdmin {
@Override
public void restore(RestoreRequest request) throws IOException {
if (request.isCheck()) {
- // check and load backup image manifest for the tables
- Path rootPath = new Path(request.getBackupRootDir());
- String backupId = request.getBackupId();
- TableName[] sTableArray = request.getFromTables();
- BackupManifest manifest =
- HBackupFileSystem.getManifest(conn.getConfiguration(), rootPath,
backupId);
-
- // Check and validate the backup image and its dependencies
- if (BackupUtils.validate(Arrays.asList(sTableArray), manifest,
conn.getConfiguration())) {
+ boolean isValid = validateRequest(request);
+ if (isValid) {
LOG.info(CHECK_OK);
} else {
LOG.error(CHECK_FAILED);
@@ -509,6 +520,337 @@ public class BackupAdminImpl implements BackupAdmin {
new RestoreTablesClient(conn, request).execute();
}
+ private boolean validateRequest(RestoreRequest request) throws IOException {
+ // check and load backup image manifest for the tables
+ Path rootPath = new Path(request.getBackupRootDir());
+ String backupId = request.getBackupId();
+ TableName[] sTableArray = request.getFromTables();
+ BackupManifest manifest =
+ HBackupFileSystem.getManifest(conn.getConfiguration(), rootPath,
backupId);
+
+ // Validate the backup image and its dependencies
+ return BackupUtils.validate(Arrays.asList(sTableArray), manifest,
conn.getConfiguration());
+ }
+
+ @Override
+ public void pointInTimeRestore(PointInTimeRestoreRequest request) throws
IOException {
+ if (request.getBackupRootDir() == null) {
+ defaultPointInTimeRestore(request);
+ } else {
+ // TODO: special case, not supported at the moment
+ throw new IOException("Custom backup location for Point-In-Time Recovery
Not supported!");
+ }
+ LOG.info("Successfully completed Point In Time Restore for all tables.");
+ }
+
+ /**
+ * Performs a default Point-In-Time Restore (PITR) by restoring the latest
valid backup and
+ * replaying the WALs to bring the table to the desired state. PITR
requires: 1. A valid backup
+ * available before the end time. 2. Write-Ahead Logs (WALs) covering the
remaining duration up to
+ * the end time.
+ * @param request PointInTimeRestoreRequest containing restore parameters.
+ * @throws IOException If no valid backup or WALs are found, or if an error
occurs during
+ * restoration.
+ */
+ private void defaultPointInTimeRestore(PointInTimeRestoreRequest request)
throws IOException {
+ long endTime = request.getToDateTime();
+ validateRequestToTime(endTime);
+
+ TableName[] sTableArray = request.getFromTables();
+ TableName[] tTableArray = resolveTargetTables(sTableArray,
request.getToTables());
+
+ // Validate PITR requirements
+ validatePitr(endTime, sTableArray, tTableArray);
+
+ // If only validation is required, log and return
+ if (request.isCheck()) {
+ LOG.info("PITR can be successfully executed");
+ return;
+ }
+
+ // Execute PITR process
+ try (BackupSystemTable table = new BackupSystemTable(conn)) {
+ Map<TableName, Long> continuousBackupTables =
table.getContinuousBackupTableSet();
+ List<BackupInfo> backupInfos =
table.getBackupInfos(BackupState.COMPLETE);
+
+ for (int i = 0; i < sTableArray.length; i++) {
+ restoreTableWithWalReplay(sTableArray[i], tTableArray[i], endTime,
continuousBackupTables,
+ backupInfos, request);
+ }
+ }
+ }
+
+ /**
+ * Validates whether the requested end time falls within the allowed PITR
recovery window.
+ * @param endTime The target recovery time.
+ * @throws IOException If the requested recovery time is outside the allowed
window.
+ */
+ private void validateRequestToTime(long endTime) throws IOException {
+ long pitrWindowDays =
conn.getConfiguration().getLong(CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS,
+ DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS);
+ long currentTime = EnvironmentEdgeManager.getDelegate().currentTime();
+ long pitrMaxStartTime = currentTime -
TimeUnit.DAYS.toMillis(pitrWindowDays);
+
+ if (endTime < pitrMaxStartTime) {
+ String errorMsg = String.format(
+ "Requested recovery time (%d) is out of the allowed PITR window (last
%d days).", endTime,
+ pitrWindowDays);
+ LOG.error(errorMsg);
+ throw new IOException(errorMsg);
+ }
+
+ if (endTime > currentTime) {
+ String errorMsg = String.format(
+ "Requested recovery time (%d) is in the future. Current time: %d.",
endTime, currentTime);
+ LOG.error(errorMsg);
+ throw new IOException(errorMsg);
+ }
+ }
+
+ /**
+ * Resolves the target table array. If null or empty, defaults to the source
table array.
+ */
+ private TableName[] resolveTargetTables(TableName[] sourceTables,
TableName[] targetTables) {
+ return (targetTables == null || targetTables.length == 0) ? sourceTables :
targetTables;
+ }
+
+ /**
+ * Validates whether Point-In-Time Recovery (PITR) is possible for the given
tables at the
+ * specified time.
+ * <p>
+ * PITR requires:
+ * <ul>
+ * <li>Continuous backup to be enabled for the source tables.</li>
+ * <li>A valid backup image and corresponding WALs to be available.</li>
+ * </ul>
+ * @param endTime The target recovery time.
+ * @param sTableArray The source tables to restore.
+ * @param tTableArray The target tables where the restore will be performed.
+ * @throws IOException If PITR is not possible due to missing continuous
backup or backup images.
+ */
+ private void validatePitr(long endTime, TableName[] sTableArray, TableName[]
tTableArray)
+ throws IOException {
+ try (BackupSystemTable table = new BackupSystemTable(conn)) {
+ // Retrieve the set of tables with continuous backup enabled
+ Map<TableName, Long> continuousBackupTables =
table.getContinuousBackupTableSet();
+
+ // Ensure all source tables have continuous backup enabled
+ validateContinuousBackup(sTableArray, continuousBackupTables);
+
+ // Fetch completed backup information
+ List<BackupInfo> backupInfos =
table.getBackupInfos(BackupState.COMPLETE);
+
+ // Ensure a valid backup and WALs exist for PITR
+ validateBackupAvailability(sTableArray, tTableArray, endTime,
continuousBackupTables,
+ backupInfos);
+ }
+ }
+
+ /**
+ * Ensures that all source tables have continuous backup enabled.
+ */
+ private void validateContinuousBackup(TableName[] tables,
+ Map<TableName, Long> continuousBackupTables) throws IOException {
+ List<TableName> missingTables =
+ Arrays.stream(tables).filter(table ->
!continuousBackupTables.containsKey(table)).toList();
+
+ if (!missingTables.isEmpty()) {
+ String errorMsg = "Continuous Backup is not enabled for the following
tables: "
+ +
missingTables.stream().map(TableName::getNameAsString).collect(Collectors.joining(",
"));
+ LOG.error(errorMsg);
+ throw new IOException(errorMsg);
+ }
+ }
+
+ /**
+ * Ensures that a valid backup and corresponding WALs exist for PITR for
each source table. PITR
+ * requires: 1. A valid backup available before the end time. 2. Write-Ahead
Logs (WALs) covering
+ * the remaining duration up to the end time.
+ */
+ private void validateBackupAvailability(TableName[] sTableArray, TableName[]
tTableArray,
+ long endTime, Map<TableName, Long> continuousBackupTables,
List<BackupInfo> backupInfos)
+ throws IOException {
+ for (int i = 0; i < sTableArray.length; i++) {
+ if (
+ !canPerformPitr(sTableArray[i], tTableArray[i], endTime,
continuousBackupTables,
+ backupInfos)
+ ) {
+ String errorMsg = "Could not find a valid backup and WALs for PITR for
table: "
+ + sTableArray[i].getNameAsString();
+ LOG.error(errorMsg);
+ throw new IOException(errorMsg);
+ }
+ }
+ }
+
+ /**
+ * Checks whether PITR can be performed for a given source-target table pair.
+ */
+ private boolean canPerformPitr(TableName stableName, TableName tTableName,
long endTime,
+ Map<TableName, Long> continuousBackupTables, List<BackupInfo> backupInfos)
{
+ return getValidBackupInfo(stableName, tTableName, endTime,
continuousBackupTables, backupInfos)
+ != null;
+ }
+
+ /**
+ * Finds a valid backup for PITR that meets the required conditions.
+ */
+ private BackupInfo getValidBackupInfo(TableName sTableName, TableName
tTablename, long endTime,
+ Map<TableName, Long> continuousBackupTables, List<BackupInfo> backupInfos)
{
+ for (BackupInfo info : backupInfos) {
+ if (isValidBackupForPitr(info, sTableName, endTime,
continuousBackupTables)) {
+
+ RestoreRequest restoreRequest =
+ BackupUtils.createRestoreRequest(info.getBackupRootDir(),
info.getBackupId(), true,
+ new TableName[] { sTableName }, new TableName[] { tTablename },
false);
+
+ try {
+ if (validateRequest(restoreRequest)) {
+ return info;
+ }
+ } catch (IOException e) {
+ LOG.warn("Exception occurred while testing the backup : {} for
restore ", info, e);
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Determines if the given backup is valid for PITR.
+ * <p>
+ * A backup is valid if:
+ * <ul>
+ * <li>It contains the source table.</li>
+ * <li>It was completed before the end time.</li>
+ * <li>The start timestamp of the backup is after the continuous backup
start time for the
+ * table.</li>
+ * </ul>
+ * @param info Backup information object.
+ * @param tableName Table to check.
+ * @param endTime The target recovery time.
+ * @param continuousBackupTables Map of tables with continuous backup
enabled.
+ * @return true if the backup is valid for PITR, false otherwise.
+ */
+ private boolean isValidBackupForPitr(BackupInfo info, TableName tableName,
long endTime,
+ Map<TableName, Long> continuousBackupTables) {
+ return info.getTableNames().contains(tableName) && info.getCompleteTs() <=
endTime
+ && continuousBackupTables.getOrDefault(tableName, 0L) <=
info.getStartTs();
+ }
+
+ /**
+ * Restores a table from a valid backup and replays WALs to reach the
desired PITR state.
+ */
+ private void restoreTableWithWalReplay(TableName sourceTable, TableName
targetTable, long endTime,
+ Map<TableName, Long> continuousBackupTables, List<BackupInfo> backupInfos,
+ PointInTimeRestoreRequest request) throws IOException {
+ BackupInfo backupInfo =
+ getValidBackupInfo(sourceTable, targetTable, endTime,
continuousBackupTables, backupInfos);
+ if (backupInfo == null) {
+ String errorMsg = "Could not find a valid backup and WALs for PITR for
table: "
+ + sourceTable.getNameAsString();
+ LOG.error(errorMsg);
+ throw new IOException(errorMsg);
+ }
+
+ RestoreRequest restoreRequest =
BackupUtils.createRestoreRequest(backupInfo.getBackupRootDir(),
+ backupInfo.getBackupId(), false, new TableName[] { sourceTable },
+ new TableName[] { targetTable }, request.isOverwrite());
+
+ restore(restoreRequest);
+ replayWal(sourceTable, targetTable, backupInfo.getStartTs(), endTime);
+ }
+
+ /**
+ * Replays WALs to bring the table to the desired state.
+ */
+ private void replayWal(TableName sourceTable, TableName targetTable, long
startTime, long endTime)
+ throws IOException {
+ String walBackupDir =
conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ Path walDirPath = new Path(walBackupDir);
+ LOG.info(
+ "Starting WAL replay for source: {}, target: {}, time range: {} - {},
WAL backup dir: {}",
+ sourceTable, targetTable, startTime, endTime, walDirPath);
+
+ List<String> validDirs =
+ getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime);
+ if (validDirs.isEmpty()) {
+ LOG.warn("No valid WAL directories found for range {} - {}. Skipping WAL
replay.", startTime,
+ endTime);
+ return;
+ }
+
+ executeWalReplay(validDirs, sourceTable, targetTable, startTime, endTime);
+ }
+
+ /**
+ * Fetches valid WAL directories based on the given time range.
+ */
+ private List<String> getValidWalDirs(Configuration conf, Path walBackupDir,
long startTime,
+ long endTime) throws IOException {
+ FileSystem backupFs = FileSystem.get(walBackupDir.toUri(), conf);
+ FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir,
WALS_DIR));
+
+ List<String> validDirs = new ArrayList<>();
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+
+ for (FileStatus dayDir : dayDirs) {
+ if (!dayDir.isDirectory()) {
+ continue; // Skip files, only process directories
+ }
+
+ String dirName = dayDir.getPath().getName();
+ try {
+ Date dirDate = dateFormat.parse(dirName);
+ long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00)
+ long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End
time of day (23:59:59)
+
+ // Check if this day's WAL files overlap with the required time range
+ if (dirEndTime >= startTime && dirStartTime <= endTime) {
+ validDirs.add(dayDir.getPath().toString());
+ }
+ } catch (ParseException e) {
+ LOG.warn("Skipping invalid directory name: " + dirName, e);
+ }
+ }
+ return validDirs;
+ }
+
+ /**
+ * Executes WAL replay using WALPlayer.
+ */
+ private void executeWalReplay(List<String> walDirs, TableName sourceTable,
TableName targetTable,
+ long startTime, long endTime) throws IOException {
+ Tool walPlayer = initializeWalPlayer(startTime, endTime);
+ String[] args =
+ { String.join(",", walDirs), sourceTable.getNameAsString(),
targetTable.getNameAsString() };
+
+ try {
+ LOG.info("Executing WALPlayer with args: {}", Arrays.toString(args));
+ int exitCode = walPlayer.run(args);
+ if (exitCode == 0) {
+ LOG.info("WAL replay completed successfully for {}", targetTable);
+ } else {
+ throw new IOException("WAL replay failed with exit code: " + exitCode);
+ }
+ } catch (Exception e) {
+ LOG.error("Error during WAL replay for {}: {}", targetTable,
e.getMessage(), e);
+ throw new IOException("Exception during WAL replay", e);
+ }
+ }
+
+ /**
+ * Initializes and configures WALPlayer.
+ */
+ private Tool initializeWalPlayer(long startTime, long endTime) {
+ Configuration conf = HBaseConfiguration.create(conn.getConfiguration());
+ conf.setLong(WALInputFormat.START_TIME_KEY, startTime);
+ conf.setLong(WALInputFormat.END_TIME_KEY, endTime);
+ Tool walPlayer = new WALPlayer();
+ walPlayer.setConf(conf);
+ return walPlayer;
+ }
+
@Override
public String backupTables(BackupRequest request) throws IOException {
BackupType type = request.getBackupType();
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index 9227f577a6a..1bdf6179585 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -168,6 +168,12 @@ public final class BackupSystemTable implements Closeable {
private final static String INCR_BACKUP_SET = "incrbackupset:";
private final static String CONTINUOUS_BACKUP_SET = "continuousbackupset";
+ /**
+ * Row key identifier for storing the last replicated WAL timestamp in the
backup system table for
+ * continuous backup.
+ */
+ private static final String CONTINUOUS_BACKUP_REPLICATION_TIMESTAMP_ROW =
+ "continuous_backup_last_replicated";
private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
private final static String RS_LOG_TS_PREFIX = "rslogts:";
@@ -1080,6 +1086,86 @@ public final class BackupSystemTable implements
Closeable {
}
}
+ /**
+ * Updates the latest replicated WAL timestamp for a region server in the
backup system table.
+ * This is used to track the replication checkpoint for continuous backup
and PITR (Point-in-Time
+ * Restore).
+ * @param serverName the server for which the latest WAL timestamp is being
recorded
+ * @param timestamp the timestamp (in milliseconds) of the last WAL entry
replicated
+ * @throws IOException if an error occurs while writing to the backup system
table
+ */
+ public void updateBackupCheckpointTimestamp(ServerName serverName, long
timestamp)
+ throws IOException {
+
+ HBaseProtos.ServerName.Builder serverProto =
+ HBaseProtos.ServerName.newBuilder().setHostName(serverName.getHostname())
+ .setPort(serverName.getPort()).setStartCode(serverName.getStartCode());
+
+ try (Table table = connection.getTable(tableName)) {
+ Put put =
createPutForBackupCheckpoint(serverProto.build().toByteArray(), timestamp);
+ if (!put.isEmpty()) {
+ table.put(put);
+ }
+ }
+ }
+
+ /**
+ * Retrieves the latest replicated WAL timestamps for all region servers
from the backup system
+ * table. This is used to track the replication checkpoint state for
continuous backup and PITR
+ * (Point-in-Time Restore).
+ * @return a map where the key is {@link ServerName} and the value is the
latest replicated WAL
+ * timestamp in milliseconds
+ * @throws IOException if an error occurs while reading from the backup
system table
+ */
+ public Map<ServerName, Long> getBackupCheckpointTimestamps() throws
IOException {
+ LOG.trace("Fetching latest backup checkpoint timestamps for all region
servers.");
+
+ Map<ServerName, Long> checkpointMap = new HashMap<>();
+
+ byte[] rowKey = rowkey(CONTINUOUS_BACKUP_REPLICATION_TIMESTAMP_ROW);
+ Get get = new Get(rowKey);
+ get.addFamily(BackupSystemTable.META_FAMILY);
+
+ try (Table table = connection.getTable(tableName)) {
+ Result result = table.get(get);
+
+ if (result.isEmpty()) {
+ LOG.debug("No checkpoint timestamps found in backup system table.");
+ return checkpointMap;
+ }
+
+ List<Cell> cells = result.listCells();
+ for (Cell cell : cells) {
+ try {
+ HBaseProtos.ServerName protoServer =
+ HBaseProtos.ServerName.parseFrom(CellUtil.cloneQualifier(cell));
+ ServerName serverName = ServerName.valueOf(protoServer.getHostName(),
+ protoServer.getPort(), protoServer.getStartCode());
+
+ long timestamp = Bytes.toLong(CellUtil.cloneValue(cell));
+ checkpointMap.put(serverName, timestamp);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Failed to parse server name or timestamp from cell: {}",
cell, e);
+ }
+ }
+ }
+
+ return checkpointMap;
+ }
+
+ /**
+ * Constructs a {@link Put} operation to update the last replicated WAL
timestamp for a given
+ * server in the backup system table.
+ * @param serverNameBytes the serialized server name as bytes
+ * @param timestamp the WAL entry timestamp to store
+ * @return a {@link Put} object ready to be written to the system table
+ */
+ private Put createPutForBackupCheckpoint(byte[] serverNameBytes, long
timestamp) {
+ Put put = new Put(rowkey(CONTINUOUS_BACKUP_REPLICATION_TIMESTAMP_ROW));
+ put.addColumn(BackupSystemTable.META_FAMILY, serverNameBytes,
Bytes.toBytes(timestamp));
+ return put;
+ }
+
/**
* Deletes incremental backup set for a backup destination
* @param backupRoot backup root
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
index d71c6ce6b4d..53f2f96b47e 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
@@ -166,10 +166,18 @@ public class FullTableBackupClient extends
TableBackupClient {
private void handleContinuousBackup(Admin admin) throws IOException {
backupInfo.setPhase(BackupInfo.BackupPhase.SETUP_WAL_REPLICATION);
long startTimestamp = startContinuousWALBackup(admin);
+ backupManager.addContinuousBackupTableSet(backupInfo.getTables(),
startTimestamp);
- performBackupSnapshots(admin);
+ // Updating the start time of this backup to reflect the actual beginning
of the full backup.
+ // So far, we have only set up continuous WAL replication, but the full
backup has not yet
+ // started.
+ // Setting the correct start time is crucial for Point-In-Time Recovery
(PITR).
+ // When selecting a backup for PITR, we must ensure that the backup
started **on or after** the
+ // starting time of the WALs. If WAL streaming began later, we couldn't
guarantee that WALs
+ // exist for the entire period between the backup's start time and the
desired PITR timestamp.
+ backupInfo.setStartTs(startTimestamp);
- backupManager.addContinuousBackupTableSet(backupInfo.getTables(),
startTimestamp);
+ performBackupSnapshots(admin);
// set overall backup status: complete. Here we make sure to complete the
backup.
// After this checkpoint, even if entering cancel process, will let the
backup finished
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
index c973af8102e..34fcd76bf9c 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
@@ -36,6 +36,9 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
@@ -82,6 +85,8 @@ public class ContinuousBackupReplicationEndpoint extends
BaseReplicationEndpoint
private String peerId;
private ScheduledExecutorService flushExecutor;
+ private long latestWALEntryTimestamp = -1L;
+
public static final long ONE_DAY_IN_MILLISECONDS = TimeUnit.DAYS.toMillis(1);
public static final String WAL_FILE_PREFIX = "wal_file.";
public static final String DATE_FORMAT = "yyyy-MM-dd";
@@ -174,6 +179,13 @@ public class ContinuousBackupReplicationEndpoint extends
BaseReplicationEndpoint
}
}
walWriters.clear();
+
+ // All received WAL entries have been flushed and persisted successfully.
+ // At this point, it's safe to record the latest replicated timestamp,
+ // as we are guaranteed that all entries up to that timestamp are durably
stored.
+ // This checkpoint is essential for enabling consistent Point-in-Time
Restore (PITR).
+ updateLastReplicatedTimestampForContinuousBackup();
+
LOG.info("{} WAL writers flushed and cleared", Utils.logPeerId(peerId));
}
@@ -218,6 +230,12 @@ public class ContinuousBackupReplicationEndpoint extends
BaseReplicationEndpoint
backupWalEntries(entry.getKey(), entry.getValue());
}
+ // Capture the timestamp of the last WAL entry processed. This is used
as the replication
+ // checkpoint so that point-in-time restores know the latest consistent
time up to which
+ // replication has
+ // occurred.
+ latestWALEntryTimestamp = entries.get(entries.size() -
1).getKey().getWriteTime();
+
if (isAnyWriterFull()) {
LOG.debug("{} Some WAL writers reached max size, triggering flush",
Utils.logPeerId(peerId));
@@ -237,6 +255,21 @@ public class ContinuousBackupReplicationEndpoint extends
BaseReplicationEndpoint
}
}
+ /**
+ * Persists the latest replicated WAL entry timestamp in the backup system
table. This checkpoint
+ * is critical for Continuous Backup and Point-in-Time Restore (PITR) to
ensure restore operations
+ * only go up to a known safe point. The value is stored per region server
using its ServerName as
+ * the key.
+ * @throws IOException if the checkpoint update fails
+ */
+ private void updateLastReplicatedTimestampForContinuousBackup() throws
IOException {
+ try (final Connection conn = ConnectionFactory.createConnection(conf);
+ BackupSystemTable backupSystemTable = new BackupSystemTable(conn)) {
+
backupSystemTable.updateBackupCheckpointTimestamp(replicationSource.getServerWALsBelongTo(),
+ latestWALEntryTimestamp);
+ }
+ }
+
private Map<Long, List<WAL.Entry>> groupEntriesByDay(List<WAL.Entry>
entries) {
return entries.stream().collect(
Collectors.groupingBy(entry -> (entry.getKey().getWriteTime() /
ONE_DAY_IN_MILLISECONDS)
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
index 15159ed73e4..34fa82ef45f 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase.backup.util;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
+import static
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT;
+import static
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URLDecoder;
@@ -49,11 +53,17 @@ import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.backup.RestoreRequest;
import org.apache.hadoop.hbase.backup.impl.BackupManifest;
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationQueueId;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -770,4 +780,156 @@ public final class BackupUtils {
return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp;
}
+ /**
+ * Calculates the replication checkpoint timestamp used for continuous
backup.
+ * <p>
+ * A replication checkpoint is the earliest timestamp across all region
servers such that every
+ * WAL entry before that point is known to be replicated to the target
system. This is essential
+ * for features like Point-in-Time Restore (PITR) and incremental backups,
where we want to
+ * confidently restore data to a consistent state without missing updates.
+ * <p>
+ * The checkpoint is calculated using a combination of:
+ * <ul>
+ * <li>The start timestamps of WAL files currently being replicated for each
server.</li>
+ * <li>The latest successfully replicated timestamp recorded by the
replication marker chore.</li>
+ * </ul>
+ * <p>
+ * We combine these two sources to handle the following challenges:
+ * <ul>
+ * <li><b>Stale WAL start times:</b> If replication traffic is low or WALs
are long-lived, the
+ * replication offset may point to the same WAL for a long time, resulting
in stale timestamps
+ * that underestimate progress. This could delay PITR unnecessarily.</li>
+ * <li><b>Limitations of marker-only tracking:</b> The replication marker
chore stores the last
+ * successfully replicated timestamp per region server in a system table.
However, this data may
+ * become stale if the server goes offline or region ownership changes. For
example, if a region
+ * initially belonged to rs1 and was later moved to rs4 due to re-balancing,
rs1’s marker would
+ * persist even though it no longer holds any regions. Relying solely on
these stale markers could
+ * lead to incorrect or outdated checkpoints.</li>
+ * </ul>
+ * <p>
+ * To handle these limitations, the method:
+ * <ol>
+ * <li>Verifies that the continuous backup peer exists to ensure replication
is enabled.</li>
+ * <li>Retrieves WAL replication queue information for the peer, collecting
WAL start times per
+ * region server. This gives us a lower bound for replication progress.</li>
+ * <li>Reads the marker chore's replicated timestamps from the backup system
table.</li>
+ * <li>For servers found in both sources, if the marker timestamp is more
recent than the WAL's
+ * start timestamp, we use the marker (since replication has progressed
beyond the WAL).</li>
+ * <li>We discard marker entries for region servers that are not present in
WAL queues, assuming
+ * those servers are no longer relevant (e.g., decommissioned or
reassigned).</li>
+ * <li>The checkpoint is the minimum of all chosen timestamps — i.e., the
slowest replicating
+ * region server.</li>
+ * <li>Finally, we persist the updated marker information to include any
newly participating
+ * region servers.</li>
+ * </ol>
+ * <p>
+ * Note: If the replication marker chore is disabled, we fall back to using
only the WAL start
+ * times. This ensures correctness but may lead to conservative checkpoint
estimates during idle
+ * periods.
+ * @param conn the HBase connection
+ * @return the calculated replication checkpoint timestamp
+ * @throws IOException if reading replication queues or updating the backup
system table fails
+ */
+ public static long getReplicationCheckpoint(Connection conn) throws
IOException {
+ Configuration conf = conn.getConfiguration();
+ long checkpoint = EnvironmentEdgeManager.getDelegate().currentTime();
+
+ // Step 1: Ensure the continuous backup replication peer exists
+ if (!continuousBackupReplicationPeerExists(conn.getAdmin())) {
+ String msg = "Replication peer '" + CONTINUOUS_BACKUP_REPLICATION_PEER
+ + "' not found. Continuous backup not enabled.";
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+
+ // Step 2: Get all replication queues for the continuous backup peer
+ ReplicationQueueStorage queueStorage =
+ ReplicationStorageFactory.getReplicationQueueStorage(conn, conf);
+
+ List<ReplicationQueueId> queueIds;
+ try {
+ queueIds =
queueStorage.listAllQueueIds(CONTINUOUS_BACKUP_REPLICATION_PEER);
+ } catch (ReplicationException e) {
+ String msg = "Failed to retrieve replication queue IDs for peer '"
+ + CONTINUOUS_BACKUP_REPLICATION_PEER + "'";
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+ }
+
+ if (queueIds.isEmpty()) {
+ String msg = "Replication peer '" + CONTINUOUS_BACKUP_REPLICATION_PEER +
"' has no queues. "
+ + "This may indicate that continuous backup replication is not
initialized correctly.";
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+
+ // Step 3: Build a map of ServerName -> WAL start timestamp (lowest seen
per server)
+ Map<ServerName, Long> serverToCheckpoint = new HashMap<>();
+ for (ReplicationQueueId queueId : queueIds) {
+ Map<String, ReplicationGroupOffset> offsets;
+ try {
+ offsets = queueStorage.getOffsets(queueId);
+ } catch (ReplicationException e) {
+ String msg = "Failed to fetch WAL offsets for replication queue: " +
queueId;
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+ }
+
+ for (ReplicationGroupOffset offset : offsets.values()) {
+ String walFile = offset.getWal();
+ long ts = AbstractFSWALProvider.getTimestamp(walFile); // WAL creation
time
+ ServerName server = queueId.getServerName();
+ // Store the minimum timestamp per server (ts - 1 to avoid edge
boundary issues)
+ serverToCheckpoint.merge(server, ts - 1, Math::min);
+ }
+ }
+
+ // Step 4: If replication markers are enabled, overlay fresher timestamps
from backup system
+ // table
+ boolean replicationMarkerEnabled =
+ conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY,
REPLICATION_MARKER_ENABLED_DEFAULT);
+ if (replicationMarkerEnabled) {
+ try (BackupSystemTable backupSystemTable = new BackupSystemTable(conn)) {
+ Map<ServerName, Long> markerTimestamps =
backupSystemTable.getBackupCheckpointTimestamps();
+
+ for (Map.Entry<ServerName, Long> entry : markerTimestamps.entrySet()) {
+ ServerName server = entry.getKey();
+ long markerTs = entry.getValue();
+
+ // If marker timestamp is newer, override
+ if (serverToCheckpoint.containsKey(server)) {
+ long current = serverToCheckpoint.get(server);
+ if (markerTs > current) {
+ serverToCheckpoint.put(server, markerTs);
+ }
+ } else {
+ // This server is no longer active (e.g., RS moved or removed);
skip
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping replication marker timestamp for inactive
server: {}", server);
+ }
+ }
+ }
+
+ // Step 5: Persist current server timestamps into backup system table
+ for (Map.Entry<ServerName, Long> entry :
serverToCheckpoint.entrySet()) {
+ backupSystemTable.updateBackupCheckpointTimestamp(entry.getKey(),
entry.getValue());
+ }
+ }
+ } else {
+ LOG.warn(
+ "Replication marker chore is disabled. Using WAL-based timestamps only
for checkpoint calculation.");
+ }
+
+ // Step 6: Calculate final checkpoint as minimum timestamp across all
active servers
+ for (long ts : serverToCheckpoint.values()) {
+ checkpoint = Math.min(checkpoint, ts);
+ }
+
+ return checkpoint;
+ }
+
+ private static boolean continuousBackupReplicationPeerExists(Admin admin)
throws IOException {
+ return admin.listReplicationPeers().stream()
+ .anyMatch(peer ->
peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER));
+ }
}
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index b9a76347440..582c7d1ae54 100644
---
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase.backup;
+import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE;
+import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INITIAL_DELAY;
+import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INTERVAL;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -296,6 +300,10 @@ public class TestBackupBase {
BACKUP_ROOT_DIR = Path.SEPARATOR + "backupUT";
BACKUP_REMOTE_ROOT_DIR = Path.SEPARATOR + "backupUT";
+ conf1.set(CONF_BACKUP_MAX_WAL_SIZE, "10240");
+ conf1.set(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, "10");
+ conf1.set(CONF_STAGED_WAL_FLUSH_INTERVAL, "10");
+
if (secure) {
// set the always on security provider
UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(),
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
new file mode 100644
index 00000000000..fb37977c4ee
--- /dev/null
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DATETIME;
+import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category(LargeTests.class)
+public class TestPointInTimeRestore extends TestBackupBase {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestPointInTimeRestore.class);
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestPointInTimeRestore.class);
+
+ private static final String backupWalDirName =
"TestPointInTimeRestoreWalDir";
+ private static final int WAIT_FOR_REPLICATION_MS = 30_000;
+ static Path backupWalDir;
+ static FileSystem fs;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ Path root = TEST_UTIL.getDataTestDirOnTestFS();
+ backupWalDir = new Path(root, backupWalDirName);
+ fs = FileSystem.get(conf1);
+ fs.mkdirs(backupWalDir);
+ conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString());
+
+ setUpBackups();
+ }
+
+ /**
+ * Sets up multiple backups at different timestamps by: 1. Adjusting the
system time to simulate
+ * past backup points. 2. Loading data into tables to create meaningful
snapshots. 3. Running full
+ * backups with or without continuous backup enabled. 4. Ensuring
replication is complete before
+ * proceeding.
+ */
+ private static void setUpBackups() throws Exception {
+ // Simulate a backup taken 20 days ago
+ EnvironmentEdgeManager
+ .injectEdge(() -> System.currentTimeMillis() - 20 *
ONE_DAY_IN_MILLISECONDS);
+ loadRandomData(table1, 1000); // Insert initial data into table1
+
+ // Perform a full backup for table1 with continuous backup enabled
+ String[] args = buildBackupArgs("full", new TableName[] { table1 }, true);
+ int ret = ToolRunner.run(conf1, new BackupDriver(), args);
+ assertEquals("Backup should succeed", 0, ret);
+
+ // Move time forward to simulate 15 days ago
+ EnvironmentEdgeManager
+ .injectEdge(() -> System.currentTimeMillis() - 15 *
ONE_DAY_IN_MILLISECONDS);
+ loadRandomData(table1, 1000); // Add more data to table1
+ loadRandomData(table2, 500); // Insert data into table2
+
+ waitForReplication(); // Ensure replication is complete
+
+ // Perform a full backup for table2 with continuous backup enabled
+ args = buildBackupArgs("full", new TableName[] { table2 }, true);
+ ret = ToolRunner.run(conf1, new BackupDriver(), args);
+ assertEquals("Backup should succeed", 0, ret);
+
+ // Move time forward to simulate 10 days ago
+ EnvironmentEdgeManager
+ .injectEdge(() -> System.currentTimeMillis() - 10 *
ONE_DAY_IN_MILLISECONDS);
+ loadRandomData(table2, 500); // Add more data to table2
+ loadRandomData(table3, 500); // Insert data into table3
+
+ // Perform a full backup for table3 and table4 (without continuous backup)
+ args = buildBackupArgs("full", new TableName[] { table3, table4 }, false);
+ ret = ToolRunner.run(conf1, new BackupDriver(), args);
+ assertEquals("Backup should succeed", 0, ret);
+
+ waitForReplication(); // Ensure replication is complete before concluding
setup
+
+ // Reset time mocking to avoid affecting other tests
+ EnvironmentEdgeManager.reset();
+ }
+
+ @AfterClass
+ public static void setupAfterClass() throws IOException {
+ Path root = TEST_UTIL.getDataTestDirOnTestFS();
+ Path backupWalDir = new Path(root, backupWalDirName);
+ FileSystem fs = FileSystem.get(conf1);
+
+ if (fs.exists(backupWalDir)) {
+ fs.delete(backupWalDir, true);
+ }
+
+ conf1.unset(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ }
+
+ /**
+ * Verifies that PITR (Point-in-Time Restore) fails when the requested
restore time is either in
+ * the future or outside the allowed retention window.
+ */
+ @Test
+ public void testPITR_FailsOutsideWindow() throws Exception {
+ // Case 1: Requested restore time is in the future (should fail)
+ String[] args = buildPITRArgs(new TableName[] { table1 },
+ new TableName[] { TableName.valueOf("restoredTable1") },
+ EnvironmentEdgeManager.currentTime() + ONE_DAY_IN_MILLISECONDS);
+
+ int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+ assertNotEquals("Restore should fail since the requested restore time is
in the future", 0,
+ ret);
+
+ // Case 2: Requested restore time is too old (beyond the retention window,
should fail)
+ args = buildPITRArgs(new TableName[] { table1 },
+ new TableName[] { TableName.valueOf("restoredTable1") },
+ EnvironmentEdgeManager.currentTime() - 40 * ONE_DAY_IN_MILLISECONDS);
+
+ ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+ assertNotEquals(
+ "Restore should fail since the requested restore time is outside the
retention window", 0,
+ ret);
+ }
+
+ /**
+ * Ensures that PITR fails when attempting to restore tables where
continuous backup was not
+ * enabled.
+ */
+ @Test
+ public void testPointInTimeRestore_ContinuousBackupNotEnabledTables() throws
Exception {
+ String[] args = buildPITRArgs(new TableName[] { table3 },
+ new TableName[] { TableName.valueOf("restoredTable1") },
+ EnvironmentEdgeManager.currentTime() - 10 * ONE_DAY_IN_MILLISECONDS);
+
+ int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+ assertNotEquals("Restore should fail since continuous backup is not
enabled for the table", 0,
+ ret);
+ }
+
+ /**
+ * Ensures that PITR fails when trying to restore from a point before
continuous backup started.
+ */
+ @Test
+ public void testPointInTimeRestore_TablesWithNoProperBackupOrWals() throws
Exception {
+ String[] args = buildPITRArgs(new TableName[] { table2 },
+ new TableName[] { TableName.valueOf("restoredTable1") },
+ EnvironmentEdgeManager.currentTime() - 16 * ONE_DAY_IN_MILLISECONDS);
+
+ int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+ assertNotEquals(
+ "Restore should fail since the requested restore point is before the
start of continuous backup",
+ 0, ret);
+ }
+
+ /**
+ * Verifies that PITR successfully restores data for a single table.
+ */
+ @Test
+ public void testPointInTimeRestore_SuccessfulRestoreForOneTable() throws
Exception {
+ TableName restoredTable = TableName.valueOf("restoredTable");
+
+ // Perform restore operation
+ String[] args = buildPITRArgs(new TableName[] { table1 }, new TableName[]
{ restoredTable },
+ EnvironmentEdgeManager.currentTime() - 5 * ONE_DAY_IN_MILLISECONDS);
+
+ int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+ assertEquals("Restore should succeed", 0, ret);
+
+ // Validate that the restored table contains the same number of rows as
the original table
+ assertEquals("Restored table should have the same row count as the
original",
+ getRowCount(table1), getRowCount(restoredTable));
+ }
+
+ /**
+ * Verifies that PITR successfully restores multiple tables at once.
+ */
+ @Test
+ public void testPointInTimeRestore_SuccessfulRestoreForMultipleTables()
throws Exception {
+ TableName restoredTable1 = TableName.valueOf("restoredTable1");
+ TableName restoredTable2 = TableName.valueOf("restoredTable2");
+
+ // Perform restore operation for multiple tables
+ String[] args = buildPITRArgs(new TableName[] { table1, table2 },
+ new TableName[] { restoredTable1, restoredTable2 },
+ EnvironmentEdgeManager.currentTime() - 5 * ONE_DAY_IN_MILLISECONDS);
+
+ int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+ assertEquals("Restore should succeed", 0, ret);
+
+ // Validate that the restored tables contain the same number of rows as
the originals
+ assertEquals("Restored table1 should have the same row count as the
original",
+ getRowCount(table1), getRowCount(restoredTable1));
+ assertEquals("Restored table2 should have the same row count as the
original",
+ getRowCount(table2), getRowCount(restoredTable2));
+ }
+
+ private String[] buildPITRArgs(TableName[] sourceTables, TableName[]
targetTables, long endTime) {
+ String sourceTableNames =
+
Arrays.stream(sourceTables).map(TableName::getNameAsString).collect(Collectors.joining(","));
+
+ String targetTableNames =
+
Arrays.stream(targetTables).map(TableName::getNameAsString).collect(Collectors.joining(","));
+
+ return new String[] { "-" + OPTION_TABLE, sourceTableNames, "-" +
OPTION_TABLE_MAPPING,
+ targetTableNames, "-" + OPTION_TO_DATETIME, String.valueOf(endTime) };
+ }
+
+ private static String[] buildBackupArgs(String backupType, TableName[]
tables,
+ boolean continuousEnabled) {
+ String tableNames =
+
Arrays.stream(tables).map(TableName::getNameAsString).collect(Collectors.joining(","));
+
+ if (continuousEnabled) {
+ return new String[] { "create", backupType, BACKUP_ROOT_DIR, "-" +
OPTION_TABLE, tableNames,
+ "-" + OPTION_ENABLE_CONTINUOUS_BACKUP };
+ } else {
+ return new String[] { "create", backupType, BACKUP_ROOT_DIR, "-" +
OPTION_TABLE, tableNames };
+ }
+ }
+
+ private static void loadRandomData(TableName tableName, int totalRows)
throws IOException {
+ int rowSize = 32;
+ try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+ TEST_UTIL.loadRandomRows(table, famName, rowSize, totalRows);
+ }
+ }
+
+ private static void waitForReplication() {
+ LOG.info("Waiting for replication to complete for {} ms",
WAIT_FOR_REPLICATION_MS);
+ try {
+ Thread.sleep(WAIT_FOR_REPLICATION_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Thread was interrupted while waiting", e);
+ }
+ }
+
+ private int getRowCount(TableName tableName) throws IOException {
+ try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+ return HBaseTestingUtil.countRows(table);
+ }
+ }
+}