kgeisz commented on code in PR #7417:
URL: https://github.com/apache/hbase/pull/7417#discussion_r2476125740


##########
hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java:
##########
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static 
org.apache.hadoop.hbase.IntegrationTestingUtility.createPreSplitLoadTestTable;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.IntegrationTestBase;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
+import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
+import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
+import org.apache.hadoop.hbase.chaos.policies.Policy;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+
+/**
+ * An abstract base class that is used to run backup, restore, and delete 
integration tests. This
+ * class performs both full backups and incremental backups. Both continuous 
backup and
+ * non-continuous backup test cases are supported. The number of incremental 
backups performed
+ * depends on the number of iterations defined by the user. The class performs 
the backup/restore in
+ * a separate thread, where one thread is created per table. The number of 
tables is user-defined,
+ * along with other various configurations.
+ */
+public abstract class IntegrationTestBackupRestoreBase extends 
IntegrationTestBase {
+  protected static final Logger LOG =
+    LoggerFactory.getLogger(IntegrationTestBackupRestoreBase.class);
+  protected static final String NUMBER_OF_TABLES_KEY = "num_tables";
+  protected static final String COLUMN_NAME = "f";
+  protected static final String REGION_COUNT_KEY = "regions_per_rs";
+  protected static final String REGIONSERVER_COUNT_KEY = "region_servers";
+  protected static final String ROWS_PER_ITERATION_KEY = "rows_in_iteration";
+  protected static final String NUM_ITERATIONS_KEY = "num_iterations";
+  protected static final int DEFAULT_REGION_COUNT = 10;
+  protected static final int DEFAULT_REGIONSERVER_COUNT = 5;
+  protected static final int DEFAULT_NUMBER_OF_TABLES = 1;
+  protected static final int DEFAULT_NUM_ITERATIONS = 10;
+  protected static final int DEFAULT_ROWS_IN_ITERATION = 10000;
+  protected static final String SLEEP_TIME_KEY = "sleeptime";
+  // short default interval because tests don't run very long.
+  protected static final long SLEEP_TIME_DEFAULT = 50000L;
+
+  protected static int rowsInIteration;
+  protected static int regionsCountPerServer;
+  protected static int regionServerCount;
+
+  protected static int numIterations;
+  protected static int numTables;
+  protected static TableName[] tableNames;
+  protected long sleepTime;
+  protected static Object lock = new Object();
+
+  protected FileSystem fs;
+  protected String backupRootDir = "backupRootDir";
+
+  /*
+   * This class is used to run the backup and restore thread(s). Throwing an 
exception in this
+   * thread will not cause the test to fail, so the purpose of this class is 
to both kick off the
+   * backup and restore, as well as record any exceptions that occur so they 
can be thrown in the
+   * main thread.
+   */
+  protected class BackupAndRestoreThread implements Runnable {
+    private final TableName table;
+    private final boolean isContinuousBackupEnabled;
+    private Throwable throwable;
+
+    public BackupAndRestoreThread(TableName table, boolean 
isContinuousBackupEnabled) {
+      this.table = table;
+      this.isContinuousBackupEnabled = isContinuousBackupEnabled;
+      this.throwable = null;
+    }
+
+    public Throwable getThrowable() {
+      return this.throwable;
+    }
+
+    @Override
+    public void run() {
+      try {
+        LOG.info("Running backup and restore test for {} in thread {}", 
this.table,
+          Thread.currentThread());
+        runTestSingle(this.table, isContinuousBackupEnabled);
+      } catch (Throwable t) {
+        LOG.error(
+          "An error occurred in thread {} when performing a backup and restore 
with table {}: ",
+          Thread.currentThread().getName(), this.table.getNameAsString(), t);
+        this.throwable = t;
+      }
+    }
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    LOG.info("Cleaning up after test.");
+    if (util.isDistributedCluster()) {
+      deleteTablesIfAny();
+      LOG.info("Cleaning up after test. Deleted tables");
+      cleanUpBackupDir();
+    }
+    LOG.info("Restoring cluster.");
+    util.restoreCluster();
+    LOG.info("Cluster restored.");
+  }
+
+  @Override
+  public void setUpMonkey() throws Exception {
+    Policy p =
+      new PeriodicRandomActionPolicy(sleepTime, new 
RestartRandomRsExceptMetaAction(sleepTime));
+    this.monkey = new PolicyBasedChaosMonkey(util, p);
+    startMonkey();
+  }
+
+  private void deleteTablesIfAny() throws IOException {
+    for (TableName table : tableNames) {
+      util.deleteTableIfAny(table);
+    }
+  }
+
+  protected void createTables(String tableBaseName) throws Exception {
+    tableNames = new TableName[numTables];
+    for (int i = 0; i < numTables; i++) {
+      tableNames[i] = TableName.valueOf(tableBaseName + ".table." + i);
+    }
+    for (TableName table : tableNames) {
+      createTable(table);
+    }
+  }
+
+  /**
+   * Creates a directory specified by backupWALDir and sets this directory to
+   * CONF_CONTINUOUS_BACKUP_WAL_DIR in the configuration.
+   */
+  protected void createAndSetBackupWalDir() throws IOException {
+    Path root = util.getDataTestDirOnTestFS();
+    Path backupWalDir = new Path(root, "backupWALDir");
+    FileSystem fs = FileSystem.get(conf);
+    fs.mkdirs(backupWalDir);
+    conf.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString());
+    LOG.info(
+      "The continuous backup WAL directory has been created and set in the 
configuration to: {}",
+      backupWalDir);
+  }
+
+  private void cleanUpBackupDir() throws IOException {
+    FileSystem fs = FileSystem.get(util.getConfiguration());
+    fs.delete(new Path(backupRootDir), true);
+  }
+
+  /**
+   * This is the main driver method used by tests that extend this abstract 
base class. This method
+   * starts one backup and restore thread per table.
+   * @param isContinuousBackupEnabled Boolean flag used to specify if the 
backups should have
+   *                                  continuous backup enabled.
+   */
+  protected void runTestMulti(boolean isContinuousBackupEnabled) {
+    Thread[] workers = new Thread[numTables];
+    BackupAndRestoreThread[] backupAndRestoreThreads = new 
BackupAndRestoreThread[numTables];
+    for (int i = 0; i < numTables; i++) {
+      final TableName table = tableNames[i];
+      BackupAndRestoreThread backupAndRestoreThread =
+        new BackupAndRestoreThread(table, isContinuousBackupEnabled);
+      backupAndRestoreThreads[i] = backupAndRestoreThread;
+      workers[i] = new Thread(backupAndRestoreThread);
+      workers[i].start();
+    }
+    // Wait for all workers to finish and check for errors
+    Throwable error = null;
+    Throwable threadThrowable;
+    for (int i = 0; i < numTables; i++) {
+      Uninterruptibles.joinUninterruptibly(workers[i]);
+      threadThrowable = backupAndRestoreThreads[i].getThrowable();
+      if (threadThrowable == null) {
+        continue;
+      }
+      if (error == null) {
+        error = threadThrowable;
+      } else {
+        error.addSuppressed(threadThrowable);
+      }
+    }
+    // Throw any found errors after all threads have completed
+    if (error != null) {
+      throw new AssertionError("An error occurred in a backup and restore 
thread", error);
+    }
+    LOG.info("IT backup & restore finished");
+  }
+
+  /**
+   * This method is what performs the actual backup, restore, merge, and 
delete operations. This
+   * method is run in a separate thread. It first performs a full backup. 
After, it iteratively
+   * performs a series of incremental backups and restores. Later, it deletes 
the backups.
+   * @param table                     The table the backups are performed on
+   * @param isContinuousBackupEnabled Boolean flag used to indicate if the 
backups should have
+   *                                  continuous backup enabled.
+   */
+  private void runTestSingle(TableName table, boolean 
isContinuousBackupEnabled)
+    throws IOException, InterruptedException {
+    String enabledOrDisabled = isContinuousBackupEnabled ? "enabled" : 
"disabled";
+    List<String> backupIds = new ArrayList<>();
+
+    try (Connection conn = util.getConnection(); BackupAdmin client = new 
BackupAdminImpl(conn)) {
+      loadData(table, rowsInIteration);
+
+      // First create a full backup for the table
+      LOG.info("Creating full backup image for {} with continuous backup {}", 
table,
+        enabledOrDisabled);
+      List<TableName> tables = Lists.newArrayList(table);
+      BackupRequest.Builder builder = new BackupRequest.Builder();
+      BackupRequest request = 
builder.withBackupType(BackupType.FULL).withTableList(tables)
+        
.withTargetRootDir(backupRootDir).withContinuousBackupEnabled(isContinuousBackupEnabled)
+        .build();
+
+      String fullBackupId = backup(request, client, backupIds);
+      LOG.info("Created full backup with ID: {}", fullBackupId);
+
+      verifySnapshotExists(table, fullBackupId);
+
+      // Run full backup verifications specific to continuous backup
+      if (isContinuousBackupEnabled) {
+        BackupTestUtil.verifyReplicationPeerSubscription(util, table);
+        Path backupWALs = verifyWALsDirectoryExists();
+        Path walPartitionDir = verifyWALPartitionDirExists(backupWALs);
+        verifyBackupWALFiles(walPartitionDir);
+      }
+
+      // Now continue with incremental backups
+      String incrementalBackupId;
+      for (int count = 1; count <= numIterations; count++) {
+        LOG.info("{} - Starting incremental backup iteration {} of {} for {}",
+          Thread.currentThread().getName(), count, numIterations, table);
+        loadData(table, rowsInIteration);
+
+        // Do incremental backup
+        LOG.info("Creating incremental backup number {} with continuous backup 
{} for {}", count,
+          enabledOrDisabled, table);
+        builder = new BackupRequest.Builder();
+        request = 
builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables)
+          
.withTargetRootDir(backupRootDir).withContinuousBackupEnabled(isContinuousBackupEnabled)
+          .build();
+        incrementalBackupId = backup(request, client, backupIds);
+        LOG.info("Created incremental backup with ID: {}", 
incrementalBackupId);
+
+        // Restore table using backup taken "two backups ago"
+        // On the first iteration, this backup will be the full backup
+        String previousBackupId = backupIds.get(backupIds.size() - 2);
+        if (previousBackupId.equals(fullBackupId)) {
+          LOG.info("Restoring {} using original full backup with ID: {}", 
table, previousBackupId);
+        } else {
+          LOG.info("Restoring {} using second most recent incremental backup 
with ID: {}", table,
+            previousBackupId);
+        }
+        restoreTableAndVerifyRowCount(conn, client, table, previousBackupId,
+          (long) rowsInIteration * count);
+
+        // Restore table using the most recently created incremental backup
+        LOG.info("Restoring {} using most recent incremental backup with ID: 
{}", table,
+          incrementalBackupId);
+        restoreTableAndVerifyRowCount(conn, client, table, incrementalBackupId,
+          (long) rowsInIteration * (count + 1));
+        LOG.info("{} - Finished incremental backup iteration {} of {} for {}",
+          Thread.currentThread().getName(), count, numIterations, table);
+      }
+
+      // Now merge all incremental and restore
+      String[] incrementalBackupIds = getAllIncrementalBackupIds(backupIds);
+      merge(incrementalBackupIds, client);
+      verifyBackupExistenceAfterMerge(backupIds);
+      removeNonexistentBackups(backupIds);
+      // Restore the last incremental backup
+      incrementalBackupId = incrementalBackupIds[incrementalBackupIds.length - 
1];
+      // restore incremental backup for table, with overwrite
+      TableName[] tablesToRestoreFrom = new TableName[] { table };
+      restore(createRestoreRequest(incrementalBackupId, false, 
tablesToRestoreFrom, null, true),
+        client);
+      Table hTable = conn.getTable(table);
+      Assert.assertEquals(rowsInIteration * (numIterations + 1),
+        HBaseTestingUtil.countRows(hTable));
+      hTable.close();
+
+      // Create another incremental backup to show it can be deleted on its own
+      backup(request, client, backupIds);
+      deleteMostRecentIncrementalBackup(backupIds, client);
+      // The full backup and the second most recent incremental backup should 
still exist
+      assertEquals(2, backupIds.size());
+      verifyAllBackupsExist(backupIds);
+      // Delete the full backup, which should also automatically delete any 
incremental backups that
+      // depend on it
+      LOG.info("Deleting full backup: {}. This will also delete any remaining 
incremental backups",
+        fullBackupId);
+      delete(new String[] { fullBackupId }, client);
+      verifyNoBackupsExist(backupIds);

Review Comment:
   Yes correct.  The original test wasn't performing any deletes, so I thought 
this would be good to add.  We need to cover deletes for the continuous backup 
test case anyway, so I figured I could add some deletes here since they can be 
done for both continuous and non-continuous backups.
   
   Also, deleting a full backup also deletes the incremental backups that 
depend on it.  I wanted to show this can be done, along with showing I can 
delete an incremental backup on its own.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to