This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch HBASE-28957
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 416c8a62eabd166c8bd35979f17994fc14cc5062
Author: asolomon <ankitsolo...@gmail.com>
AuthorDate: Sat Jun 21 00:32:14 2025 +0530

    HBASE-28990 Modify Incremental Backup for Continuous Backup (#6788)
    
    Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org>
    Signed-off-by: Andor Molnár an...@apache.org
    Reviewed by: Kota-SH <shanmukhaharipr...@gmail.com>
    Reviewed by: Vinayak Hegde <vinayakph...@gmail.com>
    Reviewed by: Kevin Geiszler <kevin.j.geisz...@gmail.com>
---
 .../org/apache/hadoop/hbase/backup/BackupInfo.java |  14 ++
 .../hadoop/hbase/backup/impl/BackupAdminImpl.java  |  58 +++--
 .../backup/impl/IncrementalTableBackupClient.java  | 165 ++++++++++---
 .../apache/hadoop/hbase/backup/TestBackupBase.java |  29 ++-
 .../hadoop/hbase/backup/TestBackupDescribe.java    |   1 +
 .../hadoop/hbase/backup/TestContinuousBackup.java  |  15 +-
 .../TestIncrementalBackupWithContinuous.java       | 254 +++++++++++++++++++++
 7 files changed, 472 insertions(+), 64 deletions(-)

diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
index 862a9cbad10..0997aec19ec 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
@@ -124,6 +124,11 @@ public class BackupInfo implements Comparable<BackupInfo> {
    */
   private long completeTs;
 
+  /**
+   * Committed WAL timestamp for incremental backup
+   */
+  private long incrCommittedWalTs;
+
   /**
    * Total bytes of incremental logs copied
    */
@@ -293,6 +298,14 @@ public class BackupInfo implements Comparable<BackupInfo> {
     this.completeTs = endTs;
   }
 
+  public long getIncrCommittedWalTs() {
+    return incrCommittedWalTs;
+  }
+
+  public void setIncrCommittedWalTs(long timestamp) {
+    this.incrCommittedWalTs = timestamp;
+  }
+
   public long getTotalBytesCopied() {
     return totalBytesCopied;
   }
@@ -549,6 +562,7 @@ public class BackupInfo implements Comparable<BackupInfo> {
     sb.append("{");
     sb.append("ID=" + backupId).append(",");
     sb.append("Type=" + getType()).append(",");
+    sb.append("IsContinuous=" + isContinuousBackupEnabled()).append(",");
     sb.append("Tables=" + getTableListAsString()).append(",");
     sb.append("State=" + getState()).append(",");
     Calendar cal = Calendar.getInstance();
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
index e94389d6938..1e91258ba6c 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
@@ -859,28 +859,47 @@ public class BackupAdminImpl implements BackupAdmin {
 
     String backupId = BackupRestoreConstants.BACKUPID_PREFIX + 
EnvironmentEdgeManager.currentTime();
     if (type == BackupType.INCREMENTAL) {
-      Set<TableName> incrTableSet;
-      try (BackupSystemTable table = new BackupSystemTable(conn)) {
-        incrTableSet = table.getIncrementalBackupTableSet(targetRootDir);
-      }
+      if (request.isContinuousBackupEnabled()) {
+        Set<TableName> continuousBackupTableSet;
+        try (BackupSystemTable table = new BackupSystemTable(conn)) {
+          continuousBackupTableSet = 
table.getContinuousBackupTableSet().keySet();
+        }
+        if (continuousBackupTableSet.isEmpty()) {
+          String msg = "Continuous backup table set contains no tables. "
+            + "You need to run Continuous backup first "
+            + (tableList != null ? "on " + StringUtils.join(tableList, ",") : 
"");
+          throw new IOException(msg);
+        }
+        if (!continuousBackupTableSet.containsAll(tableList)) {
+          String extraTables = StringUtils.join(tableList, ",");
+          String msg = "Some tables (" + extraTables + ") haven't gone through 
Continuous backup. "
+            + "Perform Continuous backup on " + extraTables + " first, then 
retry the command";
+          throw new IOException(msg);
+        }
+      } else {
+        Set<TableName> incrTableSet;
+        try (BackupSystemTable table = new BackupSystemTable(conn)) {
+          incrTableSet = table.getIncrementalBackupTableSet(targetRootDir);
+        }
 
-      if (incrTableSet.isEmpty()) {
-        String msg =
-          "Incremental backup table set contains no tables. " + "You need to 
run full backup first "
+        if (incrTableSet.isEmpty()) {
+          String msg = "Incremental backup table set contains no tables. "
+            + "You need to run full backup first "
             + (tableList != null ? "on " + StringUtils.join(tableList, ",") : 
"");
 
-        throw new IOException(msg);
-      }
-      if (tableList != null) {
-        tableList.removeAll(incrTableSet);
-        if (!tableList.isEmpty()) {
-          String extraTables = StringUtils.join(tableList, ",");
-          String msg = "Some tables (" + extraTables + ") haven't gone through 
full backup. "
-            + "Perform full backup on " + extraTables + " first, " + "then 
retry the command";
           throw new IOException(msg);
         }
+        if (tableList != null) {
+          tableList.removeAll(incrTableSet);
+          if (!tableList.isEmpty()) {
+            String extraTables = StringUtils.join(tableList, ",");
+            String msg = "Some tables (" + extraTables + ") haven't gone 
through full backup. "
+              + "Perform full backup on " + extraTables + " first, then retry 
the command";
+            throw new IOException(msg);
+          }
+        }
+        tableList = Lists.newArrayList(incrTableSet);
       }
-      tableList = Lists.newArrayList(incrTableSet);
     }
     if (tableList != null && !tableList.isEmpty()) {
       for (TableName table : tableList) {
@@ -907,7 +926,12 @@ public class BackupAdminImpl implements BackupAdmin {
         }
       }
       if (nonExistingTableList != null) {
-        if (type == BackupType.INCREMENTAL) {
+        // Non-continuous incremental backup is controlled by 'incremental 
backup table set'
+        // and not by user provided backup table list. This is an optimization 
to avoid copying
+        // the same set of WALs for incremental backups of different tables at 
different times
+        // HBASE-14038. Since continuous incremental backup and full backup 
backs-up user provided
+        // table list, we should inform use about non-existence of input 
table(s)
+        if (type == BackupType.INCREMENTAL && 
!request.isContinuousBackupEnabled()) {
           // Update incremental backup set
           tableList = excludeNonExistingTables(tableList, 
nonExistingTableList);
         } else {
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index bbc32b2ef8f..4bb04b11261 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -17,18 +17,28 @@
  */
 package org.apache.hadoop.hbase.backup.impl;
 
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
+import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
 
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
@@ -48,6 +58,7 @@ import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
 import org.apache.hadoop.hbase.mapreduce.WALPlayer;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
@@ -62,6 +73,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Strings;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -274,9 +286,19 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
       // case PREPARE_INCREMENTAL:
       beginBackup(backupManager, backupInfo);
       backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
-      LOG.debug("For incremental backup, current table set is "
-        + backupManager.getIncrementalBackupTableSet());
-      newTimestamps = ((IncrementalBackupManager) 
backupManager).getIncrBackupLogFileMap();
+      // Non-continuous Backup incremental backup is controlled by 
'incremental backup table set'
+      // and not by user provided backup table list. This is an optimization 
to avoid copying
+      // the same set of WALs for incremental backups of different tables at 
different times
+      // HBASE-14038
+      // Continuous-incremental backup backs up user provided table list/set
+      Set<TableName> currentTableSet;
+      if (backupInfo.isContinuousBackupEnabled()) {
+        currentTableSet = backupInfo.getTables();
+      } else {
+        currentTableSet = backupManager.getIncrementalBackupTableSet();
+        newTimestamps = ((IncrementalBackupManager) 
backupManager).getIncrBackupLogFileMap();
+      }
+      LOG.debug("For incremental backup, the current table set is {}", 
currentTableSet);
     } catch (Exception e) {
       // fail the overall backup and return
       failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
@@ -303,21 +325,24 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
     // set overall backup status: complete. Here we make sure to complete the 
backup.
     // After this checkpoint, even if entering cancel process, will let the 
backup finished
     try {
-      // Set the previousTimestampMap which is before this current log roll to 
the manifest.
-      Map<TableName, Map<String, Long>> previousTimestampMap = 
backupManager.readLogTimestampMap();
-      backupInfo.setIncrTimestampMap(previousTimestampMap);
-
-      // The table list in backupInfo is good for both full backup and 
incremental backup.
-      // For incremental backup, it contains the incremental backup table set.
-      backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), 
newTimestamps);
-
-      Map<TableName, Map<String, Long>> newTableSetTimestampMap =
-        backupManager.readLogTimestampMap();
-
-      backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
-      Long newStartCode =
-        
BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
-      backupManager.writeBackupStartCode(newStartCode);
+      if (!backupInfo.isContinuousBackupEnabled()) {
+        // Set the previousTimestampMap which is before this current log roll 
to the manifest.
+        Map<TableName, Map<String, Long>> previousTimestampMap =
+          backupManager.readLogTimestampMap();
+        backupInfo.setIncrTimestampMap(previousTimestampMap);
+
+        // The table list in backupInfo is good for both full backup and 
incremental backup.
+        // For incremental backup, it contains the incremental backup table 
set.
+        backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), 
newTimestamps);
+
+        Map<TableName, Map<String, Long>> newTableSetTimestampMap =
+          backupManager.readLogTimestampMap();
+
+        backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
+        Long newStartCode =
+          
BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
+        backupManager.writeBackupStartCode(newStartCode);
+      }
 
       List<BulkLoad> bulkLoads = handleBulkLoad(backupInfo.getTableNames());
 
@@ -374,23 +399,88 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
   }
 
   protected void convertWALsToHFiles() throws IOException {
-    // get incremental backup file list and prepare parameters for DistCp
-    List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
-    // Get list of tables in incremental backup set
-    Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
-    // filter missing files out (they have been copied by previous backups)
-    incrBackupFileList = filterMissingFiles(incrBackupFileList);
-    List<String> tableList = new ArrayList<String>();
-    for (TableName table : tableSet) {
-      // Check if table exists
-      if (tableExists(table, conn)) {
-        tableList.add(table.getNameAsString());
-      } else {
-        LOG.warn("Table " + table + " does not exists. Skipping in WAL 
converter");
+    long previousBackupTs = 0L;
+    if (backupInfo.isContinuousBackupEnabled()) {
+      Set<TableName> tableSet = backupInfo.getTables();
+      List<BackupInfo> backupInfos = backupManager.getBackupHistory(true);
+      for (TableName table : tableSet) {
+        for (BackupInfo backup : backupInfos) {
+          // find previous backup for this table
+          if (backup.getTables().contains(table)) {
+            LOG.info("Found previous backup of type {} with id {} for table 
{}", backup.getType(),
+              backup.getBackupId(), table.getNameAsString());
+            List<String> walBackupFileList;
+            if (backup.getType() == BackupType.FULL) {
+              previousBackupTs = backup.getStartTs();
+            } else {
+              previousBackupTs = backup.getIncrCommittedWalTs();
+            }
+            walBackupFileList = getBackupLogs(previousBackupTs);
+            walToHFiles(walBackupFileList, 
Arrays.asList(table.getNameAsString()),
+              previousBackupTs);
+            break;
+          }
+        }
       }
+    } else {
+      // get incremental backup file list and prepare parameters for DistCp
+      List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
+      // Get list of tables in incremental backup set
+      Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
+      // filter missing files out (they have been copied by previous backups)
+      incrBackupFileList = filterMissingFiles(incrBackupFileList);
+      List<String> tableList = new ArrayList<String>();
+      for (TableName table : tableSet) {
+        // Check if table exists
+        if (tableExists(table, conn)) {
+          tableList.add(table.getNameAsString());
+        } else {
+          LOG.warn("Table " + table + " does not exists. Skipping in WAL 
converter");
+        }
+      }
+      walToHFiles(incrBackupFileList, tableList, previousBackupTs);
     }
-    walToHFiles(incrBackupFileList, tableList);
+  }
 
+  private List<String> getBackupLogs(long startTs) throws IOException {
+    // get log files from backup dir
+    String walBackupDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+    if (Strings.isNullOrEmpty(walBackupDir)) {
+      throw new IOException(
+        "Incremental backup requires the WAL backup directory " + 
CONF_CONTINUOUS_BACKUP_WAL_DIR);
+    }
+    List<String> resultLogFiles = new ArrayList<>();
+    Path walBackupPath = new Path(walBackupDir);
+    FileSystem backupFs = FileSystem.get(walBackupPath.toUri(), conf);
+    FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, 
WALS_DIR));
+    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+    for (FileStatus dayDir : dayDirs) {
+      if (!dayDir.isDirectory()) {
+        continue; // Skip files, only process directories
+      }
+
+      String dirName = dayDir.getPath().getName();
+      try {
+        Date dirDate = dateFormat.parse(dirName);
+        long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00)
+        long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End 
time of day (23:59:59)
+
+        if (dirEndTime >= startTs) {
+          Path dirPath = dayDir.getPath();
+          FileStatus[] logs = backupFs.listStatus(dirPath);
+          for (FileStatus log : logs) {
+            String filepath = log.getPath().toString();
+            LOG.debug("Found WAL file: {}", filepath);
+            resultLogFiles.add(filepath);
+          }
+        }
+      } catch (ParseException e) {
+        LOG.warn("Skipping invalid directory name: " + dirName, e);
+      }
+    }
+    return resultLogFiles;
   }
 
   protected boolean tableExists(TableName table, Connection conn) throws 
IOException {
@@ -399,7 +489,8 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
     }
   }
 
-  protected void walToHFiles(List<String> dirPaths, List<String> tableList) 
throws IOException {
+  protected void walToHFiles(List<String> dirPaths, List<String> tableList, 
long previousBackupTs)
+    throws IOException {
     Tool player = new WALPlayer();
 
     // Player reads all files in arbitrary directory structure and creates
@@ -413,6 +504,14 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
     conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
     conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
     conf.set(JOB_NAME_CONF_KEY, jobname);
+    if (backupInfo.isContinuousBackupEnabled()) {
+      conf.set(WALInputFormat.START_TIME_KEY, Long.toString(previousBackupTs));
+      // committedWALsTs is needed only for Incremental backups with 
continuous backup
+      // since these do not depend on log roll ts
+      long committedWALsTs = BackupUtils.getReplicationCheckpoint(conn);
+      backupInfo.setIncrCommittedWalTs(committedWALsTs);
+      conf.set(WALInputFormat.END_TIME_KEY, Long.toString(committedWALsTs));
+    }
     String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };
 
     try {
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index e5c78d04854..684b701daa7 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.backup;
 
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INITIAL_DELAY;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INTERVAL;
@@ -414,15 +415,31 @@ public class TestBackupBase {
     return request;
   }
 
+  protected BackupRequest createBackupRequest(BackupType type, List<TableName> 
tables, String path,
+    boolean noChecksumVerify, boolean continuousBackupEnabled) {
+    BackupRequest.Builder builder = new BackupRequest.Builder();
+    BackupRequest request = builder.withBackupType(type).withTableList(tables)
+      .withTargetRootDir(path).withNoChecksumVerify(noChecksumVerify)
+      .withContinuousBackupEnabled(continuousBackupEnabled).build();
+    return request;
+  }
+
   protected String backupTables(BackupType type, List<TableName> tables, 
String path)
     throws IOException {
+    return backupTables(type, tables, path, false);
+  }
+
+  protected String backupTables(BackupType type, List<TableName> tables, 
String path,
+    boolean isContinuousBackup) throws IOException {
     Connection conn = null;
     BackupAdmin badmin = null;
     String backupId;
     try {
       conn = ConnectionFactory.createConnection(conf1);
       badmin = new BackupAdminImpl(conn);
-      BackupRequest request = createBackupRequest(type, new 
ArrayList<>(tables), path);
+
+      BackupRequest request =
+        createBackupRequest(type, new ArrayList<>(tables), path, false, 
isContinuousBackup);
       backupId = badmin.backupTables(request);
     } finally {
       if (badmin != null) {
@@ -554,4 +571,14 @@ public class TestBackupBase {
       LOG.debug(Objects.toString(it.next().getPath()));
     }
   }
+
+  void deleteContinuousBackupReplicationPeerIfExists(Admin admin) throws 
IOException {
+    if (
+      admin.listReplicationPeers().stream()
+        .anyMatch(peer -> 
peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER))
+    ) {
+      admin.disableReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER);
+      admin.removeReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER);
+    }
+  }
 }
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDescribe.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDescribe.java
index 7ce039fd666..6084dc730ee 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDescribe.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDescribe.java
@@ -94,6 +94,7 @@ public class TestBackupDescribe extends TestBackupBase {
     System.setOut(new PrintStream(baos));
 
     String[] args = new String[] { "describe", backupId };
+
     // Run backup
     int ret = ToolRunner.run(conf1, new BackupDriver(), args);
     assertTrue(ret == 0);
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
index fe44ebf420d..0cc34ed63eb 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
@@ -259,8 +259,7 @@ public class TestContinuousBackup extends TestBackupBase {
     }
   }
 
-  private String[] buildBackupArgs(String backupType, TableName[] tables,
-    boolean continuousEnabled) {
+  String[] buildBackupArgs(String backupType, TableName[] tables, boolean 
continuousEnabled) {
     String tableNames =
       
Arrays.stream(tables).map(TableName::getNameAsString).collect(Collectors.joining(","));
 
@@ -272,7 +271,7 @@ public class TestContinuousBackup extends TestBackupBase {
     }
   }
 
-  private BackupManifest getLatestBackupManifest(List<BackupInfo> backups) 
throws IOException {
+  BackupManifest getLatestBackupManifest(List<BackupInfo> backups) throws 
IOException {
     BackupInfo newestBackup = backups.get(0);
     return HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR),
       newestBackup.getBackupId());
@@ -289,14 +288,4 @@ public class TestContinuousBackup extends TestBackupBase {
     }
   }
 
-  private void deleteContinuousBackupReplicationPeerIfExists(Admin admin) 
throws IOException {
-    if (
-      admin.listReplicationPeers().stream()
-        .anyMatch(peer -> 
peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER))
-    ) {
-      admin.disableReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER);
-      admin.removeReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER);
-    }
-  }
-
 }
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
new file mode 100644
index 00000000000..79d1df645b9
--- /dev/null
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static 
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT;
+import static 
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.impl.BulkLoad;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+
+@Category(LargeTests.class)
+public class TestIncrementalBackupWithContinuous extends TestContinuousBackup {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestIncrementalBackupWithContinuous.class);
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(TestIncrementalBackupWithContinuous.class);
+
+  private byte[] ROW = Bytes.toBytes("row1");
+  private final byte[] FAMILY = Bytes.toBytes("family");
+  private final byte[] COLUMN = Bytes.toBytes("col");
+  private static final int ROWS_IN_BULK_LOAD = 100;
+
+  @Test
+  public void testContinuousBackupWithIncrementalBackupSuccess() throws 
Exception {
+    LOG.info("Testing incremental backup with continuous backup");
+    conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
+    String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
+    TableName tableName = TableName.valueOf("table_" + methodName);
+    Table t1 = TEST_UTIL.createTable(tableName, FAMILY);
+
+    try (BackupSystemTable table = new 
BackupSystemTable(TEST_UTIL.getConnection())) {
+      int before = table.getBackupHistory().size();
+
+      // Run continuous backup
+      String[] args = buildBackupArgs("full", new TableName[] { tableName }, 
true);
+      int ret = ToolRunner.run(conf1, new BackupDriver(), args);
+      assertEquals("Full Backup should succeed", 0, ret);
+
+      // Verify backup history increased and all the backups are succeeded
+      LOG.info("Verify backup history increased and all the backups are 
succeeded");
+      List<BackupInfo> backups = table.getBackupHistory();
+      assertEquals("Backup history should increase", before + 1, 
backups.size());
+      for (BackupInfo data : List.of(backups.get(0))) {
+        String backupId = data.getBackupId();
+        assertTrue(checkSucceeded(backupId));
+      }
+
+      // Verify backup manifest contains the correct tables
+      LOG.info("Verify backup manifest contains the correct tables");
+      BackupManifest manifest = getLatestBackupManifest(backups);
+      assertEquals("Backup should contain the expected tables", 
Sets.newHashSet(tableName),
+        new HashSet<>(manifest.getTableList()));
+
+      Put p = new Put(ROW);
+      p.addColumn(FAMILY, COLUMN, COLUMN);
+      t1.put(p);
+      Thread.sleep(5000);
+
+      // Run incremental backup
+      LOG.info("Run incremental backup now");
+      before = table.getBackupHistory().size();
+      args = buildBackupArgs("incremental", new TableName[] { tableName }, 
false);
+      ret = ToolRunner.run(conf1, new BackupDriver(), args);
+      assertEquals("Incremental Backup should succeed", 0, ret);
+      LOG.info("Incremental backup completed");
+
+      // Verify backup history increased and all the backups are succeeded
+      backups = table.getBackupHistory();
+      String incrementalBackupid = null;
+      assertEquals("Backup history should increase", before + 1, 
backups.size());
+      for (BackupInfo data : List.of(backups.get(0))) {
+        String backupId = data.getBackupId();
+        incrementalBackupid = backupId;
+        assertTrue(checkSucceeded(backupId));
+      }
+
+      TEST_UTIL.truncateTable(tableName);
+      // Restore incremental backup
+      TableName[] tables = new TableName[] { tableName };
+      BackupAdminImpl client = new BackupAdminImpl(TEST_UTIL.getConnection());
+      client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
incrementalBackupid, false,
+        tables, tables, true));
+
+      verifyTable(t1);
+      conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, 
REPLICATION_MARKER_ENABLED_DEFAULT);
+    }
+  }
+
+  @Test
+  public void testContinuousBackupWithIncrementalBackupAndBulkloadSuccess() 
throws Exception {
+    conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
+    String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
+    try (BackupSystemTable systemTable = new 
BackupSystemTable(TEST_UTIL.getConnection())) {
+      // The test starts with some data, and no bulk loaded rows.
+      int expectedRowCount = NB_ROWS_IN_BATCH;
+      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+      assertTrue(systemTable.readBulkloadRows(List.of(table1)).isEmpty());
+
+      // Bulk loads aren't tracked if the table isn't backed up yet
+      performBulkLoad("bulk1", methodName);
+      expectedRowCount += ROWS_IN_BULK_LOAD;
+      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+      assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size());
+
+      // Create a backup, bulk loads are now being tracked
+      String backup1 = backupTables(BackupType.FULL, List.of(table1), 
BACKUP_ROOT_DIR, true);
+      assertTrue(checkSucceeded(backup1));
+
+      loadTable(TEST_UTIL.getConnection().getTable(table1));
+      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+      performBulkLoad("bulk2", methodName);
+      expectedRowCount += ROWS_IN_BULK_LOAD;
+      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+      assertEquals(1, systemTable.readBulkloadRows(List.of(table1)).size());
+
+      // Creating an incremental backup clears the bulk loads
+      performBulkLoad("bulk4", methodName);
+      performBulkLoad("bulk5", methodName);
+      performBulkLoad("bulk6", methodName);
+      expectedRowCount += 3 * ROWS_IN_BULK_LOAD;
+      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+      assertEquals(4, systemTable.readBulkloadRows(List.of(table1)).size());
+      String backup2 = backupTables(BackupType.INCREMENTAL, List.of(table1), 
BACKUP_ROOT_DIR, true);
+      assertTrue(checkSucceeded(backup2));
+      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+      assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size());
+      int rowCountAfterBackup2 = expectedRowCount;
+
+      // Doing another bulk load, to check that this data will disappear after 
a restore operation
+      performBulkLoad("bulk7", methodName);
+      expectedRowCount += ROWS_IN_BULK_LOAD;
+      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+      List<BulkLoad> bulkloadsTemp = 
systemTable.readBulkloadRows(List.of(table1));
+      assertEquals(1, bulkloadsTemp.size());
+      BulkLoad bulk7 = bulkloadsTemp.get(0);
+
+      // Doing a restore. Overwriting the table implies clearing the bulk 
loads,
+      // but the loading of restored data involves loading bulk data, we 
expect 2 bulk loads
+      // associated with backup 3 (loading of full backup, loading of 
incremental backup).
+      BackupAdmin client = getBackupAdmin();
+      client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backup2, false,
+        new TableName[] { table1 }, new TableName[] { table1 }, true));
+      assertEquals(rowCountAfterBackup2, TEST_UTIL.countRows(table1));
+      List<BulkLoad> bulkLoads = systemTable.readBulkloadRows(List.of(table1));
+      assertEquals(3, bulkLoads.size());
+      conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, 
REPLICATION_MARKER_ENABLED_DEFAULT);
+    }
+  }
+
+  private void verifyTable(Table t1) throws IOException {
+    Get g = new Get(ROW);
+    Result r = t1.get(g);
+    assertEquals(1, r.size());
+    assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN));
+  }
+
+  private void performBulkLoad(String keyPrefix, String testDir) throws 
IOException {
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Path baseDirectory = TEST_UTIL.getDataTestDirOnTestFS(testDir);
+    Path hfilePath =
+      new Path(baseDirectory, Bytes.toString(famName) + Path.SEPARATOR + 
"hfile_" + keyPrefix);
+
+    HFileTestUtil.createHFile(TEST_UTIL.getConfiguration(), fs, hfilePath, 
famName, qualName,
+      Bytes.toBytes(keyPrefix), Bytes.toBytes(keyPrefix + "z"), 
ROWS_IN_BULK_LOAD);
+
+    listFiles(fs, baseDirectory, baseDirectory);
+
+    Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result =
+      BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(table1, 
baseDirectory);
+    assertFalse(result.isEmpty());
+  }
+
+  private static Set<String> listFiles(final FileSystem fs, final Path root, 
final Path dir)
+    throws IOException {
+    Set<String> files = new HashSet<>();
+    FileStatus[] list = CommonFSUtils.listStatus(fs, dir);
+    if (list != null) {
+      for (FileStatus fstat : list) {
+        if (fstat.isDirectory()) {
+          LOG.info("Found directory {}", Objects.toString(fstat.getPath()));
+          files.addAll(listFiles(fs, root, fstat.getPath()));
+        } else {
+          LOG.info("Found file {}", Objects.toString(fstat.getPath()));
+          String file = fstat.getPath().makeQualified(fs).toString();
+          files.add(file);
+        }
+      }
+    }
+    return files;
+  }
+
+  protected static void loadTable(Table table) throws Exception {
+    Put p; // 100 + 1 row to t1_syncup
+    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+      p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+      table.put(p);
+    }
+  }
+}


Reply via email to