This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f6cb9de55 HDDS-8208. [SNAPSHOT] Allow om snapshot bootstrap tarballs 
to be incremental. (#4770)
9f6cb9de55 is described below

commit 9f6cb9de5596fc2228bb6baf58ead6b713036757
Author: GeorgeJahad <[email protected]>
AuthorDate: Fri Jun 16 00:50:05 2023 -0700

    HDDS-8208. [SNAPSHOT] Allow om snapshot bootstrap tarballs to be 
incremental. (#4770)
---
 .../common/src/main/resources/ozone-default.xml    |   2 +-
 .../java/org/apache/hadoop/hdds/utils/HAUtils.java |  26 +--
 .../hadoop/hdds/utils/RDBSnapshotProvider.java     |  19 +-
 .../hadoop/hdds/utils/TestRDBSnapshotProvider.java |  28 ++-
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   2 +-
 .../hadoop/ozone/om/TestOMDbCheckpointServlet.java |  25 +-
 .../hadoop/ozone/om/TestOMRatisSnapshots.java      | 230 ++++++++++++++----
 .../hadoop/ozone/om/OMDBCheckpointServlet.java     | 141 ++++++++---
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  15 +-
 .../hadoop/ozone/om/snapshot/OmSnapshotUtils.java  |  49 ++--
 .../hadoop/ozone/om/TestOmSnapshotManager.java     | 257 +++++++++++++++++----
 .../ozone/om/snapshot/TestOmSnapshotUtils.java     |  81 +++++++
 12 files changed, 699 insertions(+), 176 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 813073b6a0..9849ae88bd 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2033,7 +2033,7 @@
   </property>
   <property>
     <name>ozone.om.snapshot.provider.request.timeout</name>
-    <value>5000ms</value>
+    <value>300000ms</value>
     <tag>OZONE, OM, HA, MANAGEMENT</tag>
     <description>
       Connection request timeout for HTTP call made by OM Snapshot Provider to
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
index 8855546a1e..6ed7ce9214 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
@@ -55,18 +55,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.security.cert.X509Certificate;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CA_LIST_RETRY_INTERVAL;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CA_LIST_RETRY_INTERVAL_DEFAULT;
@@ -359,26 +359,26 @@ public final class HAUtils {
   }
 
   /**
-   * Scan the DB dir and return the existing SST files.
+   * Scan the DB dir and return the existing SST files,
+   * including omSnapshot sst files.
    * SSTs could be used for avoiding repeated download.
    *
    * @param db the file representing the DB to be scanned
    * @return the list of SST file name. If db not exist, will return empty list
    */
-  public static List<String> getExistingSstFiles(File db) {
+  public static List<String> getExistingSstFiles(File db) throws IOException {
     List<String> sstList = new ArrayList<>();
     if (!db.exists()) {
       return sstList;
     }
-    FilenameFilter filter = new FilenameFilter() {
-      @Override
-      public boolean accept(File dir, String name) {
-        return name.endsWith(ROCKSDB_SST_SUFFIX);
-      }
-    };
-    String[] tempArray = db.list(filter);
-    if (tempArray != null) {
-      sstList = Arrays.asList(tempArray);
+
+    int truncateLength = db.toString().length() + 1;
+    // Walk the db dir and get all sst files including omSnapshot files.
+    try (Stream<Path> files = Files.walk(db.toPath())) {
+      sstList =
+          files.filter(path -> path.toString().endsWith(ROCKSDB_SST_SUFFIX)).
+              map(p -> p.toString().substring(truncateLength)).
+              collect(Collectors.toList());
       if (LOG.isDebugEnabled()) {
         LOG.debug("Scanned SST files {} in {}.", sstList, 
db.getAbsolutePath());
       }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java
index c934e7d2b9..261e4e103d 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java
@@ -60,6 +60,8 @@ public abstract class RDBSnapshotProvider implements 
Closeable {
   private final AtomicReference<String> lastLeaderRef;
   private final AtomicLong numDownloaded;
   private FaultInjector injector;
+  // The number of times init() is called
+  private final AtomicLong initCount;
 
   public RDBSnapshotProvider(File snapshotDir, String dbName) {
     this.snapshotDir = snapshotDir;
@@ -68,6 +70,7 @@ public abstract class RDBSnapshotProvider implements 
Closeable {
     this.injector = null;
     this.lastLeaderRef = new AtomicReference<>(null);
     this.numDownloaded = new AtomicLong();
+    this.initCount = new AtomicLong();
     init();
   }
 
@@ -91,6 +94,7 @@ public abstract class RDBSnapshotProvider implements 
Closeable {
 
     // reset leader info
     lastLeaderRef.set(null);
+    initCount.incrementAndGet();
   }
 
   /**
@@ -104,7 +108,7 @@ public abstract class RDBSnapshotProvider implements 
Closeable {
       throws IOException {
     LOG.info("Prepare to download the snapshot from leader OM {} and " +
         "reloading state from the snapshot.", leaderNodeID);
-    checkLeaderConsistent(leaderNodeID);
+    checkLeaderConsistency(leaderNodeID);
 
     String snapshotFileName = getSnapshotFileName(leaderNodeID);
     File targetFile = new File(snapshotDir, snapshotFileName);
@@ -112,13 +116,14 @@ public abstract class RDBSnapshotProvider implements 
Closeable {
     LOG.info("Successfully download the latest snapshot {} from leader OM: {}",
         targetFile, leaderNodeID);
 
+    numDownloaded.incrementAndGet();
+    injectPause();
+
     RocksDBCheckpoint checkpoint = getCheckpointFromSnapshotFile(targetFile,
         candidateDir, true);
     LOG.info("Successfully untar the downloaded snapshot {} at {}.", 
targetFile,
         checkpoint.getCheckpointLocation());
 
-    numDownloaded.incrementAndGet();
-    injectPause();
     return checkpoint;
   }
 
@@ -131,7 +136,8 @@ public abstract class RDBSnapshotProvider implements 
Closeable {
    *
    * @param currentLeader the ID of leader node
    */
-  private void checkLeaderConsistent(String currentLeader) {
+  @VisibleForTesting
+  void checkLeaderConsistency(String currentLeader) throws IOException {
     String lastLeader = lastLeaderRef.get();
     if (lastLeader != null) {
       if (!lastLeader.equals(currentLeader)) {
@@ -230,4 +236,9 @@ public abstract class RDBSnapshotProvider implements 
Closeable {
   public long getNumDownloaded() {
     return numDownloaded.get();
   }
+
+  @VisibleForTesting
+  public long getInitCount() {
+    return initCount.get();
+  }
 }
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java
index 54fb72645a..585c635d5a 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java
@@ -46,6 +46,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -58,6 +59,7 @@ import static 
org.apache.hadoop.hdds.utils.HddsServerUtil.writeDBCheckpointToStr
 import static org.apache.hadoop.hdds.utils.db.TestRDBStore.newRDBStore;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -98,7 +100,7 @@ public class TestRDBSnapshotProvider {
         MAX_DB_UPDATES_SIZE_THRESHOLD);
     rdbSnapshotProvider = new RDBSnapshotProvider(testDir, "test.db") {
       @Override
-      public void close() throws IOException {
+      public void close() {
       }
 
       @Override
@@ -236,4 +238,28 @@ public class TestRDBSnapshotProvider {
       throw new IOException(e);
     }
   }
+
+  @Test
+  public void testCheckLeaderConsistency() throws IOException {
+    // Leader initialized to null at startup.
+    assertEquals(1, rdbSnapshotProvider.getInitCount());
+    File dummyFile = new File(rdbSnapshotProvider.getCandidateDir(),
+        "file1.sst");
+    Files.write(dummyFile.toPath(),
+        "dummyData".getBytes(StandardCharsets.UTF_8));
+    assertTrue(dummyFile.exists());
+
+    // Set the leader.
+    rdbSnapshotProvider.checkLeaderConsistency("node1");
+    assertEquals(2, rdbSnapshotProvider.getInitCount());
+    assertFalse(dummyFile.exists());
+
+    // Confirm setting the same leader doesn't reinitialize.
+    rdbSnapshotProvider.checkLeaderConsistency("node1");
+    assertEquals(2, rdbSnapshotProvider.getInitCount());
+
+    // Confirm setting different leader does reinitialize.
+    rdbSnapshotProvider.checkLeaderConsistency("node2");
+    assertEquals(3, rdbSnapshotProvider.getInitCount());
+  }
 }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 9a7acb02f5..e6df3e5946 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -250,7 +250,7 @@ public final class OMConfigKeys {
       "ozone.om.snapshot.provider.request.timeout";
   public static final TimeDuration
       OZONE_OM_SNAPSHOT_PROVIDER_REQUEST_TIMEOUT_DEFAULT =
-      TimeDuration.valueOf(5000, TimeUnit.MILLISECONDS);
+      TimeDuration.valueOf(300000, TimeUnit.MILLISECONDS);
 
   public static final String OZONE_OM_FS_SNAPSHOT_MAX_LIMIT =
       "ozone.om.fs.snapshot.max.limit";
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
index 498a446be9..ade5196304 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
@@ -121,6 +121,7 @@ public class TestOMDbCheckpointServlet {
   private String snapshotDirName2;
   private Path compactionDirPath;
   private DBCheckpoint dbCheckpoint;
+  private static final String FABRICATED_FILE_NAME = "fabricatedFile.sst";
 
   @Rule
   public Timeout timeout = Timeout.seconds(240);
@@ -322,7 +323,7 @@ public class TestOMDbCheckpointServlet {
     String newDbDirName = testDirName + OM_KEY_PREFIX + OM_DB_NAME;
     int newDbDirLength = newDbDirName.length() + 1;
     File newDbDir = new File(newDbDirName);
-    newDbDir.mkdirs();
+    Assert.assertTrue(newDbDir.mkdirs());
     FileUtil.unTar(tempFile, newDbDir);
 
     // Move snapshot dir to correct location.
@@ -361,7 +362,7 @@ public class TestOMDbCheckpointServlet {
       for (String line : lines.collect(Collectors.toList())) {
         Assert.assertFalse("CURRENT file is not a hard link",
             line.contains("CURRENT"));
-        if (line.contains("fabricatedFile")) {
+        if (line.contains(FABRICATED_FILE_NAME)) {
           fabricatedLinkLines.add(line);
         } else {
           checkLine(shortSnapshotLocation, shortSnapshotLocation2, line);
@@ -477,15 +478,15 @@ public class TestOMDbCheckpointServlet {
     Path fabricatedSnapshot  = Paths.get(
         new File(snapshotDirName).getParent(),
         "fabricatedSnapshot");
-    fabricatedSnapshot.toFile().mkdirs();
-    Assert.assertTrue(Paths.get(fabricatedSnapshot.toString(), 
"fabricatedFile")
-        .toFile().createNewFile());
+    Assert.assertTrue(fabricatedSnapshot.toFile().mkdirs());
+    Assert.assertTrue(Paths.get(fabricatedSnapshot.toString(),
+        FABRICATED_FILE_NAME).toFile().createNewFile());
 
     // Create fabricated links to snapshot dirs
     // to confirm that links are recognized even if
-    // they are don't point to the checkpoint directory.
-    Path fabricatedFile = Paths.get(snapshotDirName, "fabricatedFile");
-    Path fabricatedLink = Paths.get(snapshotDirName2, "fabricatedFile");
+    // they don't point to the checkpoint directory.
+    Path fabricatedFile = Paths.get(snapshotDirName, FABRICATED_FILE_NAME);
+    Path fabricatedLink = Paths.get(snapshotDirName2, FABRICATED_FILE_NAME);
 
     Files.write(fabricatedFile,
         "fabricatedData".getBytes(StandardCharsets.UTF_8));
@@ -495,7 +496,7 @@ public class TestOMDbCheckpointServlet {
     compactionDirPath = Paths.get(metaDir.toString(),
         OM_SNAPSHOT_DIFF_DIR, DB_COMPACTION_SST_BACKUP_DIR);
     Path fabricatedLink2 = Paths.get(compactionDirPath.toString(),
-        "fabricatedFile");
+        FABRICATED_FILE_NAME);
     Files.createLink(fabricatedLink2, fabricatedFile);
     Path currentFile = Paths.get(metaDir.toString(),
                                     OM_DB_NAME, "CURRENT");
@@ -565,7 +566,7 @@ public class TestOMDbCheckpointServlet {
     // find the real file
     String realDir = null;
     for (String dir: directories) {
-      if (Paths.get(testDirName, dir, "fabricatedFile").toFile().exists()) {
+      if (Paths.get(testDirName, dir, FABRICATED_FILE_NAME).toFile().exists()) 
{
         Assert.assertNull(
             "Exactly one copy of the fabricated file exists in the tarball",
             realDir);
@@ -589,8 +590,8 @@ public class TestOMDbCheckpointServlet {
       Path path0 = Paths.get(files[0]);
       Path path1 = Paths.get(files[1]);
       Assert.assertTrue("fabricated entries contains correct file name",
-          path0.getFileName().toString().equals("fabricatedFile") &&
-              path1.getFileName().toString().equals("fabricatedFile"));
+          path0.getFileName().toString().equals(FABRICATED_FILE_NAME) &&
+              path1.getFileName().toString().equals(FABRICATED_FILE_NAME));
     }
   }
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
index 1850ba0b5a..cbe3575124 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.ozone.om;
 
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.ExitManager;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
@@ -48,13 +49,15 @@ import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.assertj.core.api.Fail;
-import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.event.Level;
 
@@ -64,8 +67,10 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -76,9 +81,12 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.OM_HARDLINK_FILE;
 import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
 import static org.apache.hadoop.ozone.om.TestOzoneManagerHAWithData.createKey;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
@@ -166,15 +174,17 @@ public class TestOMRatisSnapshots {
     }
   }
 
-  @Test
-  public void testInstallSnapshot() throws Exception {
+  @ParameterizedTest
+  @ValueSource(ints = {100})
+  // tried up to 1000 snapshots and this test works, but some of the
+  //  timeouts have to be increased.
+  public void testInstallSnapshot(int numSnapshotsToCreate) throws Exception {
     // Get the leader OM
     String leaderOMNodeId = OmFailoverProxyUtil
         .getFailoverProxyProvider(objectStore.getClientProxy())
         .getCurrentProxyOMNodeId();
 
     OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
-    OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();
 
     // Find the inactive OM
     String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
@@ -183,10 +193,19 @@ public class TestOMRatisSnapshots {
     }
     OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
 
-    // Do some transactions so that the log index increases
-    List<String> keys = writeKeysToIncreaseLogIndex(leaderRatisServer, 200);
+    // Create some snapshots, each with new keys
+    int keyIncrement = 10;
+    String snapshotNamePrefix = "snapshot";
+    String snapshotName = "";
+    List<String> keys = new ArrayList<>();
+    SnapshotInfo snapshotInfo = null;
+    for (int snapshotCount = 0; snapshotCount < numSnapshotsToCreate;
+        snapshotCount++) {
+      snapshotName = snapshotNamePrefix + snapshotCount;
+      keys = writeKeys(keyIncrement);
+      snapshotInfo = createOzoneSnapshot(leaderOM, snapshotName);
+    }
 
-    SnapshotInfo snapshotInfo = createOzoneSnapshot(leaderOM);
 
     // Get the latest db checkpoint from the leader OM.
     TransactionInfo transactionInfo =
@@ -204,11 +223,11 @@ public class TestOMRatisSnapshots {
 
     // The recently started OM should be lagging behind the leader OM.
     // Wait & for follower to update transactions to leader snapshot index.
-    // Timeout error if follower does not load update within 3s
+    // Timeout error if follower does not load update within 10s
     GenericTestUtils.waitFor(() -> {
       return followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
           >= leaderOMSnapshotIndex - 1;
-    }, 100, 3000);
+    }, 100, 10000);
 
     long followerOMLastAppliedIndex =
         followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex();
@@ -231,12 +250,12 @@ public class TestOMRatisSnapshots {
     // Verify that the follower OM's DB contains the transactions which were
     // made while it was inactive.
     OMMetadataManager followerOMMetaMngr = followerOM.getMetadataManager();
-    Assert.assertNotNull(followerOMMetaMngr.getVolumeTable().get(
+    assertNotNull(followerOMMetaMngr.getVolumeTable().get(
         followerOMMetaMngr.getVolumeKey(volumeName)));
-    Assert.assertNotNull(followerOMMetaMngr.getBucketTable().get(
+    assertNotNull(followerOMMetaMngr.getBucketTable().get(
         followerOMMetaMngr.getBucketKey(volumeName, bucketName)));
     for (String key : keys) {
-      Assert.assertNotNull(followerOMMetaMngr.getKeyTable(
+      assertNotNull(followerOMMetaMngr.getKeyTable(
           TEST_BUCKET_LAYOUT)
           .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
     }
@@ -259,11 +278,19 @@ public class TestOMRatisSnapshots {
         volumeName, bucketName, newKeys.get(0))));
      */
 
-    // Read back data from the OM snapshot.
+    checkSnapshot(leaderOM, followerOM, snapshotName, keys, snapshotInfo);
+  }
+
+  private void checkSnapshot(OzoneManager leaderOM, OzoneManager followerOM,
+                             String snapshotName,
+                             List<String> keys, SnapshotInfo snapshotInfo)
+      throws IOException {
+    // Read back data from snapshot.
     OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
-        .setKeyName(".snapshot/snap1/" + keys.get(0)).build();
+        .setKeyName(".snapshot/" + snapshotName + "/" +
+            keys.get(keys.size() - 1)).build();
     OmKeyInfo omKeyInfo;
     omKeyInfo = followerOM.lookupKey(omKeyArgs);
     Assertions.assertNotNull(omKeyInfo);
@@ -314,7 +341,8 @@ public class TestOMRatisSnapshots {
 
   @Test
   @Timeout(300)
-  public void testInstallIncrementalSnapshot() throws Exception {
+  public void testInstallIncrementalSnapshot(@TempDir Path tempDir)
+      throws Exception {
     // Get the leader OM
     String leaderOMNodeId = OmFailoverProxyUtil
         .getFailoverProxyProvider(objectStore.getClientProxy())
@@ -338,6 +366,8 @@ public class TestOMRatisSnapshots {
     List<String> firstKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
         80);
 
+    SnapshotInfo snapshotInfo2 = createOzoneSnapshot(leaderOM, "snap80");
+
     // Start the inactive OM. Checkpoint installation will happen 
spontaneously.
     cluster.startInactiveOM(followerNodeId);
 
@@ -346,10 +376,11 @@ public class TestOMRatisSnapshots {
       return followerOM.getOmSnapshotProvider().getNumDownloaded() == 1;
     }, 1000, 10000);
 
-    // Do some transactions, let leader OM take a new snapshot and purge the
-    // old logs, so that follower must download the new snapshot again.
-    List<String> secondKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
-        160);
+    // Get two incremental tarballs, adding new keys/snapshot for each.
+    IncrementData firstIncrement = getNextIncrementalTarball(160, 2, leaderOM,
+        leaderRatisServer, faultInjector, followerOM, tempDir);
+    IncrementData secondIncrement = getNextIncrementalTarball(240, 3, leaderOM,
+        leaderRatisServer, faultInjector, followerOM, tempDir);
 
     // Resume the follower thread, it would download the incremental snapshot.
     faultInjector.resume();
@@ -364,13 +395,13 @@ public class TestOMRatisSnapshots {
 
     // The recently started OM should be lagging behind the leader OM.
     // Wait & for follower to update transactions to leader snapshot index.
-    // Timeout error if follower does not load update within 10s
+    // Timeout error if follower does not load update within 30s
     GenericTestUtils.waitFor(() -> {
       return followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
           >= leaderOMSnapshotIndex - 1;
-    }, 1000, 10000);
+    }, 1000, 30000);
 
-    assertEquals(2, followerOM.getOmSnapshotProvider().getNumDownloaded());
+    assertEquals(3, followerOM.getOmSnapshotProvider().getNumDownloaded());
 
     // Verify that the follower OM's DB contains the transactions which were
     // made while it was inactive.
@@ -384,7 +415,12 @@ public class TestOMRatisSnapshots {
       assertNotNull(followerOMMetaMngr.getKeyTable(TEST_BUCKET_LAYOUT)
           .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
     }
-    for (String key : secondKeys) {
+    for (String key : firstIncrement.getKeys()) {
+      assertNotNull(followerOMMetaMngr.getKeyTable(TEST_BUCKET_LAYOUT)
+          .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
+    }
+
+    for (String key : secondIncrement.getKeys()) {
       assertNotNull(followerOMMetaMngr.getKeyTable(TEST_BUCKET_LAYOUT)
           .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
     }
@@ -394,7 +430,7 @@ public class TestOMRatisSnapshots {
         getDBCheckpointMetrics();
     Assertions.assertTrue(
         dbMetrics.getLastCheckpointStreamingNumSSTExcluded() > 0);
-    assertEquals(1, dbMetrics.getNumIncrementalCheckpoints());
+    assertEquals(2, dbMetrics.getNumIncrementalCheckpoints());
 
     // Verify RPC server is running
     GenericTestUtils.waitFor(() -> {
@@ -413,6 +449,99 @@ public class TestOMRatisSnapshots {
         getCandidateDir().list();
     assertNotNull(filesInCandidate);
     assertEquals(0, filesInCandidate.length);
+
+    checkSnapshot(leaderOM, followerOM, "snap80", firstKeys, snapshotInfo2);
+    checkSnapshot(leaderOM, followerOM, "snap160", firstIncrement.getKeys(),
+        firstIncrement.getSnapshotInfo());
+    checkSnapshot(leaderOM, followerOM, "snap240", secondIncrement.getKeys(),
+        secondIncrement.getSnapshotInfo());
+    Assertions.assertEquals(
+        followerOM.getOmSnapshotProvider().getInitCount(), 2,
+        "Only initialized twice");
+  }
+
+  static class IncrementData {
+    private List<String> keys;
+    private SnapshotInfo snapshotInfo;
+    public List<String> getKeys() {
+      return keys;
+    }
+    public SnapshotInfo getSnapshotInfo() {
+      return snapshotInfo;
+    }
+  }
+
+  private IncrementData getNextIncrementalTarball(
+      int numKeys, int expectedNumDownloads,
+      OzoneManager leaderOM, OzoneManagerRatisServer leaderRatisServer,
+      FaultInjector faultInjector, OzoneManager followerOM, Path tempDir)
+      throws IOException, InterruptedException, TimeoutException {
+    IncrementData id = new IncrementData();
+
+    // Get the latest db checkpoint from the leader OM.
+    TransactionInfo transactionInfo =
+        TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
+    TermIndex leaderOMTermIndex =
+        TermIndex.valueOf(transactionInfo.getTerm(),
+            transactionInfo.getTransactionIndex());
+    long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
+    // Do some transactions, let leader OM take a new snapshot and purge the
+    // old logs, so that follower must download the new increment.
+    id.keys = writeKeysToIncreaseLogIndex(leaderRatisServer,
+        numKeys);
+
+    id.snapshotInfo = createOzoneSnapshot(leaderOM, "snap" + numKeys);
+    // Resume the follower thread, it would download the incremental snapshot.
+    faultInjector.resume();
+
+    // Pause the follower thread again to block the next install
+    faultInjector.reset();
+
+    // Wait the follower download the incremental snapshot, but get stuck
+    // by injector
+    GenericTestUtils.waitFor(() ->
+        followerOM.getOmSnapshotProvider().getNumDownloaded() ==
+        expectedNumDownloads, 1000, 10000);
+
+    assertTrue(followerOM.getOmRatisServer().
+        getLastAppliedTermIndex().getIndex()
+        >= leaderOMSnapshotIndex - 1);
+
+    // Now confirm tarball is just incremental and contains no unexpected
+    //  files/links.
+    Path increment = Paths.get(tempDir.toString(), "increment" + numKeys);
+    assertTrue(increment.toFile().mkdirs());
+    unTarLatestTarBall(followerOM, increment);
+    List<String> sstFiles = HAUtils.getExistingSstFiles(increment.toFile());
+    Path followerCandidatePath = followerOM.getOmSnapshotProvider().
+        getCandidateDir().toPath();
+
+    // Confirm that none of the files in the tarball match one in the
+    // candidate dir.
+    assertTrue(sstFiles.size() > 0);
+    for (String s: sstFiles) {
+      File sstFile = Paths.get(followerCandidatePath.toString(), s).toFile();
+      assertFalse(sstFile.exists(),
+          sstFile + " should not duplicate existing files");
+    }
+
+    // Confirm that none of the links in the tarballs hardLinkFile
+    //  match the existing files
+    Path hardLinkFile = Paths.get(increment.toString(), OM_HARDLINK_FILE);
+    try (Stream<String> lines = Files.lines(hardLinkFile)) {
+      int lineCount = 0;
+      for (String line: lines.collect(Collectors.toList())) {
+        lineCount++;
+        String link = line.split("\t")[0];
+        File linkFile = Paths.get(
+            followerCandidatePath.toString(), link).toFile();
+        assertFalse(linkFile.exists(),
+            "Incremental checkpoint should not " +
+                "duplicate existing links");
+      }
+      assertTrue(lineCount > 0);
+    }
+    return id;
   }
 
   @Test
@@ -457,7 +586,7 @@ public class TestOMRatisSnapshots {
     // Resume the follower thread, it would download the incremental snapshot.
     faultInjector.resume();
 
-    // Pause the follower thread again to block the second-time install
+    // Pause the follower thread again to block the tarball install
     faultInjector.reset();
 
     // Wait the follower download the incremental snapshot, but get stuck
@@ -616,25 +745,25 @@ public class TestOMRatisSnapshots {
     // Verify that the follower OM's DB contains the transactions which were
     // made while it was inactive.
     OMMetadataManager followerOMMetaMgr = followerOM.getMetadataManager();
-    Assert.assertNotNull(followerOMMetaMgr.getVolumeTable().get(
+    assertNotNull(followerOMMetaMgr.getVolumeTable().get(
         followerOMMetaMgr.getVolumeKey(volumeName)));
-    Assert.assertNotNull(followerOMMetaMgr.getBucketTable().get(
+    assertNotNull(followerOMMetaMgr.getBucketTable().get(
         followerOMMetaMgr.getBucketKey(volumeName, bucketName)));
     for (String key : keys) {
-      Assert.assertNotNull(followerOMMetaMgr.getKeyTable(
+      assertNotNull(followerOMMetaMgr.getKeyTable(
           TEST_BUCKET_LAYOUT)
           .get(followerOMMetaMgr.getOzoneKey(volumeName, bucketName, key)));
     }
     OMMetadataManager leaderOmMetaMgr = leaderOM.getMetadataManager();
     for (String key : newKeys) {
-      Assert.assertNotNull(leaderOmMetaMgr.getKeyTable(
+      assertNotNull(leaderOmMetaMgr.getKeyTable(
           TEST_BUCKET_LAYOUT)
           .get(followerOMMetaMgr.getOzoneKey(volumeName, bucketName, key)));
     }
     Thread.sleep(5000);
     followerOMMetaMgr = followerOM.getMetadataManager();
     for (String key : newKeys) {
-      Assert.assertNotNull(followerOMMetaMgr.getKeyTable(
+      assertNotNull(followerOMMetaMgr.getKeyTable(
           TEST_BUCKET_LAYOUT)
           .get(followerOMMetaMgr.getOzoneKey(volumeName, bucketName, key)));
     }
@@ -684,7 +813,7 @@ public class TestOMRatisSnapshots {
         getKeys(keys, 10);
         readKeys(keys);
       } catch (IOException e) {
-        Fail.fail("Read Key failed", e);
+        assertTrue(Fail.fail("Read Key failed", e));
       }
       return null;
     });
@@ -715,12 +844,12 @@ public class TestOMRatisSnapshots {
     // Verify that the follower OM's DB contains the transactions which were
     // made while it was inactive.
     OMMetadataManager followerOMMetaMngr = followerOM.getMetadataManager();
-    Assert.assertNotNull(followerOMMetaMngr.getVolumeTable().get(
+    assertNotNull(followerOMMetaMngr.getVolumeTable().get(
         followerOMMetaMngr.getVolumeKey(volumeName)));
-    Assert.assertNotNull(followerOMMetaMngr.getBucketTable().get(
+    assertNotNull(followerOMMetaMngr.getBucketTable().get(
         followerOMMetaMngr.getBucketKey(volumeName, bucketName)));
     for (String key : keys) {
-      Assert.assertNotNull(followerOMMetaMngr.getKeyTable(
+      assertNotNull(followerOMMetaMngr.getKeyTable(
           TEST_BUCKET_LAYOUT)
           .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
     }
@@ -779,9 +908,10 @@ public class TestOMRatisSnapshots {
         "TermIndex " + followerTermIndex + " and checkpoint has lower " +
         "TermIndex";
     assertLogCapture(logCapture, errorMsg);
-    Assert.assertNull("OM installed checkpoint even though checkpoint " +
-        "logIndex is less than it's lastAppliedIndex", newTermIndex);
-    Assert.assertEquals(followerTermIndex,
+    assertNull(newTermIndex,
+        "OM installed checkpoint even though checkpoint " +
+            "logIndex is less than it's lastAppliedIndex");
+    assertEquals(followerTermIndex,
         followerRatisServer.getLastAppliedTermIndex());
     String msg = "OM DB is not stopped. Started services with Term: " +
         followerTermIndex.getTerm() + " and Index: " +
@@ -845,13 +975,13 @@ public class TestOMRatisSnapshots {
     assertLogCapture(logCapture, msg);
   }
 
-  private SnapshotInfo createOzoneSnapshot(OzoneManager leaderOM)
+  private SnapshotInfo createOzoneSnapshot(OzoneManager leaderOM, String name)
       throws IOException {
-    objectStore.createSnapshot(volumeName, bucketName, "snap1");
+    objectStore.createSnapshot(volumeName, bucketName, name);
 
     String tableKey = SnapshotInfo.getTableKey(volumeName,
                                                bucketName,
-                                               "snap1");
+                                               name);
     SnapshotInfo snapshotInfo = leaderOM.getMetadataManager()
         .getSnapshotInfoTable()
         .get(tableKey);
@@ -893,7 +1023,7 @@ public class TestOMRatisSnapshots {
     while (round > 0) {
       for (String keyName : keys) {
         OzoneKeyDetails key = ozoneBucket.getKey(keyName);
-        Assert.assertEquals(keyName, key.getName());
+        assertEquals(keyName, key.getName());
       }
       round--;
     }
@@ -916,6 +1046,17 @@ public class TestOMRatisSnapshots {
     }, 100, 5000);
   }
 
+  // Returns temp dir where tarball was untarred.
+  private void unTarLatestTarBall(OzoneManager followerOm, Path tempDir)
+      throws IOException {
+    File snapshotDir = followerOm.getOmSnapshotProvider().getSnapshotDir();
+    // Find the latest tarball.
+    String tarBall = Arrays.stream(Objects.requireNonNull(snapshotDir.list())).
+        filter(s -> s.toLowerCase().endsWith(".tar")).
+        reduce("", (s1, s2) -> s1.compareToIgnoreCase(s2) > 0 ? s1 : s2);
+    FileUtil.unTar(new File(snapshotDir, tarBall), tempDir.toFile());
+  }
+
   private static class DummyExitManager extends ExitManager {
     @Override
     public void exitSystem(int status, String message, Throwable throwable,
@@ -950,6 +1091,13 @@ public class TestOMRatisSnapshots {
 
     @Override
     public void resume() throws IOException {
+      // Make sure injector pauses before resuming.
+      try {
+        ready.await();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+        assertTrue(Fail.fail("resume interrupted"));
+      }
       wait.countDown();
     }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
index bf66528ffb..4db64fba41 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.om;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.compress.archivers.ArchiveOutputStream;
 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -31,7 +32,6 @@ import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
-import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,6 +60,7 @@ import static 
org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
 import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA;
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
 import static 
org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.createHardLinkList;
 import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
 import static 
org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.truncateFileName;
@@ -74,7 +75,7 @@ import static 
org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.truncateFileNa
  *
  * If Kerberos is enabled, the principal should be appended to
  * `ozone.administrator`, e.g. `scm/[email protected]`
- * If Kerberos is not enabled, simply append the login user name to
+ * If Kerberos is not enabled, simply append the login username to
  * `ozone.administrator`, e.g. `scm`
  */
 public class OMDBCheckpointServlet extends DBCheckpointServlet
@@ -129,8 +130,8 @@ public class OMDBCheckpointServlet extends 
DBCheckpointServlet
     Objects.requireNonNull(toExcludeList);
     Objects.requireNonNull(excludedList);
 
-    // Map of inodes to path.
-    Map<Object, Path> copyFiles = new HashMap<>();
+    // Files to be added to tarball
+    Set<Path> copyFiles = new HashSet<>();
     // Map of link to path.
     Map<Path, Path> hardLinkFiles = new HashMap<>();
 
@@ -141,32 +142,47 @@ public class OMDBCheckpointServlet extends 
DBCheckpointServlet
           .setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
       archiveOutputStream
           .setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX);
-      getFilesForArchive(checkpoint, copyFiles, hardLinkFiles,
-          includeSnapshotData(request));
-
-      // Exclude file
-      Map<Object, Path> finalCopyFiles = new HashMap<>();
-      copyFiles.forEach((o, path) -> {
-        String fName = path.getFileName().toString();
-        if (!toExcludeList.contains(fName)) {
-          finalCopyFiles.put(o, path);
-        } else {
-          excludedList.add(fName);
-        }
-      });
-      writeFilesToArchive(finalCopyFiles, hardLinkFiles, archiveOutputStream);
+      // Files to be excluded from tarball
+      Set<Path> toExcludeFiles = normalizeExcludeList(toExcludeList,
+          checkpoint.getCheckpointLocation().toString(),
+          ServerUtils.getOzoneMetaDirPath(getConf()).toString());
+      getFilesForArchive(checkpoint, copyFiles, hardLinkFiles, toExcludeFiles,
+          includeSnapshotData(request), excludedList);
+      writeFilesToArchive(copyFiles, hardLinkFiles, archiveOutputStream);
+    } catch (Exception e) {
+      LOG.error("got exception writing to archive " + e);
+      throw e;
     }
   }
 
+  // Format list from follower to match data on leader.
+  @VisibleForTesting
+  public static Set<Path> normalizeExcludeList(List<String> toExcludeList,
+      String checkpointLocation, String metaDirPath) {
+    Set<Path> paths = new HashSet<>();
+    for (String s: toExcludeList) {
+      if (!s.startsWith(OM_SNAPSHOT_DIR)) {
+        Path fixedPath = Paths.get(checkpointLocation, s);
+        paths.add(fixedPath);
+      } else {
+        paths.add(Paths.get(metaDirPath, s));
+      }
+    }
+    return paths;
+  }
+
   private void getFilesForArchive(DBCheckpoint checkpoint,
-                                  Map<Object, Path> copyFiles,
+                                  Set<Path> copyFiles,
                                   Map<Path, Path> hardLinkFiles,
-                                  boolean includeSnapshotData)
+                                  Set<Path> toExcludeFiles,
+                                  boolean includeSnapshotData,
+                                  List<String> excluded)
       throws IOException {
 
     // Get the active fs files.
     Path dir = checkpoint.getCheckpointLocation();
-    processDir(dir, copyFiles, hardLinkFiles, new HashSet<>());
+    processDir(dir, copyFiles, hardLinkFiles, toExcludeFiles,
+        new HashSet<>(), excluded);
 
     if (!includeSnapshotData) {
       return;
@@ -176,7 +192,8 @@ public class OMDBCheckpointServlet extends 
DBCheckpointServlet
     Set<Path> snapshotPaths = waitForSnapshotDirs(checkpoint);
     Path snapshotDir = Paths.get(OMStorage.getOmDbDir(getConf()).toString(),
         OM_SNAPSHOT_DIR);
-    processDir(snapshotDir, copyFiles, hardLinkFiles, snapshotPaths);
+    processDir(snapshotDir, copyFiles, hardLinkFiles, toExcludeFiles,
+        snapshotPaths, excluded);
   }
 
   /**
@@ -219,9 +236,11 @@ public class OMDBCheckpointServlet extends 
DBCheckpointServlet
     }
   }
 
-  private void processDir(Path dir, Map<Object, Path> copyFiles,
+  private void processDir(Path dir, Set<Path> copyFiles,
                           Map<Path, Path> hardLinkFiles,
-                          Set<Path> snapshotPaths)
+                          Set<Path> toExcludeFiles,
+                          Set<Path> snapshotPaths,
+                          List<String> excluded)
       throws IOException {
     try (Stream<Path> files = Files.list(dir)) {
       for (Path file : files.collect(Collectors.toList())) {
@@ -234,24 +253,70 @@ public class OMDBCheckpointServlet extends 
DBCheckpointServlet
             LOG.debug("Skipping unneeded file: " + file);
             continue;
           }
-          processDir(file, copyFiles, hardLinkFiles, snapshotPaths);
+          processDir(file, copyFiles, hardLinkFiles, toExcludeFiles,
+              snapshotPaths, excluded);
         } else {
-          processFile(file, copyFiles, hardLinkFiles);
+          processFile(file, copyFiles, hardLinkFiles, toExcludeFiles, 
excluded);
         }
       }
     }
   }
 
-  private void processFile(Path file, Map<Object, Path> copyFiles,
-                           Map<Path, Path> hardLinkFiles) throws IOException {
-    // Get the inode.
-    Object key = OmSnapshotUtils.getINode(file);
-    // If we already have the inode, store as hard link.
-    if (copyFiles.containsKey(key)) {
-      hardLinkFiles.put(file, copyFiles.get(key));
+  /**
+   * Takes a db file and determines whether it should be included in
+   * the tarball, or added as a link, or excluded altogether.
+   * Uses the toExcludeFiles list to know what already
+   * exists on the follower.
+   * @param file The db file to be processed.
+   * @param copyFiles The db files to be added to tarball.
+   * @param hardLinkFiles The db files to be added as hard links.
+   * @param toExcludeFiles The db files to be excluded from tarball.
+   * @param excluded The list of db files that actually were excluded.
+   */
+  @VisibleForTesting
+  public static void processFile(Path file, Set<Path> copyFiles,
+                                 Map<Path, Path> hardLinkFiles,
+                                 Set<Path> toExcludeFiles,
+                                 List<String> excluded) {
+    if (toExcludeFiles.contains(file)) {
+      excluded.add(file.toString());
     } else {
-      copyFiles.put(key, file);
+      Path fileNamePath = file.getFileName();
+      if (fileNamePath == null) {
+        throw new NullPointerException("Bad file: " + file);
+      }
+      String fileName = fileNamePath.toString();
+      if (fileName.endsWith(ROCKSDB_SST_SUFFIX)) {
+        // If same as existing excluded file, add a link for it.
+        Path linkPath = findLinkPath(toExcludeFiles, fileName);
+        if (linkPath != null) {
+          hardLinkFiles.put(file, linkPath);
+        } else {
+          // If already in tarball add a link for it.
+          linkPath = findLinkPath(copyFiles, fileName);
+          if (linkPath != null) {
+            hardLinkFiles.put(file, linkPath);
+          } else {
+            // Add to tarball.
+            copyFiles.add(file);
+          }
+        }
+      } else {
+        // Not sst file.
+        copyFiles.add(file);
+      }
+    }
+  }
+
+  // If fileName exists in "files" parameter,
+  // it should be linked to path in files.
+  private static Path findLinkPath(Set<Path> files, String fileName) {
+    for (Path p: files) {
+      if (p.toString().endsWith(fileName)) {
+        return p;
+      }
     }
+    return null;
   }
 
   // Returns value of http request parameter.
@@ -261,16 +326,16 @@ public class OMDBCheckpointServlet extends 
DBCheckpointServlet
     return Boolean.parseBoolean(includeParam);
   }
 
-  private void writeFilesToArchive(Map<Object, Path> copyFiles,
-                         Map<Path, Path> hardLinkFiles,
-                         ArchiveOutputStream archiveOutputStream)
+  private void writeFilesToArchive(Set<Path> copyFiles,
+                                   Map<Path, Path> hardLinkFiles,
+                                   ArchiveOutputStream archiveOutputStream)
       throws IOException {
 
     File metaDirPath = ServerUtils.getOzoneMetaDirPath(getConf());
     int truncateLength = metaDirPath.toString().length() + 1;
 
     // Go through each of the files to be copied and add to archive.
-    for (Path file : copyFiles.values()) {
+    for (Path file : copyFiles) {
       String fixedFile = truncateFileName(truncateLength, file);
       if (fixedFile.startsWith(OM_CHECKPOINT_DIR)) {
         // checkpoint files go to root of tarball
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 2acde7f317..1e8cecd00b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -3537,6 +3537,8 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
 
     TermIndex termIndex = null;
     try {
+      // Install hard links.
+      OmSnapshotUtils.createHardLinks(omDBCheckpoint.getCheckpointLocation());
       termIndex = installCheckpoint(leaderId, omDBCheckpoint);
     } catch (Exception ex) {
       LOG.error("Failed to install snapshot from Leader OM.", ex);
@@ -3767,15 +3769,17 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       // an inconsistent state and this marker file will fail OM from
       // starting up.
       Files.createFile(markerFile);
-      // Copy the candidate DB to real DB
-      org.apache.commons.io.FileUtils.copyDirectory(checkpointPath.toFile(),
+      // Link each of the candidate DB files to real DB directory.  This
+      // preserves the links that already exist between files in the
+      // candidate db.
+      OmSnapshotUtils.linkFiles(checkpointPath.toFile(),
           oldDB);
       moveOmSnapshotData(oldDB.toPath(), dbSnapshotsDir.toPath());
       Files.deleteIfExists(markerFile);
     } catch (IOException e) {
       LOG.error("Failed to move downloaded DB checkpoint {} to metadata " +
-              "directory {}. Resetting to original DB.", checkpointPath,
-          oldDB.toPath());
+              "directory {}. Exception: {}. Resetting to original DB.",
+          checkpointPath, oldDB.toPath(), e);
       try {
         FileUtil.fullyDelete(oldDB);
         Files.move(dbBackup.toPath(), oldDB.toPath());
@@ -3792,14 +3796,13 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     }
   }
 
-  // Move the new snapshot directory into place and create hard links.
+  // Move the new snapshot directory into place.
   private void moveOmSnapshotData(Path dbPath, Path dbSnapshotsDir)
       throws IOException {
     Path incomingSnapshotsDir = Paths.get(dbPath.toString(),
         OM_SNAPSHOT_DIR);
     if (incomingSnapshotsDir.toFile().exists()) {
       Files.move(incomingSnapshotsDir, dbSnapshotsDir);
-      OmSnapshotUtils.createHardLinks(dbPath);
     }
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
index 9aef593af8..eac63a54bc 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
@@ -115,8 +115,8 @@ public final class OmSnapshotUtils {
         for (String l : lines) {
           String from = l.split("\t")[1];
           String to = l.split("\t")[0];
-          Path fullFromPath = getFullPath(dbPath, from);
-          Path fullToPath = getFullPath(dbPath, to);
+          Path fullFromPath = Paths.get(dbPath.toString(), from);
+          Path fullToPath = Paths.get(dbPath.toString(), to);
           Files.createLink(fullToPath, fullFromPath);
         }
         if (!hardLinkFile.delete()) {
@@ -126,19 +126,40 @@ public final class OmSnapshotUtils {
     }
   }
 
-  // Prepend the full path to the hard link entry entry.
-  private static Path getFullPath(Path dbPath, String fileName)
-      throws IOException {
-    File file = new File(fileName);
-    // If there is no directory then this file belongs in the db.
-    if (file.getName().equals(fileName)) {
-      return Paths.get(dbPath.toString(), fileName);
+  /**
+   * Link each of the files in oldDir to newDir.
+   *
+   * @param oldDir The dir to create links from.
+   * @param newDir The dir to create links to.
+   */
+  public static void linkFiles(File oldDir, File newDir) throws IOException {
+    int truncateLength = oldDir.toString().length() + 1;
+    List<String> oldDirList;
+    try (Stream<Path> files = Files.walk(oldDir.toPath())) {
+      oldDirList = files.map(Path::toString).
+          // Don't copy the directory itself
+          filter(s -> !s.equals(oldDir.toString())).
+          // Remove the old path
+          map(s -> s.substring(truncateLength)).
+          sorted().
+          collect(Collectors.toList());
     }
-    // Else this file belong in a directory parallel to the db.
-    Path parent = dbPath.getParent();
-    if (parent == null) {
-      throw new IOException("Invalid database " + dbPath);
+    for (String s: oldDirList) {
+      File oldFile = new File(oldDir, s);
+      File newFile = new File(newDir, s);
+      File newParent = newFile.getParentFile();
+      if (!newParent.exists()) {
+        if (!newParent.mkdirs()) {
+          throw new IOException("Directory create fails: " + newParent);
+        }
+      }
+      if (oldFile.isDirectory()) {
+        if (!newFile.mkdirs()) {
+          throw new IOException("Directory create fails: " + newFile);
+        }
+      } else {
+        Files.createLink(newFile.toPath(), oldFile.toPath());
+      }
     }
-    return Paths.get(parent.toString(), fileName);
   }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
index 6e0b98657a..445bdb2a4b 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
@@ -42,14 +42,24 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
+import static org.apache.commons.io.file.PathUtils.copyDirectory;
+import static org.apache.hadoop.hdds.utils.HAUtils.getExistingSstFiles;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR;
+import static org.apache.hadoop.ozone.OzoneConsts.SNAPSHOT_CANDIDATE_DIR;
+import static org.apache.hadoop.ozone.om.OMDBCheckpointServlet.processFile;
 import static org.apache.hadoop.ozone.OzoneConsts.SNAPSHOT_INFO_TABLE;
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.BUCKET_TABLE;
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.VOLUME_TABLE;
@@ -63,12 +73,22 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
- * Unit test om snapshot manager.
+ * Unit test ozone snapshot manager.
  */
 public class TestOmSnapshotManager {
 
   private OzoneManager om;
   private File testDir;
+  private static final String CANDIDATE_DIR_NAME = OM_DB_NAME +
+      SNAPSHOT_CANDIDATE_DIR;
+  private File leaderDir;
+  private File leaderSnapDir1;
+  private File leaderSnapDir2;
+  private File followerSnapDir2;
+  private File leaderCheckpointDir;
+  private File candidateDir;
+  private File s1File;
+  private File f1File;
 
   @Before
   public void init() throws Exception {
@@ -86,6 +106,7 @@ public class TestOmSnapshotManager {
 
     OmTestManagers omTestManagers = new OmTestManagers(configuration);
     om = omTestManagers.getOzoneManager();
+    setupData();
   }
 
   @After
@@ -165,58 +186,204 @@ public class TestOmSnapshotManager {
     verify(firstSnapshotStore, timeout(3000).times(1)).close();
   }
 
+  private void setupData() throws IOException {
+    // Set up the leader with the following files:
+    // leader/db.checkpoints/checkpoint1/f1.sst
+    // leader/db.snapshots/checkpointState/snap1/s1.sst
+    // leader/db.snapshots/checkpointState/snap2/noLink.sst
+    // leader/db.snapshots/checkpointState/snap2/nonSstFile
+
+    // Set up the follower with the following files, (as if they came
+    // from the tarball from the leader)
+
+    // follower/om.db.candidate/f1.sst
+    // follower/om.db.candidate/db.snapshots/checkpointState/snap1/s1.sst
+    // follower/om.db.candidate/db.snapshots/checkpointState/snap2/noLink.sst
+    // follower/om.db.candidate/db.snapshots/checkpointState/snap2/nonSstFile
+
+    // Note that the layout between leader and follower is slightly
+    // different in that the f1.sst on the leader is in the
+    // db.checkpoints/checkpoint1 directory but on the follower is
+    // moved to the om.db.candidate directory; the links must be adjusted
+    // accordingly.
+
+    byte[] dummyData = {0};
+
+    // Create dummy leader files to calculate links.
+    leaderDir = new File(testDir.toString(),
+        "leader");
+    Assert.assertTrue(leaderDir.mkdirs());
+    String pathSnap1 = OM_SNAPSHOT_CHECKPOINT_DIR + OM_KEY_PREFIX + "snap1";
+    String pathSnap2 = OM_SNAPSHOT_CHECKPOINT_DIR + OM_KEY_PREFIX + "snap2";
+    leaderSnapDir1 = new File(leaderDir.toString(), pathSnap1);
+    Assert.assertTrue(leaderSnapDir1.mkdirs());
+    Files.write(Paths.get(leaderSnapDir1.toString(), "s1.sst"), dummyData);
+
+    leaderSnapDir2 = new File(leaderDir.toString(), pathSnap2);
+    Assert.assertTrue(leaderSnapDir2.mkdirs());
+    Files.write(Paths.get(leaderSnapDir2.toString(), "noLink.sst"), dummyData);
+    Files.write(Paths.get(leaderSnapDir2.toString(), "nonSstFile"), dummyData);
+
+    // Also create the follower files.
+    candidateDir = new File(testDir.toString(),
+        CANDIDATE_DIR_NAME);
+    File followerSnapDir1 = new File(candidateDir.toString(), pathSnap1);
+    followerSnapDir2 = new File(candidateDir.toString(), pathSnap2);
+    copyDirectory(leaderDir.toPath(), candidateDir.toPath());
+    f1File = new File(candidateDir, "f1.sst");
+    Files.write(f1File.toPath(), dummyData);
+    s1File = new File(followerSnapDir1, "s1.sst");
+    // confirm s1 file got copied over.
+    Assert.assertTrue(s1File.exists());
+
+    // Finish creating leaders files that are not to be copied over, because
+    //  f1.sst belongs in a different directory as explained above.
+    leaderCheckpointDir = new File(leaderDir.toString(),
+        OM_CHECKPOINT_DIR + OM_KEY_PREFIX + "checkpoint1");
+    Assert.assertTrue(leaderCheckpointDir.mkdirs());
+    Files.write(Paths.get(leaderCheckpointDir.toString(), "f1.sst"), 
dummyData);
+  }
+
+  /*
+   * Create map of links to files on the leader:
+   *     leader/db.snapshots/checkpointState/snap2/<link to f1.sst>
+   *     leader/db.snapshots/checkpointState/snap2/<link to s1.sst>
+   * and test that corresponding links are created on the Follower:
+   *     follower/db.snapshots/checkpointState/snap2/f1.sst
+   *     follower/db.snapshots/checkpointState/snap2/s1.sst
+   */
   @Test
   @SuppressFBWarnings({"NP_NULL_ON_SOME_PATH"})
   public void testHardLinkCreation() throws IOException {
-    byte[] dummyData = {0};
 
-    // Create dummy files to be linked to.
-    File snapDir1 = new File(testDir.toString(),
-        OM_SNAPSHOT_CHECKPOINT_DIR + OM_KEY_PREFIX + "dir1");
-    if (!snapDir1.mkdirs()) {
-      throw new IOException("failed to make directory: " + snapDir1);
-    }
-    Files.write(Paths.get(snapDir1.toString(), "s1"), dummyData);
-
-    File snapDir2 = new File(testDir.toString(),
-        OM_SNAPSHOT_CHECKPOINT_DIR + OM_KEY_PREFIX + "dir2");
-    if (!snapDir2.mkdirs()) {
-      throw new IOException("failed to make directory: " + snapDir2);
-    }
-
-    File dbDir = new File(testDir.toString(), OM_DB_NAME);
-    Files.write(Paths.get(dbDir.toString(), "f1"), dummyData);
-
-    // Create map of links to dummy files.
-    File checkpointDir1 = new File(testDir.toString(),
-        OM_CHECKPOINT_DIR + OM_KEY_PREFIX + "dir1");
+    // Map of links to files on the leader
     Map<Path, Path> hardLinkFiles = new HashMap<>();
-    hardLinkFiles.put(Paths.get(snapDir2.toString(), "f1"),
-        Paths.get(checkpointDir1.toString(), "f1"));
-    hardLinkFiles.put(Paths.get(snapDir2.toString(), "s1"),
-        Paths.get(snapDir1.toString(), "s1"));
+    hardLinkFiles.put(Paths.get(leaderSnapDir2.toString(), "f1.sst"),
+        Paths.get(leaderCheckpointDir.toString(), "f1.sst"));
+    hardLinkFiles.put(Paths.get(leaderSnapDir2.toString(), "s1.sst"),
+        Paths.get(leaderSnapDir1.toString(), "s1.sst"));
 
-    // Create link list.
+    // Create link list from leader map.
     Path hardLinkList =
         OmSnapshotUtils.createHardLinkList(
-            testDir.toString().length() + 1, hardLinkFiles);
-    Files.move(hardLinkList, Paths.get(dbDir.toString(), OM_HARDLINK_FILE));
-
-    // Create links from list.
-    OmSnapshotUtils.createHardLinks(dbDir.toPath());
-
-    // Confirm expected links.
-    for (Map.Entry<Path, Path> entry : hardLinkFiles.entrySet()) {
-      Assert.assertTrue(entry.getKey().toFile().exists());
-      Path value = entry.getValue();
-      // Convert checkpoint path to om.db.
-      if (value.toString().contains(OM_CHECKPOINT_DIR)) {
-        value = Paths.get(dbDir.toString(),
-                          value.getFileName().toString());
-      }
-      Assert.assertEquals("link matches original file",
-          getINode(entry.getKey()), getINode(value));
-    }
+            leaderDir.toString().length() + 1, hardLinkFiles);
+
+    Files.move(hardLinkList, Paths.get(candidateDir.toString(),
+        OM_HARDLINK_FILE));
+
+    // Pointers to follower links to be created.
+    File f1FileLink = new File(followerSnapDir2, "f1.sst");
+    File s1FileLink = new File(followerSnapDir2, "s1.sst");
+
+    // Create links on the follower from list.
+    OmSnapshotUtils.createHardLinks(candidateDir.toPath());
+
+    // Confirm expected follower links.
+    Assert.assertTrue(s1FileLink.exists());
+    Assert.assertEquals("link matches original file",
+        getINode(s1File.toPath()), getINode(s1FileLink.toPath()));
+
+    Assert.assertTrue(f1FileLink.exists());
+    Assert.assertEquals("link matches original file",
+        getINode(f1File.toPath()), getINode(f1FileLink.toPath()));
+  }
+
+  /*
+   * Test that exclude list is generated correctly.
+   */
+  @Test
+  public void testExcludeUtilities() throws IOException {
+    File noLinkFile = new File(followerSnapDir2, "noLink.sst");
+
+    // Confirm that the list of existing sst files is as expected.
+    List<String> existingSstList = getExistingSstFiles(candidateDir);
+    Set<String> existingSstFiles = new HashSet<>(existingSstList);
+    int truncateLength = candidateDir.toString().length() + 1;
+    Set<String> expectedSstFiles = new HashSet<>(Arrays.asList(
+        s1File.toString().substring(truncateLength),
+        noLinkFile.toString().substring(truncateLength),
+        f1File.toString().substring(truncateLength)));
+    Assert.assertEquals(expectedSstFiles, existingSstFiles);
+
+    // Confirm that the excluded list is normalized as expected.
+    //  (Normalizing means matches the layout on the leader.)
+    Set<Path> normalizedSet =
+        OMDBCheckpointServlet.normalizeExcludeList(existingSstList,
+            leaderCheckpointDir.toString(), leaderDir.toString());
+    Set<Path> expectedNormalizedSet = new HashSet<>(Arrays.asList(
+        Paths.get(leaderSnapDir1.toString(), "s1.sst"),
+        Paths.get(leaderSnapDir2.toString(), "noLink.sst"),
+        Paths.get(leaderCheckpointDir.toString(), "f1.sst")));
+    Assert.assertEquals(expectedNormalizedSet, normalizedSet);
+  }
+
+  /*
+   * Confirm that processFile() correctly determines whether a file
+   * should be copied, linked, or excluded from the tarball entirely.
+   */
+  @Test
+  public void testProcessFile() {
+    Path copyFile = Paths.get(testDir.toString(),
+        "snap1/copyfile.sst");
+    Path excludeFile = Paths.get(testDir.toString(),
+        "snap1/excludeFile.sst");
+    Path linkToExcludedFile = Paths.get(testDir.toString(),
+        "snap2/excludeFile.sst");
+    Path linkToCopiedFile = Paths.get(testDir.toString(),
+        "snap2/copyfile.sst");
+    Path addToCopiedFiles = Paths.get(testDir.toString(),
+        "snap1/copyfile2.sst");
+    Path addNonSstToCopiedFiles = Paths.get(testDir.toString(),
+        "snap1/nonSst");
+
+    Set<Path> toExcludeFiles = new HashSet<>(
+        Collections.singletonList(excludeFile));
+    Set<Path> copyFiles = new HashSet<>(Collections.singletonList(copyFile));
+    List<String> excluded = new ArrayList<>();
+    Map<Path, Path> hardLinkFiles = new HashMap<>();
+
+    // Confirm the exclude file gets added to the excluded list,
+    //  (and thus is excluded.)
+    processFile(excludeFile, copyFiles, hardLinkFiles, toExcludeFiles,
+        excluded);
+    Assert.assertEquals(excluded.size(), 1);
+    Assert.assertEquals((excluded.get(0)), excludeFile.toString());
+    Assert.assertEquals(copyFiles.size(), 1);
+    Assert.assertEquals(hardLinkFiles.size(), 0);
+    excluded = new ArrayList<>();
+
+    // Confirm the linkToExcludedFile gets added as a link.
+    processFile(linkToExcludedFile, copyFiles, hardLinkFiles, toExcludeFiles,
+        excluded);
+    Assert.assertEquals(excluded.size(), 0);
+    Assert.assertEquals(copyFiles.size(), 1);
+    Assert.assertEquals(hardLinkFiles.size(), 1);
+    Assert.assertEquals(hardLinkFiles.get(linkToExcludedFile), excludeFile);
+    hardLinkFiles = new HashMap<>();
+
+    // Confirm the linkToCopiedFile gets added as a link.
+    processFile(linkToCopiedFile, copyFiles, hardLinkFiles, toExcludeFiles,
+        excluded);
+    Assert.assertEquals(excluded.size(), 0);
+    Assert.assertEquals(copyFiles.size(), 1);
+    Assert.assertEquals(hardLinkFiles.size(), 1);
+    Assert.assertEquals(hardLinkFiles.get(linkToCopiedFile), copyFile);
+    hardLinkFiles = new HashMap<>();
+
+    // Confirm the addToCopiedFiles gets added to list of copied files
+    processFile(addToCopiedFiles, copyFiles, hardLinkFiles, toExcludeFiles,
+        excluded);
+    Assert.assertEquals(excluded.size(), 0);
+    Assert.assertEquals(copyFiles.size(), 2);
+    Assert.assertTrue(copyFiles.contains(addToCopiedFiles));
+    copyFiles = new HashSet<>(Collections.singletonList(copyFile));
+
+    // Confirm the addNonSstToCopiedFiles gets added to list of copied files
+    processFile(addNonSstToCopiedFiles, copyFiles, hardLinkFiles,
+        toExcludeFiles, excluded);
+    Assert.assertEquals(excluded.size(), 0);
+    Assert.assertEquals(copyFiles.size(), 2);
+    Assert.assertTrue(copyFiles.contains(addNonSstToCopiedFiles));
   }
 
   private SnapshotInfo createSnapshotInfo(String volumeName,
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotUtils.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotUtils.java
new file mode 100644
index 0000000000..7e2483e574
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotUtils.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.getINode;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+/**
+ * Class to test snapshot utilities.
+ */
+public class TestOmSnapshotUtils {
+
+  /**
+   * Test linkFiles().
+   */
+  @Test
+  public void testLinkFiles(@TempDir File tempDir) throws Exception {
+
+    // Create the tree to link from
+    File dir1 = new File(tempDir, "tree1/dir1");
+    File dir2 = new File(tempDir, "tree1/dir2");
+    File tree1 = new File(tempDir, "tree1");
+    assertTrue(dir1.mkdirs());
+    assertTrue(dir2.mkdirs());
+    File f1 = new File(tempDir, "tree1/dir1/f1");
+    Files.write(f1.toPath(), "dummyData".getBytes(UTF_8));
+
+    // Create pointers to expected files/links.
+    File tree2 = new File(tempDir, "tree2");
+    File f1Link = new File(tempDir, "tree2/dir1/f1");
+
+    // Expected files/links shouldn't exist yet.
+    assertFalse(tree2.exists());
+    assertFalse(f1Link.exists());
+
+    OmSnapshotUtils.linkFiles(tree1, tree2);
+
+    // Expected files/links should exist now.
+    assertTrue(tree2.exists());
+    assertTrue(f1Link.exists());
+    assertEquals(getINode(f1.toPath()), getINode(f1Link.toPath()));
+
+    Set<String> tree1Files = Files.walk(tree1.toPath()).
+        map(Path::toString).
+        map((s) -> s.replace("tree1", "tree2")).
+        collect(Collectors.toSet());
+    Set<String> tree2Files = Files.walk(tree2.toPath()).
+        map(Path::toString).collect(Collectors.toSet());
+
+    assertEquals(tree1Files, tree2Files);
+    GenericTestUtils.deleteDirectory(tempDir);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to