vinayakphegde commented on code in PR #7400:
URL: https://github.com/apache/hbase/pull/7400#discussion_r2451833630


##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java:
##########
@@ -469,7 +479,7 @@ protected void convertWALsToHFiles() throws IOException {
     }
   }
 
-  private List<String> getBackupLogs(long startTs) throws IOException {
+  private List<String> getBackupLogs(long startTs, long endTs) throws 
IOException {

Review Comment:
   Let's avoid duplicating code. We already have similar functionality for 
retrieving log files within a time range in 
`org.apache.hadoop.hbase.backup.impl.AbstractPitrRestoreHandler#getValidWalDirs`.
 Can we use that instead? We could move the file to a common location such as 
`src/main/java/org/apache/hadoop/hbase/backup/util`.



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java:
##########
@@ -113,6 +114,12 @@ protected void beginBackup(BackupManager backupManager, 
BackupInfo backupInfo)
     // set the start timestamp of the overall backup
     long startTs = EnvironmentEdgeManager.currentTime();
     backupInfo.setStartTs(startTs);
+    if (backupInfo.getType() == BackupType.INCREMENTAL && 
backupInfo.isContinuousBackupEnabled()) {

Review Comment:
   Why was this logic added to TableBackupClient? Wouldn't it be more 
appropriate to place it in IncrementalTableBackupClient?



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java:
##########
@@ -137,89 +139,88 @@ protected static int getIndex(TableName tbl, 
List<TableName> sTableList) {
    * the backup is marked as complete.
    * @param tablesToBackup list of tables to be backed up
    */
-  protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup) 
throws IOException {
+  protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup,
+    Map<TableName, List<String>> tablesToWALFileList, Map<TableName, Long> 
tablesToPrevBackupTs)
+    throws IOException {
     Map<TableName, MergeSplitBulkloadInfo> toBulkload = new HashMap<>();
-    List<BulkLoad> bulkLoads;
-    if (backupInfo.isContinuousBackupEnabled()) {
-      bulkLoads =
-        backupManager.readBulkloadRows(tablesToBackup, 
backupInfo.getIncrCommittedWalTs());
-    } else {
-      bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
-    }
+    List<BulkLoad> bulkLoads = new ArrayList<>();
+
     FileSystem tgtFs;
     try {
       tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
     } catch (URISyntaxException use) {
       throw new IOException("Unable to get FileSystem", use);
     }
+
     Path rootdir = CommonFSUtils.getRootDir(conf);
     Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
 
-    for (BulkLoad bulkLoad : bulkLoads) {
-      TableName srcTable = bulkLoad.getTableName();
-      MergeSplitBulkloadInfo bulkloadInfo =
-        toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new);
-      String regionName = bulkLoad.getRegion();
-      String fam = bulkLoad.getColumnFamily();
-      String filename = FilenameUtils.getName(bulkLoad.getHfilePath());
+    if (!backupInfo.isContinuousBackupEnabled()) {
+      bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
+      for (BulkLoad bulkLoad : bulkLoads) {
+        TableName srcTable = bulkLoad.getTableName();
+        MergeSplitBulkloadInfo bulkloadInfo =
+          toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new);
+        String regionName = bulkLoad.getRegion();
+        String fam = bulkLoad.getColumnFamily();
+        String filename = FilenameUtils.getName(bulkLoad.getHfilePath());
+
+        if (!tablesToBackup.contains(srcTable)) {
+          LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
+          continue;
+        }
+        Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
+        Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + 
Path.SEPARATOR + filename);
+
+        String srcTableQualifier = srcTable.getQualifierAsString();
+        String srcTableNs = srcTable.getNamespaceAsString();
+        Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + 
srcTableQualifier
+          + Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
+        if (!tgtFs.mkdirs(tgtFam)) {
+          throw new IOException("couldn't create " + tgtFam);
+        }
+        Path tgt = new Path(tgtFam, filename);
+
+        Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, 
regionName, fam);
+        Path archive = new Path(archiveDir, filename);
 
-      if (!tablesToBackup.contains(srcTable)) {
-        LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
-        continue;
-      }
-      Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
-      Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + 
Path.SEPARATOR + filename);
-
-      // For continuous backup: bulkload files are copied from backup 
directory defined by
-      // CONF_CONTINUOUS_BACKUP_WAL_DIR instead of source cluster.
-      String backupRootDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
-      if (backupInfo.isContinuousBackupEnabled() && 
!Strings.isNullOrEmpty(backupRootDir)) {
-        String dayDirectoryName = 
BackupUtils.formatToDateString(bulkLoad.getTimestamp());
-        Path bulkLoadBackupPath =
-          new Path(backupRootDir, BULKLOAD_FILES_DIR + Path.SEPARATOR + 
dayDirectoryName);
-        Path bulkLoadDir = new Path(bulkLoadBackupPath,
-          srcTable.getNamespaceAsString() + Path.SEPARATOR + 
srcTable.getNameAsString());
-        FileSystem backupFs = FileSystem.get(bulkLoadDir.toUri(), conf);
-        Path fullBulkLoadBackupPath =
-          new Path(bulkLoadDir, regionName + Path.SEPARATOR + fam + 
Path.SEPARATOR + filename);
-        if (backupFs.exists(fullBulkLoadBackupPath)) {
-          LOG.debug("Backup bulkload file found {}", fullBulkLoadBackupPath);
-          p = fullBulkLoadBackupPath;
-        } else {
-          LOG.warn("Backup bulkload file not found {}", 
fullBulkLoadBackupPath);
+        if (fs.exists(p)) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("found bulk hfile {} in {} for {}", 
bulkLoad.getHfilePath(), p.getParent(),
+              srcTableQualifier);
+            LOG.trace("copying {} to {}", p, tgt);
+          }
+          bulkloadInfo.addActiveFile(p.toString());
+        } else if (fs.exists(archive)) {
+          LOG.debug("copying archive {} to {}", archive, tgt);
+          bulkloadInfo.addArchiveFiles(archive.toString());
         }
       }
 
-      String srcTableQualifier = srcTable.getQualifierAsString();
-      String srcTableNs = srcTable.getNamespaceAsString();
-      Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + 
srcTableQualifier
-        + Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
-      if (!tgtFs.mkdirs(tgtFam)) {
-        throw new IOException("couldn't create " + tgtFam);
+      for (MergeSplitBulkloadInfo bulkloadInfo : toBulkload.values()) {
+        mergeSplitAndCopyBulkloadedHFiles(bulkloadInfo.getActiveFiles(),
+          bulkloadInfo.getArchiveFiles(), bulkloadInfo.getSrcTable(), tgtFs);
       }
-      Path tgt = new Path(tgtFam, filename);
+    } else {
+      // Continuous incremental backup: run BulkLoadCollectorJob over 
backed-up WALs
+      Path collectorOutput = new Path(getBulkOutputDir(), 
BULKLOAD_COLLECTOR_OUTPUT);
+      for (TableName table : tablesToBackup) {
+        String walDirsCsv = String.join(",", tablesToWALFileList.get(table));
 
-      Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, 
regionName, fam);
-      Path archive = new Path(archiveDir, filename);
+        List<Path> bulkloadPaths =
+          BulkFilesCollector.collectFromWalDirs(conf, walDirsCsv, 
collectorOutput, table, table,

Review Comment:
   Rather than calling `BulkFilesCollector `directly, we can use the 
`org.apache.hadoop.hbase.backup.impl.AbstractPitrRestoreHandler#collectBulkFiles()`
 method, which serves as a higher-level approach and internally invokes 
`BulkFilesCollector.collectFromWalDirs()`. This helps us avoid duplicating 
code. In both restore and incremental backup scenarios, we need to extract 
bulkload files by reading WAL files within a given time range, so it makes 
sense to have a single logic for this. We should consider placing this common 
logic in a utility class under the util package.



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java:
##########
@@ -96,8 +96,11 @@ private void registerBulkLoad(ObserverContext<? extends 
RegionCoprocessorEnviron
     try (Connection connection = ConnectionFactory.createConnection(cfg);
       BackupSystemTable tbl = new BackupSystemTable(connection)) {
       Set<TableName> fullyBackedUpTables = tbl.getTablesIncludedInBackups();
+      Map<TableName, Long> continuousBackupTableSet = 
tbl.getContinuousBackupTableSet();
 
-      if (fullyBackedUpTables.contains(tableName)) {
+      if (
+        fullyBackedUpTables.contains(tableName) && 
!continuousBackupTableSet.containsKey(tableName)

Review Comment:
   I have a suggestion. Perhaps we could add a comment stating that for 
continuous backup, this isn't necessary, as everything will be utilized from 
the WAL backup location.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to