This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch HBASE-28957
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 7e25a4684e05f30d80966492b4156e44201c92d1
Author: vinayak hegde <vinayakph...@gmail.com>
AuthorDate: Wed Jul 16 22:43:59 2025 +0530

    HBASE-29445 Add Option to Specify Custom Backup Location in PITR (#7153)
    
    Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org>
---
 .../backup/impl/AbstractPitrRestoreHandler.java    | 403 +++++++++++++++++++++
 .../hadoop/hbase/backup/impl/BackupAdminImpl.java  | 348 +-----------------
 .../hbase/backup/impl/BackupImageAdapter.java      |  60 +++
 .../hbase/backup/impl/BackupInfoAdapter.java       |  60 +++
 .../CustomBackupLocationPitrRestoreHandler.java    |  57 +++
 .../backup/impl/DefaultPitrRestoreHandler.java     |  55 +++
 .../hbase/backup/impl/PitrBackupMetadata.java      |  50 +++
 .../apache/hadoop/hbase/backup/PITRTestUtil.java   | 107 ++++++
 .../hbase/backup/TestPointInTimeRestore.java       | 117 ++----
 ...TestPointInTimeRestoreWithCustomBackupPath.java | 121 +++++++
 10 files changed, 962 insertions(+), 416 deletions(-)

diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
new file mode 100644
index 00000000000..b2edce6b0fd
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
+import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest;
+import org.apache.hadoop.hbase.backup.RestoreRequest;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.Tool;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class for handling Point-In-Time Restore (PITR).
+ * <p>
+ * Defines the common PITR algorithm using the Template Method Pattern. 
Subclasses provide the
+ * metadata source (e.g., backup system table or a custom backup location).
+ * <p>
+ * The PITR flow includes:
+ * <ul>
+ * <li>Validating recovery time within the PITR window</li>
+ * <li>Checking for continuous backup and valid backup availability</li>
+ * <li>Restoring the backup</li>
+ * <li>Replaying WALs to bring tables to the target state</li>
+ * </ul>
+ * <p>
+ * Subclasses must implement {@link 
#getBackupMetadata(PointInTimeRestoreRequest)} to supply the
+ * list of completed backups.
+ */
+@InterfaceAudience.Private
+public abstract class AbstractPitrRestoreHandler {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractPitrRestoreHandler.class);
+
+  protected final Connection conn;
+  protected final BackupAdminImpl backupAdmin;
+  protected final PointInTimeRestoreRequest request;
+
+  AbstractPitrRestoreHandler(Connection conn, PointInTimeRestoreRequest 
request) {
+    this.conn = conn;
+    this.backupAdmin = new BackupAdminImpl(conn);
+    this.request = request;
+  }
+
+  /**
+   * Validates the PITR request and performs the restore if valid. This is the 
main entry point for
+   * the PITR process and should be called by clients.
+   */
+  public final void validateAndRestore() throws IOException {
+    long endTime = request.getToDateTime();
+    validateRequestToTime(endTime);
+
+    TableName[] sourceTableArray = request.getFromTables();
+    TableName[] targetTableArray = resolveTargetTables(sourceTableArray, 
request.getToTables());
+
+    // Validate PITR requirements
+    validatePitr(endTime, sourceTableArray, targetTableArray);
+
+    // If only validation is required, log and return
+    if (request.isCheck()) {
+      LOG.info("PITR can be successfully executed");
+      return;
+    }
+
+    // Execute PITR process
+    try (BackupSystemTable table = new BackupSystemTable(conn)) {
+      Map<TableName, Long> continuousBackupTables = 
table.getContinuousBackupTableSet();
+      List<PitrBackupMetadata> backupMetadataList = getBackupMetadata(request);
+
+      for (int i = 0; i < sourceTableArray.length; i++) {
+        restoreTableWithWalReplay(sourceTableArray[i], targetTableArray[i], 
endTime,
+          continuousBackupTables, backupMetadataList, request);
+      }
+    }
+  }
+
+  /**
+   * Validates whether the requested end time falls within the allowed PITR 
recovery window.
+   * @param endTime The target recovery time.
+   * @throws IOException If the requested recovery time is outside the allowed 
window.
+   */
+  private void validateRequestToTime(long endTime) throws IOException {
+    long pitrWindowDays = 
conn.getConfiguration().getLong(CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS,
+      DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS);
+    long currentTime = EnvironmentEdgeManager.getDelegate().currentTime();
+    long pitrMaxStartTime = currentTime - 
TimeUnit.DAYS.toMillis(pitrWindowDays);
+
+    if (endTime < pitrMaxStartTime) {
+      String errorMsg = String.format(
+        "Requested recovery time (%d) is out of the allowed PITR window (last 
%d days).", endTime,
+        pitrWindowDays);
+      LOG.error(errorMsg);
+      throw new IOException(errorMsg);
+    }
+
+    if (endTime > currentTime) {
+      String errorMsg = String.format(
+        "Requested recovery time (%d) is in the future. Current time: %d.", 
endTime, currentTime);
+      LOG.error(errorMsg);
+      throw new IOException(errorMsg);
+    }
+  }
+
+  /**
+   * Resolves the target table array. If null or empty, defaults to the source 
table array.
+   */
+  private TableName[] resolveTargetTables(TableName[] sourceTables, 
TableName[] targetTables) {
+    return (targetTables == null || targetTables.length == 0) ? sourceTables : 
targetTables;
+  }
+
+  /**
+   * Validates whether Point-In-Time Recovery (PITR) is possible for the given 
tables at the
+   * specified time.
+   * <p>
+   * PITR requires:
+   * <ul>
+   * <li>Continuous backup to be enabled for the source tables.</li>
+   * <li>A valid backup image and corresponding WALs to be available.</li>
+   * </ul>
+   * @param endTime     The target recovery time.
+   * @param sTableArray The source tables to restore.
+   * @param tTableArray The target tables where the restore will be performed.
+   * @throws IOException If PITR is not possible due to missing continuous 
backup or backup images.
+   */
+  private void validatePitr(long endTime, TableName[] sTableArray, TableName[] 
tTableArray)
+    throws IOException {
+    try (BackupSystemTable table = new BackupSystemTable(conn)) {
+      // Retrieve the set of tables with continuous backup enabled
+      Map<TableName, Long> continuousBackupTables = 
table.getContinuousBackupTableSet();
+
+      // Ensure all source tables have continuous backup enabled
+      validateContinuousBackup(sTableArray, continuousBackupTables);
+
+      // Fetch completed backup information
+      List<PitrBackupMetadata> backupMetadataList = getBackupMetadata(request);
+
+      // Ensure a valid backup and WALs exist for PITR
+      validateBackupAvailability(sTableArray, tTableArray, endTime, 
continuousBackupTables,
+        backupMetadataList);
+    }
+  }
+
+  /**
+   * Ensures that all source tables have continuous backup enabled.
+   */
+  private void validateContinuousBackup(TableName[] tables,
+    Map<TableName, Long> continuousBackupTables) throws IOException {
+    List<TableName> missingTables =
+      Arrays.stream(tables).filter(table -> 
!continuousBackupTables.containsKey(table)).toList();
+
+    if (!missingTables.isEmpty()) {
+      String errorMsg = "Continuous Backup is not enabled for the following 
tables: "
+        + 
missingTables.stream().map(TableName::getNameAsString).collect(Collectors.joining(",
 "));
+      LOG.error(errorMsg);
+      throw new IOException(errorMsg);
+    }
+  }
+
+  /**
+   * Ensures that a valid backup and corresponding WALs exist for PITR for 
each source table. PITR
+   * requires: 1. A valid backup available before the end time. 2. Write-Ahead 
Logs (WALs) covering
+   * the remaining duration up to the end time.
+   */
+  private void validateBackupAvailability(TableName[] sTableArray, TableName[] 
tTableArray,
+    long endTime, Map<TableName, Long> continuousBackupTables, 
List<PitrBackupMetadata> backups)
+    throws IOException {
+    for (int i = 0; i < sTableArray.length; i++) {
+      if (
+        !canPerformPitr(sTableArray[i], tTableArray[i], endTime, 
continuousBackupTables, backups)
+      ) {
+        String errorMsg = String.format(
+          "PITR failed: No valid backup/WALs found for source table %s 
(target: %s) before time %d",
+          sTableArray[i].getNameAsString(), tTableArray[i].getNameAsString(), 
endTime);
+        LOG.error(errorMsg);
+        throw new IOException(errorMsg);
+      }
+    }
+  }
+
+  /**
+   * Checks whether PITR can be performed for a given source-target table pair.
+   */
+  private boolean canPerformPitr(TableName stableName, TableName tTableName, 
long endTime,
+    Map<TableName, Long> continuousBackupTables, List<PitrBackupMetadata> 
backups) {
+    return getValidBackup(stableName, tTableName, endTime, 
continuousBackupTables, backups) != null;
+  }
+
+  /**
+   * Finds and returns the first valid backup metadata entry that can be used 
to restore the given
+   * source table up to the specified end time. A backup is considered valid 
if:
+   * <ul>
+   * <li>It contains the source table</li>
+   * <li>It was completed before the requested end time</li>
+   * <li>Its start time is after the table's continuous backup start time</li>
+   * <li>It passes the restore request validation</li>
+   * </ul>
+   */
+  private PitrBackupMetadata getValidBackup(TableName sTableName, TableName 
tTablename,
+    long endTime, Map<TableName, Long> continuousBackupTables, 
List<PitrBackupMetadata> backups) {
+    for (PitrBackupMetadata backup : backups) {
+      if (isValidBackupForPitr(backup, sTableName, endTime, 
continuousBackupTables)) {
+
+        RestoreRequest restoreRequest =
+          BackupUtils.createRestoreRequest(backup.getRootDir(), 
backup.getBackupId(), true,
+            new TableName[] { sTableName }, new TableName[] { tTablename }, 
false);
+
+        try {
+          if (backupAdmin.validateRequest(restoreRequest)) {
+            return backup;
+          }
+        } catch (IOException e) {
+          LOG.warn("Exception occurred while testing the backup : {} for 
restore ",
+            backup.getBackupId(), e);
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Determines if the given backup is valid for PITR.
+   * <p>
+   * A backup is valid if:
+   * <ul>
+   * <li>It contains the source table.</li>
+   * <li>It was completed before the end time.</li>
+   * <li>The start timestamp of the backup is after the continuous backup 
start time for the
+   * table.</li>
+   * </ul>
+   * @param backupMetadata         Backup information object.
+   * @param tableName              Table to check.
+   * @param endTime                The target recovery time.
+   * @param continuousBackupTables Map of tables with continuous backup 
enabled.
+   * @return true if the backup is valid for PITR, false otherwise.
+   */
+  private boolean isValidBackupForPitr(PitrBackupMetadata backupMetadata, 
TableName tableName,
+    long endTime, Map<TableName, Long> continuousBackupTables) {
+    return backupMetadata.getTableNames().contains(tableName)
+      && backupMetadata.getCompleteTs() <= endTime
+      && continuousBackupTables.getOrDefault(tableName, 0L) <= 
backupMetadata.getStartTs();
+  }
+
+  /**
+   * Restores the table using the selected backup and replays WALs from the 
backup start time to the
+   * requested end time.
+   * @throws IOException if no valid backup is found or WAL replay fails
+   */
+  private void restoreTableWithWalReplay(TableName sourceTable, TableName 
targetTable, long endTime,
+    Map<TableName, Long> continuousBackupTables, List<PitrBackupMetadata> 
backupMetadataList,
+    PointInTimeRestoreRequest request) throws IOException {
+    PitrBackupMetadata backupMetadata =
+      getValidBackup(sourceTable, targetTable, endTime, 
continuousBackupTables, backupMetadataList);
+    if (backupMetadata == null) {
+      String errorMsg = "Could not find a valid backup and WALs for PITR for 
table: "
+        + sourceTable.getNameAsString();
+      LOG.error(errorMsg);
+      throw new IOException(errorMsg);
+    }
+
+    RestoreRequest restoreRequest = 
BackupUtils.createRestoreRequest(backupMetadata.getRootDir(),
+      backupMetadata.getBackupId(), false, new TableName[] { sourceTable },
+      new TableName[] { targetTable }, request.isOverwrite());
+
+    backupAdmin.restore(restoreRequest);
+    replayWal(sourceTable, targetTable, backupMetadata.getStartTs(), endTime);
+  }
+
+  /**
+   * Replays WALs to bring the table to the desired state.
+   */
+  private void replayWal(TableName sourceTable, TableName targetTable, long 
startTime, long endTime)
+    throws IOException {
+    String walBackupDir = 
conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+    Path walDirPath = new Path(walBackupDir);
+    LOG.info(
+      "Starting WAL replay for source: {}, target: {}, time range: {} - {}, 
WAL backup dir: {}",
+      sourceTable, targetTable, startTime, endTime, walDirPath);
+
+    List<String> validDirs =
+      getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime);
+    if (validDirs.isEmpty()) {
+      LOG.warn("No valid WAL directories found for range {} - {}. Skipping WAL 
replay.", startTime,
+        endTime);
+      return;
+    }
+
+    executeWalReplay(validDirs, sourceTable, targetTable, startTime, endTime);
+  }
+
+  /**
+   * Fetches valid WAL directories based on the given time range.
+   */
+  private List<String> getValidWalDirs(Configuration conf, Path walBackupDir, 
long startTime,
+    long endTime) throws IOException {
+    FileSystem backupFs = FileSystem.get(walBackupDir.toUri(), conf);
+    FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, 
WALS_DIR));
+
+    List<String> validDirs = new ArrayList<>();
+    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+
+    for (FileStatus dayDir : dayDirs) {
+      if (!dayDir.isDirectory()) {
+        continue; // Skip files, only process directories
+      }
+
+      String dirName = dayDir.getPath().getName();
+      try {
+        Date dirDate = dateFormat.parse(dirName);
+        long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00)
+        long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End 
time of day (23:59:59)
+
+        // Check if this day's WAL files overlap with the required time range
+        if (dirEndTime >= startTime && dirStartTime <= endTime) {
+          validDirs.add(dayDir.getPath().toString());
+        }
+      } catch (ParseException e) {
+        LOG.warn("Skipping invalid directory name: " + dirName, e);
+      }
+    }
+    return validDirs;
+  }
+
+  /**
+   * Executes WAL replay using WALPlayer.
+   */
+  private void executeWalReplay(List<String> walDirs, TableName sourceTable, 
TableName targetTable,
+    long startTime, long endTime) throws IOException {
+    Tool walPlayer = initializeWalPlayer(startTime, endTime);
+    String[] args =
+      { String.join(",", walDirs), sourceTable.getNameAsString(), 
targetTable.getNameAsString() };
+
+    try {
+      LOG.info("Executing WALPlayer with args: {}", Arrays.toString(args));
+      int exitCode = walPlayer.run(args);
+      if (exitCode == 0) {
+        LOG.info("WAL replay completed successfully for {}", targetTable);
+      } else {
+        throw new IOException("WAL replay failed with exit code: " + exitCode);
+      }
+    } catch (Exception e) {
+      LOG.error("Error during WAL replay for {}: {}", targetTable, 
e.getMessage(), e);
+      throw new IOException("Exception during WAL replay", e);
+    }
+  }
+
+  /**
+   * Initializes and configures WALPlayer.
+   */
+  private Tool initializeWalPlayer(long startTime, long endTime) {
+    Configuration conf = HBaseConfiguration.create(conn.getConfiguration());
+    conf.setLong(WALInputFormat.START_TIME_KEY, startTime);
+    conf.setLong(WALInputFormat.END_TIME_KEY, endTime);
+    conf.setBoolean(IGNORE_EMPTY_FILES, true);
+    Tool walPlayer = new WALPlayer();
+    walPlayer.setConf(conf);
+    return walPlayer;
+  }
+
+  protected abstract List<PitrBackupMetadata> 
getBackupMetadata(PointInTimeRestoreRequest request)
+    throws IOException;
+}
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
index e82d9804f9d..75a2a6343a5 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
@@ -17,33 +17,18 @@
  */
 package org.apache.hadoop.hbase.backup.impl;
 
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
-import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
-import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
-import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
-import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES;
-
 import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.BackupAdmin;
 import org.apache.hadoop.hbase.backup.BackupClientFactory;
@@ -61,10 +46,7 @@ import org.apache.hadoop.hbase.backup.util.BackupSet;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
-import org.apache.hadoop.hbase.mapreduce.WALPlayer;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.util.Tool;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -521,7 +503,7 @@ public class BackupAdminImpl implements BackupAdmin {
     new RestoreTablesClient(conn, request).execute();
   }
 
-  private boolean validateRequest(RestoreRequest request) throws IOException {
+  public boolean validateRequest(RestoreRequest request) throws IOException {
     // check and load backup image manifest for the tables
     Path rootPath = new Path(request.getBackupRootDir());
     String backupId = request.getBackupId();
@@ -533,324 +515,28 @@ public class BackupAdminImpl implements BackupAdmin {
     return BackupUtils.validate(Arrays.asList(sTableArray), manifest, 
conn.getConfiguration());
   }
 
-  @Override
-  public void pointInTimeRestore(PointInTimeRestoreRequest request) throws 
IOException {
-    if (request.getBackupRootDir() == null) {
-      defaultPointInTimeRestore(request);
-    } else {
-      // TODO: special case, not supported at the moment
-      throw new IOException("Custom backup location for Point-In-Time Recovery 
Not supported!");
-    }
-    LOG.info("Successfully completed Point In Time Restore for all tables.");
-  }
-
-  /**
-   * Performs a default Point-In-Time Restore (PITR) by restoring the latest 
valid backup and
-   * replaying the WALs to bring the table to the desired state. PITR 
requires: 1. A valid backup
-   * available before the end time. 2. Write-Ahead Logs (WALs) covering the 
remaining duration up to
-   * the end time.
-   * @param request PointInTimeRestoreRequest containing restore parameters.
-   * @throws IOException If no valid backup or WALs are found, or if an error 
occurs during
-   *                     restoration.
-   */
-  private void defaultPointInTimeRestore(PointInTimeRestoreRequest request) 
throws IOException {
-    long endTime = request.getToDateTime();
-    validateRequestToTime(endTime);
-
-    TableName[] sTableArray = request.getFromTables();
-    TableName[] tTableArray = resolveTargetTables(sTableArray, 
request.getToTables());
-
-    // Validate PITR requirements
-    validatePitr(endTime, sTableArray, tTableArray);
-
-    // If only validation is required, log and return
-    if (request.isCheck()) {
-      LOG.info("PITR can be successfully executed");
-      return;
-    }
-
-    // Execute PITR process
-    try (BackupSystemTable table = new BackupSystemTable(conn)) {
-      Map<TableName, Long> continuousBackupTables = 
table.getContinuousBackupTableSet();
-      List<BackupInfo> backupInfos = 
table.getBackupInfos(BackupState.COMPLETE);
-
-      for (int i = 0; i < sTableArray.length; i++) {
-        restoreTableWithWalReplay(sTableArray[i], tTableArray[i], endTime, 
continuousBackupTables,
-          backupInfos, request);
-      }
-    }
-  }
-
-  /**
-   * Validates whether the requested end time falls within the allowed PITR 
recovery window.
-   * @param endTime The target recovery time.
-   * @throws IOException If the requested recovery time is outside the allowed 
window.
-   */
-  private void validateRequestToTime(long endTime) throws IOException {
-    long pitrWindowDays = 
conn.getConfiguration().getLong(CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS,
-      DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS);
-    long currentTime = EnvironmentEdgeManager.getDelegate().currentTime();
-    long pitrMaxStartTime = currentTime - 
TimeUnit.DAYS.toMillis(pitrWindowDays);
-
-    if (endTime < pitrMaxStartTime) {
-      String errorMsg = String.format(
-        "Requested recovery time (%d) is out of the allowed PITR window (last 
%d days).", endTime,
-        pitrWindowDays);
-      LOG.error(errorMsg);
-      throw new IOException(errorMsg);
-    }
-
-    if (endTime > currentTime) {
-      String errorMsg = String.format(
-        "Requested recovery time (%d) is in the future. Current time: %d.", 
endTime, currentTime);
-      LOG.error(errorMsg);
-      throw new IOException(errorMsg);
-    }
-  }
-
-  /**
-   * Resolves the target table array. If null or empty, defaults to the source 
table array.
-   */
-  private TableName[] resolveTargetTables(TableName[] sourceTables, 
TableName[] targetTables) {
-    return (targetTables == null || targetTables.length == 0) ? sourceTables : 
targetTables;
-  }
-
   /**
-   * Validates whether Point-In-Time Recovery (PITR) is possible for the given 
tables at the
-   * specified time.
+   * Initiates Point-In-Time Restore (PITR) for the given request.
    * <p>
-   * PITR requires:
-   * <ul>
-   * <li>Continuous backup to be enabled for the source tables.</li>
-   * <li>A valid backup image and corresponding WALs to be available.</li>
-   * </ul>
-   * @param endTime     The target recovery time.
-   * @param sTableArray The source tables to restore.
-   * @param tTableArray The target tables where the restore will be performed.
-   * @throws IOException If PITR is not possible due to missing continuous 
backup or backup images.
-   */
-  private void validatePitr(long endTime, TableName[] sTableArray, TableName[] 
tTableArray)
-    throws IOException {
-    try (BackupSystemTable table = new BackupSystemTable(conn)) {
-      // Retrieve the set of tables with continuous backup enabled
-      Map<TableName, Long> continuousBackupTables = 
table.getContinuousBackupTableSet();
-
-      // Ensure all source tables have continuous backup enabled
-      validateContinuousBackup(sTableArray, continuousBackupTables);
-
-      // Fetch completed backup information
-      List<BackupInfo> backupInfos = 
table.getBackupInfos(BackupState.COMPLETE);
-
-      // Ensure a valid backup and WALs exist for PITR
-      validateBackupAvailability(sTableArray, tTableArray, endTime, 
continuousBackupTables,
-        backupInfos);
-    }
-  }
-
-  /**
-   * Ensures that all source tables have continuous backup enabled.
-   */
-  private void validateContinuousBackup(TableName[] tables,
-    Map<TableName, Long> continuousBackupTables) throws IOException {
-    List<TableName> missingTables =
-      Arrays.stream(tables).filter(table -> 
!continuousBackupTables.containsKey(table)).toList();
-
-    if (!missingTables.isEmpty()) {
-      String errorMsg = "Continuous Backup is not enabled for the following 
tables: "
-        + 
missingTables.stream().map(TableName::getNameAsString).collect(Collectors.joining(",
 "));
-      LOG.error(errorMsg);
-      throw new IOException(errorMsg);
-    }
-  }
-
-  /**
-   * Ensures that a valid backup and corresponding WALs exist for PITR for 
each source table. PITR
-   * requires: 1. A valid backup available before the end time. 2. Write-Ahead 
Logs (WALs) covering
-   * the remaining duration up to the end time.
+   * If {@code backupRootDir} is specified in the request, performs PITR using 
metadata from the
+   * provided custom backup location. Otherwise, defaults to using metadata 
from the backup system
+   * table.
+   * @param request PointInTimeRestoreRequest containing PITR parameters.
+   * @throws IOException if validation fails or restore cannot be completed.
    */
-  private void validateBackupAvailability(TableName[] sTableArray, TableName[] 
tTableArray,
-    long endTime, Map<TableName, Long> continuousBackupTables, 
List<BackupInfo> backupInfos)
-    throws IOException {
-    for (int i = 0; i < sTableArray.length; i++) {
-      if (
-        !canPerformPitr(sTableArray[i], tTableArray[i], endTime, 
continuousBackupTables,
-          backupInfos)
-      ) {
-        String errorMsg = "Could not find a valid backup and WALs for PITR for 
table: "
-          + sTableArray[i].getNameAsString();
-        LOG.error(errorMsg);
-        throw new IOException(errorMsg);
-      }
-    }
-  }
-
-  /**
-   * Checks whether PITR can be performed for a given source-target table pair.
-   */
-  private boolean canPerformPitr(TableName stableName, TableName tTableName, 
long endTime,
-    Map<TableName, Long> continuousBackupTables, List<BackupInfo> backupInfos) 
{
-    return getValidBackupInfo(stableName, tTableName, endTime, 
continuousBackupTables, backupInfos)
-        != null;
-  }
-
-  /**
-   * Finds a valid backup for PITR that meets the required conditions.
-   */
-  private BackupInfo getValidBackupInfo(TableName sTableName, TableName 
tTablename, long endTime,
-    Map<TableName, Long> continuousBackupTables, List<BackupInfo> backupInfos) 
{
-    for (BackupInfo info : backupInfos) {
-      if (isValidBackupForPitr(info, sTableName, endTime, 
continuousBackupTables)) {
-
-        RestoreRequest restoreRequest =
-          BackupUtils.createRestoreRequest(info.getBackupRootDir(), 
info.getBackupId(), true,
-            new TableName[] { sTableName }, new TableName[] { tTablename }, 
false);
-
-        try {
-          if (validateRequest(restoreRequest)) {
-            return info;
-          }
-        } catch (IOException e) {
-          LOG.warn("Exception occurred while testing the backup : {} for 
restore ", info, e);
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Determines if the given backup is valid for PITR.
-   * <p>
-   * A backup is valid if:
-   * <ul>
-   * <li>It contains the source table.</li>
-   * <li>It was completed before the end time.</li>
-   * <li>The start timestamp of the backup is after the continuous backup 
start time for the
-   * table.</li>
-   * </ul>
-   * @param info                   Backup information object.
-   * @param tableName              Table to check.
-   * @param endTime                The target recovery time.
-   * @param continuousBackupTables Map of tables with continuous backup 
enabled.
-   * @return true if the backup is valid for PITR, false otherwise.
-   */
-  private boolean isValidBackupForPitr(BackupInfo info, TableName tableName, 
long endTime,
-    Map<TableName, Long> continuousBackupTables) {
-    return info.getTableNames().contains(tableName) && info.getCompleteTs() <= 
endTime
-      && continuousBackupTables.getOrDefault(tableName, 0L) <= 
info.getStartTs();
-  }
-
-  /**
-   * Restores a table from a valid backup and replays WALs to reach the 
desired PITR state.
-   */
-  private void restoreTableWithWalReplay(TableName sourceTable, TableName 
targetTable, long endTime,
-    Map<TableName, Long> continuousBackupTables, List<BackupInfo> backupInfos,
-    PointInTimeRestoreRequest request) throws IOException {
-    BackupInfo backupInfo =
-      getValidBackupInfo(sourceTable, targetTable, endTime, 
continuousBackupTables, backupInfos);
-    if (backupInfo == null) {
-      String errorMsg = "Could not find a valid backup and WALs for PITR for 
table: "
-        + sourceTable.getNameAsString();
-      LOG.error(errorMsg);
-      throw new IOException(errorMsg);
-    }
-
-    RestoreRequest restoreRequest = 
BackupUtils.createRestoreRequest(backupInfo.getBackupRootDir(),
-      backupInfo.getBackupId(), false, new TableName[] { sourceTable },
-      new TableName[] { targetTable }, request.isOverwrite());
-
-    restore(restoreRequest);
-    replayWal(sourceTable, targetTable, backupInfo.getStartTs(), endTime);
-  }
-
-  /**
-   * Replays WALs to bring the table to the desired state.
-   */
-  private void replayWal(TableName sourceTable, TableName targetTable, long 
startTime, long endTime)
-    throws IOException {
-    String walBackupDir = 
conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
-    Path walDirPath = new Path(walBackupDir);
-    LOG.info(
-      "Starting WAL replay for source: {}, target: {}, time range: {} - {}, 
WAL backup dir: {}",
-      sourceTable, targetTable, startTime, endTime, walDirPath);
-
-    List<String> validDirs =
-      getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime);
-    if (validDirs.isEmpty()) {
-      LOG.warn("No valid WAL directories found for range {} - {}. Skipping WAL 
replay.", startTime,
-        endTime);
-      return;
-    }
-
-    executeWalReplay(validDirs, sourceTable, targetTable, startTime, endTime);
-  }
-
-  /**
-   * Fetches valid WAL directories based on the given time range.
-   */
-  private List<String> getValidWalDirs(Configuration conf, Path walBackupDir, 
long startTime,
-    long endTime) throws IOException {
-    FileSystem backupFs = FileSystem.get(walBackupDir.toUri(), conf);
-    FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, 
WALS_DIR));
-
-    List<String> validDirs = new ArrayList<>();
-    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
-
-    for (FileStatus dayDir : dayDirs) {
-      if (!dayDir.isDirectory()) {
-        continue; // Skip files, only process directories
-      }
-
-      String dirName = dayDir.getPath().getName();
-      try {
-        Date dirDate = dateFormat.parse(dirName);
-        long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00)
-        long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End 
time of day (23:59:59)
-
-        // Check if this day's WAL files overlap with the required time range
-        if (dirEndTime >= startTime && dirStartTime <= endTime) {
-          validDirs.add(dayDir.getPath().toString());
-        }
-      } catch (ParseException e) {
-        LOG.warn("Skipping invalid directory name: " + dirName, e);
-      }
-    }
-    return validDirs;
-  }
-
-  /**
-   * Executes WAL replay using WALPlayer.
-   */
-  private void executeWalReplay(List<String> walDirs, TableName sourceTable, 
TableName targetTable,
-    long startTime, long endTime) throws IOException {
-    Tool walPlayer = initializeWalPlayer(startTime, endTime);
-    String[] args =
-      { String.join(",", walDirs), sourceTable.getNameAsString(), 
targetTable.getNameAsString() };
+  @Override
+  public void pointInTimeRestore(PointInTimeRestoreRequest request) throws 
IOException {
+    AbstractPitrRestoreHandler handler;
 
-    try {
-      LOG.info("Executing WALPlayer with args: {}", Arrays.toString(args));
-      int exitCode = walPlayer.run(args);
-      if (exitCode == 0) {
-        LOG.info("WAL replay completed successfully for {}", targetTable);
-      } else {
-        throw new IOException("WAL replay failed with exit code: " + exitCode);
-      }
-    } catch (Exception e) {
-      LOG.error("Error during WAL replay for {}: {}", targetTable, 
e.getMessage(), e);
-      throw new IOException("Exception during WAL replay", e);
+    // Choose the appropriate handler based on whether a custom backup 
location is provided
+    if (request.getBackupRootDir() == null) {
+      handler = new DefaultPitrRestoreHandler(conn, request);
+    } else {
+      handler = new CustomBackupLocationPitrRestoreHandler(conn, request);
     }
-  }
+    handler.validateAndRestore();
 
-  /**
-   * Initializes and configures WALPlayer.
-   */
-  private Tool initializeWalPlayer(long startTime, long endTime) {
-    Configuration conf = HBaseConfiguration.create(conn.getConfiguration());
-    conf.setLong(WALInputFormat.START_TIME_KEY, startTime);
-    conf.setLong(WALInputFormat.END_TIME_KEY, endTime);
-    conf.setBoolean(IGNORE_EMPTY_FILES, true);
-    Tool walPlayer = new WALPlayer();
-    walPlayer.setConf(conf);
-    return walPlayer;
+    LOG.info("Successfully completed Point In Time Restore for all tables.");
   }
 
   @Override
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java
new file mode 100644
index 00000000000..8b785a0f050
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.util.List;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Adapter that wraps a {@link BackupImage} to expose it as {@link 
PitrBackupMetadata}.
+ */
+@InterfaceAudience.Private
+public class BackupImageAdapter implements PitrBackupMetadata {
+  private final BackupImage image;
+
+  public BackupImageAdapter(BackupImage image) {
+    this.image = image;
+  }
+
+  @Override
+  public List<TableName> getTableNames() {
+    return image.getTableNames();
+  }
+
+  @Override
+  public long getStartTs() {
+    return image.getStartTs();
+  }
+
+  @Override
+  public long getCompleteTs() {
+    return image.getCompleteTs();
+  }
+
+  @Override
+  public String getBackupId() {
+    return image.getBackupId();
+  }
+
+  @Override
+  public String getRootDir() {
+    return image.getRootDir();
+  }
+}
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java
new file mode 100644
index 00000000000..967fae551cb
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.util.List;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Adapter that wraps a {@link BackupInfo} to expose it as {@link 
PitrBackupMetadata}.
+ */
+@InterfaceAudience.Private
+public class BackupInfoAdapter implements PitrBackupMetadata {
+  private final BackupInfo info;
+
+  public BackupInfoAdapter(BackupInfo info) {
+    this.info = info;
+  }
+
+  @Override
+  public List<TableName> getTableNames() {
+    return info.getTableNames();
+  }
+
+  @Override
+  public long getStartTs() {
+    return info.getStartTs();
+  }
+
+  @Override
+  public long getCompleteTs() {
+    return info.getCompleteTs();
+  }
+
+  @Override
+  public String getBackupId() {
+    return info.getBackupId();
+  }
+
+  @Override
+  public String getRootDir() {
+    return info.getBackupRootDir();
+  }
+}
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/CustomBackupLocationPitrRestoreHandler.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/CustomBackupLocationPitrRestoreHandler.java
new file mode 100644
index 00000000000..1657b68d023
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/CustomBackupLocationPitrRestoreHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * PITR restore handler that retrieves backup metadata from a custom backup 
root directory.
+ * <p>
+ * This implementation is used when the PITR request specifies a custom backup 
location via
+ * {@code backupRootDir}.
+ */
+@InterfaceAudience.Private
+public class CustomBackupLocationPitrRestoreHandler extends 
AbstractPitrRestoreHandler {
+
+  public CustomBackupLocationPitrRestoreHandler(Connection conn,
+    PointInTimeRestoreRequest request) {
+    super(conn, request);
+  }
+
+  /**
+   * Retrieves completed backup entries from the given custom backup root 
directory and converts
+   * them into {@link PitrBackupMetadata} using {@link BackupImageAdapter}.
+   * @param request the PITR request
+   * @return list of completed backup metadata entries from the custom location
+   * @throws IOException if reading from the custom backup directory fails
+   */
+  @Override
+  protected List<PitrBackupMetadata> 
getBackupMetadata(PointInTimeRestoreRequest request)
+    throws IOException {
+    return HBackupFileSystem
+      .getAllBackupImages(conn.getConfiguration(), new 
Path(request.getBackupRootDir())).stream()
+      .map(BackupImageAdapter::new).collect(Collectors.toList());
+  }
+}
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/DefaultPitrRestoreHandler.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/DefaultPitrRestoreHandler.java
new file mode 100644
index 00000000000..c6844ba96bd
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/DefaultPitrRestoreHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Default PITR restore handler that retrieves backup metadata from the system 
table.
+ * <p>
+ * This implementation is used when no custom backup root directory is 
specified in the request.
+ */
+@InterfaceAudience.Private
+public class DefaultPitrRestoreHandler extends AbstractPitrRestoreHandler {
+
+  public DefaultPitrRestoreHandler(Connection conn, PointInTimeRestoreRequest 
request) {
+    super(conn, request);
+  }
+
+  /**
+   * Retrieves completed backup entries from the BackupSystemTable and 
converts them into
+   * {@link PitrBackupMetadata} using {@link BackupInfoAdapter}.
+   * @param request the PITR request
+   * @return list of completed backup metadata entries
+   * @throws IOException if reading from the backup system table fails
+   */
+  @Override
+  protected List<PitrBackupMetadata> 
getBackupMetadata(PointInTimeRestoreRequest request)
+    throws IOException {
+    try (BackupSystemTable table = new BackupSystemTable(conn)) {
+      return table.getBackupInfos(BackupInfo.BackupState.COMPLETE).stream()
+        .map(BackupInfoAdapter::new).collect(Collectors.toList());
+    }
+  }
+}
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java
new file mode 100644
index 00000000000..dc135ce79c0
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.util.List;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A unified abstraction over backup metadata used during Point-In-Time 
Restore (PITR).
+ * <p>
+ * This interface allows the PITR algorithm to operate uniformly over 
different types of backup
+ * metadata sources, such as {@link BackupInfo} (system table) and {@link 
BackupImage} (custom
+ * backup location), without knowing their specific implementations.
+ */
+@InterfaceAudience.Private
+public interface PitrBackupMetadata {
+
+  /** Returns List of table names included in the backup */
+  List<TableName> getTableNames();
+
+  /** Returns Start timestamp of the backup */
+  long getStartTs();
+
+  /** Returns Completion timestamp of the backup */
+  long getCompleteTs();
+
+  /** Returns Unique identifier for the backup */
+  String getBackupId();
+
+  /** Returns Root directory where the backup is stored */
+  String getRootDir();
+}
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/PITRTestUtil.java 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/PITRTestUtil.java
new file mode 100644
index 00000000000..ae26cf96050
--- /dev/null
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/PITRTestUtil.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_PITR_BACKUP_PATH;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DATETIME;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public final class PITRTestUtil {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PITRTestUtil.class);
+  private static final int DEFAULT_WAIT_FOR_REPLICATION_MS = 30_000;
+
+  private PITRTestUtil() {
+    // Utility class
+  }
+
+  public static String[] buildPITRArgs(TableName[] sourceTables, TableName[] 
targetTables,
+    long endTime, String backupRootDir) {
+    String sourceTableNames =
+      
Arrays.stream(sourceTables).map(TableName::getNameAsString).collect(Collectors.joining(","));
+    String targetTableNames =
+      
Arrays.stream(targetTables).map(TableName::getNameAsString).collect(Collectors.joining(","));
+
+    List<String> args = new ArrayList<>();
+    args.add("-" + OPTION_TABLE);
+    args.add(sourceTableNames);
+    args.add("-" + OPTION_TABLE_MAPPING);
+    args.add(targetTableNames);
+    args.add("-" + OPTION_TO_DATETIME);
+    args.add(String.valueOf(endTime));
+
+    if (backupRootDir != null) {
+      args.add("-" + OPTION_PITR_BACKUP_PATH);
+      args.add(backupRootDir);
+    }
+
+    return args.toArray(new String[0]);
+  }
+
+  public static String[] buildBackupArgs(String backupType, TableName[] tables,
+    boolean continuousEnabled, String backupRootDir) {
+    String tableNames =
+      
Arrays.stream(tables).map(TableName::getNameAsString).collect(Collectors.joining(","));
+
+    List<String> args = new ArrayList<>(
+      Arrays.asList("create", backupType, backupRootDir, "-" + OPTION_TABLE, 
tableNames));
+
+    if (continuousEnabled) {
+      args.add("-" + OPTION_ENABLE_CONTINUOUS_BACKUP);
+    }
+
+    return args.toArray(new String[0]);
+  }
+
+  public static void loadRandomData(HBaseTestingUtil testUtil, TableName 
tableName, byte[] family,
+    int totalRows) throws IOException {
+    try (Table table = testUtil.getConnection().getTable(tableName)) {
+      testUtil.loadRandomRows(table, family, 32, totalRows);
+    }
+  }
+
+  public static void waitForReplication() {
+    try {
+      LOG.info("Waiting for replication to complete for {} ms", 
DEFAULT_WAIT_FOR_REPLICATION_MS);
+      Thread.sleep(DEFAULT_WAIT_FOR_REPLICATION_MS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted while waiting for replication", 
e);
+    }
+  }
+
+  public static int getRowCount(HBaseTestingUtil testUtil, TableName 
tableName) throws IOException {
+    try (Table table = testUtil.getConnection().getTable(tableName)) {
+      return HBaseTestingUtil.countRows(table);
+    }
+  }
+}
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
index fb37977c4ee..a1ce9c97a68 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
@@ -18,23 +18,15 @@
 package org.apache.hadoop.hbase.backup;
 
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DATETIME;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.stream.Collectors;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.util.ToolRunner;
@@ -43,8 +35,6 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Category(LargeTests.class)
 public class TestPointInTimeRestore extends TestBackupBase {
@@ -52,10 +42,7 @@ public class TestPointInTimeRestore extends TestBackupBase {
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestPointInTimeRestore.class);
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestPointInTimeRestore.class);
-
   private static final String backupWalDirName = 
"TestPointInTimeRestoreWalDir";
-  private static final int WAIT_FOR_REPLICATION_MS = 30_000;
   static Path backupWalDir;
   static FileSystem fs;
 
@@ -80,38 +67,41 @@ public class TestPointInTimeRestore extends TestBackupBase {
     // Simulate a backup taken 20 days ago
     EnvironmentEdgeManager
       .injectEdge(() -> System.currentTimeMillis() - 20 * 
ONE_DAY_IN_MILLISECONDS);
-    loadRandomData(table1, 1000); // Insert initial data into table1
+    PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); // Insert 
initial data into
+                                                                   // table1
 
     // Perform a full backup for table1 with continuous backup enabled
-    String[] args = buildBackupArgs("full", new TableName[] { table1 }, true);
+    String[] args =
+      PITRTestUtil.buildBackupArgs("full", new TableName[] { table1 }, true, 
BACKUP_ROOT_DIR);
     int ret = ToolRunner.run(conf1, new BackupDriver(), args);
     assertEquals("Backup should succeed", 0, ret);
 
     // Move time forward to simulate 15 days ago
     EnvironmentEdgeManager
       .injectEdge(() -> System.currentTimeMillis() - 15 * 
ONE_DAY_IN_MILLISECONDS);
-    loadRandomData(table1, 1000); // Add more data to table1
-    loadRandomData(table2, 500); // Insert data into table2
+    PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); // Add more 
data to table1
+    PITRTestUtil.loadRandomData(TEST_UTIL, table2, famName, 500); // Insert 
data into table2
 
-    waitForReplication(); // Ensure replication is complete
+    PITRTestUtil.waitForReplication(); // Ensure replication is complete
 
     // Perform a full backup for table2 with continuous backup enabled
-    args = buildBackupArgs("full", new TableName[] { table2 }, true);
+    args = PITRTestUtil.buildBackupArgs("full", new TableName[] { table2 }, 
true, BACKUP_ROOT_DIR);
     ret = ToolRunner.run(conf1, new BackupDriver(), args);
     assertEquals("Backup should succeed", 0, ret);
 
     // Move time forward to simulate 10 days ago
     EnvironmentEdgeManager
       .injectEdge(() -> System.currentTimeMillis() - 10 * 
ONE_DAY_IN_MILLISECONDS);
-    loadRandomData(table2, 500); // Add more data to table2
-    loadRandomData(table3, 500); // Insert data into table3
+    PITRTestUtil.loadRandomData(TEST_UTIL, table2, famName, 500); // Add more 
data to table2
+    PITRTestUtil.loadRandomData(TEST_UTIL, table3, famName, 500); // Insert 
data into table3
 
     // Perform a full backup for table3 and table4 (without continuous backup)
-    args = buildBackupArgs("full", new TableName[] { table3, table4 }, false);
+    args = PITRTestUtil.buildBackupArgs("full", new TableName[] { table3, 
table4 }, false,
+      BACKUP_ROOT_DIR);
     ret = ToolRunner.run(conf1, new BackupDriver(), args);
     assertEquals("Backup should succeed", 0, ret);
 
-    waitForReplication(); // Ensure replication is complete before concluding 
setup
+    PITRTestUtil.waitForReplication(); // Ensure replication is complete 
before concluding setup
 
     // Reset time mocking to avoid affecting other tests
     EnvironmentEdgeManager.reset();
@@ -137,18 +127,18 @@ public class TestPointInTimeRestore extends 
TestBackupBase {
   @Test
   public void testPITR_FailsOutsideWindow() throws Exception {
     // Case 1: Requested restore time is in the future (should fail)
-    String[] args = buildPITRArgs(new TableName[] { table1 },
+    String[] args = PITRTestUtil.buildPITRArgs(new TableName[] { table1 },
       new TableName[] { TableName.valueOf("restoredTable1") },
-      EnvironmentEdgeManager.currentTime() + ONE_DAY_IN_MILLISECONDS);
+      EnvironmentEdgeManager.currentTime() + ONE_DAY_IN_MILLISECONDS, null);
 
     int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
     assertNotEquals("Restore should fail since the requested restore time is 
in the future", 0,
       ret);
 
     // Case 2: Requested restore time is too old (beyond the retention window, 
should fail)
-    args = buildPITRArgs(new TableName[] { table1 },
+    args = PITRTestUtil.buildPITRArgs(new TableName[] { table1 },
       new TableName[] { TableName.valueOf("restoredTable1") },
-      EnvironmentEdgeManager.currentTime() - 40 * ONE_DAY_IN_MILLISECONDS);
+      EnvironmentEdgeManager.currentTime() - 40 * ONE_DAY_IN_MILLISECONDS, 
null);
 
     ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
     assertNotEquals(
@@ -162,9 +152,9 @@ public class TestPointInTimeRestore extends TestBackupBase {
    */
   @Test
   public void testPointInTimeRestore_ContinuousBackupNotEnabledTables() throws 
Exception {
-    String[] args = buildPITRArgs(new TableName[] { table3 },
+    String[] args = PITRTestUtil.buildPITRArgs(new TableName[] { table3 },
       new TableName[] { TableName.valueOf("restoredTable1") },
-      EnvironmentEdgeManager.currentTime() - 10 * ONE_DAY_IN_MILLISECONDS);
+      EnvironmentEdgeManager.currentTime() - 10 * ONE_DAY_IN_MILLISECONDS, 
null);
 
     int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
     assertNotEquals("Restore should fail since continuous backup is not 
enabled for the table", 0,
@@ -176,9 +166,9 @@ public class TestPointInTimeRestore extends TestBackupBase {
    */
   @Test
   public void testPointInTimeRestore_TablesWithNoProperBackupOrWals() throws 
Exception {
-    String[] args = buildPITRArgs(new TableName[] { table2 },
+    String[] args = PITRTestUtil.buildPITRArgs(new TableName[] { table2 },
       new TableName[] { TableName.valueOf("restoredTable1") },
-      EnvironmentEdgeManager.currentTime() - 16 * ONE_DAY_IN_MILLISECONDS);
+      EnvironmentEdgeManager.currentTime() - 16 * ONE_DAY_IN_MILLISECONDS, 
null);
 
     int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
     assertNotEquals(
@@ -194,15 +184,17 @@ public class TestPointInTimeRestore extends 
TestBackupBase {
     TableName restoredTable = TableName.valueOf("restoredTable");
 
     // Perform restore operation
-    String[] args = buildPITRArgs(new TableName[] { table1 }, new TableName[] 
{ restoredTable },
-      EnvironmentEdgeManager.currentTime() - 5 * ONE_DAY_IN_MILLISECONDS);
+    String[] args =
+      PITRTestUtil.buildPITRArgs(new TableName[] { table1 }, new TableName[] { 
restoredTable },
+        EnvironmentEdgeManager.currentTime() - 5 * ONE_DAY_IN_MILLISECONDS, 
null);
 
     int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
     assertEquals("Restore should succeed", 0, ret);
 
     // Validate that the restored table contains the same number of rows as 
the original table
     assertEquals("Restored table should have the same row count as the 
original",
-      getRowCount(table1), getRowCount(restoredTable));
+      PITRTestUtil.getRowCount(TEST_UTIL, table1),
+      PITRTestUtil.getRowCount(TEST_UTIL, restoredTable));
   }
 
   /**
@@ -214,64 +206,19 @@ public class TestPointInTimeRestore extends 
TestBackupBase {
     TableName restoredTable2 = TableName.valueOf("restoredTable2");
 
     // Perform restore operation for multiple tables
-    String[] args = buildPITRArgs(new TableName[] { table1, table2 },
+    String[] args = PITRTestUtil.buildPITRArgs(new TableName[] { table1, 
table2 },
       new TableName[] { restoredTable1, restoredTable2 },
-      EnvironmentEdgeManager.currentTime() - 5 * ONE_DAY_IN_MILLISECONDS);
+      EnvironmentEdgeManager.currentTime() - 5 * ONE_DAY_IN_MILLISECONDS, 
null);
 
     int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
     assertEquals("Restore should succeed", 0, ret);
 
     // Validate that the restored tables contain the same number of rows as 
the originals
     assertEquals("Restored table1 should have the same row count as the 
original",
-      getRowCount(table1), getRowCount(restoredTable1));
+      PITRTestUtil.getRowCount(TEST_UTIL, table1),
+      PITRTestUtil.getRowCount(TEST_UTIL, restoredTable1));
     assertEquals("Restored table2 should have the same row count as the 
original",
-      getRowCount(table2), getRowCount(restoredTable2));
-  }
-
-  private String[] buildPITRArgs(TableName[] sourceTables, TableName[] 
targetTables, long endTime) {
-    String sourceTableNames =
-      
Arrays.stream(sourceTables).map(TableName::getNameAsString).collect(Collectors.joining(","));
-
-    String targetTableNames =
-      
Arrays.stream(targetTables).map(TableName::getNameAsString).collect(Collectors.joining(","));
-
-    return new String[] { "-" + OPTION_TABLE, sourceTableNames, "-" + 
OPTION_TABLE_MAPPING,
-      targetTableNames, "-" + OPTION_TO_DATETIME, String.valueOf(endTime) };
-  }
-
-  private static String[] buildBackupArgs(String backupType, TableName[] 
tables,
-    boolean continuousEnabled) {
-    String tableNames =
-      
Arrays.stream(tables).map(TableName::getNameAsString).collect(Collectors.joining(","));
-
-    if (continuousEnabled) {
-      return new String[] { "create", backupType, BACKUP_ROOT_DIR, "-" + 
OPTION_TABLE, tableNames,
-        "-" + OPTION_ENABLE_CONTINUOUS_BACKUP };
-    } else {
-      return new String[] { "create", backupType, BACKUP_ROOT_DIR, "-" + 
OPTION_TABLE, tableNames };
-    }
-  }
-
-  private static void loadRandomData(TableName tableName, int totalRows) 
throws IOException {
-    int rowSize = 32;
-    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
-      TEST_UTIL.loadRandomRows(table, famName, rowSize, totalRows);
-    }
-  }
-
-  private static void waitForReplication() {
-    LOG.info("Waiting for replication to complete for {} ms", 
WAIT_FOR_REPLICATION_MS);
-    try {
-      Thread.sleep(WAIT_FOR_REPLICATION_MS);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException("Thread was interrupted while waiting", e);
-    }
-  }
-
-  private int getRowCount(TableName tableName) throws IOException {
-    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
-      return HBaseTestingUtil.countRows(table);
-    }
+      PITRTestUtil.getRowCount(TEST_UTIL, table2),
+      PITRTestUtil.getRowCount(TEST_UTIL, restoredTable2));
   }
 }
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestoreWithCustomBackupPath.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestoreWithCustomBackupPath.java
new file mode 100644
index 00000000000..78c5ac94ba0
--- /dev/null
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestoreWithCustomBackupPath.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestPointInTimeRestoreWithCustomBackupPath extends TestBackupBase 
{
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    
HBaseClassTestRule.forClass(TestPointInTimeRestoreWithCustomBackupPath.class);
+
+  private static final String backupWalDirName = "TestCustomBackupWalDir";
+  private static final String customBackupDirName = "CustomBackupRoot";
+
+  private static Path backupWalDir;
+  private static Path customBackupDir;
+  private static FileSystem fs;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    Path root = TEST_UTIL.getDataTestDirOnTestFS();
+    backupWalDir = new Path(root, backupWalDirName);
+    customBackupDir = new Path(root, customBackupDirName);
+
+    fs = FileSystem.get(conf1);
+    fs.mkdirs(backupWalDir);
+    fs.mkdirs(customBackupDir);
+
+    conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString());
+
+    createAndCopyBackupData();
+  }
+
+  private static void createAndCopyBackupData() throws Exception {
+    // Simulate time 10 days ago
+    EnvironmentEdgeManager
+      .injectEdge(() -> System.currentTimeMillis() - 10 * 
ONE_DAY_IN_MILLISECONDS);
+    PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000);
+
+    // Perform backup with continuous backup enabled
+    String[] args =
+      PITRTestUtil.buildBackupArgs("full", new TableName[] { table1 }, true, 
BACKUP_ROOT_DIR);
+    int ret = ToolRunner.run(conf1, new BackupDriver(), args);
+    assertEquals("Backup should succeed", 0, ret);
+
+    PITRTestUtil.waitForReplication();
+
+    // Copy the contents of BACKUP_ROOT_DIR to the new customBackupDir
+    Path defaultBackupDir = new Path(BACKUP_ROOT_DIR);
+    for (FileStatus status : fs.listStatus(defaultBackupDir)) {
+      Path dst = new Path(customBackupDir, status.getPath().getName());
+      FileUtil.copy(fs, status, fs, dst, true, false, conf1);
+    }
+
+    EnvironmentEdgeManager.reset();
+  }
+
+  @AfterClass
+  public static void cleanupAfterClass() throws IOException {
+    if (fs.exists(backupWalDir)) {
+      fs.delete(backupWalDir, true);
+    }
+    if (fs.exists(customBackupDir)) {
+      fs.delete(customBackupDir, true);
+    }
+
+    conf1.unset(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+  }
+
+  @Test
+  public void testPITR_FromCustomBackupRootDir() throws Exception {
+    TableName restoredTable = TableName.valueOf("restoredTableCustomPath");
+
+    long restoreTime = EnvironmentEdgeManager.currentTime() - 2 * 
ONE_DAY_IN_MILLISECONDS;
+
+    String[] args = PITRTestUtil.buildPITRArgs(new TableName[] { table1 },
+      new TableName[] { restoredTable }, restoreTime, 
customBackupDir.toString());
+
+    int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+    assertEquals("PITR should succeed with custom backup root dir", 0, ret);
+
+    // Validate that the restored table has same row count
+    assertEquals("Restored table should match row count",
+      PITRTestUtil.getRowCount(TEST_UTIL, table1),
+      PITRTestUtil.getRowCount(TEST_UTIL, restoredTable));
+  }
+}

Reply via email to