This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch HBASE-28957
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 11dffb4751fc83bce406ca5da189b3916f06439f
Author: asolomon <ankitsolo...@gmail.com>
AuthorDate: Tue Jul 22 22:13:49 2025 +0530

    HBASE-29459 Capture bulkload files only till IncrCommittedWalTs during 
Incremental Backup (#7166)
    
    Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org>
    Reviewed by: Kevin Geiszler <kevin.j.geisz...@gmail.com>
---
 .../hadoop/hbase/backup/impl/BackupManager.java    |   5 ++
 .../hbase/backup/impl/BackupSystemTable.java       |  29 ++++--
 .../apache/hadoop/hbase/backup/impl/BulkLoad.java  |  15 +++-
 .../backup/impl/IncrementalTableBackupClient.java  |   8 +-
 .../TestIncrementalBackupWithContinuous.java       | 100 +++++++++------------
 5 files changed, 88 insertions(+), 69 deletions(-)

diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
index 8b17e93868b..c2ed4f7fa1f 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
@@ -363,6 +363,11 @@ public class BackupManager implements Closeable {
     return 
systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir());
   }
 
+  public List<BulkLoad> readBulkloadRows(List<TableName> tableList, long 
endTimestamp)
+    throws IOException {
+    return systemTable.readBulkloadRows(tableList, endTimestamp);
+  }
+
   public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws 
IOException {
     return systemTable.readBulkloadRows(tableList);
   }
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index 7a7d948e934..788194c1963 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -380,26 +380,36 @@ public final class BackupSystemTable implements Closeable 
{
   }
 
   /**
-   * Reads all registered bulk loads.
+   * Reads the rows from backup table recording bulk loaded hfiles
    */
   public List<BulkLoad> readBulkloadRows() throws IOException {
     Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(null);
-    return processBulkLoadRowScan(scan);
+    return processBulkLoadRowScan(scan, Long.MAX_VALUE);
   }
 
   /**
-   * Reads the registered bulk loads for the given tables.
+   * Reads the rows from backup table recording bulk loaded hfiles
+   * @param tableList list of table names
    */
   public List<BulkLoad> readBulkloadRows(Collection<TableName> tableList) 
throws IOException {
+    return readBulkloadRows(tableList, Long.MAX_VALUE);
+  }
+
+  /**
+   * Reads the rows from backup table recording bulk loaded hfiles
+   * @param tableList    list of table names
+   * @param endTimestamp upper bound timestamp for bulkload entries retrieval
+   */
+  public List<BulkLoad> readBulkloadRows(Collection<TableName> tableList, long 
endTimestamp) throws IOException {
     List<BulkLoad> result = new ArrayList<>();
     for (TableName table : tableList) {
       Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table);
-      result.addAll(processBulkLoadRowScan(scan));
+      result.addAll(processBulkLoadRowScan(scan, endTimestamp));
     }
     return result;
   }
 
