vinayakphegde commented on code in PR #7400:
URL: https://github.com/apache/hbase/pull/7400#discussion_r2452745608


##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java:
##########
@@ -137,89 +139,88 @@ protected static int getIndex(TableName tbl, 
List<TableName> sTableList) {
    * the backup is marked as complete.
    * @param tablesToBackup list of tables to be backed up
    */
-  protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup) 
throws IOException {
+  protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup,
+    Map<TableName, List<String>> tablesToWALFileList, Map<TableName, Long> 
tablesToPrevBackupTs)
+    throws IOException {
     Map<TableName, MergeSplitBulkloadInfo> toBulkload = new HashMap<>();
-    List<BulkLoad> bulkLoads;
-    if (backupInfo.isContinuousBackupEnabled()) {
-      bulkLoads =
-        backupManager.readBulkloadRows(tablesToBackup, 
backupInfo.getIncrCommittedWalTs());
-    } else {
-      bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
-    }
+    List<BulkLoad> bulkLoads = new ArrayList<>();
+
     FileSystem tgtFs;
     try {
       tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
     } catch (URISyntaxException use) {
       throw new IOException("Unable to get FileSystem", use);
     }
+
     Path rootdir = CommonFSUtils.getRootDir(conf);
     Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
 
-    for (BulkLoad bulkLoad : bulkLoads) {
-      TableName srcTable = bulkLoad.getTableName();
-      MergeSplitBulkloadInfo bulkloadInfo =
-        toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new);
-      String regionName = bulkLoad.getRegion();
-      String fam = bulkLoad.getColumnFamily();
-      String filename = FilenameUtils.getName(bulkLoad.getHfilePath());
+    if (!backupInfo.isContinuousBackupEnabled()) {
+      bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
+      for (BulkLoad bulkLoad : bulkLoads) {
+        TableName srcTable = bulkLoad.getTableName();
+        MergeSplitBulkloadInfo bulkloadInfo =
+          toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new);
+        String regionName = bulkLoad.getRegion();
+        String fam = bulkLoad.getColumnFamily();
+        String filename = FilenameUtils.getName(bulkLoad.getHfilePath());
+
+        if (!tablesToBackup.contains(srcTable)) {
+          LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
+          continue;
+        }
+        Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
+        Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + 
Path.SEPARATOR + filename);
+
+        String srcTableQualifier = srcTable.getQualifierAsString();
+        String srcTableNs = srcTable.getNamespaceAsString();
+        Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + 
srcTableQualifier
+          + Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
+        if (!tgtFs.mkdirs(tgtFam)) {
+          throw new IOException("couldn't create " + tgtFam);
+        }
+        Path tgt = new Path(tgtFam, filename);
+
+        Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, 
regionName, fam);
+        Path archive = new Path(archiveDir, filename);
 
-      if (!tablesToBackup.contains(srcTable)) {
-        LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
-        continue;
-      }
-      Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
-      Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + 
Path.SEPARATOR + filename);
-
-      // For continuous backup: bulkload files are copied from backup 
directory defined by
-      // CONF_CONTINUOUS_BACKUP_WAL_DIR instead of source cluster.
-      String backupRootDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
-      if (backupInfo.isContinuousBackupEnabled() && 
!Strings.isNullOrEmpty(backupRootDir)) {
-        String dayDirectoryName = 
BackupUtils.formatToDateString(bulkLoad.getTimestamp());
-        Path bulkLoadBackupPath =
-          new Path(backupRootDir, BULKLOAD_FILES_DIR + Path.SEPARATOR + 
dayDirectoryName);
-        Path bulkLoadDir = new Path(bulkLoadBackupPath,
-          srcTable.getNamespaceAsString() + Path.SEPARATOR + 
srcTable.getNameAsString());
-        FileSystem backupFs = FileSystem.get(bulkLoadDir.toUri(), conf);
-        Path fullBulkLoadBackupPath =
-          new Path(bulkLoadDir, regionName + Path.SEPARATOR + fam + 
Path.SEPARATOR + filename);
-        if (backupFs.exists(fullBulkLoadBackupPath)) {
-          LOG.debug("Backup bulkload file found {}", fullBulkLoadBackupPath);
-          p = fullBulkLoadBackupPath;
-        } else {
-          LOG.warn("Backup bulkload file not found {}", 
fullBulkLoadBackupPath);
+        if (fs.exists(p)) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("found bulk hfile {} in {} for {}", 
bulkLoad.getHfilePath(), p.getParent(),
+              srcTableQualifier);
+            LOG.trace("copying {} to {}", p, tgt);
+          }
+          bulkloadInfo.addActiveFile(p.toString());
+        } else if (fs.exists(archive)) {
+          LOG.debug("copying archive {} to {}", archive, tgt);
+          bulkloadInfo.addArchiveFiles(archive.toString());
         }
       }
 
-      String srcTableQualifier = srcTable.getQualifierAsString();
-      String srcTableNs = srcTable.getNamespaceAsString();
-      Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + 
srcTableQualifier
-        + Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
-      if (!tgtFs.mkdirs(tgtFam)) {
-        throw new IOException("couldn't create " + tgtFam);
+      for (MergeSplitBulkloadInfo bulkloadInfo : toBulkload.values()) {
+        mergeSplitAndCopyBulkloadedHFiles(bulkloadInfo.getActiveFiles(),
+          bulkloadInfo.getArchiveFiles(), bulkloadInfo.getSrcTable(), tgtFs);
       }
-      Path tgt = new Path(tgtFam, filename);
+    } else {
+      // Continuous incremental backup: run BulkLoadCollectorJob over 
backed-up WALs
+      Path collectorOutput = new Path(getBulkOutputDir(), 
BULKLOAD_COLLECTOR_OUTPUT);
+      for (TableName table : tablesToBackup) {
+        String walDirsCsv = String.join(",", tablesToWALFileList.get(table));
 
-      Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, 
regionName, fam);
-      Path archive = new Path(archiveDir, filename);
+        List<Path> bulkloadPaths =
+          BulkFilesCollector.collectFromWalDirs(conf, walDirsCsv, 
collectorOutput, table, table,

Review Comment:
   > If I call AbstractPitrRestoreHandler#collectBulkFiles() it would again 
call BackupUtils#getValidWalDirs()
   
   instead of calling BackupUtils#getValidWalDirs() inside 
AbstractPitrRestoreHandler#collectBulkFiles(), take the output of 
BackupUtils#getValidWalDirs() as parameter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to