This is an automated email from the ASF dual-hosted git repository. andor pushed a commit to branch HBASE-28957 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 11dffb4751fc83bce406ca5da189b3916f06439f Author: asolomon <ankitsolo...@gmail.com> AuthorDate: Tue Jul 22 22:13:49 2025 +0530 HBASE-29459 Capture bulkload files only till IncrCommittedWalTs during Incremental Backup (#7166) Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org> Reviewed by: Kevin Geiszler <kevin.j.geisz...@gmail.com> --- .../hadoop/hbase/backup/impl/BackupManager.java | 5 ++ .../hbase/backup/impl/BackupSystemTable.java | 29 ++++-- .../apache/hadoop/hbase/backup/impl/BulkLoad.java | 15 +++- .../backup/impl/IncrementalTableBackupClient.java | 8 +- .../TestIncrementalBackupWithContinuous.java | 100 +++++++++------------ 5 files changed, 88 insertions(+), 69 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java index 8b17e93868b..c2ed4f7fa1f 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java @@ -363,6 +363,11 @@ public class BackupManager implements Closeable { return systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir()); } + public List<BulkLoad> readBulkloadRows(List<TableName> tableList, long endTimestamp) + throws IOException { + return systemTable.readBulkloadRows(tableList, endTimestamp); + } + public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOException { return systemTable.readBulkloadRows(tableList); } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index 7a7d948e934..788194c1963 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -380,26 +380,36 @@ public final class BackupSystemTable implements Closeable { } /** - * Reads all registered bulk loads. + * Reads the rows from backup table recording bulk loaded hfiles */ public List<BulkLoad> readBulkloadRows() throws IOException { Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(null); - return processBulkLoadRowScan(scan); + return processBulkLoadRowScan(scan, Long.MAX_VALUE); } /** - * Reads the registered bulk loads for the given tables. + * Reads the rows from backup table recording bulk loaded hfiles + * @param tableList list of table names */ public List<BulkLoad> readBulkloadRows(Collection<TableName> tableList) throws IOException { + return readBulkloadRows(tableList, Long.MAX_VALUE); + } + + /** + * Reads the rows from backup table recording bulk loaded hfiles + * @param tableList list of table names + * @param endTimestamp upper bound timestamp for bulkload entries retrieval + */ + public List<BulkLoad> readBulkloadRows(Collection<TableName> tableList, long endTimestamp) throws IOException { List<BulkLoad> result = new ArrayList<>(); for (TableName table : tableList) { Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table); - result.addAll(processBulkLoadRowScan(scan)); + result.addAll(processBulkLoadRowScan(scan, endTimestamp)); } return result; } - private List<BulkLoad> processBulkLoadRowScan(Scan scan) throws IOException { + private List<BulkLoad> processBulkLoadRowScan(Scan scan, long endTimestamp) throws IOException { List<BulkLoad> result = new ArrayList<>(); try (Table bulkLoadTable = connection.getTable(bulkLoadTableName); ResultScanner scanner = bulkLoadTable.getScanner(scan)) { @@ -411,8 +421,10 @@ public final class BackupSystemTable implements Closeable { String path = null; String region = null; byte[] row = null; + long timestamp = 0L; for (Cell cell : res.listCells()) { row = CellUtil.cloneRow(cell); + timestamp = cell.getTimestamp(); String rowStr = Bytes.toString(row); region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr); if ( @@ -432,8 +444,11 @@ public final class BackupSystemTable implements Closeable { path = Bytes.toString(CellUtil.cloneValue(cell)); } } - result.add(new BulkLoad(table, region, fam, path, row)); - LOG.debug("Found bulk load entry for table {}, family {}: {}", table, fam, path); + LOG.debug("Found orig path {} for family {} of table {} and region {} with timestamp {}", + path, fam, table, region, timestamp); + if (timestamp <= endTimestamp) { + result.add(new BulkLoad(table, region, fam, path, row, timestamp)); + } } } return result; diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java index 0f1e79c976b..1befe7c469c 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java @@ -34,14 +34,16 @@ public class BulkLoad { private final String columnFamily; private final String hfilePath; private final byte[] rowKey; + private final long timestamp; public BulkLoad(TableName tableName, String region, String columnFamily, String hfilePath, - byte[] rowKey) { + byte[] rowKey, long timestamp) { this.tableName = tableName; this.region = region; this.columnFamily = columnFamily; this.hfilePath = hfilePath; this.rowKey = rowKey; + this.timestamp = timestamp; } public TableName getTableName() { @@ -64,6 +66,10 @@ public class BulkLoad { return rowKey; } + public long getTimestamp() { + return timestamp; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -75,19 +81,20 @@ public class BulkLoad { BulkLoad that = (BulkLoad) o; return new EqualsBuilder().append(tableName, that.tableName).append(region, that.region) .append(columnFamily, that.columnFamily).append(hfilePath, that.hfilePath) - .append(rowKey, that.rowKey).isEquals(); + .append(rowKey, that.rowKey).append(timestamp, that.timestamp).isEquals(); } @Override public int hashCode() { return new HashCodeBuilder().append(tableName).append(region).append(columnFamily) - .append(hfilePath).append(rowKey).toHashCode(); + .append(hfilePath).append(rowKey).append(timestamp).toHashCode(); } @Override public String toString() { return new ToStringBuilder(this, ToStringStyle.NO_CLASS_NAME_STYLE) .append("tableName", tableName).append("region", region).append("columnFamily", columnFamily) - .append("hfilePath", hfilePath).append("rowKey", rowKey).toString(); + .append("hfilePath", hfilePath).append("rowKey", rowKey).append("timestamp", timestamp) + .toString(); } } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 4bb04b11261..c51e3a0a041 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -138,7 +138,13 @@ public class IncrementalTableBackupClient extends TableBackupClient { */ protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup) throws IOException { Map<TableName, MergeSplitBulkloadInfo> toBulkload = new HashMap<>(); - List<BulkLoad> bulkLoads = backupManager.readBulkloadRows(tablesToBackup); + List<BulkLoad> bulkLoads; + if (backupInfo.isContinuousBackupEnabled()) { + bulkLoads = + backupManager.readBulkloadRows(tablesToBackup, backupInfo.getIncrCommittedWalTs()); + } else { + bulkLoads = backupManager.readBulkloadRows(tablesToBackup); + } FileSystem tgtFs; try { tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java index 79d1df645b9..170cc866568 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; -import org.apache.hadoop.hbase.backup.impl.BulkLoad; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -70,7 +69,6 @@ public class TestIncrementalBackupWithContinuous extends TestContinuousBackup { LoggerFactory.getLogger(TestIncrementalBackupWithContinuous.class); private byte[] ROW = Bytes.toBytes("row1"); - private final byte[] FAMILY = Bytes.toBytes("family"); private final byte[] COLUMN = Bytes.toBytes("col"); private static final int ROWS_IN_BULK_LOAD = 100; @@ -80,7 +78,7 @@ public class TestIncrementalBackupWithContinuous extends TestContinuousBackup { conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true); String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); TableName tableName = TableName.valueOf("table_" + methodName); - Table t1 = TEST_UTIL.createTable(tableName, FAMILY); + Table t1 = TEST_UTIL.createTable(tableName, famName); try (BackupSystemTable table = new BackupSystemTable(TEST_UTIL.getConnection())) { int before = table.getBackupHistory().size(); @@ -105,10 +103,8 @@ public class TestIncrementalBackupWithContinuous extends TestContinuousBackup { assertEquals("Backup should contain the expected tables", Sets.newHashSet(tableName), new HashSet<>(manifest.getTableList())); - Put p = new Put(ROW); - p.addColumn(FAMILY, COLUMN, COLUMN); - t1.put(p); - Thread.sleep(5000); + loadTable(t1); + Thread.sleep(10000); // Run incremental backup LOG.info("Run incremental backup now"); @@ -135,68 +131,57 @@ public class TestIncrementalBackupWithContinuous extends TestContinuousBackup { client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, incrementalBackupid, false, tables, tables, true)); - verifyTable(t1); + assertEquals(NB_ROWS_IN_BATCH, TEST_UTIL.countRows(tableName)); + } finally { conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT); } } @Test - public void testContinuousBackupWithIncrementalBackupAndBulkloadSuccess() throws Exception { + public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() throws Exception { conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true); String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + TableName tableName1 = TableName.valueOf("table_" + methodName); + TEST_UTIL.createTable(tableName1, famName); try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) { - // The test starts with some data, and no bulk loaded rows. - int expectedRowCount = NB_ROWS_IN_BATCH; - assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); - assertTrue(systemTable.readBulkloadRows(List.of(table1)).isEmpty()); - // Bulk loads aren't tracked if the table isn't backed up yet - performBulkLoad("bulk1", methodName); - expectedRowCount += ROWS_IN_BULK_LOAD; - assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); - assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size()); + // The test starts with no data, and no bulk loaded rows. + int expectedRowCount = 0; + assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1)); + assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty()); - // Create a backup, bulk loads are now being tracked - String backup1 = backupTables(BackupType.FULL, List.of(table1), BACKUP_ROOT_DIR, true); + // Create continuous backup, bulk loads are now being tracked + String backup1 = backupTables(BackupType.FULL, List.of(tableName1), BACKUP_ROOT_DIR, true); assertTrue(checkSucceeded(backup1)); - loadTable(TEST_UTIL.getConnection().getTable(table1)); - assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); - performBulkLoad("bulk2", methodName); + loadTable(TEST_UTIL.getConnection().getTable(tableName1)); + expectedRowCount = expectedRowCount + NB_ROWS_IN_BATCH; + performBulkLoad("bulkPreIncr", methodName, tableName1); expectedRowCount += ROWS_IN_BULK_LOAD; - assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); - assertEquals(1, systemTable.readBulkloadRows(List.of(table1)).size()); - - // Creating an incremental backup clears the bulk loads - performBulkLoad("bulk4", methodName); - performBulkLoad("bulk5", methodName); - performBulkLoad("bulk6", methodName); - expectedRowCount += 3 * ROWS_IN_BULK_LOAD; - assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); - assertEquals(4, systemTable.readBulkloadRows(List.of(table1)).size()); - String backup2 = backupTables(BackupType.INCREMENTAL, List.of(table1), BACKUP_ROOT_DIR, true); + assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1)); + assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size()); + loadTable(TEST_UTIL.getConnection().getTable(tableName1)); + Thread.sleep(10000); + + performBulkLoad("bulkPostIncr", methodName, tableName1); + assertEquals(2, systemTable.readBulkloadRows(List.of(tableName1)).size()); + + // Incremental backup + String backup2 = + backupTables(BackupType.INCREMENTAL, List.of(tableName1), BACKUP_ROOT_DIR, true); assertTrue(checkSucceeded(backup2)); - assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); - assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size()); - int rowCountAfterBackup2 = expectedRowCount; - // Doing another bulk load, to check that this data will disappear after a restore operation - performBulkLoad("bulk7", methodName); - expectedRowCount += ROWS_IN_BULK_LOAD; - assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); - List<BulkLoad> bulkloadsTemp = systemTable.readBulkloadRows(List.of(table1)); - assertEquals(1, bulkloadsTemp.size()); - BulkLoad bulk7 = bulkloadsTemp.get(0); - - // Doing a restore. Overwriting the table implies clearing the bulk loads, - // but the loading of restored data involves loading bulk data, we expect 2 bulk loads - // associated with backup 3 (loading of full backup, loading of incremental backup). - BackupAdmin client = getBackupAdmin(); - client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup2, false, - new TableName[] { table1 }, new TableName[] { table1 }, true)); - assertEquals(rowCountAfterBackup2, TEST_UTIL.countRows(table1)); - List<BulkLoad> bulkLoads = systemTable.readBulkloadRows(List.of(table1)); - assertEquals(3, bulkLoads.size()); + // bulkPostIncr Bulkload entry should not be deleted post incremental backup + assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size()); + + TEST_UTIL.truncateTable(tableName1); + // Restore incremental backup + TableName[] tables = new TableName[] { tableName1 }; + BackupAdminImpl client = new BackupAdminImpl(TEST_UTIL.getConnection()); + client.restore( + BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup2, false, tables, tables, true)); + assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1)); + } finally { conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT); } } @@ -208,7 +193,8 @@ public class TestIncrementalBackupWithContinuous extends TestContinuousBackup { assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN)); } - private void performBulkLoad(String keyPrefix, String testDir) throws IOException { + private void performBulkLoad(String keyPrefix, String testDir, TableName tableName) + throws IOException { FileSystem fs = TEST_UTIL.getTestFileSystem(); Path baseDirectory = TEST_UTIL.getDataTestDirOnTestFS(testDir); Path hfilePath = @@ -220,7 +206,7 @@ public class TestIncrementalBackupWithContinuous extends TestContinuousBackup { listFiles(fs, baseDirectory, baseDirectory); Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result = - BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(table1, baseDirectory); + BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(tableName, baseDirectory); assertFalse(result.isEmpty()); } @@ -246,7 +232,7 @@ public class TestIncrementalBackupWithContinuous extends TestContinuousBackup { protected static void loadTable(Table table) throws Exception { Put p; // 100 + 1 row to t1_syncup for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { - p = new Put(Bytes.toBytes("row" + i)); + p = new Put(Bytes.toBytes("rowLoad" + i)); p.addColumn(famName, qualName, Bytes.toBytes("val" + i)); table.put(p); }