-  private List<BulkLoad> processBulkLoadRowScan(Scan scan) throws IOException {
+  private List<BulkLoad> processBulkLoadRowScan(Scan scan, long endTimestamp) 
throws IOException {
     List<BulkLoad> result = new ArrayList<>();
     try (Table bulkLoadTable = connection.getTable(bulkLoadTableName);
       ResultScanner scanner = bulkLoadTable.getScanner(scan)) {
@@ -411,8 +421,10 @@ public final class BackupSystemTable implements Closeable {
         String path = null;
         String region = null;
         byte[] row = null;
+        long timestamp = 0L;
         for (Cell cell : res.listCells()) {
           row = CellUtil.cloneRow(cell);
+          timestamp = cell.getTimestamp();
           String rowStr = Bytes.toString(row);
           region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
           if (
@@ -432,8 +444,11 @@ public final class BackupSystemTable implements Closeable {
             path = Bytes.toString(CellUtil.cloneValue(cell));
           }
         }
-        result.add(new BulkLoad(table, region, fam, path, row));
-        LOG.debug("Found bulk load entry for table {}, family {}: {}", table, 
fam, path);
+        LOG.debug("Found orig path {} for family {} of table {} and region {} 
with timestamp {}",
+          path, fam, table, region, timestamp);
+        if (timestamp <= endTimestamp) {
+          result.add(new BulkLoad(table, region, fam, path, row, timestamp));
+        }
       }
     }
     return result;
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java
index 0f1e79c976b..1befe7c469c 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java
@@ -34,14 +34,16 @@ public class BulkLoad {
   private final String columnFamily;
   private final String hfilePath;
   private final byte[] rowKey;
+  private final long timestamp;
 
   public BulkLoad(TableName tableName, String region, String columnFamily, 
String hfilePath,
-    byte[] rowKey) {
+    byte[] rowKey, long timestamp) {
     this.tableName = tableName;
     this.region = region;
     this.columnFamily = columnFamily;
     this.hfilePath = hfilePath;
     this.rowKey = rowKey;
+    this.timestamp = timestamp;
   }
 
   public TableName getTableName() {
@@ -64,6 +66,10 @@ public class BulkLoad {
     return rowKey;
   }
 
+  public long getTimestamp() {
+    return timestamp;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -75,19 +81,20 @@ public class BulkLoad {
     BulkLoad that = (BulkLoad) o;
     return new EqualsBuilder().append(tableName, 
that.tableName).append(region, that.region)
       .append(columnFamily, that.columnFamily).append(hfilePath, 
that.hfilePath)
-      .append(rowKey, that.rowKey).isEquals();
+      .append(rowKey, that.rowKey).append(timestamp, 
that.timestamp).isEquals();
   }
 
   @Override
   public int hashCode() {
     return new 
HashCodeBuilder().append(tableName).append(region).append(columnFamily)
-      .append(hfilePath).append(rowKey).toHashCode();
+      .append(hfilePath).append(rowKey).append(timestamp).toHashCode();
   }
 
   @Override
   public String toString() {
     return new ToStringBuilder(this, ToStringStyle.NO_CLASS_NAME_STYLE)
       .append("tableName", tableName).append("region", 
region).append("columnFamily", columnFamily)
-      .append("hfilePath", hfilePath).append("rowKey", rowKey).toString();
+      .append("hfilePath", hfilePath).append("rowKey", 
rowKey).append("timestamp", timestamp)
+      .toString();
   }
 }
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index 4bb04b11261..c51e3a0a041 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -138,7 +138,13 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
    */
   protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup) 
throws IOException {
     Map<TableName, MergeSplitBulkloadInfo> toBulkload = new HashMap<>();
-    List<BulkLoad> bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
+    List<BulkLoad> bulkLoads;
+    if (backupInfo.isContinuousBackupEnabled()) {
+      bulkLoads =
+        backupManager.readBulkloadRows(tablesToBackup, 
backupInfo.getIncrCommittedWalTs());
+    } else {
+      bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
+    }
     FileSystem tgtFs;
     try {
       tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
index 79d1df645b9..170cc866568 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
 import org.apache.hadoop.hbase.backup.impl.BackupManifest;
 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
-import org.apache.hadoop.hbase.backup.impl.BulkLoad;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
@@ -70,7 +69,6 @@ public class TestIncrementalBackupWithContinuous extends 
TestContinuousBackup {
     LoggerFactory.getLogger(TestIncrementalBackupWithContinuous.class);
 
   private byte[] ROW = Bytes.toBytes("row1");
-  private final byte[] FAMILY = Bytes.toBytes("family");
   private final byte[] COLUMN = Bytes.toBytes("col");
   private static final int ROWS_IN_BULK_LOAD = 100;
 
@@ -80,7 +78,7 @@ public class TestIncrementalBackupWithContinuous extends 
TestContinuousBackup {
     conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
     String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
     TableName tableName = TableName.valueOf("table_" + methodName);
-    Table t1 = TEST_UTIL.createTable(tableName, FAMILY);
+    Table t1 = TEST_UTIL.createTable(tableName, famName);
 
     try (BackupSystemTable table = new 
BackupSystemTable(TEST_UTIL.getConnection())) {
       int before = table.getBackupHistory().size();
@@ -105,10 +103,8 @@ public class TestIncrementalBackupWithContinuous extends 
TestContinuousBackup {
       assertEquals("Backup should contain the expected tables", 
Sets.newHashSet(tableName),
         new HashSet<>(manifest.getTableList()));
 
-      Put p = new Put(ROW);
-      p.addColumn(FAMILY, COLUMN, COLUMN);
-      t1.put(p);
-      Thread.sleep(5000);
+      loadTable(t1);
+      Thread.sleep(10000);
 
       // Run incremental backup
       LOG.info("Run incremental backup now");
@@ -135,68 +131,57 @@ public class TestIncrementalBackupWithContinuous extends 
TestContinuousBackup {
       client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
incrementalBackupid, false,
         tables, tables, true));
 
-      verifyTable(t1);
+      assertEquals(NB_ROWS_IN_BATCH, TEST_UTIL.countRows(tableName));
+    } finally {
       conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, 
REPLICATION_MARKER_ENABLED_DEFAULT);
     }
   }
 
   @Test
-  public void testContinuousBackupWithIncrementalBackupAndBulkloadSuccess() 
throws Exception {
+  public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() 
throws Exception {
     conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
     String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
+    TableName tableName1 = TableName.valueOf("table_" + methodName);
+    TEST_UTIL.createTable(tableName1, famName);
     try (BackupSystemTable systemTable = new 
BackupSystemTable(TEST_UTIL.getConnection())) {
-      // The test starts with some data, and no bulk loaded rows.
-      int expectedRowCount = NB_ROWS_IN_BATCH;
-      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
-      assertTrue(systemTable.readBulkloadRows(List.of(table1)).isEmpty());
 
-      // Bulk loads aren't tracked if the table isn't backed up yet
-      performBulkLoad("bulk1", methodName);
-      expectedRowCount += ROWS_IN_BULK_LOAD;
-      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
-      assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size());
+      // The test starts with no data, and no bulk loaded rows.
+      int expectedRowCount = 0;
+      assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
+      assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty());
 
-      // Create a backup, bulk loads are now being tracked
-      String backup1 = backupTables(BackupType.FULL, List.of(table1), 
BACKUP_ROOT_DIR, true);
+      // Create continuous backup, bulk loads are now being tracked
+      String backup1 = backupTables(BackupType.FULL, List.of(tableName1), 
BACKUP_ROOT_DIR, true);
       assertTrue(checkSucceeded(backup1));
 
-      loadTable(TEST_UTIL.getConnection().getTable(table1));
-      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
-      performBulkLoad("bulk2", methodName);
+      loadTable(TEST_UTIL.getConnection().getTable(tableName1));
+      expectedRowCount = expectedRowCount + NB_ROWS_IN_BATCH;
+      performBulkLoad("bulkPreIncr", methodName, tableName1);
       expectedRowCount += ROWS_IN_BULK_LOAD;
-      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
-      assertEquals(1, systemTable.readBulkloadRows(List.of(table1)).size());
-
-      // Creating an incremental backup clears the bulk loads
-      performBulkLoad("bulk4", methodName);
-      performBulkLoad("bulk5", methodName);
-      performBulkLoad("bulk6", methodName);
-      expectedRowCount += 3 * ROWS_IN_BULK_LOAD;
-      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
-      assertEquals(4, systemTable.readBulkloadRows(List.of(table1)).size());
-      String backup2 = backupTables(BackupType.INCREMENTAL, List.of(table1), 
BACKUP_ROOT_DIR, true);
+      assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
+      assertEquals(1, 
systemTable.readBulkloadRows(List.of(tableName1)).size());
+      loadTable(TEST_UTIL.getConnection().getTable(tableName1));
+      Thread.sleep(10000);
+
+      performBulkLoad("bulkPostIncr", methodName, tableName1);
+      assertEquals(2, 
systemTable.readBulkloadRows(List.of(tableName1)).size());
+
+      // Incremental backup
+      String backup2 =
+        backupTables(BackupType.INCREMENTAL, List.of(tableName1), 
BACKUP_ROOT_DIR, true);
       assertTrue(checkSucceeded(backup2));
-      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
-      assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size());
-      int rowCountAfterBackup2 = expectedRowCount;
 
-      // Doing another bulk load, to check that this data will disappear after 
a restore operation
-      performBulkLoad("bulk7", methodName);
-      expectedRowCount += ROWS_IN_BULK_LOAD;
-      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
-      List<BulkLoad> bulkloadsTemp = 
systemTable.readBulkloadRows(List.of(table1));
-      assertEquals(1, bulkloadsTemp.size());
-      BulkLoad bulk7 = bulkloadsTemp.get(0);
-
-      // Doing a restore. Overwriting the table implies clearing the bulk 
loads,
-      // but the loading of restored data involves loading bulk data, we 
expect 2 bulk loads
-      // associated with backup 3 (loading of full backup, loading of 
incremental backup).
-      BackupAdmin client = getBackupAdmin();
-      client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backup2, false,
-        new TableName[] { table1 }, new TableName[] { table1 }, true));
-      assertEquals(rowCountAfterBackup2, TEST_UTIL.countRows(table1));
-      List<BulkLoad> bulkLoads = systemTable.readBulkloadRows(List.of(table1));
-      assertEquals(3, bulkLoads.size());
+      // bulkPostIncr Bulkload entry should not be deleted post incremental 
backup
+      assertEquals(1, 
systemTable.readBulkloadRows(List.of(tableName1)).size());
+
+      TEST_UTIL.truncateTable(tableName1);
+      // Restore incremental backup
+      TableName[] tables = new TableName[] { tableName1 };
+      BackupAdminImpl client = new BackupAdminImpl(TEST_UTIL.getConnection());
+      client.restore(
+        BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup2, false, 
tables, tables, true));
+      assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
+    } finally {
       conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, 
REPLICATION_MARKER_ENABLED_DEFAULT);
     }
   }
