vinayakphegde commented on code in PR #7400:
URL: https://github.com/apache/hbase/pull/7400#discussion_r2451833630
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java:
##########
@@ -469,7 +479,7 @@ protected void convertWALsToHFiles() throws IOException {
}
}
- private List<String> getBackupLogs(long startTs) throws IOException {
+ private List<String> getBackupLogs(long startTs, long endTs) throws
IOException {
Review Comment:
Let's avoid duplicating code. We already have similar functionality for
retrieving log files within a time range in
`org.apache.hadoop.hbase.backup.impl.AbstractPitrRestoreHandler#getValidWalDirs`.
Can we use that instead? We could move the file to a common location such as
`src/main/java/org/apache/hadoop/hbase/backup/util`.
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java:
##########
@@ -113,6 +114,12 @@ protected void beginBackup(BackupManager backupManager,
BackupInfo backupInfo)
// set the start timestamp of the overall backup
long startTs = EnvironmentEdgeManager.currentTime();
backupInfo.setStartTs(startTs);
+ if (backupInfo.getType() == BackupType.INCREMENTAL &&
backupInfo.isContinuousBackupEnabled()) {
Review Comment:
Why was this logic added to TableBackupClient? Wouldn't it be more
appropriate to place it in IncrementalTableBackupClient?
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java:
##########
@@ -137,89 +139,88 @@ protected static int getIndex(TableName tbl,
List<TableName> sTableList) {
* the backup is marked as complete.
* @param tablesToBackup list of tables to be backed up
*/
- protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup)
throws IOException {
+ protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup,
+ Map<TableName, List<String>> tablesToWALFileList, Map<TableName, Long>
tablesToPrevBackupTs)
+ throws IOException {
Map<TableName, MergeSplitBulkloadInfo> toBulkload = new HashMap<>();
- List<BulkLoad> bulkLoads;
- if (backupInfo.isContinuousBackupEnabled()) {
- bulkLoads =
- backupManager.readBulkloadRows(tablesToBackup,
backupInfo.getIncrCommittedWalTs());
- } else {
- bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
- }
+ List<BulkLoad> bulkLoads = new ArrayList<>();
+
FileSystem tgtFs;
try {
tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
} catch (URISyntaxException use) {
throw new IOException("Unable to get FileSystem", use);
}
+
Path rootdir = CommonFSUtils.getRootDir(conf);
Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
- for (BulkLoad bulkLoad : bulkLoads) {
- TableName srcTable = bulkLoad.getTableName();
- MergeSplitBulkloadInfo bulkloadInfo =
- toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new);
- String regionName = bulkLoad.getRegion();
- String fam = bulkLoad.getColumnFamily();
- String filename = FilenameUtils.getName(bulkLoad.getHfilePath());
+ if (!backupInfo.isContinuousBackupEnabled()) {
+ bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
+ for (BulkLoad bulkLoad : bulkLoads) {
+ TableName srcTable = bulkLoad.getTableName();
+ MergeSplitBulkloadInfo bulkloadInfo =
+ toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new);
+ String regionName = bulkLoad.getRegion();
+ String fam = bulkLoad.getColumnFamily();
+ String filename = FilenameUtils.getName(bulkLoad.getHfilePath());
+
+ if (!tablesToBackup.contains(srcTable)) {
+ LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
+ continue;
+ }
+ Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
+ Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam +
Path.SEPARATOR + filename);
+
+ String srcTableQualifier = srcTable.getQualifierAsString();
+ String srcTableNs = srcTable.getNamespaceAsString();
+ Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR +
srcTableQualifier
+ + Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
+ if (!tgtFs.mkdirs(tgtFam)) {
+ throw new IOException("couldn't create " + tgtFam);
+ }
+ Path tgt = new Path(tgtFam, filename);
+
+ Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable,
regionName, fam);
+ Path archive = new Path(archiveDir, filename);
- if (!tablesToBackup.contains(srcTable)) {
- LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
- continue;
- }
- Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
- Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam +
Path.SEPARATOR + filename);
-
- // For continuous backup: bulkload files are copied from backup
directory defined by
- // CONF_CONTINUOUS_BACKUP_WAL_DIR instead of source cluster.
- String backupRootDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
- if (backupInfo.isContinuousBackupEnabled() &&
!Strings.isNullOrEmpty(backupRootDir)) {
- String dayDirectoryName =
BackupUtils.formatToDateString(bulkLoad.getTimestamp());
- Path bulkLoadBackupPath =
- new Path(backupRootDir, BULKLOAD_FILES_DIR + Path.SEPARATOR +
dayDirectoryName);
- Path bulkLoadDir = new Path(bulkLoadBackupPath,
- srcTable.getNamespaceAsString() + Path.SEPARATOR +
srcTable.getNameAsString());
- FileSystem backupFs = FileSystem.get(bulkLoadDir.toUri(), conf);
- Path fullBulkLoadBackupPath =
- new Path(bulkLoadDir, regionName + Path.SEPARATOR + fam +
Path.SEPARATOR + filename);
- if (backupFs.exists(fullBulkLoadBackupPath)) {
- LOG.debug("Backup bulkload file found {}", fullBulkLoadBackupPath);
- p = fullBulkLoadBackupPath;
- } else {
- LOG.warn("Backup bulkload file not found {}",
fullBulkLoadBackupPath);
+ if (fs.exists(p)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("found bulk hfile {} in {} for {}",
bulkLoad.getHfilePath(), p.getParent(),
+ srcTableQualifier);
+ LOG.trace("copying {} to {}", p, tgt);
+ }
+ bulkloadInfo.addActiveFile(p.toString());
+ } else if (fs.exists(archive)) {
+ LOG.debug("copying archive {} to {}", archive, tgt);
+ bulkloadInfo.addArchiveFiles(archive.toString());
}
}
- String srcTableQualifier = srcTable.getQualifierAsString();
- String srcTableNs = srcTable.getNamespaceAsString();
- Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR +
srcTableQualifier
- + Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
- if (!tgtFs.mkdirs(tgtFam)) {
- throw new IOException("couldn't create " + tgtFam);
+ for (MergeSplitBulkloadInfo bulkloadInfo : toBulkload.values()) {
+ mergeSplitAndCopyBulkloadedHFiles(bulkloadInfo.getActiveFiles(),
+ bulkloadInfo.getArchiveFiles(), bulkloadInfo.getSrcTable(), tgtFs);
}
- Path tgt = new Path(tgtFam, filename);
+ } else {
+ // Continuous incremental backup: run BulkLoadCollectorJob over
backed-up WALs
+ Path collectorOutput = new Path(getBulkOutputDir(),
BULKLOAD_COLLECTOR_OUTPUT);
+ for (TableName table : tablesToBackup) {
+ String walDirsCsv = String.join(",", tablesToWALFileList.get(table));
- Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable,
regionName, fam);
- Path archive = new Path(archiveDir, filename);
+ List<Path> bulkloadPaths =
+ BulkFilesCollector.collectFromWalDirs(conf, walDirsCsv,
collectorOutput, table, table,
Review Comment:
Rather than calling `BulkFilesCollector `directly, we can use the
`org.apache.hadoop.hbase.backup.impl.AbstractPitrRestoreHandler#collectBulkFiles()`
method, which serves as a higher-level approach and internally invokes
`BulkFilesCollector.collectFromWalDirs()`. This helps us avoid duplicating
code. In both restore and incremental backup scenarios, we need to extract
bulkload files by reading WAL files within a given time range, so it makes
sense to have a single logic for this. We should consider placing this common
logic in a utility class under the util package.
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java:
##########
@@ -96,8 +96,11 @@ private void registerBulkLoad(ObserverContext<? extends
RegionCoprocessorEnviron
try (Connection connection = ConnectionFactory.createConnection(cfg);
BackupSystemTable tbl = new BackupSystemTable(connection)) {
Set<TableName> fullyBackedUpTables = tbl.getTablesIncludedInBackups();
+ Map<TableName, Long> continuousBackupTableSet =
tbl.getContinuousBackupTableSet();
- if (fullyBackedUpTables.contains(tableName)) {
+ if (
+ fullyBackedUpTables.contains(tableName) &&
!continuousBackupTableSet.containsKey(tableName)
Review Comment:
I have a suggestion. Perhaps we could add a comment stating that for
continuous backup, this isn't necessary, as everything will be utilized from
the WAL backup location.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]