This is an automated email from the ASF dual-hosted git repository. andor pushed a commit to branch HBASE-28957 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 7e25a4684e05f30d80966492b4156e44201c92d1 Author: vinayak hegde <vinayakph...@gmail.com> AuthorDate: Wed Jul 16 22:43:59 2025 +0530 HBASE-29445 Add Option to Specify Custom Backup Location in PITR (#7153) Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org> --- .../backup/impl/AbstractPitrRestoreHandler.java | 403 +++++++++++++++++++++ .../hadoop/hbase/backup/impl/BackupAdminImpl.java | 348 +----------------- .../hbase/backup/impl/BackupImageAdapter.java | 60 +++ .../hbase/backup/impl/BackupInfoAdapter.java | 60 +++ .../CustomBackupLocationPitrRestoreHandler.java | 57 +++ .../backup/impl/DefaultPitrRestoreHandler.java | 55 +++ .../hbase/backup/impl/PitrBackupMetadata.java | 50 +++ .../apache/hadoop/hbase/backup/PITRTestUtil.java | 107 ++++++ .../hbase/backup/TestPointInTimeRestore.java | 117 ++---- ...TestPointInTimeRestoreWithCustomBackupPath.java | 121 +++++++ 10 files changed, 962 insertions(+), 416 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java new file mode 100644 index 00000000000..b2edce6b0fd --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS; +import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; +import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES; + +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest; +import org.apache.hadoop.hbase.backup.RestoreRequest; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.mapreduce.WALInputFormat; +import org.apache.hadoop.hbase.mapreduce.WALPlayer; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.util.Tool; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract base class for handling Point-In-Time Restore (PITR). + * <p> + * Defines the common PITR algorithm using the Template Method Pattern. Subclasses provide the + * metadata source (e.g., backup system table or a custom backup location). + * <p> + * The PITR flow includes: + * <ul> + * <li>Validating recovery time within the PITR window</li> + * <li>Checking for continuous backup and valid backup availability</li> + * <li>Restoring the backup</li> + * <li>Replaying WALs to bring tables to the target state</li> + * </ul> + * <p> + * Subclasses must implement {@link #getBackupMetadata(PointInTimeRestoreRequest)} to supply the + * list of completed backups. + */ +@InterfaceAudience.Private +public abstract class AbstractPitrRestoreHandler { + private static final Logger LOG = LoggerFactory.getLogger(AbstractPitrRestoreHandler.class); + + protected final Connection conn; + protected final BackupAdminImpl backupAdmin; + protected final PointInTimeRestoreRequest request; + + AbstractPitrRestoreHandler(Connection conn, PointInTimeRestoreRequest request) { + this.conn = conn; + this.backupAdmin = new BackupAdminImpl(conn); + this.request = request; + } + + /** + * Validates the PITR request and performs the restore if valid. This is the main entry point for + * the PITR process and should be called by clients. + */ + public final void validateAndRestore() throws IOException { + long endTime = request.getToDateTime(); + validateRequestToTime(endTime); + + TableName[] sourceTableArray = request.getFromTables(); + TableName[] targetTableArray = resolveTargetTables(sourceTableArray, request.getToTables()); + + // Validate PITR requirements + validatePitr(endTime, sourceTableArray, targetTableArray); + + // If only validation is required, log and return + if (request.isCheck()) { + LOG.info("PITR can be successfully executed"); + return; + } + + // Execute PITR process + try (BackupSystemTable table = new BackupSystemTable(conn)) { + Map<TableName, Long> continuousBackupTables = table.getContinuousBackupTableSet(); + List<PitrBackupMetadata> backupMetadataList = getBackupMetadata(request); + + for (int i = 0; i < sourceTableArray.length; i++) { + restoreTableWithWalReplay(sourceTableArray[i], targetTableArray[i], endTime, + continuousBackupTables, backupMetadataList, request); + } + } + } + + /** + * Validates whether the requested end time falls within the allowed PITR recovery window. + * @param endTime The target recovery time. + * @throws IOException If the requested recovery time is outside the allowed window. + */ + private void validateRequestToTime(long endTime) throws IOException { + long pitrWindowDays = conn.getConfiguration().getLong(CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS, + DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS); + long currentTime = EnvironmentEdgeManager.getDelegate().currentTime(); + long pitrMaxStartTime = currentTime - TimeUnit.DAYS.toMillis(pitrWindowDays); + + if (endTime < pitrMaxStartTime) { + String errorMsg = String.format( + "Requested recovery time (%d) is out of the allowed PITR window (last %d days).", endTime, + pitrWindowDays); + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + + if (endTime > currentTime) { + String errorMsg = String.format( + "Requested recovery time (%d) is in the future. Current time: %d.", endTime, currentTime); + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + } + + /** + * Resolves the target table array. If null or empty, defaults to the source table array. + */ + private TableName[] resolveTargetTables(TableName[] sourceTables, TableName[] targetTables) { + return (targetTables == null || targetTables.length == 0) ? sourceTables : targetTables; + } + + /** + * Validates whether Point-In-Time Recovery (PITR) is possible for the given tables at the + * specified time. + * <p> + * PITR requires: + * <ul> + * <li>Continuous backup to be enabled for the source tables.</li> + * <li>A valid backup image and corresponding WALs to be available.</li> + * </ul> + * @param endTime The target recovery time. + * @param sTableArray The source tables to restore. + * @param tTableArray The target tables where the restore will be performed. + * @throws IOException If PITR is not possible due to missing continuous backup or backup images. + */ + private void validatePitr(long endTime, TableName[] sTableArray, TableName[] tTableArray) + throws IOException { + try (BackupSystemTable table = new BackupSystemTable(conn)) { + // Retrieve the set of tables with continuous backup enabled + Map<TableName, Long> continuousBackupTables = table.getContinuousBackupTableSet(); + + // Ensure all source tables have continuous backup enabled + validateContinuousBackup(sTableArray, continuousBackupTables); + + // Fetch completed backup information + List<PitrBackupMetadata> backupMetadataList = getBackupMetadata(request); + + // Ensure a valid backup and WALs exist for PITR + validateBackupAvailability(sTableArray, tTableArray, endTime, continuousBackupTables, + backupMetadataList); + } + } + + /** + * Ensures that all source tables have continuous backup enabled. + */ + private void validateContinuousBackup(TableName[] tables, + Map<TableName, Long> continuousBackupTables) throws IOException { + List<TableName> missingTables = + Arrays.stream(tables).filter(table -> !continuousBackupTables.containsKey(table)).toList(); + + if (!missingTables.isEmpty()) { + String errorMsg = "Continuous Backup is not enabled for the following tables: " + + missingTables.stream().map(TableName::getNameAsString).collect(Collectors.joining(", ")); + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + } + + /** + * Ensures that a valid backup and corresponding WALs exist for PITR for each source table. PITR + * requires: 1. A valid backup available before the end time. 2. Write-Ahead Logs (WALs) covering + * the remaining duration up to the end time. + */ + private void validateBackupAvailability(TableName[] sTableArray, TableName[] tTableArray, + long endTime, Map<TableName, Long> continuousBackupTables, List<PitrBackupMetadata> backups) + throws IOException { + for (int i = 0; i < sTableArray.length; i++) { + if ( + !canPerformPitr(sTableArray[i], tTableArray[i], endTime, continuousBackupTables, backups) + ) { + String errorMsg = String.format( + "PITR failed: No valid backup/WALs found for source table %s (target: %s) before time %d", + sTableArray[i].getNameAsString(), tTableArray[i].getNameAsString(), endTime); + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + } + } + + /** + * Checks whether PITR can be performed for a given source-target table pair. + */ + private boolean canPerformPitr(TableName stableName, TableName tTableName, long endTime, + Map<TableName, Long> continuousBackupTables, List<PitrBackupMetadata> backups) { + return getValidBackup(stableName, tTableName, endTime, continuousBackupTables, backups) != null; + } + + /** + * Finds and returns the first valid backup metadata entry that can be used to restore the given + * source table up to the specified end time. A backup is considered valid if: + * <ul> + * <li>It contains the source table</li> + * <li>It was completed before the requested end time</li> + * <li>Its start time is after the table's continuous backup start time</li> + * <li>It passes the restore request validation</li> + * </ul> + */ + private PitrBackupMetadata getValidBackup(TableName sTableName, TableName tTablename, + long endTime, Map<TableName, Long> continuousBackupTables, List<PitrBackupMetadata> backups) { + for (PitrBackupMetadata backup : backups) { + if (isValidBackupForPitr(backup, sTableName, endTime, continuousBackupTables)) { + + RestoreRequest restoreRequest = + BackupUtils.createRestoreRequest(backup.getRootDir(), backup.getBackupId(), true, + new TableName[] { sTableName }, new TableName[] { tTablename }, false); + + try { + if (backupAdmin.validateRequest(restoreRequest)) { + return backup; + } + } catch (IOException e) { + LOG.warn("Exception occurred while testing the backup : {} for restore ", + backup.getBackupId(), e); + } + } + } + return null; + } + + /** + * Determines if the given backup is valid for PITR. + * <p> + * A backup is valid if: + * <ul> + * <li>It contains the source table.</li> + * <li>It was completed before the end time.</li> + * <li>The start timestamp of the backup is after the continuous backup start time for the + * table.</li> + * </ul> + * @param backupMetadata Backup information object. + * @param tableName Table to check. + * @param endTime The target recovery time. + * @param continuousBackupTables Map of tables with continuous backup enabled. + * @return true if the backup is valid for PITR, false otherwise. + */ + private boolean isValidBackupForPitr(PitrBackupMetadata backupMetadata, TableName tableName, + long endTime, Map<TableName, Long> continuousBackupTables) { + return backupMetadata.getTableNames().contains(tableName) + && backupMetadata.getCompleteTs() <= endTime + && continuousBackupTables.getOrDefault(tableName, 0L) <= backupMetadata.getStartTs(); + } + + /** + * Restores the table using the selected backup and replays WALs from the backup start time to the + * requested end time. + * @throws IOException if no valid backup is found or WAL replay fails + */ + private void restoreTableWithWalReplay(TableName sourceTable, TableName targetTable, long endTime, + Map<TableName, Long> continuousBackupTables, List<PitrBackupMetadata> backupMetadataList, + PointInTimeRestoreRequest request) throws IOException { + PitrBackupMetadata backupMetadata = + getValidBackup(sourceTable, targetTable, endTime, continuousBackupTables, backupMetadataList); + if (backupMetadata == null) { + String errorMsg = "Could not find a valid backup and WALs for PITR for table: " + + sourceTable.getNameAsString(); + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + + RestoreRequest restoreRequest = BackupUtils.createRestoreRequest(backupMetadata.getRootDir(), + backupMetadata.getBackupId(), false, new TableName[] { sourceTable }, + new TableName[] { targetTable }, request.isOverwrite()); + + backupAdmin.restore(restoreRequest); + replayWal(sourceTable, targetTable, backupMetadata.getStartTs(), endTime); + } + + /** + * Replays WALs to bring the table to the desired state. + */ + private void replayWal(TableName sourceTable, TableName targetTable, long startTime, long endTime) + throws IOException { + String walBackupDir = conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR); + Path walDirPath = new Path(walBackupDir); + LOG.info( + "Starting WAL replay for source: {}, target: {}, time range: {} - {}, WAL backup dir: {}", + sourceTable, targetTable, startTime, endTime, walDirPath); + + List<String> validDirs = + getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime); + if (validDirs.isEmpty()) { + LOG.warn("No valid WAL directories found for range {} - {}. Skipping WAL replay.", startTime, + endTime); + return; + } + + executeWalReplay(validDirs, sourceTable, targetTable, startTime, endTime); + } + + /** + * Fetches valid WAL directories based on the given time range. + */ + private List<String> getValidWalDirs(Configuration conf, Path walBackupDir, long startTime, + long endTime) throws IOException { + FileSystem backupFs = FileSystem.get(walBackupDir.toUri(), conf); + FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, WALS_DIR)); + + List<String> validDirs = new ArrayList<>(); + SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + + for (FileStatus dayDir : dayDirs) { + if (!dayDir.isDirectory()) { + continue; // Skip files, only process directories + } + + String dirName = dayDir.getPath().getName(); + try { + Date dirDate = dateFormat.parse(dirName); + long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00) + long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End time of day (23:59:59) + + // Check if this day's WAL files overlap with the required time range + if (dirEndTime >= startTime && dirStartTime <= endTime) { + validDirs.add(dayDir.getPath().toString()); + } + } catch (ParseException e) { + LOG.warn("Skipping invalid directory name: " + dirName, e); + } + } + return validDirs; + } + + /** + * Executes WAL replay using WALPlayer. + */ + private void executeWalReplay(List<String> walDirs, TableName sourceTable, TableName targetTable, + long startTime, long endTime) throws IOException { + Tool walPlayer = initializeWalPlayer(startTime, endTime); + String[] args = + { String.join(",", walDirs), sourceTable.getNameAsString(), targetTable.getNameAsString() }; + + try { + LOG.info("Executing WALPlayer with args: {}", Arrays.toString(args)); + int exitCode = walPlayer.run(args); + if (exitCode == 0) { + LOG.info("WAL replay completed successfully for {}", targetTable); + } else { + throw new IOException("WAL replay failed with exit code: " + exitCode); + } + } catch (Exception e) { + LOG.error("Error during WAL replay for {}: {}", targetTable, e.getMessage(), e); + throw new IOException("Exception during WAL replay", e); + } + } + + /** + * Initializes and configures WALPlayer. + */ + private Tool initializeWalPlayer(long startTime, long endTime) { + Configuration conf = HBaseConfiguration.create(conn.getConfiguration()); + conf.setLong(WALInputFormat.START_TIME_KEY, startTime); + conf.setLong(WALInputFormat.END_TIME_KEY, endTime); + conf.setBoolean(IGNORE_EMPTY_FILES, true); + Tool walPlayer = new WALPlayer(); + walPlayer.setConf(conf); + return walPlayer; + } + + protected abstract List<PitrBackupMetadata> getBackupMetadata(PointInTimeRestoreRequest request) + throws IOException; +} diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java index e82d9804f9d..75a2a6343a5 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java @@ -17,33 +17,18 @@ */ package org.apache.hadoop.hbase.backup.impl; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS; -import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; -import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT; -import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; -import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES; - import java.io.IOException; -import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupAdmin; import org.apache.hadoop.hbase.backup.BackupClientFactory; @@ -61,10 +46,7 @@ import org.apache.hadoop.hbase.backup.util.BackupSet; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.mapreduce.WALInputFormat; -import org.apache.hadoop.hbase.mapreduce.WALPlayer; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.util.Tool; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -521,7 +503,7 @@ public class BackupAdminImpl implements BackupAdmin { new RestoreTablesClient(conn, request).execute(); } - private boolean validateRequest(RestoreRequest request) throws IOException { + public boolean validateRequest(RestoreRequest request) throws IOException { // check and load backup image manifest for the tables Path rootPath = new Path(request.getBackupRootDir()); String backupId = request.getBackupId(); @@ -533,324 +515,28 @@ public class BackupAdminImpl implements BackupAdmin { return BackupUtils.validate(Arrays.asList(sTableArray), manifest, conn.getConfiguration()); } - @Override - public void pointInTimeRestore(PointInTimeRestoreRequest request) throws IOException { - if (request.getBackupRootDir() == null) { - defaultPointInTimeRestore(request); - } else { - // TODO: special case, not supported at the moment - throw new IOException("Custom backup location for Point-In-Time Recovery Not supported!"); - } - LOG.info("Successfully completed Point In Time Restore for all tables."); - } - - /** - * Performs a default Point-In-Time Restore (PITR) by restoring the latest valid backup and - * replaying the WALs to bring the table to the desired state. PITR requires: 1. A valid backup - * available before the end time. 2. Write-Ahead Logs (WALs) covering the remaining duration up to - * the end time. - * @param request PointInTimeRestoreRequest containing restore parameters. - * @throws IOException If no valid backup or WALs are found, or if an error occurs during - * restoration. - */ - private void defaultPointInTimeRestore(PointInTimeRestoreRequest request) throws IOException { - long endTime = request.getToDateTime(); - validateRequestToTime(endTime); - - TableName[] sTableArray = request.getFromTables(); - TableName[] tTableArray = resolveTargetTables(sTableArray, request.getToTables()); - - // Validate PITR requirements - validatePitr(endTime, sTableArray, tTableArray); - - // If only validation is required, log and return - if (request.isCheck()) { - LOG.info("PITR can be successfully executed"); - return; - } - - // Execute PITR process - try (BackupSystemTable table = new BackupSystemTable(conn)) { - Map<TableName, Long> continuousBackupTables = table.getContinuousBackupTableSet(); - List<BackupInfo> backupInfos = table.getBackupInfos(BackupState.COMPLETE); - - for (int i = 0; i < sTableArray.length; i++) { - restoreTableWithWalReplay(sTableArray[i], tTableArray[i], endTime, continuousBackupTables, - backupInfos, request); - } - } - } - - /** - * Validates whether the requested end time falls within the allowed PITR recovery window. - * @param endTime The target recovery time. - * @throws IOException If the requested recovery time is outside the allowed window. - */ - private void validateRequestToTime(long endTime) throws IOException { - long pitrWindowDays = conn.getConfiguration().getLong(CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS, - DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS); - long currentTime = EnvironmentEdgeManager.getDelegate().currentTime(); - long pitrMaxStartTime = currentTime - TimeUnit.DAYS.toMillis(pitrWindowDays); - - if (endTime < pitrMaxStartTime) { - String errorMsg = String.format( - "Requested recovery time (%d) is out of the allowed PITR window (last %d days).", endTime, - pitrWindowDays); - LOG.error(errorMsg); - throw new IOException(errorMsg); - } - - if (endTime > currentTime) { - String errorMsg = String.format( - "Requested recovery time (%d) is in the future. Current time: %d.", endTime, currentTime); - LOG.error(errorMsg); - throw new IOException(errorMsg); - } - } - - /** - * Resolves the target table array. If null or empty, defaults to the source table array. - */ - private TableName[] resolveTargetTables(TableName[] sourceTables, TableName[] targetTables) { - return (targetTables == null || targetTables.length == 0) ? sourceTables : targetTables; - } - /** - * Validates whether Point-In-Time Recovery (PITR) is possible for the given tables at the - * specified time. + * Initiates Point-In-Time Restore (PITR) for the given request. * <p> - * PITR requires: - * <ul> - * <li>Continuous backup to be enabled for the source tables.</li> - * <li>A valid backup image and corresponding WALs to be available.</li> - * </ul> - * @param endTime The target recovery time. - * @param sTableArray The source tables to restore. - * @param tTableArray The target tables where the restore will be performed. - * @throws IOException If PITR is not possible due to missing continuous backup or backup images. - */ - private void validatePitr(long endTime, TableName[] sTableArray, TableName[] tTableArray) - throws IOException { - try (BackupSystemTable table = new BackupSystemTable(conn)) { - // Retrieve the set of tables with continuous backup enabled - Map<TableName, Long> continuousBackupTables = table.getContinuousBackupTableSet(); - - // Ensure all source tables have continuous backup enabled - validateContinuousBackup(sTableArray, continuousBackupTables); - - // Fetch completed backup information - List<BackupInfo> backupInfos = table.getBackupInfos(BackupState.COMPLETE); - - // Ensure a valid backup and WALs exist for PITR - validateBackupAvailability(sTableArray, tTableArray, endTime, continuousBackupTables, - backupInfos); - } - } - - /** - * Ensures that all source tables have continuous backup enabled. - */ - private void validateContinuousBackup(TableName[] tables, - Map<TableName, Long> continuousBackupTables) throws IOException { - List<TableName> missingTables = - Arrays.stream(tables).filter(table -> !continuousBackupTables.containsKey(table)).toList(); - - if (!missingTables.isEmpty()) { - String errorMsg = "Continuous Backup is not enabled for the following tables: " - + missingTables.stream().map(TableName::getNameAsString).collect(Collectors.joining(", ")); - LOG.error(errorMsg); - throw new IOException(errorMsg); - } - } - - /** - * Ensures that a valid backup and corresponding WALs exist for PITR for each source table. PITR - * requires: 1. A valid backup available before the end time. 2. Write-Ahead Logs (WALs) covering - * the remaining duration up to the end time. + * If {@code backupRootDir} is specified in the request, performs PITR using metadata from the + * provided custom backup location. Otherwise, defaults to using metadata from the backup system + * table. + * @param request PointInTimeRestoreRequest containing PITR parameters. + * @throws IOException if validation fails or restore cannot be completed. */ - private void validateBackupAvailability(TableName[] sTableArray, TableName[] tTableArray, - long endTime, Map<TableName, Long> continuousBackupTables, List<BackupInfo> backupInfos) - throws IOException { - for (int i = 0; i < sTableArray.length; i++) { - if ( - !canPerformPitr(sTableArray[i], tTableArray[i], endTime, continuousBackupTables, - backupInfos) - ) { - String errorMsg = "Could not find a valid backup and WALs for PITR for table: " - + sTableArray[i].getNameAsString(); - LOG.error(errorMsg); - throw new IOException(errorMsg); - } - } - } - - /** - * Checks whether PITR can be performed for a given source-target table pair. - */ - private boolean canPerformPitr(TableName stableName, TableName tTableName, long endTime, - Map<TableName, Long> continuousBackupTables, List<BackupInfo> backupInfos) { - return getValidBackupInfo(stableName, tTableName, endTime, continuousBackupTables, backupInfos) - != null; - } - - /** - * Finds a valid backup for PITR that meets the required conditions. - */ - private BackupInfo getValidBackupInfo(TableName sTableName, TableName tTablename, long endTime, - Map<TableName, Long> continuousBackupTables, List<BackupInfo> backupInfos) { - for (BackupInfo info : backupInfos) { - if (isValidBackupForPitr(info, sTableName, endTime, continuousBackupTables)) { - - RestoreRequest restoreRequest = - BackupUtils.createRestoreRequest(info.getBackupRootDir(), info.getBackupId(), true, - new TableName[] { sTableName }, new TableName[] { tTablename }, false); - - try { - if (validateRequest(restoreRequest)) { - return info; - } - } catch (IOException e) { - LOG.warn("Exception occurred while testing the backup : {} for restore ", info, e); - } - } - } - return null; - } - - /** - * Determines if the given backup is valid for PITR. - * <p> - * A backup is valid if: - * <ul> - * <li>It contains the source table.</li> - * <li>It was completed before the end time.</li> - * <li>The start timestamp of the backup is after the continuous backup start time for the - * table.</li> - * </ul> - * @param info Backup information object. - * @param tableName Table to check. - * @param endTime The target recovery time. - * @param continuousBackupTables Map of tables with continuous backup enabled. - * @return true if the backup is valid for PITR, false otherwise. - */ - private boolean isValidBackupForPitr(BackupInfo info, TableName tableName, long endTime, - Map<TableName, Long> continuousBackupTables) { - return info.getTableNames().contains(tableName) && info.getCompleteTs() <= endTime - && continuousBackupTables.getOrDefault(tableName, 0L) <= info.getStartTs(); - } - - /** - * Restores a table from a valid backup and replays WALs to reach the desired PITR state. - */ - private void restoreTableWithWalReplay(TableName sourceTable, TableName targetTable, long endTime, - Map<TableName, Long> continuousBackupTables, List<BackupInfo> backupInfos, - PointInTimeRestoreRequest request) throws IOException { - BackupInfo backupInfo = - getValidBackupInfo(sourceTable, targetTable, endTime, continuousBackupTables, backupInfos); - if (backupInfo == null) { - String errorMsg = "Could not find a valid backup and WALs for PITR for table: " - + sourceTable.getNameAsString(); - LOG.error(errorMsg); - throw new IOException(errorMsg); - } - - RestoreRequest restoreRequest = BackupUtils.createRestoreRequest(backupInfo.getBackupRootDir(), - backupInfo.getBackupId(), false, new TableName[] { sourceTable }, - new TableName[] { targetTable }, request.isOverwrite()); - - restore(restoreRequest); - replayWal(sourceTable, targetTable, backupInfo.getStartTs(), endTime); - } - - /** - * Replays WALs to bring the table to the desired state. - */ - private void replayWal(TableName sourceTable, TableName targetTable, long startTime, long endTime) - throws IOException { - String walBackupDir = conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR); - Path walDirPath = new Path(walBackupDir); - LOG.info( - "Starting WAL replay for source: {}, target: {}, time range: {} - {}, WAL backup dir: {}", - sourceTable, targetTable, startTime, endTime, walDirPath); - - List<String> validDirs = - getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime); - if (validDirs.isEmpty()) { - LOG.warn("No valid WAL directories found for range {} - {}. Skipping WAL replay.", startTime, - endTime); - return; - } - - executeWalReplay(validDirs, sourceTable, targetTable, startTime, endTime); - } - - /** - * Fetches valid WAL directories based on the given time range. - */ - private List<String> getValidWalDirs(Configuration conf, Path walBackupDir, long startTime, - long endTime) throws IOException { - FileSystem backupFs = FileSystem.get(walBackupDir.toUri(), conf); - FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, WALS_DIR)); - - List<String> validDirs = new ArrayList<>(); - SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); - - for (FileStatus dayDir : dayDirs) { - if (!dayDir.isDirectory()) { - continue; // Skip files, only process directories - } - - String dirName = dayDir.getPath().getName(); - try { - Date dirDate = dateFormat.parse(dirName); - long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00) - long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End time of day (23:59:59) - - // Check if this day's WAL files overlap with the required time range - if (dirEndTime >= startTime && dirStartTime <= endTime) { - validDirs.add(dayDir.getPath().toString()); - } - } catch (ParseException e) { - LOG.warn("Skipping invalid directory name: " + dirName, e); - } - } - return validDirs; - } - - /** - * Executes WAL replay using WALPlayer. - */ - private void executeWalReplay(List<String> walDirs, TableName sourceTable, TableName targetTable, - long startTime, long endTime) throws IOException { - Tool walPlayer = initializeWalPlayer(startTime, endTime); - String[] args = - { String.join(",", walDirs), sourceTable.getNameAsString(), targetTable.getNameAsString() }; + @Override + public void pointInTimeRestore(PointInTimeRestoreRequest request) throws IOException { + AbstractPitrRestoreHandler handler; - try { - LOG.info("Executing WALPlayer with args: {}", Arrays.toString(args)); - int exitCode = walPlayer.run(args); - if (exitCode == 0) { - LOG.info("WAL replay completed successfully for {}", targetTable); - } else { - throw new IOException("WAL replay failed with exit code: " + exitCode); - } - } catch (Exception e) { - LOG.error("Error during WAL replay for {}: {}", targetTable, e.getMessage(), e); - throw new IOException("Exception during WAL replay", e); + // Choose the appropriate handler based on whether a custom backup location is provided + if (request.getBackupRootDir() == null) { + handler = new DefaultPitrRestoreHandler(conn, request); + } else { + handler = new CustomBackupLocationPitrRestoreHandler(conn, request); } - } + handler.validateAndRestore(); - /** - * Initializes and configures WALPlayer. - */ - private Tool initializeWalPlayer(long startTime, long endTime) { - Configuration conf = HBaseConfiguration.create(conn.getConfiguration()); - conf.setLong(WALInputFormat.START_TIME_KEY, startTime); - conf.setLong(WALInputFormat.END_TIME_KEY, endTime); - conf.setBoolean(IGNORE_EMPTY_FILES, true); - Tool walPlayer = new WALPlayer(); - walPlayer.setConf(conf); - return walPlayer; + LOG.info("Successfully completed Point In Time Restore for all tables."); } @Override diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java new file mode 100644 index 00000000000..8b785a0f050 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import java.util.List; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Adapter that wraps a {@link BackupImage} to expose it as {@link PitrBackupMetadata}. + */ +@InterfaceAudience.Private +public class BackupImageAdapter implements PitrBackupMetadata { + private final BackupImage image; + + public BackupImageAdapter(BackupImage image) { + this.image = image; + } + + @Override + public List<TableName> getTableNames() { + return image.getTableNames(); + } + + @Override + public long getStartTs() { + return image.getStartTs(); + } + + @Override + public long getCompleteTs() { + return image.getCompleteTs(); + } + + @Override + public String getBackupId() { + return image.getBackupId(); + } + + @Override + public String getRootDir() { + return image.getRootDir(); + } +} diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java new file mode 100644 index 00000000000..967fae551cb --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import java.util.List; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Adapter that wraps a {@link BackupInfo} to expose it as {@link PitrBackupMetadata}. + */ +@InterfaceAudience.Private +public class BackupInfoAdapter implements PitrBackupMetadata { + private final BackupInfo info; + + public BackupInfoAdapter(BackupInfo info) { + this.info = info; + } + + @Override + public List<TableName> getTableNames() { + return info.getTableNames(); + } + + @Override + public long getStartTs() { + return info.getStartTs(); + } + + @Override + public long getCompleteTs() { + return info.getCompleteTs(); + } + + @Override + public String getBackupId() { + return info.getBackupId(); + } + + @Override + public String getRootDir() { + return info.getBackupRootDir(); + } +} diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/CustomBackupLocationPitrRestoreHandler.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/CustomBackupLocationPitrRestoreHandler.java new file mode 100644 index 00000000000..1657b68d023 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/CustomBackupLocationPitrRestoreHandler.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * PITR restore handler that retrieves backup metadata from a custom backup root directory. + * <p> + * This implementation is used when the PITR request specifies a custom backup location via + * {@code backupRootDir}. + */ +@InterfaceAudience.Private +public class CustomBackupLocationPitrRestoreHandler extends AbstractPitrRestoreHandler { + + public CustomBackupLocationPitrRestoreHandler(Connection conn, + PointInTimeRestoreRequest request) { + super(conn, request); + } + + /** + * Retrieves completed backup entries from the given custom backup root directory and converts + * them into {@link PitrBackupMetadata} using {@link BackupImageAdapter}. + * @param request the PITR request + * @return list of completed backup metadata entries from the custom location + * @throws IOException if reading from the custom backup directory fails + */ + @Override + protected List<PitrBackupMetadata> getBackupMetadata(PointInTimeRestoreRequest request) + throws IOException { + return HBackupFileSystem + .getAllBackupImages(conn.getConfiguration(), new Path(request.getBackupRootDir())).stream() + .map(BackupImageAdapter::new).collect(Collectors.toList()); + } +} diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/DefaultPitrRestoreHandler.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/DefaultPitrRestoreHandler.java new file mode 100644 index 00000000000..c6844ba96bd --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/DefaultPitrRestoreHandler.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Default PITR restore handler that retrieves backup metadata from the system table. + * <p> + * This implementation is used when no custom backup root directory is specified in the request. + */ +@InterfaceAudience.Private +public class DefaultPitrRestoreHandler extends AbstractPitrRestoreHandler { + + public DefaultPitrRestoreHandler(Connection conn, PointInTimeRestoreRequest request) { + super(conn, request); + } + + /** + * Retrieves completed backup entries from the BackupSystemTable and converts them into + * {@link PitrBackupMetadata} using {@link BackupInfoAdapter}. + * @param request the PITR request + * @return list of completed backup metadata entries + * @throws IOException if reading from the backup system table fails + */ + @Override + protected List<PitrBackupMetadata> getBackupMetadata(PointInTimeRestoreRequest request) + throws IOException { + try (BackupSystemTable table = new BackupSystemTable(conn)) { + return table.getBackupInfos(BackupInfo.BackupState.COMPLETE).stream() + .map(BackupInfoAdapter::new).collect(Collectors.toList()); + } + } +} diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java new file mode 100644 index 00000000000..dc135ce79c0 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import java.util.List; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * A unified abstraction over backup metadata used during Point-In-Time Restore (PITR). + * <p> + * This interface allows the PITR algorithm to operate uniformly over different types of backup + * metadata sources, such as {@link BackupInfo} (system table) and {@link BackupImage} (custom + * backup location), without knowing their specific implementations. + */ +@InterfaceAudience.Private +public interface PitrBackupMetadata { + + /** Returns List of table names included in the backup */ + List<TableName> getTableNames(); + + /** Returns Start timestamp of the backup */ + long getStartTs(); + + /** Returns Completion timestamp of the backup */ + long getCompleteTs(); + + /** Returns Unique identifier for the backup */ + String getBackupId(); + + /** Returns Root directory where the backup is stored */ + String getRootDir(); +} diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/PITRTestUtil.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/PITRTestUtil.java new file mode 100644 index 00000000000..ae26cf96050 --- /dev/null +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/PITRTestUtil.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_PITR_BACKUP_PATH; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DATETIME; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public final class PITRTestUtil { + private static final Logger LOG = LoggerFactory.getLogger(PITRTestUtil.class); + private static final int DEFAULT_WAIT_FOR_REPLICATION_MS = 30_000; + + private PITRTestUtil() { + // Utility class + } + + public static String[] buildPITRArgs(TableName[] sourceTables, TableName[] targetTables, + long endTime, String backupRootDir) { + String sourceTableNames = + Arrays.stream(sourceTables).map(TableName::getNameAsString).collect(Collectors.joining(",")); + String targetTableNames = + Arrays.stream(targetTables).map(TableName::getNameAsString).collect(Collectors.joining(",")); + + List<String> args = new ArrayList<>(); + args.add("-" + OPTION_TABLE); + args.add(sourceTableNames); + args.add("-" + OPTION_TABLE_MAPPING); + args.add(targetTableNames); + args.add("-" + OPTION_TO_DATETIME); + args.add(String.valueOf(endTime)); + + if (backupRootDir != null) { + args.add("-" + OPTION_PITR_BACKUP_PATH); + args.add(backupRootDir); + } + + return args.toArray(new String[0]); + } + + public static String[] buildBackupArgs(String backupType, TableName[] tables, + boolean continuousEnabled, String backupRootDir) { + String tableNames = + Arrays.stream(tables).map(TableName::getNameAsString).collect(Collectors.joining(",")); + + List<String> args = new ArrayList<>( + Arrays.asList("create", backupType, backupRootDir, "-" + OPTION_TABLE, tableNames)); + + if (continuousEnabled) { + args.add("-" + OPTION_ENABLE_CONTINUOUS_BACKUP); + } + + return args.toArray(new String[0]); + } + + public static void loadRandomData(HBaseTestingUtil testUtil, TableName tableName, byte[] family, + int totalRows) throws IOException { + try (Table table = testUtil.getConnection().getTable(tableName)) { + testUtil.loadRandomRows(table, family, 32, totalRows); + } + } + + public static void waitForReplication() { + try { + LOG.info("Waiting for replication to complete for {} ms", DEFAULT_WAIT_FOR_REPLICATION_MS); + Thread.sleep(DEFAULT_WAIT_FOR_REPLICATION_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for replication", e); + } + } + + public static int getRowCount(HBaseTestingUtil testUtil, TableName tableName) throws IOException { + try (Table table = testUtil.getConnection().getTable(tableName)) { + return HBaseTestingUtil.countRows(table); + } + } +} diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java index fb37977c4ee..a1ce9c97a68 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java @@ -18,23 +18,15 @@ package org.apache.hadoop.hbase.backup; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DATETIME; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import java.io.IOException; -import java.util.Arrays; -import java.util.stream.Collectors; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.ToolRunner; @@ -43,8 +35,6 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @Category(LargeTests.class) public class TestPointInTimeRestore extends TestBackupBase { @@ -52,10 +42,7 @@ public class TestPointInTimeRestore extends TestBackupBase { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestPointInTimeRestore.class); - private static final Logger LOG = LoggerFactory.getLogger(TestPointInTimeRestore.class); - private static final String backupWalDirName = "TestPointInTimeRestoreWalDir"; - private static final int WAIT_FOR_REPLICATION_MS = 30_000; static Path backupWalDir; static FileSystem fs; @@ -80,38 +67,41 @@ public class TestPointInTimeRestore extends TestBackupBase { // Simulate a backup taken 20 days ago EnvironmentEdgeManager .injectEdge(() -> System.currentTimeMillis() - 20 * ONE_DAY_IN_MILLISECONDS); - loadRandomData(table1, 1000); // Insert initial data into table1 + PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); // Insert initial data into + // table1 // Perform a full backup for table1 with continuous backup enabled - String[] args = buildBackupArgs("full", new TableName[] { table1 }, true); + String[] args = + PITRTestUtil.buildBackupArgs("full", new TableName[] { table1 }, true, BACKUP_ROOT_DIR); int ret = ToolRunner.run(conf1, new BackupDriver(), args); assertEquals("Backup should succeed", 0, ret); // Move time forward to simulate 15 days ago EnvironmentEdgeManager .injectEdge(() -> System.currentTimeMillis() - 15 * ONE_DAY_IN_MILLISECONDS); - loadRandomData(table1, 1000); // Add more data to table1 - loadRandomData(table2, 500); // Insert data into table2 + PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); // Add more data to table1 + PITRTestUtil.loadRandomData(TEST_UTIL, table2, famName, 500); // Insert data into table2 - waitForReplication(); // Ensure replication is complete + PITRTestUtil.waitForReplication(); // Ensure replication is complete // Perform a full backup for table2 with continuous backup enabled - args = buildBackupArgs("full", new TableName[] { table2 }, true); + args = PITRTestUtil.buildBackupArgs("full", new TableName[] { table2 }, true, BACKUP_ROOT_DIR); ret = ToolRunner.run(conf1, new BackupDriver(), args); assertEquals("Backup should succeed", 0, ret); // Move time forward to simulate 10 days ago EnvironmentEdgeManager .injectEdge(() -> System.currentTimeMillis() - 10 * ONE_DAY_IN_MILLISECONDS); - loadRandomData(table2, 500); // Add more data to table2 - loadRandomData(table3, 500); // Insert data into table3 + PITRTestUtil.loadRandomData(TEST_UTIL, table2, famName, 500); // Add more data to table2 + PITRTestUtil.loadRandomData(TEST_UTIL, table3, famName, 500); // Insert data into table3 // Perform a full backup for table3 and table4 (without continuous backup) - args = buildBackupArgs("full", new TableName[] { table3, table4 }, false); + args = PITRTestUtil.buildBackupArgs("full", new TableName[] { table3, table4 }, false, + BACKUP_ROOT_DIR); ret = ToolRunner.run(conf1, new BackupDriver(), args); assertEquals("Backup should succeed", 0, ret); - waitForReplication(); // Ensure replication is complete before concluding setup + PITRTestUtil.waitForReplication(); // Ensure replication is complete before concluding setup // Reset time mocking to avoid affecting other tests EnvironmentEdgeManager.reset(); @@ -137,18 +127,18 @@ public class TestPointInTimeRestore extends TestBackupBase { @Test public void testPITR_FailsOutsideWindow() throws Exception { // Case 1: Requested restore time is in the future (should fail) - String[] args = buildPITRArgs(new TableName[] { table1 }, + String[] args = PITRTestUtil.buildPITRArgs(new TableName[] { table1 }, new TableName[] { TableName.valueOf("restoredTable1") }, - EnvironmentEdgeManager.currentTime() + ONE_DAY_IN_MILLISECONDS); + EnvironmentEdgeManager.currentTime() + ONE_DAY_IN_MILLISECONDS, null); int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args); assertNotEquals("Restore should fail since the requested restore time is in the future", 0, ret); // Case 2: Requested restore time is too old (beyond the retention window, should fail) - args = buildPITRArgs(new TableName[] { table1 }, + args = PITRTestUtil.buildPITRArgs(new TableName[] { table1 }, new TableName[] { TableName.valueOf("restoredTable1") }, - EnvironmentEdgeManager.currentTime() - 40 * ONE_DAY_IN_MILLISECONDS); + EnvironmentEdgeManager.currentTime() - 40 * ONE_DAY_IN_MILLISECONDS, null); ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args); assertNotEquals( @@ -162,9 +152,9 @@ public class TestPointInTimeRestore extends TestBackupBase { */ @Test public void testPointInTimeRestore_ContinuousBackupNotEnabledTables() throws Exception { - String[] args = buildPITRArgs(new TableName[] { table3 }, + String[] args = PITRTestUtil.buildPITRArgs(new TableName[] { table3 }, new TableName[] { TableName.valueOf("restoredTable1") }, - EnvironmentEdgeManager.currentTime() - 10 * ONE_DAY_IN_MILLISECONDS); + EnvironmentEdgeManager.currentTime() - 10 * ONE_DAY_IN_MILLISECONDS, null); int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args); assertNotEquals("Restore should fail since continuous backup is not enabled for the table", 0, @@ -176,9 +166,9 @@ public class TestPointInTimeRestore extends TestBackupBase { */ @Test public void testPointInTimeRestore_TablesWithNoProperBackupOrWals() throws Exception { - String[] args = buildPITRArgs(new TableName[] { table2 }, + String[] args = PITRTestUtil.buildPITRArgs(new TableName[] { table2 }, new TableName[] { TableName.valueOf("restoredTable1") }, - EnvironmentEdgeManager.currentTime() - 16 * ONE_DAY_IN_MILLISECONDS); + EnvironmentEdgeManager.currentTime() - 16 * ONE_DAY_IN_MILLISECONDS, null); int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args); assertNotEquals( @@ -194,15 +184,17 @@ public class TestPointInTimeRestore extends TestBackupBase { TableName restoredTable = TableName.valueOf("restoredTable"); // Perform restore operation - String[] args = buildPITRArgs(new TableName[] { table1 }, new TableName[] { restoredTable }, - EnvironmentEdgeManager.currentTime() - 5 * ONE_DAY_IN_MILLISECONDS); + String[] args = + PITRTestUtil.buildPITRArgs(new TableName[] { table1 }, new TableName[] { restoredTable }, + EnvironmentEdgeManager.currentTime() - 5 * ONE_DAY_IN_MILLISECONDS, null); int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args); assertEquals("Restore should succeed", 0, ret); // Validate that the restored table contains the same number of rows as the original table assertEquals("Restored table should have the same row count as the original", - getRowCount(table1), getRowCount(restoredTable)); + PITRTestUtil.getRowCount(TEST_UTIL, table1), + PITRTestUtil.getRowCount(TEST_UTIL, restoredTable)); } /** @@ -214,64 +206,19 @@ public class TestPointInTimeRestore extends TestBackupBase { TableName restoredTable2 = TableName.valueOf("restoredTable2"); // Perform restore operation for multiple tables - String[] args = buildPITRArgs(new TableName[] { table1, table2 }, + String[] args = PITRTestUtil.buildPITRArgs(new TableName[] { table1, table2 }, new TableName[] { restoredTable1, restoredTable2 }, - EnvironmentEdgeManager.currentTime() - 5 * ONE_DAY_IN_MILLISECONDS); + EnvironmentEdgeManager.currentTime() - 5 * ONE_DAY_IN_MILLISECONDS, null); int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args); assertEquals("Restore should succeed", 0, ret); // Validate that the restored tables contain the same number of rows as the originals assertEquals("Restored table1 should have the same row count as the original", - getRowCount(table1), getRowCount(restoredTable1)); + PITRTestUtil.getRowCount(TEST_UTIL, table1), + PITRTestUtil.getRowCount(TEST_UTIL, restoredTable1)); assertEquals("Restored table2 should have the same row count as the original", - getRowCount(table2), getRowCount(restoredTable2)); - } - - private String[] buildPITRArgs(TableName[] sourceTables, TableName[] targetTables, long endTime) { - String sourceTableNames = - Arrays.stream(sourceTables).map(TableName::getNameAsString).collect(Collectors.joining(",")); - - String targetTableNames = - Arrays.stream(targetTables).map(TableName::getNameAsString).collect(Collectors.joining(",")); - - return new String[] { "-" + OPTION_TABLE, sourceTableNames, "-" + OPTION_TABLE_MAPPING, - targetTableNames, "-" + OPTION_TO_DATETIME, String.valueOf(endTime) }; - } - - private static String[] buildBackupArgs(String backupType, TableName[] tables, - boolean continuousEnabled) { - String tableNames = - Arrays.stream(tables).map(TableName::getNameAsString).collect(Collectors.joining(",")); - - if (continuousEnabled) { - return new String[] { "create", backupType, BACKUP_ROOT_DIR, "-" + OPTION_TABLE, tableNames, - "-" + OPTION_ENABLE_CONTINUOUS_BACKUP }; - } else { - return new String[] { "create", backupType, BACKUP_ROOT_DIR, "-" + OPTION_TABLE, tableNames }; - } - } - - private static void loadRandomData(TableName tableName, int totalRows) throws IOException { - int rowSize = 32; - try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { - TEST_UTIL.loadRandomRows(table, famName, rowSize, totalRows); - } - } - - private static void waitForReplication() { - LOG.info("Waiting for replication to complete for {} ms", WAIT_FOR_REPLICATION_MS); - try { - Thread.sleep(WAIT_FOR_REPLICATION_MS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Thread was interrupted while waiting", e); - } - } - - private int getRowCount(TableName tableName) throws IOException { - try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { - return HBaseTestingUtil.countRows(table); - } + PITRTestUtil.getRowCount(TEST_UTIL, table2), + PITRTestUtil.getRowCount(TEST_UTIL, restoredTable2)); } } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestoreWithCustomBackupPath.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestoreWithCustomBackupPath.java new file mode 100644 index 00000000000..78c5ac94ba0 --- /dev/null +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestoreWithCustomBackupPath.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestPointInTimeRestoreWithCustomBackupPath extends TestBackupBase { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestPointInTimeRestoreWithCustomBackupPath.class); + + private static final String backupWalDirName = "TestCustomBackupWalDir"; + private static final String customBackupDirName = "CustomBackupRoot"; + + private static Path backupWalDir; + private static Path customBackupDir; + private static FileSystem fs; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + Path root = TEST_UTIL.getDataTestDirOnTestFS(); + backupWalDir = new Path(root, backupWalDirName); + customBackupDir = new Path(root, customBackupDirName); + + fs = FileSystem.get(conf1); + fs.mkdirs(backupWalDir); + fs.mkdirs(customBackupDir); + + conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString()); + + createAndCopyBackupData(); + } + + private static void createAndCopyBackupData() throws Exception { + // Simulate time 10 days ago + EnvironmentEdgeManager + .injectEdge(() -> System.currentTimeMillis() - 10 * ONE_DAY_IN_MILLISECONDS); + PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); + + // Perform backup with continuous backup enabled + String[] args = + PITRTestUtil.buildBackupArgs("full", new TableName[] { table1 }, true, BACKUP_ROOT_DIR); + int ret = ToolRunner.run(conf1, new BackupDriver(), args); + assertEquals("Backup should succeed", 0, ret); + + PITRTestUtil.waitForReplication(); + + // Copy the contents of BACKUP_ROOT_DIR to the new customBackupDir + Path defaultBackupDir = new Path(BACKUP_ROOT_DIR); + for (FileStatus status : fs.listStatus(defaultBackupDir)) { + Path dst = new Path(customBackupDir, status.getPath().getName()); + FileUtil.copy(fs, status, fs, dst, true, false, conf1); + } + + EnvironmentEdgeManager.reset(); + } + + @AfterClass + public static void cleanupAfterClass() throws IOException { + if (fs.exists(backupWalDir)) { + fs.delete(backupWalDir, true); + } + if (fs.exists(customBackupDir)) { + fs.delete(customBackupDir, true); + } + + conf1.unset(CONF_CONTINUOUS_BACKUP_WAL_DIR); + } + + @Test + public void testPITR_FromCustomBackupRootDir() throws Exception { + TableName restoredTable = TableName.valueOf("restoredTableCustomPath"); + + long restoreTime = EnvironmentEdgeManager.currentTime() - 2 * ONE_DAY_IN_MILLISECONDS; + + String[] args = PITRTestUtil.buildPITRArgs(new TableName[] { table1 }, + new TableName[] { restoredTable }, restoreTime, customBackupDir.toString()); + + int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args); + assertEquals("PITR should succeed with custom backup root dir", 0, ret); + + // Validate that the restored table has same row count + assertEquals("Restored table should match row count", + PITRTestUtil.getRowCount(TEST_UTIL, table1), + PITRTestUtil.getRowCount(TEST_UTIL, restoredTable)); + } +}