This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch HBASE-28957_rebase
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 7da752635aa3d2d6055a1a1e6f1c3bd07cb86adb
Author: vinayak hegde <vinayakph...@gmail.com>
AuthorDate: Sat May 31 00:07:44 2025 +0530

    HBASE-29133: Implement "pitr" Command for Point-in-Time Restore (#6717)
    
    Signed-off-by: Andor Molnar <an...@apache.org>
    Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org>
---
 bin/hbase                                          |  17 +
 ...storeDriver.java => AbstractRestoreDriver.java} | 130 +++-----
 .../apache/hadoop/hbase/backup/BackupAdmin.java    |   7 +
 .../hbase/backup/BackupRestoreConstants.java       |  10 +
 .../hadoop/hbase/backup/HBackupFileSystem.java     |  40 +++
 .../hbase/backup/PointInTimeRestoreDriver.java     | 137 ++++++++
 .../hbase/backup/PointInTimeRestoreRequest.java    | 111 +++++++
 .../apache/hadoop/hbase/backup/RestoreDriver.java  | 218 +------------
 .../hadoop/hbase/backup/impl/BackupAdminImpl.java  | 360 ++++++++++++++++++++-
 .../hbase/backup/impl/BackupSystemTable.java       |  86 +++++
 .../hbase/backup/impl/FullTableBackupClient.java   |  12 +-
 .../ContinuousBackupReplicationEndpoint.java       |  33 ++
 .../hadoop/hbase/backup/util/BackupUtils.java      | 162 ++++++++++
 .../apache/hadoop/hbase/backup/TestBackupBase.java |   8 +
 .../hbase/backup/TestPointInTimeRestore.java       | 277 ++++++++++++++++
 15 files changed, 1310 insertions(+), 298 deletions(-)