@@ -208,7 +193,8 @@ public class TestIncrementalBackupWithContinuous extends 
TestContinuousBackup {
     assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN));
   }
 
-  private void performBulkLoad(String keyPrefix, String testDir) throws 
IOException {
+  private void performBulkLoad(String keyPrefix, String testDir, TableName 
tableName)
+    throws IOException {
     FileSystem fs = TEST_UTIL.getTestFileSystem();
     Path baseDirectory = TEST_UTIL.getDataTestDirOnTestFS(testDir);
     Path hfilePath =
@@ -220,7 +206,7 @@ public class TestIncrementalBackupWithContinuous extends 
TestContinuousBackup {
     listFiles(fs, baseDirectory, baseDirectory);
 
     Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result =
-      BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(table1, 
baseDirectory);
+      BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(tableName, 
baseDirectory);
     assertFalse(result.isEmpty());
   }
 
@@ -246,7 +232,7 @@ public class TestIncrementalBackupWithContinuous extends 
TestContinuousBackup {
   protected static void loadTable(Table table) throws Exception {
     Put p; // 100 + 1 row to t1_syncup
     for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
-      p = new Put(Bytes.toBytes("row" + i));
+      p = new Put(Bytes.toBytes("rowLoad" + i));
       p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
       table.put(p);
     }

Reply via email to