This is an automated email from the ASF dual-hosted git repository. andor pushed a commit to branch HBASE-28957 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 416c8a62eabd166c8bd35979f17994fc14cc5062 Author: asolomon <ankitsolo...@gmail.com> AuthorDate: Sat Jun 21 00:32:14 2025 +0530 HBASE-28990 Modify Incremental Backup for Continuous Backup (#6788) Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org> Signed-off-by: Andor Molnár an...@apache.org Reviewed by: Kota-SH <shanmukhaharipr...@gmail.com> Reviewed by: Vinayak Hegde <vinayakph...@gmail.com> Reviewed by: Kevin Geiszler <kevin.j.geisz...@gmail.com> --- .../org/apache/hadoop/hbase/backup/BackupInfo.java | 14 ++ .../hadoop/hbase/backup/impl/BackupAdminImpl.java | 58 +++-- .../backup/impl/IncrementalTableBackupClient.java | 165 ++++++++++--- .../apache/hadoop/hbase/backup/TestBackupBase.java | 29 ++- .../hadoop/hbase/backup/TestBackupDescribe.java | 1 + .../hadoop/hbase/backup/TestContinuousBackup.java | 15 +- .../TestIncrementalBackupWithContinuous.java | 254 +++++++++++++++++++++ 7 files changed, 472 insertions(+), 64 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java index 862a9cbad10..0997aec19ec 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java @@ -124,6 +124,11 @@ public class BackupInfo implements Comparable<BackupInfo> { */ private long completeTs; + /** + * Committed WAL timestamp for incremental backup + */ + private long incrCommittedWalTs; + /** * Total bytes of incremental logs copied */ @@ -293,6 +298,14 @@ public class BackupInfo implements Comparable<BackupInfo> { this.completeTs = endTs; } + public long getIncrCommittedWalTs() { + return incrCommittedWalTs; + } + + public void setIncrCommittedWalTs(long timestamp) { + this.incrCommittedWalTs = timestamp; + } + public long getTotalBytesCopied() { return totalBytesCopied; } @@ -549,6 +562,7 @@ public class BackupInfo implements Comparable<BackupInfo> { sb.append("{"); sb.append("ID=" + backupId).append(","); sb.append("Type=" + getType()).append(","); + sb.append("IsContinuous=" + isContinuousBackupEnabled()).append(","); sb.append("Tables=" + getTableListAsString()).append(","); sb.append("State=" + getState()).append(","); Calendar cal = Calendar.getInstance(); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java index e94389d6938..1e91258ba6c 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java @@ -859,28 +859,47 @@ public class BackupAdminImpl implements BackupAdmin { String backupId = BackupRestoreConstants.BACKUPID_PREFIX + EnvironmentEdgeManager.currentTime(); if (type == BackupType.INCREMENTAL) { - Set<TableName> incrTableSet; - try (BackupSystemTable table = new BackupSystemTable(conn)) { - incrTableSet = table.getIncrementalBackupTableSet(targetRootDir); - } + if (request.isContinuousBackupEnabled()) { + Set<TableName> continuousBackupTableSet; + try (BackupSystemTable table = new BackupSystemTable(conn)) { + continuousBackupTableSet = table.getContinuousBackupTableSet().keySet(); + } + if (continuousBackupTableSet.isEmpty()) { + String msg = "Continuous backup table set contains no tables. " + + "You need to run Continuous backup first " + + (tableList != null ? "on " + StringUtils.join(tableList, ",") : ""); + throw new IOException(msg); + } + if (!continuousBackupTableSet.containsAll(tableList)) { + String extraTables = StringUtils.join(tableList, ","); + String msg = "Some tables (" + extraTables + ") haven't gone through Continuous backup. " + + "Perform Continuous backup on " + extraTables + " first, then retry the command"; + throw new IOException(msg); + } + } else { + Set<TableName> incrTableSet; + try (BackupSystemTable table = new BackupSystemTable(conn)) { + incrTableSet = table.getIncrementalBackupTableSet(targetRootDir); + } - if (incrTableSet.isEmpty()) { - String msg = - "Incremental backup table set contains no tables. " + "You need to run full backup first " + if (incrTableSet.isEmpty()) { + String msg = "Incremental backup table set contains no tables. " + + "You need to run full backup first " + (tableList != null ? "on " + StringUtils.join(tableList, ",") : ""); - throw new IOException(msg); - } - if (tableList != null) { - tableList.removeAll(incrTableSet); - if (!tableList.isEmpty()) { - String extraTables = StringUtils.join(tableList, ","); - String msg = "Some tables (" + extraTables + ") haven't gone through full backup. " - + "Perform full backup on " + extraTables + " first, " + "then retry the command"; throw new IOException(msg); } + if (tableList != null) { + tableList.removeAll(incrTableSet); + if (!tableList.isEmpty()) { + String extraTables = StringUtils.join(tableList, ","); + String msg = "Some tables (" + extraTables + ") haven't gone through full backup. " + + "Perform full backup on " + extraTables + " first, then retry the command"; + throw new IOException(msg); + } + } + tableList = Lists.newArrayList(incrTableSet); } - tableList = Lists.newArrayList(incrTableSet); } if (tableList != null && !tableList.isEmpty()) { for (TableName table : tableList) { @@ -907,7 +926,12 @@ public class BackupAdminImpl implements BackupAdmin { } } if (nonExistingTableList != null) { - if (type == BackupType.INCREMENTAL) { + // Non-continuous incremental backup is controlled by 'incremental backup table set' + // and not by user provided backup table list. This is an optimization to avoid copying + // the same set of WALs for incremental backups of different tables at different times + // HBASE-14038. Since continuous incremental backup and full backup backs-up user provided + // table list, we should inform use about non-existence of input table(s) + if (type == BackupType.INCREMENTAL && !request.isContinuousBackupEnabled()) { // Update incremental backup set tableList = excludeNonExistingTables(tableList, nonExistingTableList); } else { diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index bbc32b2ef8f..4bb04b11261 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -17,18 +17,28 @@ */ package org.apache.hadoop.hbase.backup.impl; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY; +import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TimeZone; import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -48,6 +58,7 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.mapreduce.WALInputFormat; import org.apache.hadoop.hbase.mapreduce.WALPlayer; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotManifest; @@ -62,6 +73,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Strings; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -274,9 +286,19 @@ public class IncrementalTableBackupClient extends TableBackupClient { // case PREPARE_INCREMENTAL: beginBackup(backupManager, backupInfo); backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL); - LOG.debug("For incremental backup, current table set is " - + backupManager.getIncrementalBackupTableSet()); - newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap(); + // Non-continuous Backup incremental backup is controlled by 'incremental backup table set' + // and not by user provided backup table list. This is an optimization to avoid copying + // the same set of WALs for incremental backups of different tables at different times + // HBASE-14038 + // Continuous-incremental backup backs up user provided table list/set + Set<TableName> currentTableSet; + if (backupInfo.isContinuousBackupEnabled()) { + currentTableSet = backupInfo.getTables(); + } else { + currentTableSet = backupManager.getIncrementalBackupTableSet(); + newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap(); + } + LOG.debug("For incremental backup, the current table set is {}", currentTableSet); } catch (Exception e) { // fail the overall backup and return failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ", @@ -303,21 +325,24 @@ public class IncrementalTableBackupClient extends TableBackupClient { // set overall backup status: complete. Here we make sure to complete the backup. // After this checkpoint, even if entering cancel process, will let the backup finished try { - // Set the previousTimestampMap which is before this current log roll to the manifest. - Map<TableName, Map<String, Long>> previousTimestampMap = backupManager.readLogTimestampMap(); - backupInfo.setIncrTimestampMap(previousTimestampMap); - - // The table list in backupInfo is good for both full backup and incremental backup. - // For incremental backup, it contains the incremental backup table set. - backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps); - - Map<TableName, Map<String, Long>> newTableSetTimestampMap = - backupManager.readLogTimestampMap(); - - backupInfo.setTableSetTimestampMap(newTableSetTimestampMap); - Long newStartCode = - BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap)); - backupManager.writeBackupStartCode(newStartCode); + if (!backupInfo.isContinuousBackupEnabled()) { + // Set the previousTimestampMap which is before this current log roll to the manifest. + Map<TableName, Map<String, Long>> previousTimestampMap = + backupManager.readLogTimestampMap(); + backupInfo.setIncrTimestampMap(previousTimestampMap); + + // The table list in backupInfo is good for both full backup and incremental backup. + // For incremental backup, it contains the incremental backup table set. + backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps); + + Map<TableName, Map<String, Long>> newTableSetTimestampMap = + backupManager.readLogTimestampMap(); + + backupInfo.setTableSetTimestampMap(newTableSetTimestampMap); + Long newStartCode = + BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap)); + backupManager.writeBackupStartCode(newStartCode); + } List<BulkLoad> bulkLoads = handleBulkLoad(backupInfo.getTableNames()); @@ -374,23 +399,88 @@ public class IncrementalTableBackupClient extends TableBackupClient { } protected void convertWALsToHFiles() throws IOException { - // get incremental backup file list and prepare parameters for DistCp - List<String> incrBackupFileList = backupInfo.getIncrBackupFileList(); - // Get list of tables in incremental backup set - Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet(); - // filter missing files out (they have been copied by previous backups) - incrBackupFileList = filterMissingFiles(incrBackupFileList); - List<String> tableList = new ArrayList<String>(); - for (TableName table : tableSet) { - // Check if table exists - if (tableExists(table, conn)) { - tableList.add(table.getNameAsString()); - } else { - LOG.warn("Table " + table + " does not exists. Skipping in WAL converter"); + long previousBackupTs = 0L; + if (backupInfo.isContinuousBackupEnabled()) { + Set<TableName> tableSet = backupInfo.getTables(); + List<BackupInfo> backupInfos = backupManager.getBackupHistory(true); + for (TableName table : tableSet) { + for (BackupInfo backup : backupInfos) { + // find previous backup for this table + if (backup.getTables().contains(table)) { + LOG.info("Found previous backup of type {} with id {} for table {}", backup.getType(), + backup.getBackupId(), table.getNameAsString()); + List<String> walBackupFileList; + if (backup.getType() == BackupType.FULL) { + previousBackupTs = backup.getStartTs(); + } else { + previousBackupTs = backup.getIncrCommittedWalTs(); + } + walBackupFileList = getBackupLogs(previousBackupTs); + walToHFiles(walBackupFileList, Arrays.asList(table.getNameAsString()), + previousBackupTs); + break; + } + } } + } else { + // get incremental backup file list and prepare parameters for DistCp + List<String> incrBackupFileList = backupInfo.getIncrBackupFileList(); + // Get list of tables in incremental backup set + Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet(); + // filter missing files out (they have been copied by previous backups) + incrBackupFileList = filterMissingFiles(incrBackupFileList); + List<String> tableList = new ArrayList<String>(); + for (TableName table : tableSet) { + // Check if table exists + if (tableExists(table, conn)) { + tableList.add(table.getNameAsString()); + } else { + LOG.warn("Table " + table + " does not exists. Skipping in WAL converter"); + } + } + walToHFiles(incrBackupFileList, tableList, previousBackupTs); } - walToHFiles(incrBackupFileList, tableList); + } + private List<String> getBackupLogs(long startTs) throws IOException { + // get log files from backup dir + String walBackupDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); + if (Strings.isNullOrEmpty(walBackupDir)) { + throw new IOException( + "Incremental backup requires the WAL backup directory " + CONF_CONTINUOUS_BACKUP_WAL_DIR); + } + List<String> resultLogFiles = new ArrayList<>(); + Path walBackupPath = new Path(walBackupDir); + FileSystem backupFs = FileSystem.get(walBackupPath.toUri(), conf); + FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, WALS_DIR)); + SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + + for (FileStatus dayDir : dayDirs) { + if (!dayDir.isDirectory()) { + continue; // Skip files, only process directories + } + + String dirName = dayDir.getPath().getName(); + try { + Date dirDate = dateFormat.parse(dirName); + long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00) + long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End time of day (23:59:59) + + if (dirEndTime >= startTs) { + Path dirPath = dayDir.getPath(); + FileStatus[] logs = backupFs.listStatus(dirPath); + for (FileStatus log : logs) { + String filepath = log.getPath().toString(); + LOG.debug("Found WAL file: {}", filepath); + resultLogFiles.add(filepath); + } + } + } catch (ParseException e) { + LOG.warn("Skipping invalid directory name: " + dirName, e); + } + } + return resultLogFiles; } protected boolean tableExists(TableName table, Connection conn) throws IOException { @@ -399,7 +489,8 @@ public class IncrementalTableBackupClient extends TableBackupClient { } } - protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws IOException { + protected void walToHFiles(List<String> dirPaths, List<String> tableList, long previousBackupTs) + throws IOException { Tool player = new WALPlayer(); // Player reads all files in arbitrary directory structure and creates @@ -413,6 +504,14 @@ public class IncrementalTableBackupClient extends TableBackupClient { conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); conf.set(JOB_NAME_CONF_KEY, jobname); + if (backupInfo.isContinuousBackupEnabled()) { + conf.set(WALInputFormat.START_TIME_KEY, Long.toString(previousBackupTs)); + // committedWALsTs is needed only for Incremental backups with continuous backup + // since these do not depend on log roll ts + long committedWALsTs = BackupUtils.getReplicationCheckpoint(conn); + backupInfo.setIncrCommittedWalTs(committedWALsTs); + conf.set(WALInputFormat.END_TIME_KEY, Long.toString(committedWALsTs)); + } String[] playerArgs = { dirs, StringUtils.join(tableList, ",") }; try { diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index e5c78d04854..684b701daa7 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.backup; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INITIAL_DELAY; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INTERVAL; @@ -414,15 +415,31 @@ public class TestBackupBase { return request; } + protected BackupRequest createBackupRequest(BackupType type, List<TableName> tables, String path, + boolean noChecksumVerify, boolean continuousBackupEnabled) { + BackupRequest.Builder builder = new BackupRequest.Builder(); + BackupRequest request = builder.withBackupType(type).withTableList(tables) + .withTargetRootDir(path).withNoChecksumVerify(noChecksumVerify) + .withContinuousBackupEnabled(continuousBackupEnabled).build(); + return request; + } + protected String backupTables(BackupType type, List<TableName> tables, String path) throws IOException { + return backupTables(type, tables, path, false); + } + + protected String backupTables(BackupType type, List<TableName> tables, String path, + boolean isContinuousBackup) throws IOException { Connection conn = null; BackupAdmin badmin = null; String backupId; try { conn = ConnectionFactory.createConnection(conf1); badmin = new BackupAdminImpl(conn); - BackupRequest request = createBackupRequest(type, new ArrayList<>(tables), path); + + BackupRequest request = + createBackupRequest(type, new ArrayList<>(tables), path, false, isContinuousBackup); backupId = badmin.backupTables(request); } finally { if (badmin != null) { @@ -554,4 +571,14 @@ public class TestBackupBase { LOG.debug(Objects.toString(it.next().getPath())); } } + + void deleteContinuousBackupReplicationPeerIfExists(Admin admin) throws IOException { + if ( + admin.listReplicationPeers().stream() + .anyMatch(peer -> peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER)) + ) { + admin.disableReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER); + admin.removeReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER); + } + } } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDescribe.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDescribe.java index 7ce039fd666..6084dc730ee 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDescribe.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDescribe.java @@ -94,6 +94,7 @@ public class TestBackupDescribe extends TestBackupBase { System.setOut(new PrintStream(baos)); String[] args = new String[] { "describe", backupId }; + // Run backup int ret = ToolRunner.run(conf1, new BackupDriver(), args); assertTrue(ret == 0); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java index fe44ebf420d..0cc34ed63eb 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java @@ -259,8 +259,7 @@ public class TestContinuousBackup extends TestBackupBase { } } - private String[] buildBackupArgs(String backupType, TableName[] tables, - boolean continuousEnabled) { + String[] buildBackupArgs(String backupType, TableName[] tables, boolean continuousEnabled) { String tableNames = Arrays.stream(tables).map(TableName::getNameAsString).collect(Collectors.joining(",")); @@ -272,7 +271,7 @@ public class TestContinuousBackup extends TestBackupBase { } } - private BackupManifest getLatestBackupManifest(List<BackupInfo> backups) throws IOException { + BackupManifest getLatestBackupManifest(List<BackupInfo> backups) throws IOException { BackupInfo newestBackup = backups.get(0); return HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR), newestBackup.getBackupId()); @@ -289,14 +288,4 @@ public class TestContinuousBackup extends TestBackupBase { } } - private void deleteContinuousBackupReplicationPeerIfExists(Admin admin) throws IOException { - if ( - admin.listReplicationPeers().stream() - .anyMatch(peer -> peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER)) - ) { - admin.disableReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER); - admin.removeReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER); - } - } - } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java new file mode 100644 index 00000000000..79d1df645b9 --- /dev/null +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT; +import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; +import org.apache.hadoop.hbase.backup.impl.BackupManifest; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.impl.BulkLoad; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.tool.BulkLoadHFiles; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.HFileTestUtil; +import org.apache.hadoop.util.ToolRunner; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; + +@Category(LargeTests.class) +public class TestIncrementalBackupWithContinuous extends TestContinuousBackup { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestIncrementalBackupWithContinuous.class); + + private static final Logger LOG = + LoggerFactory.getLogger(TestIncrementalBackupWithContinuous.class); + + private byte[] ROW = Bytes.toBytes("row1"); + private final byte[] FAMILY = Bytes.toBytes("family"); + private final byte[] COLUMN = Bytes.toBytes("col"); + private static final int ROWS_IN_BULK_LOAD = 100; + + @Test + public void testContinuousBackupWithIncrementalBackupSuccess() throws Exception { + LOG.info("Testing incremental backup with continuous backup"); + conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true); + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + TableName tableName = TableName.valueOf("table_" + methodName); + Table t1 = TEST_UTIL.createTable(tableName, FAMILY); + + try (BackupSystemTable table = new BackupSystemTable(TEST_UTIL.getConnection())) { + int before = table.getBackupHistory().size(); + + // Run continuous backup + String[] args = buildBackupArgs("full", new TableName[] { tableName }, true); + int ret = ToolRunner.run(conf1, new BackupDriver(), args); + assertEquals("Full Backup should succeed", 0, ret); + + // Verify backup history increased and all the backups are succeeded + LOG.info("Verify backup history increased and all the backups are succeeded"); + List<BackupInfo> backups = table.getBackupHistory(); + assertEquals("Backup history should increase", before + 1, backups.size()); + for (BackupInfo data : List.of(backups.get(0))) { + String backupId = data.getBackupId(); + assertTrue(checkSucceeded(backupId)); + } + + // Verify backup manifest contains the correct tables + LOG.info("Verify backup manifest contains the correct tables"); + BackupManifest manifest = getLatestBackupManifest(backups); + assertEquals("Backup should contain the expected tables", Sets.newHashSet(tableName), + new HashSet<>(manifest.getTableList())); + + Put p = new Put(ROW); + p.addColumn(FAMILY, COLUMN, COLUMN); + t1.put(p); + Thread.sleep(5000); + + // Run incremental backup + LOG.info("Run incremental backup now"); + before = table.getBackupHistory().size(); + args = buildBackupArgs("incremental", new TableName[] { tableName }, false); + ret = ToolRunner.run(conf1, new BackupDriver(), args); + assertEquals("Incremental Backup should succeed", 0, ret); + LOG.info("Incremental backup completed"); + + // Verify backup history increased and all the backups are succeeded + backups = table.getBackupHistory(); + String incrementalBackupid = null; + assertEquals("Backup history should increase", before + 1, backups.size()); + for (BackupInfo data : List.of(backups.get(0))) { + String backupId = data.getBackupId(); + incrementalBackupid = backupId; + assertTrue(checkSucceeded(backupId)); + } + + TEST_UTIL.truncateTable(tableName); + // Restore incremental backup + TableName[] tables = new TableName[] { tableName }; + BackupAdminImpl client = new BackupAdminImpl(TEST_UTIL.getConnection()); + client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, incrementalBackupid, false, + tables, tables, true)); + + verifyTable(t1); + conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT); + } + } + + @Test + public void testContinuousBackupWithIncrementalBackupAndBulkloadSuccess() throws Exception { + conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true); + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) { + // The test starts with some data, and no bulk loaded rows. + int expectedRowCount = NB_ROWS_IN_BATCH; + assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); + assertTrue(systemTable.readBulkloadRows(List.of(table1)).isEmpty()); + + // Bulk loads aren't tracked if the table isn't backed up yet + performBulkLoad("bulk1", methodName); + expectedRowCount += ROWS_IN_BULK_LOAD; + assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); + assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size()); + + // Create a backup, bulk loads are now being tracked + String backup1 = backupTables(BackupType.FULL, List.of(table1), BACKUP_ROOT_DIR, true); + assertTrue(checkSucceeded(backup1)); + + loadTable(TEST_UTIL.getConnection().getTable(table1)); + assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); + performBulkLoad("bulk2", methodName); + expectedRowCount += ROWS_IN_BULK_LOAD; + assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); + assertEquals(1, systemTable.readBulkloadRows(List.of(table1)).size()); + + // Creating an incremental backup clears the bulk loads + performBulkLoad("bulk4", methodName); + performBulkLoad("bulk5", methodName); + performBulkLoad("bulk6", methodName); + expectedRowCount += 3 * ROWS_IN_BULK_LOAD; + assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); + assertEquals(4, systemTable.readBulkloadRows(List.of(table1)).size()); + String backup2 = backupTables(BackupType.INCREMENTAL, List.of(table1), BACKUP_ROOT_DIR, true); + assertTrue(checkSucceeded(backup2)); + assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); + assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size()); + int rowCountAfterBackup2 = expectedRowCount; + + // Doing another bulk load, to check that this data will disappear after a restore operation + performBulkLoad("bulk7", methodName); + expectedRowCount += ROWS_IN_BULK_LOAD; + assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); + List<BulkLoad> bulkloadsTemp = systemTable.readBulkloadRows(List.of(table1)); + assertEquals(1, bulkloadsTemp.size()); + BulkLoad bulk7 = bulkloadsTemp.get(0); + + // Doing a restore. Overwriting the table implies clearing the bulk loads, + // but the loading of restored data involves loading bulk data, we expect 2 bulk loads + // associated with backup 3 (loading of full backup, loading of incremental backup). + BackupAdmin client = getBackupAdmin(); + client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup2, false, + new TableName[] { table1 }, new TableName[] { table1 }, true)); + assertEquals(rowCountAfterBackup2, TEST_UTIL.countRows(table1)); + List<BulkLoad> bulkLoads = systemTable.readBulkloadRows(List.of(table1)); + assertEquals(3, bulkLoads.size()); + conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT); + } + } + + private void verifyTable(Table t1) throws IOException { + Get g = new Get(ROW); + Result r = t1.get(g); + assertEquals(1, r.size()); + assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN)); + } + + private void performBulkLoad(String keyPrefix, String testDir) throws IOException { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path baseDirectory = TEST_UTIL.getDataTestDirOnTestFS(testDir); + Path hfilePath = + new Path(baseDirectory, Bytes.toString(famName) + Path.SEPARATOR + "hfile_" + keyPrefix); + + HFileTestUtil.createHFile(TEST_UTIL.getConfiguration(), fs, hfilePath, famName, qualName, + Bytes.toBytes(keyPrefix), Bytes.toBytes(keyPrefix + "z"), ROWS_IN_BULK_LOAD); + + listFiles(fs, baseDirectory, baseDirectory); + + Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result = + BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(table1, baseDirectory); + assertFalse(result.isEmpty()); + } + + private static Set<String> listFiles(final FileSystem fs, final Path root, final Path dir) + throws IOException { + Set<String> files = new HashSet<>(); + FileStatus[] list = CommonFSUtils.listStatus(fs, dir); + if (list != null) { + for (FileStatus fstat : list) { + if (fstat.isDirectory()) { + LOG.info("Found directory {}", Objects.toString(fstat.getPath())); + files.addAll(listFiles(fs, root, fstat.getPath())); + } else { + LOG.info("Found file {}", Objects.toString(fstat.getPath())); + String file = fstat.getPath().makeQualified(fs).toString(); + files.add(file); + } + } + } + return files; + } + + protected static void loadTable(Table table) throws Exception { + Put p; // 100 + 1 row to t1_syncup + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(famName, qualName, Bytes.toBytes("val" + i)); + table.put(p); + } + } +}