diff --git a/bin/hbase b/bin/hbase
index d8d9b6ec5b2..d0343dc85a3 100755
--- a/bin/hbase
+++ b/bin/hbase
@@ -102,6 +102,7 @@ show_usage() {
   echo "  version          Print the version"
   echo "  backup           Backup tables for recovery"
   echo "  restore          Restore tables from existing backup image"
+  echo "  pitr             Restore tables to a specific point in time using 
backup and WAL replay"
   echo "  completebulkload Run BulkLoadHFiles tool"
   echo "  regionsplitter   Run RegionSplitter tool"
   echo "  rowcounter       Run RowCounter tool"
@@ -639,6 +640,22 @@ elif [ "$COMMAND" = "restore" ] ; then
       fi
     done
   fi
+elif [ "$COMMAND" = "pitr" ] ; then
+  CLASS='org.apache.hadoop.hbase.backup.PointInTimeRestoreDriver'
+  if [ -n "${shaded_jar}" ] ; then
+    for f in "${HBASE_HOME}"/lib/hbase-backup*.jar; do
+      if [ -f "${f}" ]; then
+        CLASSPATH="${CLASSPATH}:${f}"
+        break
+      fi
+    done
+    for f in "${HBASE_HOME}"/lib/commons-lang3*.jar; do
+      if [ -f "${f}" ]; then
+        CLASSPATH="${CLASSPATH}:${f}"
+        break
+      fi
+    done
+  fi
 elif [ "$COMMAND" = "upgrade" ] ; then
   echo "This command was used to upgrade to HBase 0.96, it was removed in 
HBase 2.0.0."
   echo "Please follow the documentation at 
http://hbase.apache.org/book.html#upgrading.";
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/AbstractRestoreDriver.java
similarity index 68%
copy from 
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java
copy to 
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/AbstractRestoreDriver.java
index 38b767ecf67..8e053a73a6b 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/AbstractRestoreDriver.java
@@ -33,15 +33,10 @@ import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_RESTORE_DESC;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.List;
 import java.util.Objects;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
 import org.apache.hadoop.hbase.backup.impl.BackupManager;
 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
@@ -49,8 +44,6 @@ import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.logging.Log4jUtils;
 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
-import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.util.ToolRunner;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,66 +51,60 @@ import org.slf4j.LoggerFactory;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
 
-/**
- * Command-line entry point for restore operation
- */
 @InterfaceAudience.Private
-public class RestoreDriver extends AbstractHBaseTool {
-  private static final Logger LOG = 
LoggerFactory.getLogger(RestoreDriver.class);
-  private CommandLine cmd;
-
-  private static final String USAGE_STRING =
-    "Usage: hbase restore <backup_path> <backup_id> [options]\n"
-      + "  backup_path     Path to a backup destination root\n"
-      + "  backup_id       Backup image ID to restore\n"
-      + "  table(s)        Comma-separated list of tables to restore\n";
+public abstract class AbstractRestoreDriver extends AbstractHBaseTool {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractRestoreDriver.class);
+  protected CommandLine cmd;
 
-  private static final String USAGE_FOOTER = "";
+  protected static final String USAGE_FOOTER = "";
 
-  protected RestoreDriver() throws IOException {
+  protected AbstractRestoreDriver() {
     init();
   }
 
   protected void init() {
-    // disable irrelevant loggers to avoid it mess up command output
     Log4jUtils.disableZkAndClientLoggers();
   }
 
+  protected abstract int executeRestore(boolean check, TableName[] fromTables, 
TableName[] toTables,
+    boolean isOverwrite);
+
   private int parseAndRun() throws IOException {
-    // Check if backup is enabled
     if (!BackupManager.isBackupEnabled(getConf())) {
       System.err.println(BackupRestoreConstants.ENABLE_BACKUP);
       return -1;
     }
 
-    // enable debug logging
     if (cmd.hasOption(OPTION_DEBUG)) {
       Log4jUtils.setLogLevel("org.apache.hadoop.hbase.backup", "DEBUG");
     }
 
-    // whether to overwrite to existing table if any, false by default
     boolean overwrite = cmd.hasOption(OPTION_OVERWRITE);
     if (overwrite) {
-      LOG.debug("Found -overwrite option in restore command, "
-        + "will overwrite to existing table if any in the restore target");
+      LOG.debug("Found overwrite option (-{}) in restore command, "
+        + "will overwrite to existing table if any in the restore target", 
OPTION_OVERWRITE);
     }
 
-    // whether to only check the dependencies, false by default
     boolean check = cmd.hasOption(OPTION_CHECK);
     if (check) {
       LOG.debug(
-        "Found -check option in restore command, " + "will check and verify 
the dependencies");
+        "Found check option (-{}) in restore command, will check and verify 
the dependencies",
+        OPTION_CHECK);
     }
 
     if (cmd.hasOption(OPTION_SET) && cmd.hasOption(OPTION_TABLE)) {
-      System.err.println(
-        "Options -s and -t are mutaully exclusive," + " you can not specify 
both of them.");
+      System.err.printf(
+        "Set name (-%s) and table list (-%s) are mutually exclusive, you can 
not specify both "
+          + "of them.%n",
+        OPTION_SET, OPTION_TABLE);
       printToolUsage();
       return -1;
     }
 
     if (!cmd.hasOption(OPTION_SET) && !cmd.hasOption(OPTION_TABLE)) {
-      System.err.println("You have to specify either set name or table list to 
restore");
+      System.err.printf(
+        "You have to specify either set name (-%s) or table list (-%s) to " + 
"restore%n",
+        OPTION_SET, OPTION_TABLE);
       printToolUsage();
       return -1;
     }
@@ -128,20 +115,13 @@ public class RestoreDriver extends AbstractHBaseTool {
       getConf().set("mapreduce.job.queuename", queueName);
     }
 
-    // parse main restore command options
-    String[] remainArgs = cmd.getArgs();
-    if (remainArgs.length != 2) {
-      printToolUsage();
-      return -1;
-    }
-
-    String backupRootDir = remainArgs[0];
-    String backupId = remainArgs[1];
     String tables;
-    String tableMapping =
-      cmd.hasOption(OPTION_TABLE_MAPPING) ? 
cmd.getOptionValue(OPTION_TABLE_MAPPING) : null;
-    try (final Connection conn = ConnectionFactory.createConnection(conf);
-      BackupAdmin client = new BackupAdminImpl(conn)) {
+    TableName[] sTableArray;
+    TableName[] tTableArray;
+
+    String tableMapping = cmd.getOptionValue(OPTION_TABLE_MAPPING);
+
+    try (final Connection conn = ConnectionFactory.createConnection(conf)) {
       // Check backup set
       if (cmd.hasOption(OPTION_SET)) {
         String setName = cmd.getOptionValue(OPTION_SET);
@@ -162,8 +142,8 @@ public class RestoreDriver extends AbstractHBaseTool {
         tables = cmd.getOptionValue(OPTION_TABLE);
       }
 
-      TableName[] sTableArray = BackupUtils.parseTableNames(tables);
-      TableName[] tTableArray = BackupUtils.parseTableNames(tableMapping);
+      sTableArray = BackupUtils.parseTableNames(tables);
+      tTableArray = BackupUtils.parseTableNames(tableMapping);
 
       if (
         sTableArray != null && tTableArray != null && (sTableArray.length != 
tTableArray.length)
@@ -172,31 +152,13 @@ public class RestoreDriver extends AbstractHBaseTool {
         printToolUsage();
         return -4;
       }
-
-      client.restore(BackupUtils.createRestoreRequest(backupRootDir, backupId, 
check, sTableArray,
-        tTableArray, overwrite));
-    } catch (Exception e) {
-      LOG.error("Error while running restore backup", e);
-      return -5;
     }
-    return 0;
-  }
 
-  private String getTablesForSet(Connection conn, String name) throws 
IOException {
-    try (final BackupSystemTable table = new BackupSystemTable(conn)) {
-      List<TableName> tables = table.describeBackupSet(name);
-
-      if (tables == null) {
-        return null;
-      }
-
-      return StringUtils.join(tables, 
BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND);
-    }
+    return executeRestore(check, sTableArray, tTableArray, overwrite);
   }
 
   @Override
   protected void addOptions() {
-    // define supported options
     addOptNoArg(OPTION_OVERWRITE, OPTION_OVERWRITE_DESC);
     addOptNoArg(OPTION_CHECK, OPTION_CHECK_DESC);
     addOptNoArg(OPTION_DEBUG, OPTION_DEBUG_DESC);
@@ -216,26 +178,14 @@ public class RestoreDriver extends AbstractHBaseTool {
     return parseAndRun();
   }
 
-  public static void main(String[] args) throws Exception {
-    Configuration conf = HBaseConfiguration.create();
-    Path hbasedir = CommonFSUtils.getRootDir(conf);
-    URI defaultFs = hbasedir.getFileSystem(conf).getUri();
-    CommonFSUtils.setFsDefault(conf, new Path(defaultFs));
-    int ret = ToolRunner.run(conf, new RestoreDriver(), args);
-    System.exit(ret);
-  }
-
   @Override
   public int run(String[] args) {
     Objects.requireNonNull(conf, "Tool configuration is not initialized");
 
-    CommandLine cmd;
     try {
-      // parse the command line arguments
       cmd = parseArgs(args);
-      cmdLineArgs = args;
     } catch (Exception e) {
-      System.out.println("Error when parsing command-line arguments: " + 
e.getMessage());
+      System.out.println("Error parsing command-line arguments: " + 
e.getMessage());
       printToolUsage();
       return EXIT_FAILURE;
     }
@@ -247,18 +197,16 @@ public class RestoreDriver extends AbstractHBaseTool {
 
     processOptions(cmd);
 
-    int ret = EXIT_FAILURE;
     try {
-      ret = doWork();
+      return doWork();
     } catch (Exception e) {
-      LOG.error("Error running command-line tool", e);
+      LOG.error("Error running restore tool", e);
       return EXIT_FAILURE;
     }
-    return ret;
   }
 
   protected void printToolUsage() {
-    System.out.println(USAGE_STRING);
+    System.out.println(getUsageString());
     HelpFormatter helpFormatter = new HelpFormatter();
     helpFormatter.setLeftPadding(2);
     helpFormatter.setDescPadding(8);
@@ -267,4 +215,18 @@ public class RestoreDriver extends AbstractHBaseTool {
     helpFormatter.printHelp(" ", null, options, USAGE_FOOTER);
     System.out.println(BackupRestoreConstants.VERIFY_BACKUP);
   }
+
+  protected abstract String getUsageString();
+
+  private String getTablesForSet(Connection conn, String name) throws 
IOException {
+    try (final BackupSystemTable table = new BackupSystemTable(conn)) {
+      List<TableName> tables = table.describeBackupSet(name);
+
+      if (tables == null) {
+        return null;
+      }
+
+      return StringUtils.join(tables, 
BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND);
+    }
+  }
 }
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java
index 25055fd5e8e..7fdf1d9bfbf 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java
@@ -51,6 +51,13 @@ public interface BackupAdmin extends Closeable {
    */
   void restore(RestoreRequest request) throws IOException;
 
+  /**
+   * Restore the tables to specific time
+   * @param request Point in Time restore request
+   * @throws IOException exception
+   */
+  void pointInTimeRestore(PointInTimeRestoreRequest request) throws 
IOException;
+
   /**
    * Describe backup image command
    * @param backupId backup id
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java
index 3df67ac1aef..be6e4c2686d 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java
@@ -109,6 +109,16 @@ public interface BackupRestoreConstants {
   String OPTION_FORCE_DELETE_DESC =
     "Flag to forcefully delete the backup, even if it may be required for 
Point-in-Time Restore";
 
+  String OPTION_TO_DATETIME = "td";
+  String LONG_OPTION_TO_DATETIME = "to-datetime";
+  String OPTION_TO_DATETIME_DESC = "Target date and time up to which data 
should be restored";
+
+  String OPTION_PITR_BACKUP_PATH = "bp";
+  String LONG_OPTION_PITR_BACKUP_PATH = "backup-path";
+  String OPTION_PITR_BACKUP_PATH_DESC =
+    "Specifies a custom backup location for Point-In-Time Recovery (PITR). "
+      + "If provided, this location will be used exclusively instead of 
deriving the path from the system table.";
+
   String JOB_NAME_CONF_KEY = "mapreduce.job.name";
 
   String BACKUP_CONFIG_STRING = BackupRestoreConstants.BACKUP_ENABLE_KEY + 
"=true\n"
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
index d5fd9aaf4c3..0ad5b3f0e05 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
@@ -17,13 +17,21 @@
  */
 package org.apache.hadoop.hbase.backup;
 
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUPID_PREFIX;
+
 import com.google.errorprone.annotations.RestrictedApi;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.impl.BackupManifest;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -135,4 +143,36 @@ public final class HBackupFileSystem {
       new BackupManifest(conf, getManifestPath(conf, backupRootPath, 
backupId));
     return manifest;
   }
+
+  public static List<BackupImage> getAllBackupImages(Configuration conf, Path 
backupRootPath)
+    throws IOException {
+    FileSystem fs = FileSystem.get(backupRootPath.toUri(), conf);
+    RemoteIterator<LocatedFileStatus> it = 
fs.listLocatedStatus(backupRootPath);
+
+    List<BackupImage> images = new ArrayList<>();
+
+    while (it.hasNext()) {
+      LocatedFileStatus lfs = it.next();
+      if (!lfs.isDirectory()) {
+        continue;
+      }
+
+      String backupId = lfs.getPath().getName();
+      try {
+        BackupManifest manifest = getManifest(conf, backupRootPath, backupId);
+        images.add(manifest.getBackupImage());
+      } catch (IOException e) {
+        LOG.error("Cannot load backup manifest from: " + lfs.getPath(), e);
+      }
+    }
+
+    // Sort images by timestamp in descending order
+    images.sort(Comparator.comparingLong(m -> -getTimestamp(m.getBackupId())));
+
+    return images;
+  }
+
+  private static long getTimestamp(String backupId) {
+    return Long.parseLong(backupId.substring(BACKUPID_PREFIX.length()));
+  }
 }
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java
new file mode 100644
index 00000000000..abdf52f1430
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.LONG_OPTION_PITR_BACKUP_PATH;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.LONG_OPTION_TO_DATETIME;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_PITR_BACKUP_PATH;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_PITR_BACKUP_PATH_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DATETIME;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DATETIME_DESC;
+
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Strings;
+
+/**
+ * Command-line entry point for restore operation
+ */
+@InterfaceAudience.Private
+public class PointInTimeRestoreDriver extends AbstractRestoreDriver {
+  private static final String USAGE_STRING = """
+      Usage: hbase pitr [options]
+        <backup_path>   Backup Path to use for Point in Time Restore
+        table(s)        Comma-separated list of tables to restore
+      """;
+
+  @Override
+  protected int executeRestore(boolean check, TableName[] fromTables, 
TableName[] toTables,
+    boolean isOverwrite) {
+    String walBackupDir = getConf().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+    if (Strings.isNullOrEmpty(walBackupDir)) {
+      System.err.printf(
+        "Point-in-Time Restore requires the WAL backup directory (%s) to 
replay logs after full and incremental backups. "
+          + "Set this property if you need Point-in-Time Restore. Otherwise, 
use the normal restore process with the appropriate backup ID.%n",
+        CONF_CONTINUOUS_BACKUP_WAL_DIR);
+      return -1;
+    }
+
+    String[] remainArgs = cmd.getArgs();
+    if (remainArgs.length != 0) {
+      printToolUsage();
+      return -1;
+    }
+
+    String backupRootDir = cmd.getOptionValue(OPTION_PITR_BACKUP_PATH);
+
+    try (final Connection conn = ConnectionFactory.createConnection(conf);
+      BackupAdmin client = new BackupAdminImpl(conn)) {
+      // Get the replication checkpoint (last known safe point for Continuous 
Backup)
+      long replicationCheckpoint = BackupUtils.getReplicationCheckpoint(conn);
+      long endTime = replicationCheckpoint;
+
+      if (cmd.hasOption(OPTION_TO_DATETIME)) {
+        String time = cmd.getOptionValue(OPTION_TO_DATETIME);
+        try {
+          endTime = Long.parseLong(time);
+          // Convert seconds to milliseconds if input is in seconds
+          if (endTime < 10_000_000_000L) {
+            endTime *= 1000;
+          }
+        } catch (NumberFormatException e) {
+          System.out.println("ERROR: Invalid timestamp format for 
--to-datetime: " + time);
+          printToolUsage();
+          return -5;
+        }
+      }
+
+      // Ensure the requested restore time does not exceed the replication 
checkpoint
+      if (endTime > replicationCheckpoint) {
+        LOG.error(
+          "ERROR: Requested restore time ({}) exceeds the last known safe 
replication checkpoint ({}). "
+            + "Please choose a time before this checkpoint to ensure data 
consistency.",
+          endTime, replicationCheckpoint);
+        return -5;
+      }
+
+      PointInTimeRestoreRequest pointInTimeRestoreRequest = new 
PointInTimeRestoreRequest.Builder()
+        
.withBackupRootDir(backupRootDir).withCheck(check).withFromTables(fromTables)
+        
.withToTables(toTables).withOverwrite(isOverwrite).withToDateTime(endTime).build();
+
+      client.pointInTimeRestore(pointInTimeRestoreRequest);
+    } catch (Exception e) {
+      LOG.error("Error while running restore backup", e);
+      return -5;
+    }
+    return 0;
+  }
+
+  @Override
+  protected void addOptions() {
+    super.addOptions();
+    addOptWithArg(OPTION_TO_DATETIME, LONG_OPTION_TO_DATETIME, 
OPTION_TO_DATETIME_DESC);
+    addOptWithArg(OPTION_PITR_BACKUP_PATH, LONG_OPTION_PITR_BACKUP_PATH,
+      OPTION_PITR_BACKUP_PATH_DESC);
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    Path rootDir = CommonFSUtils.getRootDir(conf);
+    URI defaultFs = rootDir.getFileSystem(conf).getUri();
+    CommonFSUtils.setFsDefault(conf, new Path(defaultFs));
+    int ret = ToolRunner.run(conf, new PointInTimeRestoreDriver(), args);
+    System.exit(ret);
+  }
+
+  @Override
+  protected String getUsageString() {
+    return USAGE_STRING;
+  }
+}
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreRequest.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreRequest.java
new file mode 100644
index 00000000000..f2462a1cfd1
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreRequest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * POJO class for Point In Time Restore request
+ */
+@InterfaceAudience.Private
+public final class PointInTimeRestoreRequest {
+
+  private final String backupRootDir;
+  private final boolean check;
+  private final TableName[] fromTables;
+  private final TableName[] toTables;
+  private final boolean overwrite;
+  private final long toDateTime;
+
+  private PointInTimeRestoreRequest(Builder builder) {
+    this.backupRootDir = builder.backupRootDir;
+    this.check = builder.check;
+    this.fromTables = builder.fromTables;
+    this.toTables = builder.toTables;
+    this.overwrite = builder.overwrite;
+    this.toDateTime = builder.toDateTime;
+  }
+
+  public String getBackupRootDir() {
+    return backupRootDir;
+  }
+
+  public boolean isCheck() {
+    return check;
+  }
+
+  public TableName[] getFromTables() {
+    return fromTables;
+  }
+
+  public TableName[] getToTables() {
+    return toTables;
+  }
+
+  public boolean isOverwrite() {
+    return overwrite;
+  }
+
+  public long getToDateTime() {
+    return toDateTime;
+  }
+
+  public static class Builder {
+    private String backupRootDir;
+    private boolean check = false;
+    private TableName[] fromTables;
+    private TableName[] toTables;
+    private boolean overwrite = false;
+    private long toDateTime;
+
+    public Builder withBackupRootDir(String backupRootDir) {
+      this.backupRootDir = backupRootDir;
+      return this;
+    }
+
+    public Builder withCheck(boolean check) {
+      this.check = check;
+      return this;
+    }
+
+    public Builder withFromTables(TableName[] fromTables) {
+      this.fromTables = fromTables;
+      return this;
+    }
+
+    public Builder withToTables(TableName[] toTables) {
+      this.toTables = toTables;
+      return this;
+    }
+
+    public Builder withOverwrite(boolean overwrite) {
+      this.overwrite = overwrite;
+      return this;
+    }
+
+    public Builder withToDateTime(long dateTime) {
+      this.toDateTime = dateTime;
+      return this;
+    }
+
+    public PointInTimeRestoreRequest build() {
+      return new PointInTimeRestoreRequest(this);
+    }
+  }
+}
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java
index 38b767ecf67..efc4b0df9a0 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java
@@ -17,117 +17,34 @@
  */
 package org.apache.hadoop.hbase.backup;
 
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK_DESC;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG_DESC;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE_DESC;
-import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET_RESTORE_DESC;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_LIST_DESC;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING_DESC;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_RESTORE_DESC;
-
-import java.io.IOException;
 import java.net.URI;
-import java.util.List;
-import java.util.Objects;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
-import org.apache.hadoop.hbase.backup.impl.BackupManager;
-import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.logging.Log4jUtils;
-import org.apache.hadoop.hbase.util.AbstractHBaseTool;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
-import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
 
 /**
  * Command-line entry point for restore operation
  */
 @InterfaceAudience.Private
-public class RestoreDriver extends AbstractHBaseTool {
-  private static final Logger LOG = 
LoggerFactory.getLogger(RestoreDriver.class);
-  private CommandLine cmd;
-
-  private static final String USAGE_STRING =
-    "Usage: hbase restore <backup_path> <backup_id> [options]\n"
-      + "  backup_path     Path to a backup destination root\n"
-      + "  backup_id       Backup image ID to restore\n"
-      + "  table(s)        Comma-separated list of tables to restore\n";
-
-  private static final String USAGE_FOOTER = "";
-
-  protected RestoreDriver() throws IOException {
-    init();
-  }
-
-  protected void init() {
-    // disable irrelevant loggers to avoid it mess up command output
-    Log4jUtils.disableZkAndClientLoggers();
-  }
-
-  private int parseAndRun() throws IOException {
-    // Check if backup is enabled
-    if (!BackupManager.isBackupEnabled(getConf())) {
-      System.err.println(BackupRestoreConstants.ENABLE_BACKUP);
-      return -1;
-    }
-
-    // enable debug logging
-    if (cmd.hasOption(OPTION_DEBUG)) {
-      Log4jUtils.setLogLevel("org.apache.hadoop.hbase.backup", "DEBUG");
-    }
-
-    // whether to overwrite to existing table if any, false by default
-    boolean overwrite = cmd.hasOption(OPTION_OVERWRITE);
-    if (overwrite) {
-      LOG.debug("Found -overwrite option in restore command, "
-        + "will overwrite to existing table if any in the restore target");
-    }
-
-    // whether to only check the dependencies, false by default
-    boolean check = cmd.hasOption(OPTION_CHECK);
-    if (check) {
-      LOG.debug(
-        "Found -check option in restore command, " + "will check and verify 
the dependencies");
-    }
-
-    if (cmd.hasOption(OPTION_SET) && cmd.hasOption(OPTION_TABLE)) {
-      System.err.println(
-        "Options -s and -t are mutaully exclusive," + " you can not specify 
both of them.");
-      printToolUsage();
-      return -1;
-    }
-
-    if (!cmd.hasOption(OPTION_SET) && !cmd.hasOption(OPTION_TABLE)) {
-      System.err.println("You have to specify either set name or table list to 
restore");
-      printToolUsage();
-      return -1;
-    }
-
-    if (cmd.hasOption(OPTION_YARN_QUEUE_NAME)) {
-      String queueName = cmd.getOptionValue(OPTION_YARN_QUEUE_NAME);
-      // Set MR job queuename to configuration
-      getConf().set("mapreduce.job.queuename", queueName);
-    }
+public class RestoreDriver extends AbstractRestoreDriver {
+  private static final String USAGE_STRING = """
+      Usage: hbase restore <backup_path> <backup_id> [options]
+        backup_path     Path to a backup destination root
+        backup_id       Backup image ID to restore
+        table(s)        Comma-separated list of tables to restore
+      """;
 
+  @Override
+  protected int executeRestore(boolean check, TableName[] fromTables, 
TableName[] toTables,
+    boolean isOverwrite) {
     // parse main restore command options
     String[] remainArgs = cmd.getArgs();
     if (remainArgs.length != 2) {
@@ -137,44 +54,11 @@ public class RestoreDriver extends AbstractHBaseTool {
 
     String backupRootDir = remainArgs[0];
     String backupId = remainArgs[1];
-    String tables;
-    String tableMapping =
-      cmd.hasOption(OPTION_TABLE_MAPPING) ? 
cmd.getOptionValue(OPTION_TABLE_MAPPING) : null;
+
     try (final Connection conn = ConnectionFactory.createConnection(conf);
       BackupAdmin client = new BackupAdminImpl(conn)) {
-      // Check backup set
-      if (cmd.hasOption(OPTION_SET)) {
-        String setName = cmd.getOptionValue(OPTION_SET);
-        try {
-          tables = getTablesForSet(conn, setName);
-        } catch (IOException e) {
-          System.out.println("ERROR: " + e.getMessage() + " for setName=" + 
setName);
-          printToolUsage();
-          return -2;
-        }
-        if (tables == null) {
-          System.out
-            .println("ERROR: Backup set '" + setName + "' is either empty or 
does not exist");
-          printToolUsage();
-          return -3;
-        }
-      } else {
-        tables = cmd.getOptionValue(OPTION_TABLE);
-      }
-
-      TableName[] sTableArray = BackupUtils.parseTableNames(tables);
-      TableName[] tTableArray = BackupUtils.parseTableNames(tableMapping);
-
-      if (
-        sTableArray != null && tTableArray != null && (sTableArray.length != 
tTableArray.length)
-      ) {
-        System.out.println("ERROR: table mapping mismatch: " + tables + " : " 
+ tableMapping);
-        printToolUsage();
-        return -4;
-      }
-
-      client.restore(BackupUtils.createRestoreRequest(backupRootDir, backupId, 
check, sTableArray,
-        tTableArray, overwrite));
+      client.restore(BackupUtils.createRestoreRequest(backupRootDir, backupId, 
check, fromTables,
+        toTables, isOverwrite));
     } catch (Exception e) {
       LOG.error("Error while running restore backup", e);
       return -5;
@@ -182,40 +66,6 @@ public class RestoreDriver extends AbstractHBaseTool {
     return 0;
   }
 
-  private String getTablesForSet(Connection conn, String name) throws 
IOException {
-    try (final BackupSystemTable table = new BackupSystemTable(conn)) {
-      List<TableName> tables = table.describeBackupSet(name);
-
-      if (tables == null) {
-        return null;
-      }
-
-      return StringUtils.join(tables, 
BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND);
-    }
-  }
-
-  @Override
-  protected void addOptions() {
-    // define supported options
-    addOptNoArg(OPTION_OVERWRITE, OPTION_OVERWRITE_DESC);
-    addOptNoArg(OPTION_CHECK, OPTION_CHECK_DESC);
-    addOptNoArg(OPTION_DEBUG, OPTION_DEBUG_DESC);
-    addOptWithArg(OPTION_SET, OPTION_SET_RESTORE_DESC);
-    addOptWithArg(OPTION_TABLE, OPTION_TABLE_LIST_DESC);
-    addOptWithArg(OPTION_TABLE_MAPPING, OPTION_TABLE_MAPPING_DESC);
-    addOptWithArg(OPTION_YARN_QUEUE_NAME, OPTION_YARN_QUEUE_NAME_RESTORE_DESC);
-  }
-
-  @Override
-  protected void processOptions(CommandLine cmd) {
-    this.cmd = cmd;
-  }
-
-  @Override
-  protected int doWork() throws Exception {
-    return parseAndRun();
-  }
-
   public static void main(String[] args) throws Exception {
     Configuration conf = HBaseConfiguration.create();
     Path hbasedir = CommonFSUtils.getRootDir(conf);
@@ -226,45 +76,7 @@ public class RestoreDriver extends AbstractHBaseTool {
   }
 
   @Override
-  public int run(String[] args) {
-    Objects.requireNonNull(conf, "Tool configuration is not initialized");
-
-    CommandLine cmd;
-    try {
-      // parse the command line arguments
-      cmd = parseArgs(args);
-      cmdLineArgs = args;
-    } catch (Exception e) {
-      System.out.println("Error when parsing command-line arguments: " + 
e.getMessage());
-      printToolUsage();
-      return EXIT_FAILURE;
-    }
-
-    if (cmd.hasOption(SHORT_HELP_OPTION) || cmd.hasOption(LONG_HELP_OPTION)) {
-      printToolUsage();
-      return EXIT_FAILURE;
-    }
-
-    processOptions(cmd);
-
-    int ret = EXIT_FAILURE;
-    try {
-      ret = doWork();
-    } catch (Exception e) {
-      LOG.error("Error running command-line tool", e);
-      return EXIT_FAILURE;
-    }
-    return ret;
-  }
-
-  protected void printToolUsage() {
-    System.out.println(USAGE_STRING);
-    HelpFormatter helpFormatter = new HelpFormatter();
-    helpFormatter.setLeftPadding(2);
-    helpFormatter.setDescPadding(8);
-    helpFormatter.setWidth(100);
-    helpFormatter.setSyntaxPrefix("Options:");
-    helpFormatter.printHelp(" ", null, options, USAGE_FOOTER);
-    System.out.println(BackupRestoreConstants.VERIFY_BACKUP);
+  protected String getUsageString() {
+    return USAGE_STRING;
   }
 }
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
index 1e745c69cda..e94389d6938 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
@@ -17,18 +17,32 @@
  */
 package org.apache.hadoop.hbase.backup.impl;
 
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
+import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+
 import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.BackupAdmin;
 import org.apache.hadoop.hbase.backup.BackupClientFactory;
@@ -40,12 +54,16 @@ import 
org.apache.hadoop.hbase.backup.BackupRestoreConstants;
 import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
 import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest;
 import org.apache.hadoop.hbase.backup.RestoreRequest;
 import org.apache.hadoop.hbase.backup.util.BackupSet;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.Tool;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -490,15 +508,8 @@ public class BackupAdminImpl implements BackupAdmin {
   @Override
   public void restore(RestoreRequest request) throws IOException {
     if (request.isCheck()) {
-      // check and load backup image manifest for the tables
-      Path rootPath = new Path(request.getBackupRootDir());
-      String backupId = request.getBackupId();
-      TableName[] sTableArray = request.getFromTables();
-      BackupManifest manifest =
-        HBackupFileSystem.getManifest(conn.getConfiguration(), rootPath, 
backupId);
-
-      // Check and validate the backup image and its dependencies
-      if (BackupUtils.validate(Arrays.asList(sTableArray), manifest, 
conn.getConfiguration())) {
+      boolean isValid = validateRequest(request);
+      if (isValid) {
         LOG.info(CHECK_OK);
       } else {
         LOG.error(CHECK_FAILED);
@@ -509,6 +520,337 @@ public class BackupAdminImpl implements BackupAdmin {
     new RestoreTablesClient(conn, request).execute();
   }
 
+  private boolean validateRequest(RestoreRequest request) throws IOException {
+    // check and load backup image manifest for the tables
+    Path rootPath = new Path(request.getBackupRootDir());
+    String backupId = request.getBackupId();
+    TableName[] sTableArray = request.getFromTables();
+    BackupManifest manifest =
+      HBackupFileSystem.getManifest(conn.getConfiguration(), rootPath, 
backupId);
+
+    // Validate the backup image and its dependencies
+    return BackupUtils.validate(Arrays.asList(sTableArray), manifest, 
conn.getConfiguration());
+  }
+
+  @Override
+  public void pointInTimeRestore(PointInTimeRestoreRequest request) throws 
IOException {
+    if (request.getBackupRootDir() == null) {
+      defaultPointInTimeRestore(request);
+    } else {
+      // TODO: special case, not supported at the moment
+      throw new IOException("Custom backup location for Point-In-Time Recovery 
Not supported!");
+    }
+    LOG.info("Successfully completed Point In Time Restore for all tables.");
+  }
+
+  /**
+   * Performs a default Point-In-Time Restore (PITR) by restoring the latest 
valid backup and
+   * replaying the WALs to bring the table to the desired state. PITR 
requires: 1. A valid backup
+   * available before the end time. 2. Write-Ahead Logs (WALs) covering the 
remaining duration up to
+   * the end time.
+   * @param request PointInTimeRestoreRequest containing restore parameters.
+   * @throws IOException If no valid backup or WALs are found, or if an error 
occurs during
+   *                     restoration.
+   */
+  private void defaultPointInTimeRestore(PointInTimeRestoreRequest request) 
throws IOException {
+    long endTime = request.getToDateTime();
+    validateRequestToTime(endTime);
+
+    TableName[] sTableArray = request.getFromTables();
+    TableName[] tTableArray = resolveTargetTables(sTableArray, 
request.getToTables());
+
+    // Validate PITR requirements
+    validatePitr(endTime, sTableArray, tTableArray);
+
+    // If only validation is required, log and return
+    if (request.isCheck()) {
+      LOG.info("PITR can be successfully executed");
+      return;
+    }
+
+    // Execute PITR process
+    try (BackupSystemTable table = new BackupSystemTable(conn)) {
+      Map<TableName, Long> continuousBackupTables = 
table.getContinuousBackupTableSet();
+      List<BackupInfo> backupInfos = 
table.getBackupInfos(BackupState.COMPLETE);
+
+      for (int i = 0; i < sTableArray.length; i++) {
+        restoreTableWithWalReplay(sTableArray[i], tTableArray[i], endTime, 
continuousBackupTables,
+          backupInfos, request);
+      }
+    }
+  }
+
+  /**
+   * Validates whether the requested end time falls within the allowed PITR 
recovery window.
+   * @param endTime The target recovery time.
+   * @throws IOException If the requested recovery time is outside the allowed 
window.
+   */
+  private void validateRequestToTime(long endTime) throws IOException {
+    long pitrWindowDays = 
conn.getConfiguration().getLong(CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS,
+      DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS);
+    long currentTime = EnvironmentEdgeManager.getDelegate().currentTime();
+    long pitrMaxStartTime = currentTime - 
TimeUnit.DAYS.toMillis(pitrWindowDays);
+
+    if (endTime < pitrMaxStartTime) {
+      String errorMsg = String.format(
+        "Requested recovery time (%d) is out of the allowed PITR window (last 
%d days).", endTime,
+        pitrWindowDays);
+      LOG.error(errorMsg);
+      throw new IOException(errorMsg);
+    }
+
+    if (endTime > currentTime) {
+      String errorMsg = String.format(
+        "Requested recovery time (%d) is in the future. Current time: %d.", 
endTime, currentTime);
+      LOG.error(errorMsg);
+      throw new IOException(errorMsg);
+    }
+  }
+
+  /**
+   * Resolves the target table array. If null or empty, defaults to the source 
table array.
+   */
+  private TableName[] resolveTargetTables(TableName[] sourceTables, 
TableName[] targetTables) {
+    return (targetTables == null || targetTables.length == 0) ? sourceTables : 
targetTables;
+  }
+
+  /**
+   * Validates whether Point-In-Time Recovery (PITR) is possible for the given 
tables at the
+   * specified time.
+   * <p>
+   * PITR requires:
+   * <ul>
+   * <li>Continuous backup to be enabled for the source tables.</li>
+   * <li>A valid backup image and corresponding WALs to be available.</li>
+   * </ul>
+   * @param endTime     The target recovery time.
+   * @param sTableArray The source tables to restore.
+   * @param tTableArray The target tables where the restore will be performed.
+   * @throws IOException If PITR is not possible due to missing continuous 
backup or backup images.
+   */
+  private void validatePitr(long endTime, TableName[] sTableArray, TableName[] 
tTableArray)
+    throws IOException {
+    try (BackupSystemTable table = new BackupSystemTable(conn)) {
+      // Retrieve the set of tables with continuous backup enabled
+      Map<TableName, Long> continuousBackupTables = 
table.getContinuousBackupTableSet();
+
+      // Ensure all source tables have continuous backup enabled
+      validateContinuousBackup(sTableArray, continuousBackupTables);
+
+      // Fetch completed backup information
+      List<BackupInfo> backupInfos = 
table.getBackupInfos(BackupState.COMPLETE);
+
+      // Ensure a valid backup and WALs exist for PITR
+      validateBackupAvailability(sTableArray, tTableArray, endTime, 
continuousBackupTables,
+        backupInfos);
+    }
+  }
+
+  /**
+   * Ensures that all source tables have continuous backup enabled.
+   */
+  private void validateContinuousBackup(TableName[] tables,
+    Map<TableName, Long> continuousBackupTables) throws IOException {
+    List<TableName> missingTables =
+      Arrays.stream(tables).filter(table -> 
!continuousBackupTables.containsKey(table)).toList();
+
+    if (!missingTables.isEmpty()) {
+      String errorMsg = "Continuous Backup is not enabled for the following 
tables: "
+        + 
missingTables.stream().map(TableName::getNameAsString).collect(Collectors.joining(",
 "));
+      LOG.error(errorMsg);
+      throw new IOException(errorMsg);
+    }
+  }
+
+  /**
+   * Ensures that a valid backup and corresponding WALs exist for PITR for 
each source table. PITR
+   * requires: 1. A valid backup available before the end time. 2. Write-Ahead 
Logs (WALs) covering
+   * the remaining duration up to the end time.
+   */
+  private void validateBackupAvailability(TableName[] sTableArray, TableName[] 
tTableArray,
+    long endTime, Map<TableName, Long> continuousBackupTables, 
List<BackupInfo> backupInfos)
+    throws IOException {
+    for (int i = 0; i < sTableArray.length; i++) {
+      if (
+        !canPerformPitr(sTableArray[i], tTableArray[i], endTime, 
continuousBackupTables,
+          backupInfos)
+      ) {
+        String errorMsg = "Could not find a valid backup and WALs for PITR for 
table: "
+          + sTableArray[i].getNameAsString();
+        LOG.error(errorMsg);
+        throw new IOException(errorMsg);
+      }
+    }
+  }
+
+  /**
+   * Checks whether PITR can be performed for a given source-target table pair.
+   */
+  private boolean canPerformPitr(TableName stableName, TableName tTableName, 
long endTime,
+    Map<TableName, Long> continuousBackupTables, List<BackupInfo> backupInfos) 
{
+    return getValidBackupInfo(stableName, tTableName, endTime, 
continuousBackupTables, backupInfos)
+        != null;
+  }
+
+  /**
+   * Finds a valid backup for PITR that meets the required conditions.
+   */
+  private BackupInfo getValidBackupInfo(TableName sTableName, TableName 
tTablename, long endTime,
+    Map<TableName, Long> continuousBackupTables, List<BackupInfo> backupInfos) 
{
+    for (BackupInfo info : backupInfos) {
+      if (isValidBackupForPitr(info, sTableName, endTime, 
continuousBackupTables)) {
+
+        RestoreRequest restoreRequest =
+          BackupUtils.createRestoreRequest(info.getBackupRootDir(), 
info.getBackupId(), true,
+            new TableName[] { sTableName }, new TableName[] { tTablename }, 
false);
+
+        try {
+          if (validateRequest(restoreRequest)) {
+            return info;
+          }
+        } catch (IOException e) {
+          LOG.warn("Exception occurred while testing the backup : {} for 
restore ", info, e);
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Determines if the given backup is valid for PITR.
+   * <p>
+   * A backup is valid if:
+   * <ul>
+   * <li>It contains the source table.</li>
+   * <li>It was completed before the end time.</li>
+   * <li>The start timestamp of the backup is after the continuous backup 
start time for the
+   * table.</li>
+   * </ul>
+   * @param info                   Backup information object.
+   * @param tableName              Table to check.
+   * @param endTime                The target recovery time.
+   * @param continuousBackupTables Map of tables with continuous backup 
enabled.
+   * @return true if the backup is valid for PITR, false otherwise.
+   */
+  private boolean isValidBackupForPitr(BackupInfo info, TableName tableName, 
long endTime,
+    Map<TableName, Long> continuousBackupTables) {
+    return info.getTableNames().contains(tableName) && info.getCompleteTs() <= 
endTime
+      && continuousBackupTables.getOrDefault(tableName, 0L) <= 
info.getStartTs();
+  }
+
+  /**
+   * Restores a table from a valid backup and replays WALs to reach the 
desired PITR state.
+   */
+  private void restoreTableWithWalReplay(TableName sourceTable, TableName 
targetTable, long endTime,
+    Map<TableName, Long> continuousBackupTables, List<BackupInfo> backupInfos,
+    PointInTimeRestoreRequest request) throws IOException {
+    BackupInfo backupInfo =
+      getValidBackupInfo(sourceTable, targetTable, endTime, 
continuousBackupTables, backupInfos);
+    if (backupInfo == null) {
+      String errorMsg = "Could not find a valid backup and WALs for PITR for 
table: "
+        + sourceTable.getNameAsString();
+      LOG.error(errorMsg);
+      throw new IOException(errorMsg);
+    }
+
+    RestoreRequest restoreRequest = 
BackupUtils.createRestoreRequest(backupInfo.getBackupRootDir(),
+      backupInfo.getBackupId(), false, new TableName[] { sourceTable },
+      new TableName[] { targetTable }, request.isOverwrite());
+
+    restore(restoreRequest);
+    replayWal(sourceTable, targetTable, backupInfo.getStartTs(), endTime);
+  }
+
+  /**
+   * Replays WALs to bring the table to the desired state.
+   */
+  private void replayWal(TableName sourceTable, TableName targetTable, long 
startTime, long endTime)
+    throws IOException {
+    String walBackupDir = 
conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+    Path walDirPath = new Path(walBackupDir);
+    LOG.info(
+      "Starting WAL replay for source: {}, target: {}, time range: {} - {}, 
WAL backup dir: {}",
+      sourceTable, targetTable, startTime, endTime, walDirPath);
+
+    List<String> validDirs =
+      getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime);
+    if (validDirs.isEmpty()) {
+      LOG.warn("No valid WAL directories found for range {} - {}. Skipping WAL 
replay.", startTime,
+        endTime);
+      return;
+    }
+
+    executeWalReplay(validDirs, sourceTable, targetTable, startTime, endTime);
+  }
+
+  /**
+   * Fetches valid WAL directories based on the given time range.
+   */
+  private List<String> getValidWalDirs(Configuration conf, Path walBackupDir, 
long startTime,
+    long endTime) throws IOException {
+    FileSystem backupFs = FileSystem.get(walBackupDir.toUri(), conf);
+    FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, 
WALS_DIR));
+
+    List<String> validDirs = new ArrayList<>();
+    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+
+    for (FileStatus dayDir : dayDirs) {
+      if (!dayDir.isDirectory()) {
+        continue; // Skip files, only process directories
+      }
+
+      String dirName = dayDir.getPath().getName();
+      try {
+        Date dirDate = dateFormat.parse(dirName);
+        long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00)
+        long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End 
time of day (23:59:59)
+
+        // Check if this day's WAL files overlap with the required time range
+        if (dirEndTime >= startTime && dirStartTime <= endTime) {
+          validDirs.add(dayDir.getPath().toString());
+        }
+      } catch (ParseException e) {
+        LOG.warn("Skipping invalid directory name: " + dirName, e);
+      }
+    }
+    return validDirs;
+  }
+
+  /**
+   * Executes WAL replay using WALPlayer.
+   */
+  private void executeWalReplay(List<String> walDirs, TableName sourceTable, 
TableName targetTable,
+    long startTime, long endTime) throws IOException {
+    Tool walPlayer = initializeWalPlayer(startTime, endTime);
+    String[] args =
+      { String.join(",", walDirs), sourceTable.getNameAsString(), 
targetTable.getNameAsString() };
+
+    try {
+      LOG.info("Executing WALPlayer with args: {}", Arrays.toString(args));
+      int exitCode = walPlayer.run(args);
+      if (exitCode == 0) {
+        LOG.info("WAL replay completed successfully for {}", targetTable);
+      } else {
+        throw new IOException("WAL replay failed with exit code: " + exitCode);
+      }
+    } catch (Exception e) {
+      LOG.error("Error during WAL replay for {}: {}", targetTable, 
e.getMessage(), e);
+      throw new IOException("Exception during WAL replay", e);
+    }
+  }
+
+  /**
+   * Initializes and configures WALPlayer.
+   */
+  private Tool initializeWalPlayer(long startTime, long endTime) {
+    Configuration conf = HBaseConfiguration.create(conn.getConfiguration());
+    conf.setLong(WALInputFormat.START_TIME_KEY, startTime);
+    conf.setLong(WALInputFormat.END_TIME_KEY, endTime);
+    Tool walPlayer = new WALPlayer();
+    walPlayer.setConf(conf);
+    return walPlayer;
+  }
+
   @Override
   public String backupTables(BackupRequest request) throws IOException {
     BackupType type = request.getBackupType();
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index fa01cd88345..b72858fa578 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -170,6 +170,12 @@ public final class BackupSystemTable implements Closeable {
 
   private final static String INCR_BACKUP_SET = "incrbackupset:";
   private final static String CONTINUOUS_BACKUP_SET = "continuousbackupset";
+  /**
+   * Row key identifier for storing the last replicated WAL timestamp in the 
backup system table for
+   * continuous backup.
+   */
+  private static final String CONTINUOUS_BACKUP_REPLICATION_TIMESTAMP_ROW =
+    "continuous_backup_last_replicated";
   private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
   private final static String RS_LOG_TS_PREFIX = "rslogts:";
 
@@ -1003,6 +1009,86 @@ public final class BackupSystemTable implements 
Closeable {
     }
   }
 
+  /**
+   * Updates the latest replicated WAL timestamp for a region server in the 
backup system table.
+   * This is used to track the replication checkpoint for continuous backup 
and PITR (Point-in-Time
+   * Restore).
+   * @param serverName the server for which the latest WAL timestamp is being 
recorded
+   * @param timestamp  the timestamp (in milliseconds) of the last WAL entry 
replicated
+   * @throws IOException if an error occurs while writing to the backup system 
table
+   */
+  public void updateBackupCheckpointTimestamp(ServerName serverName, long 
timestamp)
+    throws IOException {
+
+    HBaseProtos.ServerName.Builder serverProto =
+      HBaseProtos.ServerName.newBuilder().setHostName(serverName.getHostname())
+        .setPort(serverName.getPort()).setStartCode(serverName.getStartCode());
+
+    try (Table table = connection.getTable(tableName)) {
+      Put put = 
createPutForBackupCheckpoint(serverProto.build().toByteArray(), timestamp);
+      if (!put.isEmpty()) {
+        table.put(put);
+      }
+    }
+  }
+
+  /**
+   * Retrieves the latest replicated WAL timestamps for all region servers 
from the backup system
+   * table. This is used to track the replication checkpoint state for 
continuous backup and PITR
+   * (Point-in-Time Restore).
+   * @return a map where the key is {@link ServerName} and the value is the 
latest replicated WAL
+   *         timestamp in milliseconds
+   * @throws IOException if an error occurs while reading from the backup 
system table
+   */
+  public Map<ServerName, Long> getBackupCheckpointTimestamps() throws 
IOException {
+    LOG.trace("Fetching latest backup checkpoint timestamps for all region 
servers.");
+
+    Map<ServerName, Long> checkpointMap = new HashMap<>();
+
+    byte[] rowKey = rowkey(CONTINUOUS_BACKUP_REPLICATION_TIMESTAMP_ROW);
+    Get get = new Get(rowKey);
+    get.addFamily(BackupSystemTable.META_FAMILY);
+
+    try (Table table = connection.getTable(tableName)) {
+      Result result = table.get(get);
+
+      if (result.isEmpty()) {
+        LOG.debug("No checkpoint timestamps found in backup system table.");
+        return checkpointMap;
+      }
+
+      List<Cell> cells = result.listCells();
+      for (Cell cell : cells) {
+        try {
+          HBaseProtos.ServerName protoServer =
+            HBaseProtos.ServerName.parseFrom(CellUtil.cloneQualifier(cell));
+          ServerName serverName = ServerName.valueOf(protoServer.getHostName(),
+            protoServer.getPort(), protoServer.getStartCode());
+
+          long timestamp = Bytes.toLong(CellUtil.cloneValue(cell));
+          checkpointMap.put(serverName, timestamp);
+        } catch (IllegalArgumentException e) {
+          LOG.warn("Failed to parse server name or timestamp from cell: {}", 
cell, e);
+        }
+      }
+    }
+
+    return checkpointMap;
+  }
+
+  /**
+   * Constructs a {@link Put} operation to update the last replicated WAL 
timestamp for a given
+   * server in the backup system table.
+   * @param serverNameBytes the serialized server name as bytes
+   * @param timestamp       the WAL entry timestamp to store
+   * @return a {@link Put} object ready to be written to the system table
+   */
+  private Put createPutForBackupCheckpoint(byte[] serverNameBytes, long 
timestamp) {
+    Put put = new Put(rowkey(CONTINUOUS_BACKUP_REPLICATION_TIMESTAMP_ROW));
+    put.addColumn(BackupSystemTable.META_FAMILY, serverNameBytes, 
Bytes.toBytes(timestamp));
+    return put;
+  }
+
   /**
    * Deletes incremental backup set for a backup destination
    * @param backupRoot backup root
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
index 623494577be..42341cb2186 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
@@ -166,10 +166,18 @@ public class FullTableBackupClient extends 
TableBackupClient {
   private void handleContinuousBackup(Admin admin) throws IOException {
     backupInfo.setPhase(BackupInfo.BackupPhase.SETUP_WAL_REPLICATION);
     long startTimestamp = startContinuousWALBackup(admin);
+    backupManager.addContinuousBackupTableSet(backupInfo.getTables(), 
startTimestamp);
 
-    performBackupSnapshots(admin);
+    // Updating the start time of this backup to reflect the actual beginning 
of the full backup.
+    // So far, we have only set up continuous WAL replication, but the full 
backup has not yet
+    // started.
+    // Setting the correct start time is crucial for Point-In-Time Recovery 
(PITR).
+    // When selecting a backup for PITR, we must ensure that the backup 
started **on or after** the
+    // starting time of the WALs. If WAL streaming began later, we couldn't 
guarantee that WALs
+    // exist for the entire period between the backup's start time and the 
desired PITR timestamp.
+    backupInfo.setStartTs(startTimestamp);
 
-    backupManager.addContinuousBackupTableSet(backupInfo.getTables(), 
startTimestamp);
+    performBackupSnapshots(admin);
 
     // set overall backup status: complete. Here we make sure to complete the 
backup.
     // After this checkpoint, even if entering cancel process, will let the 
backup finished
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
index c973af8102e..34fcd76bf9c 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
@@ -36,6 +36,9 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
@@ -82,6 +85,8 @@ public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint
   private String peerId;
   private ScheduledExecutorService flushExecutor;
 
+  private long latestWALEntryTimestamp = -1L;
+
   public static final long ONE_DAY_IN_MILLISECONDS = TimeUnit.DAYS.toMillis(1);
   public static final String WAL_FILE_PREFIX = "wal_file.";
   public static final String DATE_FORMAT = "yyyy-MM-dd";
@@ -174,6 +179,13 @@ public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint
       }
     }
     walWriters.clear();
+
+    // All received WAL entries have been flushed and persisted successfully.
+    // At this point, it's safe to record the latest replicated timestamp,
+    // as we are guaranteed that all entries up to that timestamp are durably 
stored.
+    // This checkpoint is essential for enabling consistent Point-in-Time 
Restore (PITR).
+    updateLastReplicatedTimestampForContinuousBackup();
+
     LOG.info("{} WAL writers flushed and cleared", Utils.logPeerId(peerId));
   }
 
@@ -218,6 +230,12 @@ public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint
         backupWalEntries(entry.getKey(), entry.getValue());
       }
 
+      // Capture the timestamp of the last WAL entry processed. This is used 
as the replication
+      // checkpoint so that point-in-time restores know the latest consistent 
time up to which
+      // replication has
+      // occurred.
+      latestWALEntryTimestamp = entries.get(entries.size() - 
1).getKey().getWriteTime();
+
       if (isAnyWriterFull()) {
         LOG.debug("{} Some WAL writers reached max size, triggering flush",
           Utils.logPeerId(peerId));
@@ -237,6 +255,21 @@ public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint
     }
   }
 
+  /**
+   * Persists the latest replicated WAL entry timestamp in the backup system 
table. This checkpoint
+   * is critical for Continuous Backup and Point-in-Time Restore (PITR) to 
ensure restore operations
+   * only go up to a known safe point. The value is stored per region server 
using its ServerName as
+   * the key.
+   * @throws IOException if the checkpoint update fails
+   */
+  private void updateLastReplicatedTimestampForContinuousBackup() throws 
IOException {
+    try (final Connection conn = ConnectionFactory.createConnection(conf);
+      BackupSystemTable backupSystemTable = new BackupSystemTable(conn)) {
+      
backupSystemTable.updateBackupCheckpointTimestamp(replicationSource.getServerWALsBelongTo(),
+        latestWALEntryTimestamp);
+    }
+  }
+
   private Map<Long, List<WAL.Entry>> groupEntriesByDay(List<WAL.Entry> 
entries) {
     return entries.stream().collect(
       Collectors.groupingBy(entry -> (entry.getKey().getWriteTime() / 
ONE_DAY_IN_MILLISECONDS)
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
index 15159ed73e4..34fa82ef45f 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hbase.backup.util;
 
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
+import static 
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT;
+import static 
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URLDecoder;
@@ -49,11 +53,17 @@ import org.apache.hadoop.hbase.backup.HBackupFileSystem;
 import org.apache.hadoop.hbase.backup.RestoreRequest;
 import org.apache.hadoop.hbase.backup.impl.BackupManifest;
 import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationQueueId;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -770,4 +780,156 @@ public final class BackupUtils {
     return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp;
   }
 
+  /**
+   * Calculates the replication checkpoint timestamp used for continuous 
backup.
+   * <p>
+   * A replication checkpoint is the earliest timestamp across all region 
servers such that every
+   * WAL entry before that point is known to be replicated to the target 
system. This is essential
+   * for features like Point-in-Time Restore (PITR) and incremental backups, 
where we want to
+   * confidently restore data to a consistent state without missing updates.
+   * <p>
+   * The checkpoint is calculated using a combination of:
+   * <ul>
+   * <li>The start timestamps of WAL files currently being replicated for each 
server.</li>
+   * <li>The latest successfully replicated timestamp recorded by the 
replication marker chore.</li>
+   * </ul>
+   * <p>
+   * We combine these two sources to handle the following challenges:
+   * <ul>
+   * <li><b>Stale WAL start times:</b> If replication traffic is low or WALs 
are long-lived, the
+   * replication offset may point to the same WAL for a long time, resulting 
in stale timestamps
+   * that underestimate progress. This could delay PITR unnecessarily.</li>
+   * <li><b>Limitations of marker-only tracking:</b> The replication marker 
chore stores the last
+   * successfully replicated timestamp per region server in a system table. 
However, this data may
+   * become stale if the server goes offline or region ownership changes. For 
example, if a region
+   * initially belonged to rs1 and was later moved to rs4 due to re-balancing, 
rs1’s marker would
+   * persist even though it no longer holds any regions. Relying solely on 
these stale markers could
+   * lead to incorrect or outdated checkpoints.</li>
+   * </ul>
+   * <p>
+   * To handle these limitations, the method:
+   * <ol>
+   * <li>Verifies that the continuous backup peer exists to ensure replication 
is enabled.</li>
+   * <li>Retrieves WAL replication queue information for the peer, collecting 
WAL start times per
+   * region server. This gives us a lower bound for replication progress.</li>
+   * <li>Reads the marker chore's replicated timestamps from the backup system 
table.</li>
+   * <li>For servers found in both sources, if the marker timestamp is more 
recent than the WAL's
+   * start timestamp, we use the marker (since replication has progressed 
beyond the WAL).</li>
+   * <li>We discard marker entries for region servers that are not present in 
WAL queues, assuming
+   * those servers are no longer relevant (e.g., decommissioned or 
reassigned).</li>
+   * <li>The checkpoint is the minimum of all chosen timestamps — i.e., the 
slowest replicating
+   * region server.</li>
+   * <li>Finally, we persist the updated marker information to include any 
newly participating
+   * region servers.</li>
+   * </ol>
+   * <p>
+   * Note: If the replication marker chore is disabled, we fall back to using 
only the WAL start
+   * times. This ensures correctness but may lead to conservative checkpoint 
estimates during idle
+   * periods.
+   * @param conn the HBase connection
+   * @return the calculated replication checkpoint timestamp
+   * @throws IOException if reading replication queues or updating the backup 
system table fails
+   */
+  public static long getReplicationCheckpoint(Connection conn) throws 
IOException {
+    Configuration conf = conn.getConfiguration();
+    long checkpoint = EnvironmentEdgeManager.getDelegate().currentTime();
+
+    // Step 1: Ensure the continuous backup replication peer exists
+    if (!continuousBackupReplicationPeerExists(conn.getAdmin())) {
+      String msg = "Replication peer '" + CONTINUOUS_BACKUP_REPLICATION_PEER
+        + "' not found. Continuous backup not enabled.";
+      LOG.error(msg);
+      throw new IOException(msg);
+    }
+
+    // Step 2: Get all replication queues for the continuous backup peer
+    ReplicationQueueStorage queueStorage =
+      ReplicationStorageFactory.getReplicationQueueStorage(conn, conf);
+
+    List<ReplicationQueueId> queueIds;
+    try {
+      queueIds = 
queueStorage.listAllQueueIds(CONTINUOUS_BACKUP_REPLICATION_PEER);
+    } catch (ReplicationException e) {
+      String msg = "Failed to retrieve replication queue IDs for peer '"
+        + CONTINUOUS_BACKUP_REPLICATION_PEER + "'";
+      LOG.error(msg, e);
+      throw new IOException(msg, e);
+    }
+
+    if (queueIds.isEmpty()) {
+      String msg = "Replication peer '" + CONTINUOUS_BACKUP_REPLICATION_PEER + 
"' has no queues. "
+        + "This may indicate that continuous backup replication is not 
initialized correctly.";
+      LOG.error(msg);
+      throw new IOException(msg);
+    }
+
+    // Step 3: Build a map of ServerName -> WAL start timestamp (lowest seen 
per server)
+    Map<ServerName, Long> serverToCheckpoint = new HashMap<>();
+    for (ReplicationQueueId queueId : queueIds) {
+      Map<String, ReplicationGroupOffset> offsets;
+      try {
+        offsets = queueStorage.getOffsets(queueId);
+      } catch (ReplicationException e) {
+        String msg = "Failed to fetch WAL offsets for replication queue: " + 
queueId;
+        LOG.error(msg, e);
+        throw new IOException(msg, e);
+      }
+
+      for (ReplicationGroupOffset offset : offsets.values()) {
+        String walFile = offset.getWal();
+        long ts = AbstractFSWALProvider.getTimestamp(walFile); // WAL creation 
time
+        ServerName server = queueId.getServerName();
+        // Store the minimum timestamp per server (ts - 1 to avoid edge 
boundary issues)
+        serverToCheckpoint.merge(server, ts - 1, Math::min);
+      }
+    }
+
+    // Step 4: If replication markers are enabled, overlay fresher timestamps 
from backup system
+    // table
+    boolean replicationMarkerEnabled =
+      conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, 
REPLICATION_MARKER_ENABLED_DEFAULT);
+    if (replicationMarkerEnabled) {
+      try (BackupSystemTable backupSystemTable = new BackupSystemTable(conn)) {
+        Map<ServerName, Long> markerTimestamps = 
backupSystemTable.getBackupCheckpointTimestamps();
+
+        for (Map.Entry<ServerName, Long> entry : markerTimestamps.entrySet()) {
+          ServerName server = entry.getKey();
+          long markerTs = entry.getValue();
+
+          // If marker timestamp is newer, override
+          if (serverToCheckpoint.containsKey(server)) {
+            long current = serverToCheckpoint.get(server);
+            if (markerTs > current) {
+              serverToCheckpoint.put(server, markerTs);
+            }
+          } else {
+            // This server is no longer active (e.g., RS moved or removed); 
skip
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Skipping replication marker timestamp for inactive 
server: {}", server);
+            }
+          }
+        }
+
+        // Step 5: Persist current server timestamps into backup system table
+        for (Map.Entry<ServerName, Long> entry : 
serverToCheckpoint.entrySet()) {
+          backupSystemTable.updateBackupCheckpointTimestamp(entry.getKey(), 
entry.getValue());
+        }
+      }
+    } else {
+      LOG.warn(
+        "Replication marker chore is disabled. Using WAL-based timestamps only 
for checkpoint calculation.");
+    }
+
+    // Step 6: Calculate final checkpoint as minimum timestamp across all 
active servers
+    for (long ts : serverToCheckpoint.values()) {
+      checkpoint = Math.min(checkpoint, ts);
+    }
+
+    return checkpoint;
+  }
+
+  private static boolean continuousBackupReplicationPeerExists(Admin admin) 
throws IOException {
+    return admin.listReplicationPeers().stream()
+      .anyMatch(peer -> 
peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER));
+  }
 }
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index b5f58508441..e5c78d04854 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hbase.backup;
 
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INITIAL_DELAY;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INTERVAL;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -296,6 +300,10 @@ public class TestBackupBase {
     BACKUP_ROOT_DIR = Path.SEPARATOR + "backupUT";
     BACKUP_REMOTE_ROOT_DIR = Path.SEPARATOR + "backupUT";
 
+    conf1.set(CONF_BACKUP_MAX_WAL_SIZE, "10240");
+    conf1.set(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, "10");
+    conf1.set(CONF_STAGED_WAL_FLUSH_INTERVAL, "10");
+
     if (secure) {
       // set the always on security provider
       UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(),
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
new file mode 100644
index 00000000000..fb37977c4ee
--- /dev/null
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DATETIME;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category(LargeTests.class)
+public class TestPointInTimeRestore extends TestBackupBase {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestPointInTimeRestore.class);
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestPointInTimeRestore.class);
+
+  private static final String backupWalDirName = 
"TestPointInTimeRestoreWalDir";
+  private static final int WAIT_FOR_REPLICATION_MS = 30_000;
+  static Path backupWalDir;
+  static FileSystem fs;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    Path root = TEST_UTIL.getDataTestDirOnTestFS();
+    backupWalDir = new Path(root, backupWalDirName);
+    fs = FileSystem.get(conf1);
+    fs.mkdirs(backupWalDir);
+    conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString());
+
+    setUpBackups();
+  }
+
+  /**
+   * Sets up multiple backups at different timestamps by: 1. Adjusting the 
system time to simulate
+   * past backup points. 2. Loading data into tables to create meaningful 
snapshots. 3. Running full
+   * backups with or without continuous backup enabled. 4. Ensuring 
replication is complete before
+   * proceeding.
+   */
+  private static void setUpBackups() throws Exception {
+    // Simulate a backup taken 20 days ago
+    EnvironmentEdgeManager
+      .injectEdge(() -> System.currentTimeMillis() - 20 * 
ONE_DAY_IN_MILLISECONDS);
+    loadRandomData(table1, 1000); // Insert initial data into table1
+
+    // Perform a full backup for table1 with continuous backup enabled
+    String[] args = buildBackupArgs("full", new TableName[] { table1 }, true);
+    int ret = ToolRunner.run(conf1, new BackupDriver(), args);
+    assertEquals("Backup should succeed", 0, ret);
+
+    // Move time forward to simulate 15 days ago
+    EnvironmentEdgeManager
+      .injectEdge(() -> System.currentTimeMillis() - 15 * 
ONE_DAY_IN_MILLISECONDS);
+    loadRandomData(table1, 1000); // Add more data to table1
+    loadRandomData(table2, 500); // Insert data into table2
+
+    waitForReplication(); // Ensure replication is complete
+
+    // Perform a full backup for table2 with continuous backup enabled
+    args = buildBackupArgs("full", new TableName[] { table2 }, true);
+    ret = ToolRunner.run(conf1, new BackupDriver(), args);
+    assertEquals("Backup should succeed", 0, ret);
+
+    // Move time forward to simulate 10 days ago
+    EnvironmentEdgeManager
+      .injectEdge(() -> System.currentTimeMillis() - 10 * 
ONE_DAY_IN_MILLISECONDS);
+    loadRandomData(table2, 500); // Add more data to table2
+    loadRandomData(table3, 500); // Insert data into table3
+
+    // Perform a full backup for table3 and table4 (without continuous backup)
+    args = buildBackupArgs("full", new TableName[] { table3, table4 }, false);
+    ret = ToolRunner.run(conf1, new BackupDriver(), args);
+    assertEquals("Backup should succeed", 0, ret);
+
+    waitForReplication(); // Ensure replication is complete before concluding 
setup
+
+    // Reset time mocking to avoid affecting other tests
+    EnvironmentEdgeManager.reset();
+  }
+
+  @AfterClass
+  public static void setupAfterClass() throws IOException {
+    Path root = TEST_UTIL.getDataTestDirOnTestFS();
+    Path backupWalDir = new Path(root, backupWalDirName);
+    FileSystem fs = FileSystem.get(conf1);
+
+    if (fs.exists(backupWalDir)) {
+      fs.delete(backupWalDir, true);
+    }
+
+    conf1.unset(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+  }
+
+  /**
+   * Verifies that PITR (Point-in-Time Restore) fails when the requested 
restore time is either in
+   * the future or outside the allowed retention window.
+   */
+  @Test
+  public void testPITR_FailsOutsideWindow() throws Exception {
+    // Case 1: Requested restore time is in the future (should fail)
+    String[] args = buildPITRArgs(new TableName[] { table1 },
+      new TableName[] { TableName.valueOf("restoredTable1") },
+      EnvironmentEdgeManager.currentTime() + ONE_DAY_IN_MILLISECONDS);
+
+    int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+    assertNotEquals("Restore should fail since the requested restore time is 
in the future", 0,
+      ret);
+
+    // Case 2: Requested restore time is too old (beyond the retention window, 
should fail)
+    args = buildPITRArgs(new TableName[] { table1 },
+      new TableName[] { TableName.valueOf("restoredTable1") },
+      EnvironmentEdgeManager.currentTime() - 40 * ONE_DAY_IN_MILLISECONDS);
+
+    ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+    assertNotEquals(
+      "Restore should fail since the requested restore time is outside the 
retention window", 0,
+      ret);
+  }
+
+  /**
+   * Ensures that PITR fails when attempting to restore tables where 
continuous backup was not
+   * enabled.
+   */
+  @Test
+  public void testPointInTimeRestore_ContinuousBackupNotEnabledTables() throws 
Exception {
+    String[] args = buildPITRArgs(new TableName[] { table3 },
+      new TableName[] { TableName.valueOf("restoredTable1") },
+      EnvironmentEdgeManager.currentTime() - 10 * ONE_DAY_IN_MILLISECONDS);
+
+    int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+    assertNotEquals("Restore should fail since continuous backup is not 
enabled for the table", 0,
+      ret);
+  }
+
+  /**
+   * Ensures that PITR fails when trying to restore from a point before 
continuous backup started.
+   */
+  @Test
+  public void testPointInTimeRestore_TablesWithNoProperBackupOrWals() throws 
Exception {
+    String[] args = buildPITRArgs(new TableName[] { table2 },
+      new TableName[] { TableName.valueOf("restoredTable1") },
+      EnvironmentEdgeManager.currentTime() - 16 * ONE_DAY_IN_MILLISECONDS);
+
+    int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+    assertNotEquals(
+      "Restore should fail since the requested restore point is before the 
start of continuous backup",
+      0, ret);
+  }
+
+  /**
+   * Verifies that PITR successfully restores data for a single table.
+   */
+  @Test
+  public void testPointInTimeRestore_SuccessfulRestoreForOneTable() throws 
Exception {
+    TableName restoredTable = TableName.valueOf("restoredTable");
+
+    // Perform restore operation
+    String[] args = buildPITRArgs(new TableName[] { table1 }, new TableName[] 
{ restoredTable },
+      EnvironmentEdgeManager.currentTime() - 5 * ONE_DAY_IN_MILLISECONDS);
+
+    int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+    assertEquals("Restore should succeed", 0, ret);
+
+    // Validate that the restored table contains the same number of rows as 
the original table
+    assertEquals("Restored table should have the same row count as the 
original",
+      getRowCount(table1), getRowCount(restoredTable));
+  }
+
+  /**
+   * Verifies that PITR successfully restores multiple tables at once.
+   */
+  @Test
+  public void testPointInTimeRestore_SuccessfulRestoreForMultipleTables() 
throws Exception {
+    TableName restoredTable1 = TableName.valueOf("restoredTable1");
+    TableName restoredTable2 = TableName.valueOf("restoredTable2");
+
+    // Perform restore operation for multiple tables
+    String[] args = buildPITRArgs(new TableName[] { table1, table2 },
+      new TableName[] { restoredTable1, restoredTable2 },
+      EnvironmentEdgeManager.currentTime() - 5 * ONE_DAY_IN_MILLISECONDS);
+
+    int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+    assertEquals("Restore should succeed", 0, ret);
+
+    // Validate that the restored tables contain the same number of rows as 
the originals
+    assertEquals("Restored table1 should have the same row count as the 
original",
+      getRowCount(table1), getRowCount(restoredTable1));
+    assertEquals("Restored table2 should have the same row count as the 
original",
+      getRowCount(table2), getRowCount(restoredTable2));
+  }
+
+  private String[] buildPITRArgs(TableName[] sourceTables, TableName[] 
targetTables, long endTime) {
+    String sourceTableNames =
+      
Arrays.stream(sourceTables).map(TableName::getNameAsString).collect(Collectors.joining(","));
+
+    String targetTableNames =
+      
Arrays.stream(targetTables).map(TableName::getNameAsString).collect(Collectors.joining(","));
+
+    return new String[] { "-" + OPTION_TABLE, sourceTableNames, "-" + 
OPTION_TABLE_MAPPING,
+      targetTableNames, "-" + OPTION_TO_DATETIME, String.valueOf(endTime) };
+  }
+
+  private static String[] buildBackupArgs(String backupType, TableName[] 
tables,
+    boolean continuousEnabled) {
+    String tableNames =
+      
Arrays.stream(tables).map(TableName::getNameAsString).collect(Collectors.joining(","));
+
+    if (continuousEnabled) {
+      return new String[] { "create", backupType, BACKUP_ROOT_DIR, "-" + 
OPTION_TABLE, tableNames,
+        "-" + OPTION_ENABLE_CONTINUOUS_BACKUP };
+    } else {
+      return new String[] { "create", backupType, BACKUP_ROOT_DIR, "-" + 
OPTION_TABLE, tableNames };
+    }
+  }
+
+  private static void loadRandomData(TableName tableName, int totalRows) 
throws IOException {
+    int rowSize = 32;
+    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+      TEST_UTIL.loadRandomRows(table, famName, rowSize, totalRows);
+    }
+  }
+
+  private static void waitForReplication() {
+    LOG.info("Waiting for replication to complete for {} ms", 
WAIT_FOR_REPLICATION_MS);
+    try {
+      Thread.sleep(WAIT_FOR_REPLICATION_MS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Thread was interrupted while waiting", e);
+    }
+  }
+
+  private int getRowCount(TableName tableName) throws IOException {
+    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+      return HBaseTestingUtil.countRows(table);
+    }
+  }
+}


Reply via email to