This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 06433c0da9 HDDS-6961. [Snapshot] Bootstrapping slow followers/new 
followers. (#3980)
06433c0da9 is described below

commit 06433c0da953d885ce39e439e0c2ddc8d96af6f6
Author: GeorgeJahad <[email protected]>
AuthorDate: Wed Mar 29 22:35:42 2023 -0700

    HDDS-6961. [Snapshot] Bootstrapping slow followers/new followers. (#3980)
    
    Co-authored-by: Hemant Kumar <[email protected]>
    Co-authored-by: Siyao Meng <[email protected]>
    Co-authored-by: George Jahad <[email protected]>
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   8 +
 .../hadoop/hdds/utils/DBCheckpointServlet.java     |  18 +-
 .../apache/hadoop/hdds/utils/HddsServerUtil.java   |   2 +-
 .../hadoop/hdds/utils/db/RDBCheckpointManager.java |  33 +--
 .../hadoop/hdds/utils/db/RDBCheckpointUtils.java   |  70 +++++
 .../org/apache/hadoop/hdds/utils/db/RDBStore.java  |  20 +-
 .../ozone/rocksdiff/RocksDBCheckpointDiffer.java   |   2 +-
 .../hadoop/ozone/om/helpers/OMNodeDetails.java     |   7 +-
 .../hadoop/fs/ozone/TestOzoneFsSnapshot.java       |  12 +-
 .../hdds/scm/TestSCMDbCheckpointServlet.java       |   9 +-
 .../hadoop/ozone/freon/TestOMSnapshotDAG.java      |  22 +-
 .../hadoop/ozone/om/TestOMDbCheckpointServlet.java | 328 ++++++++++++++++++---
 .../hadoop/ozone/om/TestOMRatisSnapshots.java      | 118 +++++++-
 .../org/apache/hadoop/ozone/om/TestOmSnapshot.java |  11 +-
 .../hadoop/ozone/om/TestOmSnapshotFileSystem.java  |  10 +-
 .../om/snapshot/TestOzoneManagerSnapshotAcl.java   |  35 ++-
 .../om/snapshot/TestOzoneSnapshotRestore.java      |  10 +-
 .../hadoop/ozone/om/OMDBCheckpointServlet.java     | 194 +++++++++++-
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  45 ++-
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  |  11 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  76 +++--
 .../hadoop/ozone/om/SstFilteringService.java       |   8 +-
 .../OmRatisSnapshotProvider.java}                  |  28 +-
 .../{snapshot => ratis_snapshot}/package-info.java |   2 +-
 .../hadoop/ozone/om/snapshot/OmSnapshotUtils.java  | 144 +++++++++
 .../hadoop/ozone/om/snapshot/package-info.java     |   2 +-
 .../hadoop/ozone/om/TestOmSnapshotManager.java     |  68 +++++
 .../hadoop/ozone/om/TestSstFilteringService.java   |   4 +-
 .../snapshot/TestOMSnapshotCreateResponse.java     |  14 +-
 .../snapshot/TestOMSnapshotDeleteResponse.java     |  15 +-
 30 files changed, 1097 insertions(+), 229 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 815bfc99f9..9ba770c214 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -141,6 +141,8 @@ public final class OzoneConsts {
   public static final String STORAGE_DIR_CHUNKS = "chunks";
   public static final String OZONE_DB_CHECKPOINT_REQUEST_FLUSH =
       "flushBeforeCheckpoint";
+  public static final String OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA =
+      "includeSnapshotData";
 
   public static final String RANGER_OZONE_SERVICE_VERSION_KEY =
       "#RANGEROZONESERVICEVERSION";
@@ -562,7 +564,13 @@ public final class OzoneConsts {
   public static final int OZONE_MAXIMUM_ACCESS_ID_LENGTH = 100;
 
   public static final String OM_SNAPSHOT_NAME = "snapshotName";
+  public static final String OM_CHECKPOINT_DIR = "db.checkpoints";
   public static final String OM_SNAPSHOT_DIR = "db.snapshots";
+  public static final String OM_SNAPSHOT_CHECKPOINT_DIR = OM_SNAPSHOT_DIR
+      + OM_KEY_PREFIX + "checkpointState";
+  public static final String OM_SNAPSHOT_DIFF_DIR = OM_SNAPSHOT_DIR
+      + OM_KEY_PREFIX + "diffState";
+
   public static final String OM_SNAPSHOT_INDICATOR = ".snapshot";
   public static final String OM_SNAPSHOT_DIFF_DB_NAME = "db.snapdiff";
 
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java
index a8f99149e5..21b7c7cfde 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java
@@ -22,6 +22,7 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.file.Path;
 import java.time.Duration;
 import java.time.Instant;
@@ -162,7 +163,7 @@ public class DBCheckpointServlet extends HttpServlet {
                file + ".tar\"");
 
       Instant start = Instant.now();
-      writeDBCheckpointToStream(checkpoint,
+      writeDbDataToStream(checkpoint, request,
           response.getOutputStream());
       Instant end = Instant.now();
 
@@ -188,4 +189,19 @@ public class DBCheckpointServlet extends HttpServlet {
     }
   }
 
+  /**
+   * Write checkpoint to the stream.
+   *
+   * @param checkpoint The checkpoint to be written.
+   * @param ignoredRequest The httpRequest which generated this checkpoint.
+   *        (Parameter is ignored in this class but used in child classes).
+   * @param destination The stream to write to.
+   */
+  public void writeDbDataToStream(DBCheckpoint checkpoint,
+                                  HttpServletRequest ignoredRequest,
+                                  OutputStream destination)
+      throws IOException, InterruptedException {
+    writeDBCheckpointToStream(checkpoint, destination);
+  }
+
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
index 33d8c178c7..f31ee3174d 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
@@ -548,7 +548,7 @@ public final class HddsServerUtil {
     }
   }
 
-  private static void includeFile(File file, String entryName,
+  public static void includeFile(File file, String entryName,
                                  ArchiveOutputStream archiveOutputStream)
       throws IOException {
     ArchiveEntry archiveEntry =
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBCheckpointManager.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBCheckpointManager.java
index ee5408b13b..e3b353164d 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBCheckpointManager.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBCheckpointManager.java
@@ -20,7 +20,6 @@
 package org.apache.hadoop.hdds.utils.db;
 
 import java.io.Closeable;
-import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -28,12 +27,9 @@ import java.time.Duration;
 import java.time.Instant;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.utils.db.RocksDatabase.RocksCheckpoint;
-import org.awaitility.core.ConditionTimeoutException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.awaitility.Awaitility.with;
-
 /**
  * RocksDB Checkpoint Manager, used to create and cleanup checkpoints.
  */
@@ -44,9 +40,6 @@ public class RDBCheckpointManager implements Closeable {
   private static final Logger LOG =
       LoggerFactory.getLogger(RDBCheckpointManager.class);
   private final String checkpointNamePrefix;
-  private static final Duration POLL_DELAY_DURATION = Duration.ZERO;
-  private static final Duration POLL_INTERVAL_DURATION = 
Duration.ofMillis(100);
-  private static final Duration POLL_MAX_DURATION = Duration.ofSeconds(5);
 
   /**
    * Create a checkpoint manager with a prefix to be added to the
@@ -96,7 +89,8 @@ public class RDBCheckpointManager implements Closeable {
       LOG.info("Created checkpoint in rocksDB at {} in {} milliseconds",
               checkpointPath, duration);
 
-      waitForCheckpointDirectoryExist(checkpointPath.toFile());
+      RDBCheckpointUtils.waitForCheckpointDirectoryExist(
+          checkpointPath.toFile());
 
       return new RocksDBCheckpoint(
           checkpointPath,
@@ -109,29 +103,6 @@ public class RDBCheckpointManager implements Closeable {
     return null;
   }
 
-  /**
-   * Wait for checkpoint directory to be created for 5 secs with 100 millis
-   * poll interval.
-   */
-  public static void waitForCheckpointDirectoryExist(File file)
-      throws IOException {
-    Instant start = Instant.now();
-    try {
-      with().atMost(POLL_MAX_DURATION)
-          .pollDelay(POLL_DELAY_DURATION)
-          .pollInterval(POLL_INTERVAL_DURATION)
-          .await()
-          .until(file::exists);
-      LOG.info("Waited for {} milliseconds for checkpoint directory {}" +
-              " availability.",
-          Duration.between(start, Instant.now()).toMillis(),
-          file.getAbsoluteFile());
-    } catch (ConditionTimeoutException exception) {
-      LOG.info("Checkpoint directory: {} didn't get created in 5 secs.",
-          file.getAbsolutePath());
-    }
-  }
-
   /**
    * Create RocksDB snapshot by saving a checkpoint to a directory.
    *
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBCheckpointUtils.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBCheckpointUtils.java
new file mode 100644
index 0000000000..24033680a7
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBCheckpointUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import org.awaitility.core.ConditionTimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+
+import static org.awaitility.Awaitility.with;
+
+/**
+ * RocksDB Checkpoint Utilities.
+ */
+public final class RDBCheckpointUtils {
+  static final Logger LOG =
+      LoggerFactory.getLogger(RDBCheckpointUtils.class);
+  private static final Duration POLL_DELAY_DURATION = Duration.ZERO;
+  private static final Duration POLL_INTERVAL_DURATION = 
Duration.ofMillis(100);
+  private static final Duration POLL_MAX_DURATION = Duration.ofSeconds(5);
+
+  private RDBCheckpointUtils() { }
+
+  /**
+   * Wait for checkpoint directory to be created for 5 secs with 100 millis
+   * poll interval.
+   * @param file Checkpoint directory.
+   * @return true if found.
+   */
+  public static boolean waitForCheckpointDirectoryExist(File file)
+      throws IOException {
+    Instant start = Instant.now();
+    try {
+      with().atMost(POLL_MAX_DURATION)
+          .pollDelay(POLL_DELAY_DURATION)
+          .pollInterval(POLL_INTERVAL_DURATION)
+          .await()
+          .until(file::exists);
+      LOG.info("Waited for {} milliseconds for checkpoint directory {}" +
+              " availability.",
+          Duration.between(start, Instant.now()).toMillis(),
+          file.getAbsoluteFile());
+      return true;
+    } catch (ConditionTimeoutException exception) {
+      LOG.info("Checkpoint directory: {} didn't get created in 5 secs.",
+          file.getAbsolutePath());
+      return false;
+    }
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index 31bfccdb86..62723f547c 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -50,9 +50,12 @@ import org.rocksdb.TransactionLogIterator.BatchResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIFF_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.DB_COMPACTION_LOG_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.DB_COMPACTION_SST_BACKUP_DIR;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.SNAPSHOT_INFO_TABLE;
 
 /**
@@ -107,8 +110,9 @@ public class RDBStore implements DBStore {
     try {
       if (enableCompactionLog) {
         rocksDBCheckpointDiffer = new RocksDBCheckpointDiffer(
-            dbLocation.getParent(), DB_COMPACTION_SST_BACKUP_DIR,
-            DB_COMPACTION_LOG_DIR, dbLocation.toString(),
+            dbLocation.getParent() + OM_KEY_PREFIX + OM_SNAPSHOT_DIFF_DIR,
+            DB_COMPACTION_SST_BACKUP_DIR, DB_COMPACTION_LOG_DIR,
+            dbLocation.toString(),
             maxTimeAllowedForSnapshotInDag, compactionDagDaemonInterval);
         rocksDBCheckpointDiffer.setRocksDBForCompactionTracking(dbOptions);
       } else {
@@ -136,7 +140,7 @@ public class RDBStore implements DBStore {
 
       //create checkpoints directory if not exists.
       checkpointsParentDir =
-          Paths.get(dbLocation.getParent(), "db.checkpoints").toString();
+          dbLocation.getParent() + OM_KEY_PREFIX + OM_CHECKPOINT_DIR;
       File checkpointsDir = new File(checkpointsParentDir);
       if (!checkpointsDir.exists()) {
         boolean success = checkpointsDir.mkdir();
@@ -147,15 +151,15 @@ public class RDBStore implements DBStore {
         }
       }
 
-      //create snapshot directory if does not exist.
+      //create snapshot checkpoint directory if does not exist.
       snapshotsParentDir = Paths.get(dbLocation.getParent(),
-          OM_SNAPSHOT_DIR).toString();
+          OM_SNAPSHOT_CHECKPOINT_DIR).toString();
       File snapshotsDir = new File(snapshotsParentDir);
       if (!snapshotsDir.exists()) {
-        boolean success = snapshotsDir.mkdir();
+        boolean success = snapshotsDir.mkdirs();
         if (!success) {
           throw new IOException(
-              "Unable to create RocksDB snapshot directory: " +
+              "Unable to create RocksDB snapshot checkpoint directory: " +
               snapshotsParentDir);
         }
       }
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index 8955cf64a5..11ef743a12 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -230,7 +230,7 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable {
 
     final File parentDir = new File(metadataDir);
     if (!parentDir.exists()) {
-      if (!parentDir.mkdir()) {
+      if (!parentDir.mkdirs()) {
         LOG.error("Error creating compaction log parent dir.");
         return null;
       }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
index 1a06601588..ee680ef854 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.NodeDetails;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
+import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA;
 import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
 import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_HTTP_ENDPOINT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
@@ -164,13 +165,15 @@ public final class OMNodeDetails extends NodeDetails {
       if (StringUtils.isNotEmpty(getHttpAddress())) {
         return "http://"; + getHttpAddress() +
             OZONE_DB_CHECKPOINT_HTTP_ENDPOINT +
-            "?" + OZONE_DB_CHECKPOINT_REQUEST_FLUSH + "=true";
+            "?" + OZONE_DB_CHECKPOINT_REQUEST_FLUSH + "=true&" +
+            OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA + "=true";
       }
     } else {
       if (StringUtils.isNotEmpty(getHttpsAddress())) {
         return "https://"; + getHttpsAddress() +
             OZONE_DB_CHECKPOINT_HTTP_ENDPOINT +
-            "?" + OZONE_DB_CHECKPOINT_REQUEST_FLUSH + "=true";
+            "?" + OZONE_DB_CHECKPOINT_REQUEST_FLUSH + "=true&" +
+            OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA + "=true";
       }
     }
     return null;
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsSnapshot.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsSnapshot.java
index 4a1bf5a278..b6a4bb1fa1 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsSnapshot.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsSnapshot.java
@@ -29,7 +29,6 @@ import com.google.common.base.Strings;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.om.OMStorage;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.util.ToolRunner;
@@ -45,11 +44,10 @@ import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_INDICATOR;
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
 
 /**
  * Test client-side CRUD snapshot operations with Ozone Manager.
@@ -332,16 +330,14 @@ public class TestOzoneFsSnapshot {
     // Asserts that create request succeeded
     Assertions.assertEquals(0, res);
 
-    File metaDir = OMStorage
-        .getOmDbDir(ozoneManager.getConfiguration());
+    OzoneConfiguration conf = ozoneManager.getConfiguration();
 
     // wait till the snapshot directory exists
     SnapshotInfo snapshotInfo = ozoneManager.getMetadataManager()
         .getSnapshotInfoTable()
         .get(SnapshotInfo.getTableKey(VOLUME, BUCKET, snapshotName));
-    String snapshotDirName = metaDir + OM_KEY_PREFIX +
-        OM_SNAPSHOT_DIR + OM_KEY_PREFIX + OM_DB_NAME +
-        snapshotInfo.getCheckpointDirName() + OM_KEY_PREFIX + "CURRENT";
+    String snapshotDirName = getSnapshotPath(conf, snapshotInfo) +
+        OM_KEY_PREFIX + "CURRENT";
     GenericTestUtils.waitFor(() -> new File(snapshotDirName).exists(),
         1000, 100000);
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
index f473b62e74..8c47a197d6 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.UUID;
 
+import org.apache.commons.compress.compressors.CompressorException;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
 import org.apache.hadoop.hdds.scm.server.SCMDBCheckpointServlet;
@@ -46,6 +47,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.mockito.Matchers;
+
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
@@ -99,7 +102,9 @@ public class TestSCMDbCheckpointServlet {
   }
 
   @Test
-  public void testDoGet() throws ServletException, IOException {
+  public void testDoGet()
+      throws ServletException, IOException, CompressorException,
+      InterruptedException {
 
     File tempFile = null;
     try {
@@ -114,6 +119,8 @@ public class TestSCMDbCheckpointServlet {
           Collections.emptyList(),
           Collections.emptyList(),
           false);
+      doCallRealMethod().when(scmDbCheckpointServletMock)
+         .writeDbDataToStream(any(), any(), any());
 
       HttpServletRequest requestMock = mock(HttpServletRequest.class);
       HttpServletResponse responseMock = mock(HttpServletResponse.class);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
index d23f3389f2..e2ddfcdd7f 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
-import org.apache.hadoop.ozone.om.OMStorage;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
@@ -62,9 +62,10 @@ import java.util.stream.Collectors;
 
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_S3_VOLUME_NAME_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.DB_COMPACTION_LOG_DIR;
+import static org.apache.hadoop.ozone.OzoneConsts.DB_COMPACTION_SST_BACKUP_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIFF_DIR;
 
 /**
  * Tests Freon, with MiniOzoneCluster.
@@ -77,9 +78,6 @@ public class TestOMSnapshotDAG {
   private static OzoneConfiguration conf;
   private static ObjectStore store;
   private static OzoneClient client;
-  private final File metaDir = OMStorage.getOmDbDir(conf);
-  private final String compactionLogDirName = "compaction-log";
-  private final String sstBackUpDirName = "compaction-sst-backup";
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -127,9 +125,7 @@ public class TestOMSnapshotDAG {
   }
 
   private String getDBCheckpointAbsolutePath(SnapshotInfo snapshotInfo) {
-    return metaDir + OM_KEY_PREFIX +
-        OM_SNAPSHOT_DIR + OM_KEY_PREFIX +
-        OM_DB_NAME + snapshotInfo.getCheckpointDirName();
+    return OmSnapshotManager.getSnapshotPath(conf, snapshotInfo);
   }
 
   private static String getSnapshotDBKey(String volumeName, String bucketName,
@@ -276,7 +272,7 @@ public class TestOMSnapshotDAG {
   }
 
   @Test
-  public void testSkipTrackingWithZeroSnapshot() throws IOException {
+  public void testSkipTrackingWithZeroSnapshot() {
     // Verify that the listener correctly skips compaction tracking
     // when there is no snapshot in SnapshotInfoTable.
 
@@ -302,7 +298,8 @@ public class TestOMSnapshotDAG {
     String omMetadataDir =
         cluster.getOzoneManager().getConfiguration().get(OZONE_METADATA_DIRS);
     // Verify that no compaction log entry has been written
-    Path logPath = Paths.get(omMetadataDir, compactionLogDirName);
+    Path logPath = Paths.get(omMetadataDir, OM_SNAPSHOT_DIFF_DIR,
+        DB_COMPACTION_LOG_DIR);
     File[] fileList = logPath.toFile().listFiles();
     Assertions.assertNotNull(fileList);
     for (File file : fileList) {
@@ -311,7 +308,8 @@ public class TestOMSnapshotDAG {
       }
     }
     // Verify that no SST has been backed up
-    Path sstBackupPath = Paths.get(omMetadataDir, sstBackUpDirName);
+    Path sstBackupPath = Paths.get(omMetadataDir, OM_SNAPSHOT_DIFF_DIR,
+        DB_COMPACTION_SST_BACKUP_DIR);
     fileList = sstBackupPath.toFile().listFiles();
     Assertions.assertNotNull(fileList);
     Assertions.assertEquals(0L, fileList.length);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
index 6c10b7bf0c..316130c2e2 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
@@ -26,35 +26,57 @@ import javax.servlet.http.HttpServletResponse;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.security.Principal;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedHashSet;
-
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.collect.Sets;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import org.apache.commons.io.FileUtils;
 
 import static 
org.apache.hadoop.hdds.recon.ReconConfig.ConfigStrings.OZONE_RECON_KERBEROS_PRINCIPAL_KEY;
-import static 
org.apache.hadoop.hdds.utils.HddsServerUtil.writeDBCheckpointToStream;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.DB_COMPACTION_SST_BACKUP_DIR;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIFF_DIR;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA;
 import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_AUTH_TYPE;
 
+
+import org.apache.ozone.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Assert;
 
-import static org.junit.Assert.assertNotNull;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -62,6 +84,10 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.rules.Timeout;
 import org.mockito.Matchers;
 
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.OM_HARDLINK_FILE;
+import static 
org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.truncateFileName;
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
@@ -81,6 +107,11 @@ public class TestOMDbCheckpointServlet {
   private HttpServletRequest requestMock = null;
   private HttpServletResponse responseMock = null;
   private OMDBCheckpointServlet omDbCheckpointServletMock = null;
+  private File metaDir;
+  private String snapshotDirName;
+  private String snapshotDirName2;
+  private Path compactionDirPath;
+  private DBCheckpoint dbCheckpoint;
 
   @Rule
   public Timeout timeout = Timeout.seconds(240);
@@ -160,6 +191,9 @@ public class TestOMDbCheckpointServlet {
 
     doCallRealMethod().when(omDbCheckpointServletMock).doGet(requestMock,
         responseMock);
+
+    doCallRealMethod().when(omDbCheckpointServletMock)
+        .writeDbDataToStream(any(), any(), any());
   }
 
   @Test
@@ -257,63 +291,265 @@ public class TestOMDbCheckpointServlet {
   }
 
   @Test
-  public void testWriteCheckpointToOutputStream() throws Exception {
+  public void testWriteDbDataToStream() throws Exception {
+    prepSnapshotData();
+    // Set http param to include snapshot data.
+    when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA))
+        .thenReturn("true");
 
+    // Get the tarball.
+    try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
+      omDbCheckpointServletMock.writeDbDataToStream(dbCheckpoint, requestMock,
+          fileOutputStream);
+    }
+
+    // Untar the file into a temp folder to be examined.
     String testDirName = folder.newFolder().getAbsolutePath();
-    File checkpoint = new File(testDirName, "checkpoint");
-    checkpoint.mkdir();
-    File file = new File(checkpoint, "temp1.txt");
-    OutputStreamWriter writer = new OutputStreamWriter(
-        new FileOutputStream(file), StandardCharsets.UTF_8);
-    writer.write("Test data 1");
-    writer.close();
-
-    file = new File(checkpoint, "/temp2.txt");
-    writer = new OutputStreamWriter(
-        new FileOutputStream(file), StandardCharsets.UTF_8);
-    writer.write("Test data 2");
-    writer.close();
-
-    File outputFile =
-        new File(Paths.get(testDirName, "output_file.tar").toString());
-    TestDBCheckpoint dbCheckpoint = new TestDBCheckpoint(
-        checkpoint.toPath());
-    writeDBCheckpointToStream(dbCheckpoint,
-        new FileOutputStream(outputFile));
-    assertNotNull(outputFile);
+    int testDirLength = testDirName.length() + 1;
+    String newDbDirName = testDirName + OM_KEY_PREFIX + OM_DB_NAME;
+    int newDbDirLength = newDbDirName.length() + 1;
+    File newDbDir = new File(newDbDirName);
+    newDbDir.mkdirs();
+    FileUtil.unTar(tempFile, newDbDir);
+
+    // Move snapshot dir to correct location.
+    Assert.assertTrue(new File(newDbDirName, OM_SNAPSHOT_DIR)
+        .renameTo(new File(newDbDir.getParent(), OM_SNAPSHOT_DIR)));
+
+    // Confirm the checkpoint directories match, (after remove extras).
+    Path checkpointLocation = dbCheckpoint.getCheckpointLocation();
+    Set<String> initialCheckpointSet = getFiles(checkpointLocation,
+        checkpointLocation.toString().length() + 1);
+    Path finalCheckpointLocation = Paths.get(newDbDirName);
+    Set<String> finalCheckpointSet = getFiles(finalCheckpointLocation,
+        newDbDirLength);
+
+    Assert.assertTrue("hardlink file exists in checkpoint dir",
+        finalCheckpointSet.contains(OM_HARDLINK_FILE));
+    finalCheckpointSet.remove(OM_HARDLINK_FILE);
+    Assert.assertEquals(initialCheckpointSet, finalCheckpointSet);
+
+    int metaDirLength = metaDir.toString().length() + 1;
+    String shortSnapshotLocation =
+        truncateFileName(metaDirLength, Paths.get(snapshotDirName));
+    String shortSnapshotLocation2 =
+        truncateFileName(metaDirLength, Paths.get(snapshotDirName2));
+    String shortCompactionDirLocation =
+        truncateFileName(metaDirLength, compactionDirPath);
+
+    Set<String> finalFullSet =
+        getFiles(Paths.get(testDirName, OM_SNAPSHOT_DIR), testDirLength);
+
+    // Check each line in the hard link file.
+    List<String> fabricatedLinkLines = new ArrayList<>();
+    try (Stream<String> lines = Files.lines(Paths.get(newDbDirName,
+        OM_HARDLINK_FILE))) {
+
+      for (String line : lines.collect(Collectors.toList())) {
+        Assert.assertFalse("CURRENT file is not a hard link",
+            line.contains("CURRENT"));
+        if (line.contains("fabricatedFile")) {
+          fabricatedLinkLines.add(line);
+        } else {
+          checkLine(shortSnapshotLocation, shortSnapshotLocation2, line);
+          // add links to the final set
+          finalFullSet.add(line.split("\t")[0]);
+        }
+      }
+    }
+    Set<String> directories = Sets.newHashSet(
+        shortSnapshotLocation, shortSnapshotLocation2,
+        shortCompactionDirLocation);
+    checkFabricatedLines(directories, fabricatedLinkLines, testDirName);
+
+    Set<String> initialFullSet =
+        getFiles(Paths.get(metaDir.toString(), OM_SNAPSHOT_DIR), 
metaDirLength);
+    Assert.assertEquals("expected snapshot files not found",
+        initialFullSet, finalFullSet);
   }
-}
 
-class TestDBCheckpoint implements DBCheckpoint {
+  @Test
+  public void testWriteDbDataWithoutOmSnapshot()
+      throws Exception {
+    prepSnapshotData();
+
+    // Set http param to exclude snapshot data.
+    when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA))
+        .thenReturn(null);
+
+    // Get the tarball.
+    try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
+      omDbCheckpointServletMock.writeDbDataToStream(dbCheckpoint, requestMock,
+          fileOutputStream);
+    }
+
+    // Untar the file into a temp folder to be examined.
+    String testDirName = folder.newFolder().getAbsolutePath();
+    int testDirLength = testDirName.length() + 1;
+    FileUtil.unTar(tempFile, new File(testDirName));
+
+    // Confirm the checkpoint directories match.
+    Path checkpointLocation = dbCheckpoint.getCheckpointLocation();
+    Set<String> initialCheckpointSet = getFiles(checkpointLocation,
+        checkpointLocation.toString().length() + 1);
+    Path finalCheckpointLocation = Paths.get(testDirName);
+    Set<String> finalCheckpointSet = getFiles(finalCheckpointLocation,
+        testDirLength);
+
+    Assert.assertEquals(initialCheckpointSet, finalCheckpointSet);
+  }
 
-  private final Path checkpointFile;
+  private void prepSnapshotData() throws Exception {
+    setupCluster();
+    metaDir = OMStorage.getOmDbDir(conf);
+
+    OzoneBucket bucket = TestDataUtil
+        .createVolumeAndBucket(cluster.newClient());
+
+    // Create dummy keys for snapshotting.
+    TestDataUtil.createKey(bucket, UUID.randomUUID().toString(),
+        "content");
+    TestDataUtil.createKey(bucket, UUID.randomUUID().toString(),
+        "content");
+
+    snapshotDirName =
+        createSnapshot(bucket.getVolumeName(), bucket.getName());
+    snapshotDirName2 =
+        createSnapshot(bucket.getVolumeName(), bucket.getName());
+
+    // Create dummy snapshot to make sure it is not included.
+    Path fabricatedSnapshot  = Paths.get(
+        new File(snapshotDirName).getParent(),
+        "fabricatedSnapshot");
+    fabricatedSnapshot.toFile().mkdirs();
+    Assert.assertTrue(Paths.get(fabricatedSnapshot.toString(), 
"fabricatedFile")
+        .toFile().createNewFile());
+
+    // Create fabricated links to snapshot dirs
+    // to confirm that links are recognized even if
+    // they are don't point to the checkpoint directory.
+    Path fabricatedFile = Paths.get(snapshotDirName, "fabricatedFile");
+    Path fabricatedLink = Paths.get(snapshotDirName2, "fabricatedFile");
+
+    Files.write(fabricatedFile,
+        "fabricatedData".getBytes(StandardCharsets.UTF_8));
+    Files.createLink(fabricatedLink, fabricatedFile);
+
+    // Simulate links from the compaction dir.
+    compactionDirPath = Paths.get(metaDir.toString(),
+        OM_SNAPSHOT_DIFF_DIR, DB_COMPACTION_SST_BACKUP_DIR);
+    Path fabricatedLink2 = Paths.get(compactionDirPath.toString(),
+        "fabricatedFile");
+    Files.createLink(fabricatedLink2, fabricatedFile);
+    Path currentFile = Paths.get(metaDir.toString(),
+                                    OM_DB_NAME, "CURRENT");
+    Path currentLink = Paths.get(compactionDirPath.toString(), "CURRENT");
+    Files.createLink(currentLink, currentFile);
+
+    dbCheckpoint = cluster.getOzoneManager()
+        .getMetadataManager().getStore()
+        .getCheckpoint(true);
 
-  TestDBCheckpoint(Path checkpointFile) {
-    this.checkpointFile = checkpointFile;
   }
 
-  @Override
-  public Path getCheckpointLocation() {
-    return checkpointFile;
+  private String createSnapshot(String vname, String bname)
+      throws IOException, InterruptedException, TimeoutException {
+    final OzoneManager om = cluster.getOzoneManager();
+    String snapshotName = UUID.randomUUID().toString();
+    OzoneManagerProtocol writeClient = cluster.newClient().getObjectStore()
+        .getClientProxy().getOzoneManagerClient();
+
+    writeClient.createSnapshot(vname, bname, snapshotName);
+    SnapshotInfo snapshotInfo = om.getMetadataManager().getSnapshotInfoTable()
+        .get(SnapshotInfo.getTableKey(vname, bname, snapshotName));
+    String snapshotPath = getSnapshotPath(conf, snapshotInfo)
+        + OM_KEY_PREFIX;
+    GenericTestUtils.waitFor(() -> new File(snapshotPath).exists(),
+        100, 2000);
+    return snapshotPath;
   }
 
-  @Override
-  public long getCheckpointTimestamp() {
-    return 0;
+  private Set<String> getFiles(Path path, int truncateLength)
+      throws IOException {
+    return getFiles(path, truncateLength, new HashSet<>());
   }
 
-  @Override
-  public long getLatestSequenceNumber() {
-    return 0;
+  // Get all files below path, recursively, (skipping fabricated files).
+  @SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
+  private Set<String> getFiles(Path path, int truncateLength,
+      Set<String> fileSet) throws IOException {
+    try (Stream<Path> files = Files.list(path)) {
+      for (Path file : files.collect(Collectors.toList())) {
+        if (file.toFile().isDirectory()) {
+          getFiles(file, truncateLength, fileSet);
+        }
+        if (!file.getFileName().toString().startsWith("fabricated")) {
+          fileSet.add(truncateFileName(truncateLength, file));
+        }
+      }
+    }
+    return fileSet;
   }
 
-  @Override
-  public long checkpointCreationTimeTaken() {
-    return 0;
+  /**
+   * Confirm fabricated link lines in hardlink file are properly
+   * formatted: "dir1/fabricatedFile dir2/fabricatedFile".
+   *
+   * The "fabricated" files/links are ones I've created by hand to
+   * fully test the code, (as opposed to the "natural" files/links
+   * created by the create snapshot process).
+   *
+   * @param directories Possible directories for the links to exist in.
+   * @param lines Text lines defining the link paths.
+   * @param testDirName Name of test directory.
+   */
+  @SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
+  private void checkFabricatedLines(Set<String> directories, List<String> 
lines,
+                                    String testDirName) {
+    // find the real file
+    String realDir = null;
+    for (String dir: directories) {
+      if (Paths.get(testDirName, dir, "fabricatedFile").toFile().exists()) {
+        Assert.assertNull(
+            "Exactly one copy of the fabricated file exists in the tarball",
+            realDir);
+        realDir = dir;
+      }
+    }
+
+    Assert.assertNotNull("real directory found", realDir);
+    directories.remove(realDir);
+    Iterator<String> directoryIterator = directories.iterator();
+    String dir0 = directoryIterator.next();
+    String dir1 = directoryIterator.next();
+    Assert.assertNotEquals("link directories are different", dir0, dir1);
+
+    for (String line : lines) {
+      String[] files = line.split("\t");
+      Assert.assertTrue("fabricated entry contains valid first directory",
+          files[0].startsWith(dir0) || files[0].startsWith(dir1));
+      Assert.assertTrue("fabricated entry contains correct real directory",
+          files[1].startsWith(realDir));
+      Path path0 = Paths.get(files[0]);
+      Path path1 = Paths.get(files[1]);
+      Assert.assertTrue("fabricated entries contains correct file name",
+          path0.getFileName().toString().equals("fabricatedFile") &&
+              path1.getFileName().toString().equals("fabricatedFile"));
+    }
   }
 
-  @Override
-  public void cleanupCheckpoint() throws IOException {
-    FileUtils.deleteDirectory(checkpointFile.toFile());
+  // Validates line in hard link file. should look something like:
+  // "dir1/x.sst x.sst".
+  private void checkLine(String shortSnapshotLocation,
+                            String shortSnapshotLocation2,
+                            String line) {
+    String[] files = line.split("\t");
+    Assert.assertTrue("hl entry starts with valid snapshot dir",
+        files[0].startsWith(shortSnapshotLocation) ||
+        files[0].startsWith(shortSnapshotLocation2));
+
+    String file0 = files[0].substring(shortSnapshotLocation.length() + 1);
+    String file1 = files[1];
+    Assert.assertEquals("hl filenames are the same", file0, file1);
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
index c20efd8b79..31ea777b7d 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
@@ -16,21 +16,14 @@
  */
 package org.apache.hadoop.ozone.om;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.ExitManager;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.RDBCheckpointUtils;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
 import org.apache.hadoop.ozone.client.BucketArgs;
@@ -41,28 +34,46 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.VolumeArgs;
-import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
-import org.apache.hadoop.hdds.ExitManager;
+import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ratis.server.protocol.TermIndex;
-
-import static org.apache.hadoop.ozone.om.TestOzoneManagerHAWithData.createKey;
-import static org.junit.Assert.assertTrue;
-
 import org.assertj.core.api.Fail;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.event.Level;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
+import static org.apache.hadoop.ozone.om.TestOzoneManagerHAWithData.createKey;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Tests the Ratis snapshots feature in OM.
  */
@@ -167,6 +178,8 @@ public class TestOMRatisSnapshots {
     // Do some transactions so that the log index increases
     List<String> keys = writeKeysToIncreaseLogIndex(leaderRatisServer, 200);
 
+    SnapshotInfo snapshotInfo = createOzoneSnapshot(leaderOM);
+
     // Get the latest db checkpoint from the leader OM.
     TransactionInfo transactionInfo =
         TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
@@ -237,6 +250,58 @@ public class TestOMRatisSnapshots {
         TEST_BUCKET_LAYOUT).get(followerOMMetaMngr.getOzoneKey(
         volumeName, bucketName, newKeys.get(0))));
      */
+
+    // Read back data from the OM snapshot.
+    OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(".snapshot/snap1/" + keys.get(0)).build();
+    OmKeyInfo omKeyInfo;
+    omKeyInfo = followerOM.lookupKey(omKeyArgs);
+    Assertions.assertNotNull(omKeyInfo);
+    Assertions.assertEquals(omKeyInfo.getKeyName(), omKeyArgs.getKeyName());
+
+    // Confirm followers snapshot hard links are as expected
+    File followerMetaDir = OMStorage.getOmDbDir(followerOM.getConfiguration());
+    Path followerActiveDir = Paths.get(followerMetaDir.toString(), OM_DB_NAME);
+    Path followerSnapshotDir =
+        Paths.get(getSnapshotPath(followerOM.getConfiguration(), 
snapshotInfo));
+    File leaderMetaDir = OMStorage.getOmDbDir(leaderOM.getConfiguration());
+    Path leaderActiveDir = Paths.get(leaderMetaDir.toString(), OM_DB_NAME);
+    Path leaderSnapshotDir =
+        Paths.get(getSnapshotPath(leaderOM.getConfiguration(), snapshotInfo));
+    // Get the list of hardlinks from the leader.  Then confirm those links
+    //  are on the follower
+    int hardLinkCount = 0;
+    try (Stream<Path>list = Files.list(leaderSnapshotDir)) {
+      for (Path leaderSnapshotSST: list.collect(Collectors.toList())) {
+        String fileName = leaderSnapshotSST.getFileName().toString();
+        if (fileName.toLowerCase().endsWith(".sst")) {
+
+          Path leaderActiveSST =
+              Paths.get(leaderActiveDir.toString(), fileName);
+          // Skip if not hard link on the leader
+          if (!leaderActiveSST.toFile().exists()) {
+            continue;
+          }
+          // If it is a hard link on the leader, it should be a hard
+          // link on the follower
+          if (OmSnapshotUtils.getINode(leaderActiveSST)
+              .equals(OmSnapshotUtils.getINode(leaderSnapshotSST))) {
+            Path followerSnapshotSST =
+                Paths.get(followerSnapshotDir.toString(), fileName);
+            Path followerActiveSST =
+                Paths.get(followerActiveDir.toString(), fileName);
+            Assertions.assertEquals(
+                OmSnapshotUtils.getINode(followerActiveSST),
+                OmSnapshotUtils.getINode(followerSnapshotSST),
+                "Snapshot sst file is supposed to be a hard link");
+            hardLinkCount++;
+          }
+        }
+      }
+    }
+    Assertions.assertTrue(hardLinkCount > 0, "No hard links were found");
   }
 
   @Ignore("Enable this unit test after RATIS-1481 used")
@@ -547,6 +612,27 @@ public class TestOMRatisSnapshots {
     Assert.assertTrue(logCapture.getOutput().contains(msg));
   }
 
+  private SnapshotInfo createOzoneSnapshot(OzoneManager leaderOM)
+      throws IOException {
+    objectStore.createSnapshot(volumeName, bucketName, "snap1");
+
+    String tableKey = SnapshotInfo.getTableKey(volumeName,
+                                               bucketName,
+                                               "snap1");
+    SnapshotInfo snapshotInfo = leaderOM.getMetadataManager()
+        .getSnapshotInfoTable()
+        .get(tableKey);
+    // Allow the snapshot to be written to disk
+    String fileName =
+        getSnapshotPath(leaderOM.getConfiguration(), snapshotInfo);
+    File snapshotDir = new File(fileName);
+    if (!RDBCheckpointUtils
+        .waitForCheckpointDirectoryExist(snapshotDir)) {
+      throw new IOException("snapshot directory doesn't exist");
+    }
+    return snapshotInfo;
+  }
+
   private List<String> writeKeysToIncreaseLogIndex(
       OzoneManagerRatisServer omRatisServer, long targetLogIndex)
       throws IOException, InterruptedException {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
index 5af4e5d42f..6c10388133 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
@@ -76,9 +76,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.CONTAINS_SNAPSHOT;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.helpers.BucketLayout.FILE_SYSTEM_OPTIMIZED;
@@ -109,7 +107,7 @@ public class TestOmSnapshot {
   private static boolean enabledFileSystemPaths;
   private static boolean forceFullSnapshotDiff;
   private static ObjectStore store;
-  private static File metaDir;
+  private static OzoneConfiguration leaderConfig;
   private static OzoneManager leaderOzoneManager;
 
   private static RDBStore rdbStore;
@@ -182,9 +180,9 @@ public class TestOmSnapshot {
     bucketName = ozoneBucket.getName();
 
     leaderOzoneManager = ((MiniOzoneHAClusterImpl) cluster).getOMLeader();
+    leaderConfig = leaderOzoneManager.getConfiguration();
     rdbStore =
         (RDBStore) leaderOzoneManager.getMetadataManager().getStore();
-    OzoneConfiguration leaderConfig = leaderOzoneManager.getConfiguration();
     cluster.setConf(leaderConfig);
 
     store = client.getObjectStore();
@@ -195,7 +193,6 @@ public class TestOmSnapshot {
 
     // stop the deletion services so that keys can still be read
     keyManager.stop();
-    metaDir = OMStorage.getOmDbDir(leaderConfig);
   }
 
   @AfterClass
@@ -845,8 +842,8 @@ public class TestOmSnapshot {
         leaderOzoneManager.getMetadataManager().getSnapshotInfoTable()
             .get(SnapshotInfo.getTableKey(volName, buckName, snapshotName));
     String snapshotDirName =
-        metaDir + OM_KEY_PREFIX + OM_SNAPSHOT_DIR + OM_KEY_PREFIX + OM_DB_NAME
-            + snapshotInfo.getCheckpointDirName() + OM_KEY_PREFIX + "CURRENT";
+        OmSnapshotManager.getSnapshotPath(leaderConfig, snapshotInfo) +
+        OM_KEY_PREFIX + "CURRENT";
     GenericTestUtils
         .waitFor(() -> new File(snapshotDirName).exists(), 1000, 120000);
     return snapshotKeyPrefix;
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotFileSystem.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotFileSystem.java
index ba5c82bdc3..cac8181743 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotFileSystem.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotFileSystem.java
@@ -79,10 +79,9 @@ import java.util.concurrent.TimeoutException;
 import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME;
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
@@ -103,7 +102,6 @@ public class TestOmSnapshotFileSystem {
   private static OzoneManagerProtocol writeClient;
   private static BucketLayout bucketLayout;
   private static boolean enabledFileSystemPaths;
-  private static File metaDir;
   private static OzoneManager ozoneManager;
   private static String keyPrefix;
 
@@ -179,7 +177,6 @@ public class TestOmSnapshotFileSystem {
     objectStore = client.getObjectStore();
     writeClient = objectStore.getClientProxy().getOzoneManagerClient();
     ozoneManager = cluster.getOzoneManager();
-    metaDir = OMStorage.getOmDbDir(conf);
 
     // stop the deletion services so that keys can still be read
     KeyManagerImpl keyManager = (KeyManagerImpl) ozoneManager.getKeyManager();
@@ -708,9 +705,8 @@ public class TestOmSnapshotFileSystem {
     SnapshotInfo snapshotInfo = ozoneManager.getMetadataManager()
         .getSnapshotInfoTable()
         .get(SnapshotInfo.getTableKey(volumeName, bucketName, snapshotName));
-    String snapshotDirName = metaDir + OM_KEY_PREFIX +
-        OM_SNAPSHOT_DIR + OM_KEY_PREFIX + OM_DB_NAME +
-        snapshotInfo.getCheckpointDirName() + OM_KEY_PREFIX + "CURRENT";
+    String snapshotDirName = getSnapshotPath(conf, snapshotInfo) +
+        OM_KEY_PREFIX + "CURRENT";
     GenericTestUtils.waitFor(() -> new File(snapshotDirName).exists(),
         1000, 120000);
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotAcl.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotAcl.java
index 9a14eb0e97..0c12a7fc89 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotAcl.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotAcl.java
@@ -22,13 +22,13 @@ package org.apache.hadoop.ozone.om.snapshot;
 import java.io.File;
 import java.io.IOException;
 import java.util.UUID;
-import java.util.concurrent.TimeoutException;
 import java.util.stream.Stream;
 
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.HddsWhiteboxTestUtils;
+import org.apache.hadoop.hdds.utils.db.RDBCheckpointUtils;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.client.BucketArgs;
@@ -49,7 +49,6 @@ import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -66,10 +65,8 @@ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConsts.ADMIN;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
 
 /**
@@ -95,7 +92,6 @@ public class TestOzoneManagerSnapshotAcl {
       OzoneObj.ResourceType.KEY;
   private static MiniOzoneCluster cluster;
   private static ObjectStore objectStore;
-  private static File metaDir;
   private static OzoneManager ozoneManager;
   private static OzoneClient client;
   private String volumeName;
@@ -138,7 +134,7 @@ public class TestOzoneManagerSnapshotAcl {
 
     // stop the deletion services so that keys can still be read
     keyManager.stop();
-    metaDir = OMStorage.getOmDbDir(ozoneManagerConf);
+    OMStorage.getOmDbDir(ozoneManagerConf);
   }
 
   @AfterEach
@@ -191,7 +187,7 @@ public class TestOzoneManagerSnapshotAcl {
   @ParameterizedTest
   @EnumSource(BucketLayout.class)
   public void testGeyKeyInfoWithAllowedUser(BucketLayout bucketLayout)
-      throws IOException, InterruptedException, TimeoutException {
+      throws IOException {
     // GIVEN
     setup(bucketLayout);
     final OmKeyArgs snapshotKeyArgs = getOmKeyArgs(true);
@@ -205,7 +201,7 @@ public class TestOzoneManagerSnapshotAcl {
   @ParameterizedTest
   @EnumSource(BucketLayout.class)
   public void testGeyKeyInfoWithNotAllowedUser(BucketLayout bucketLayout)
-      throws IOException, InterruptedException, TimeoutException {
+      throws IOException {
     // GIVEN
     setup(bucketLayout);
     final OmKeyArgs snapshotKeyOmKeyArgs = getOmKeyArgs(true);
@@ -231,7 +227,7 @@ public class TestOzoneManagerSnapshotAcl {
   @MethodSource("getListStatusArguments")
   public void testListStatusWithAllowedUser(BucketLayout bucketLayout,
       boolean recursive, boolean allowPartialPrefixes)
-      throws IOException, InterruptedException, TimeoutException {
+      throws IOException {
     // GIVEN
     setup(bucketLayout);
     final OmKeyArgs snapshotKeyArgs = getOmKeyArgs(true);
@@ -248,7 +244,7 @@ public class TestOzoneManagerSnapshotAcl {
   @MethodSource("getListStatusArguments")
   public void testListStatusWithNotAllowedUser(BucketLayout bucketLayout,
       boolean recursive, boolean allowPartialPrefixes)
-      throws IOException, InterruptedException, TimeoutException {
+      throws IOException {
     // GIVEN
     setup(bucketLayout);
     final OmKeyArgs snapshotKeyArgs = getOmKeyArgs(true);
@@ -392,7 +388,7 @@ public class TestOzoneManagerSnapshotAcl {
   }
 
   private void setup(BucketLayout bucketLayout)
-      throws IOException, InterruptedException, TimeoutException {
+      throws IOException {
     UserGroupInformation.setLoginUser(UGI1);
 
     createVolume();
@@ -468,7 +464,7 @@ public class TestOzoneManagerSnapshotAcl {
   }
 
   private void createSnapshot()
-      throws IOException, InterruptedException, TimeoutException {
+      throws IOException {
     final String snapshotPrefix = "snapshot-";
     final String snapshotName =
         snapshotPrefix + RandomStringUtils.randomNumeric(5);
@@ -479,11 +475,14 @@ public class TestOzoneManagerSnapshotAcl {
         .getMetadataManager()
         .getSnapshotInfoTable()
         .get(SnapshotInfo.getTableKey(volumeName, bucketName, snapshotName));
-    final String snapshotDirName = metaDir + OM_KEY_PREFIX +
-        OM_SNAPSHOT_DIR + OM_KEY_PREFIX + OM_DB_NAME +
-        snapshotInfo.getCheckpointDirName() + OM_KEY_PREFIX + "CURRENT";
-    GenericTestUtils.waitFor(() -> new File(snapshotDirName).exists(),
-        1000, 120000);
+    // Allow the snapshot to be written to disk
+    String fileName =
+        getSnapshotPath(ozoneManager.getConfiguration(), snapshotInfo);
+    File snapshotDir = new File(fileName);
+    if (!RDBCheckpointUtils
+        .waitForCheckpointDirectoryExist(snapshotDir)) {
+      throw new IOException("snapshot directory doesn't exist");
+    }
   }
 
   private void setBucketAcl() throws IOException {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneSnapshotRestore.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneSnapshotRestore.java
index 3a160b8ccc..d5faf7a8cc 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneSnapshotRestore.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneSnapshotRestore.java
@@ -58,9 +58,7 @@ import java.util.stream.Stream;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
 
@@ -72,7 +70,6 @@ public class TestOzoneSnapshotRestore {
   private static final String OM_SERVICE_ID = "om-service-test-1";
   private MiniOzoneCluster cluster;
   private ObjectStore store;
-  private File metaDir;
   private OzoneManager leaderOzoneManager;
   private OzoneConfiguration clientConf;
   private OzoneClient client;
@@ -122,7 +119,7 @@ public class TestOzoneSnapshotRestore {
 
     // stop the deletion services so that keys can still be read
     keyManager.stop();
-    metaDir = OMStorage.getOmDbDir(leaderConfig);
+    OMStorage.getOmDbDir(leaderConfig);
 
   }
 
@@ -160,9 +157,8 @@ public class TestOzoneSnapshotRestore {
             .getMetadataManager()
             .getSnapshotInfoTable()
             .get(SnapshotInfo.getTableKey(volName, buckName, snapshotName));
-    String snapshotDirName = metaDir + OM_KEY_PREFIX +
-            OM_SNAPSHOT_DIR + OM_KEY_PREFIX + OM_DB_NAME +
-            snapshotInfo.getCheckpointDirName() + OM_KEY_PREFIX + "CURRENT";
+    String snapshotDirName = OmSnapshotManager
+        .getSnapshotPath(clientConf, snapshotInfo) + OM_KEY_PREFIX + "CURRENT";
     GenericTestUtils.waitFor(() -> new File(snapshotDirName).exists(),
             1000, 120000);
     return snapshotKeyPrefix;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
index 0f8413996d..023f2cb3e1 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
@@ -18,19 +18,48 @@
 
 package org.apache.hadoop.ozone.om;
 
-import javax.servlet.ServletException;
-
+import org.apache.commons.compress.archivers.ArchiveOutputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.recon.ReconConfig;
+import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.hdds.utils.DBCheckpointServlet;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.RDBCheckpointUtils;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.OzoneConsts;
-
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.includeFile;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA;
+import static 
org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.createHardLinkList;
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
+import static 
org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.truncateFileName;
 
 /**
  * Provides the current checkpoint Snapshot of the OM DB. (tar.gz)
@@ -62,7 +91,7 @@ public class OMDBCheckpointServlet extends 
DBCheckpointServlet {
       return;
     }
 
-    OzoneConfiguration conf = om.getConfiguration();
+    OzoneConfiguration conf = getConf();
     // Only Ozone Admins and Recon are allowed
     Collection<String> allowedUsers =
             new LinkedHashSet<>(om.getOmAdminUsernames());
@@ -82,4 +111,161 @@ public class OMDBCheckpointServlet extends 
DBCheckpointServlet {
         allowedGroups,
         om.isSpnegoEnabled());
   }
+
+  @Override
+  public void writeDbDataToStream(DBCheckpoint checkpoint,
+                                  HttpServletRequest request,
+                                  OutputStream destination)
+      throws IOException, InterruptedException {
+    // Map of inodes to path.
+    Map<Object, Path> copyFiles = new HashMap<>();
+    // Map of link to path.
+    Map<Path, Path> hardLinkFiles = new HashMap<>();
+
+    getFilesForArchive(checkpoint, copyFiles, hardLinkFiles,
+        includeSnapshotData(request));
+
+    try (TarArchiveOutputStream archiveOutputStream =
+            new TarArchiveOutputStream(destination)) {
+      archiveOutputStream
+          .setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
+      writeFilesToArchive(copyFiles, hardLinkFiles, archiveOutputStream);
+    }
+  }
+
+  private void getFilesForArchive(DBCheckpoint checkpoint,
+                                  Map<Object, Path> copyFiles,
+                                  Map<Path, Path> hardLinkFiles,
+                                  boolean includeSnapshotData)
+      throws IOException {
+
+    // Get the active fs files.
+    Path dir = checkpoint.getCheckpointLocation();
+    processDir(dir, copyFiles, hardLinkFiles, new HashSet<>());
+
+    if (!includeSnapshotData) {
+      return;
+    }
+
+    // Get the snapshot files.
+    Set<Path> snapshotPaths = waitForSnapshotDirs(checkpoint);
+    Path snapshotDir = Paths.get(OMStorage.getOmDbDir(getConf()).toString(),
+        OM_SNAPSHOT_DIR);
+    processDir(snapshotDir, copyFiles, hardLinkFiles, snapshotPaths);
+  }
+
+  /**
+   * The snapshotInfo table may contain a snapshot that
+   * doesn't yet exist on the fs, so wait a few seconds for it.
+   * @param checkpoint Checkpoint containing snapshot entries expected.
+   * @return Set of expected snapshot dirs.
+   */
+  private Set<Path> waitForSnapshotDirs(DBCheckpoint checkpoint)
+      throws IOException {
+
+    OzoneConfiguration conf = getConf();
+
+    Set<Path> snapshotPaths = new HashSet<>();
+
+    // get snapshotInfo entries
+    OmMetadataManagerImpl checkpointMetadataManager =
+        OmMetadataManagerImpl.createCheckpointMetadataManager(
+            conf, checkpoint);
+    try (TableIterator<String, ? extends Table.KeyValue<String, SnapshotInfo>>
+        iterator = checkpointMetadataManager
+        .getSnapshotInfoTable().iterator()) {
+
+      // For each entry, wait for corresponding directory.
+      while (iterator.hasNext()) {
+        Table.KeyValue<String, SnapshotInfo> entry = iterator.next();
+        Path path = Paths.get(getSnapshotPath(conf, entry.getValue()));
+        waitForDirToExist(path);
+        snapshotPaths.add(path);
+      }
+    }
+    return snapshotPaths;
+  }
+
+  private void waitForDirToExist(Path dir) throws IOException {
+    if (!RDBCheckpointUtils.waitForCheckpointDirectoryExist(dir.toFile())) {
+      throw new IOException("snapshot dir doesn't exist: " + dir);
+    }
+  }
+
+  private void processDir(Path dir, Map<Object, Path> copyFiles,
+                          Map<Path, Path> hardLinkFiles,
+                          Set<Path> snapshotPaths)
+      throws IOException {
+    try (Stream<Path> files = Files.list(dir)) {
+      for (Path file : files.collect(Collectors.toList())) {
+        File f = file.toFile();
+        if (f.isDirectory()) {
+          // Skip any unexpected snapshot files.
+          String parent = f.getParent();
+          if (parent != null && parent.endsWith(OM_SNAPSHOT_CHECKPOINT_DIR)
+              && !snapshotPaths.contains(file)) {
+            LOG.debug("Skipping unneeded file: " + file);
+            continue;
+          }
+          processDir(file, copyFiles, hardLinkFiles, snapshotPaths);
+        } else {
+          processFile(file, copyFiles, hardLinkFiles);
+        }
+      }
+    }
+  }
+
+  private void processFile(Path file, Map<Object, Path> copyFiles,
+                           Map<Path, Path> hardLinkFiles) throws IOException {
+    // Get the inode.
+    Object key = OmSnapshotUtils.getINode(file);
+    // If we already have the inode, store as hard link.
+    if (copyFiles.containsKey(key)) {
+      hardLinkFiles.put(file, copyFiles.get(key));
+    } else {
+      copyFiles.put(key, file);
+    }
+  }
+
+  // Returns value of http request parameter.
+  private boolean includeSnapshotData(HttpServletRequest request) {
+    String includeParam =
+        request.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA);
+    return Boolean.parseBoolean(includeParam);
+  }
+
+  private void writeFilesToArchive(Map<Object, Path> copyFiles,
+                         Map<Path, Path> hardLinkFiles,
+                         ArchiveOutputStream archiveOutputStream)
+      throws IOException {
+
+    File metaDirPath = ServerUtils.getOzoneMetaDirPath(getConf());
+    int truncateLength = metaDirPath.toString().length() + 1;
+
+    // Go through each of the files to be copied and add to archive.
+    for (Path file : copyFiles.values()) {
+      String fixedFile = truncateFileName(truncateLength, file);
+      if (fixedFile.startsWith(OM_CHECKPOINT_DIR)) {
+        // checkpoint files go to root of tarball
+        Path f = Paths.get(fixedFile).getFileName();
+        if (f != null) {
+          fixedFile = f.toString();
+        }
+      }
+      includeFile(file.toFile(), fixedFile, archiveOutputStream);
+    }
+
+    // Create list of hard links.
+    if (!hardLinkFiles.isEmpty()) {
+      Path hardLinkFile = createHardLinkList(truncateLength, hardLinkFiles);
+      includeFile(hardLinkFile.toFile(), OmSnapshotManager.OM_HARDLINK_FILE,
+          archiveOutputStream);
+    }
+  }
+
+  private OzoneConfiguration getConf() {
+    return ((OzoneManager) getServletContext()
+        .getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE))
+        .getConfiguration();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index ea40ba5be6..a5fbd60c1b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.om;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -36,13 +37,14 @@ import java.util.stream.Stream;
 
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
 import org.apache.hadoop.hdds.utils.TableCacheMetrics;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.hdds.utils.db.RDBCheckpointUtils;
 import org.apache.hadoop.hdds.utils.db.RocksDBConfiguration;
 import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.RDBCheckpointManager;
 import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.hdds.utils.db.TypedTable;
@@ -108,7 +110,7 @@ import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_FS_SNAPSHOT_MAX_L
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_FS_SNAPSHOT_MAX_LIMIT_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR;
 
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.util.ExitUtils;
@@ -327,13 +329,48 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
     this.omEpoch = 0;
   }
 
+  public static OmMetadataManagerImpl createCheckpointMetadataManager(
+      OzoneConfiguration conf, DBCheckpoint checkpoint) throws IOException {
+    Path path = checkpoint.getCheckpointLocation();
+    Path parent = path.getParent();
+    if (parent == null) {
+      throw new IllegalStateException("DB checkpoint parent path should not "
+          + "have been null. Checkpoint path is " + path);
+    }
+    File dir = parent.toFile();
+    Path name = path.getFileName();
+    if (name == null) {
+      throw new IllegalStateException("DB checkpoint dir name should not "
+          + "have been null. Checkpoint path is " + path);
+    }
+    return new OmMetadataManagerImpl(conf, dir, name.toString());
+  }
+
+  /**
+   * Metadata constructor for checkpoints.
+   *
+   * @param conf - Ozone conf.
+   * @param dir - Checkpoint parent directory.
+   * @param name - Checkpoint directory name.
+   * @throws IOException
+   */
+  private OmMetadataManagerImpl(OzoneConfiguration conf, File dir, String name)
+      throws IOException {
+    lock = new OmReadOnlyLock();
+    omEpoch = 0;
+    setStore(loadDB(conf, dir, name, true,
+        java.util.Optional.of(Boolean.TRUE)));
+    initializeOmTables(false);
+  }
+
+
   // metadata constructor for snapshots
   OmMetadataManagerImpl(OzoneConfiguration conf, String snapshotDirName,
       boolean isSnapshotInCache) throws IOException {
     lock = new OmReadOnlyLock();
     omEpoch = 0;
     String snapshotDir = OMStorage.getOmDbDir(conf) +
-        OM_KEY_PREFIX + OM_SNAPSHOT_DIR;
+        OM_KEY_PREFIX + OM_SNAPSHOT_CHECKPOINT_DIR;
     File metaDir = new File(snapshotDir);
     String dbName = OM_DB_NAME + snapshotDirName;
     // The check is only to prevent every snapshot read to perform a disk IO
@@ -341,7 +378,7 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
     // it is most likely DB entries will get flushed in this wait time.
     if (isSnapshotInCache) {
       File checkpoint = Paths.get(metaDir.toPath().toString(), 
dbName).toFile();
-      RDBCheckpointManager.waitForCheckpointDirectoryExist(checkpoint);
+      RDBCheckpointUtils.waitForCheckpointDirectoryExist(checkpoint);
     }
     setStore(loadDB(conf, metaDir, dbName, true,
             java.util.Optional.of(Boolean.TRUE)));
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 91fc4498c9..56253d0e5c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -23,7 +23,6 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.cache.RemovalListener;
-
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
@@ -64,9 +63,11 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static 
org.apache.hadoop.hdds.utils.db.DBStoreBuilder.DEFAULT_COLUMN_FAMILY_NAME;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIFF_DB_NAME;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_INDICATOR;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_DB_DIR;
@@ -77,6 +78,7 @@ import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVA
  * This class is used to manage/create OM snapshots.
  */
 public final class OmSnapshotManager implements AutoCloseable {
+  public static final String OM_HARDLINK_FILE = "hardLinkFile";
   private static final Logger LOG =
       LoggerFactory.getLogger(OmSnapshotManager.class);
 
@@ -452,6 +454,13 @@ public final class OmSnapshotManager implements 
AutoCloseable {
         snapshotName + OM_KEY_PREFIX;
   }
 
+  public static String getSnapshotPath(OzoneConfiguration conf,
+                                     SnapshotInfo snapshotInfo) {
+    return OMStorage.getOmDbDir(conf) +
+        OM_KEY_PREFIX + OM_SNAPSHOT_CHECKPOINT_DIR + OM_KEY_PREFIX +
+        OM_DB_NAME + snapshotInfo.getCheckpointDirName();
+  }
+
   public static boolean isSnapshotKey(String[] keyParts) {
     return (keyParts.length > 1) &&
         (keyParts[0].compareTo(OM_SNAPSHOT_INDICATOR) == 0);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index c2e96d24c4..b78686df46 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -29,6 +29,7 @@ import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.security.PrivilegedExceptionAction;
 import java.time.Duration;
@@ -83,11 +84,13 @@ import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.OzoneManagerVersion;
+import org.apache.hadoop.ozone.om.ratis_snapshot.OmRatisSnapshotProvider;
 import org.apache.hadoop.ozone.om.ha.OMHAMetrics;
 import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.service.OMRangerBGSyncService;
+import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
 import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature;
 import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse;
 import org.apache.hadoop.ozone.util.OzoneNetUtils;
@@ -166,7 +169,6 @@ import 
org.apache.hadoop.hdds.security.OzoneSecurityException;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
-import org.apache.hadoop.ozone.om.snapshot.OzoneManagerSnapshotProvider;
 import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
 import org.apache.hadoop.ozone.om.upgrade.OMUpgradeFinalizer;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerAdminProtocolProtos.OzoneManagerAdminService;
@@ -244,6 +246,7 @@ import static 
org.apache.hadoop.ozone.OzoneConsts.DEFAULT_OM_UPDATE_ID;
 import static org.apache.hadoop.ozone.OzoneConsts.LAYOUT_VERSION_KEY;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_FILE;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_TEMP_FILE;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.PREPARE_MARKER_KEY;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.RPC_PORT;
@@ -379,7 +382,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
 
   private boolean isRatisEnabled;
   private OzoneManagerRatisServer omRatisServer;
-  private OzoneManagerSnapshotProvider omSnapshotProvider;
+  private OmRatisSnapshotProvider omRatisSnapshotProvider;
   private OMNodeDetails omNodeDetails;
   private Map<String, OMNodeDetails> peerNodesMap;
   private File omRatisSnapshotDir;
@@ -1411,7 +1414,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       }
 
       if (peerNodesMap != null && !peerNodesMap.isEmpty()) {
-        this.omSnapshotProvider = new OzoneManagerSnapshotProvider(
+        this.omRatisSnapshotProvider = new OmRatisSnapshotProvider(
             configuration, omRatisSnapshotDir, peerNodesMap);
       }
     }
@@ -1447,8 +1450,8 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   }
 
   @VisibleForTesting
-  public OzoneManagerSnapshotProvider getOmSnapshotProvider() {
-    return omSnapshotProvider;
+  public OmRatisSnapshotProvider getOmSnapshotProvider() {
+    return omRatisSnapshotProvider;
   }
 
   @VisibleForTesting
@@ -1948,11 +1951,11 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       exitManager.exitSystem(1, e.getLocalizedMessage(), e, LOG);
     }
 
-    if (omSnapshotProvider == null) {
-      omSnapshotProvider = new OzoneManagerSnapshotProvider(
+    if (omRatisSnapshotProvider == null) {
+      omRatisSnapshotProvider = new OmRatisSnapshotProvider(
           configuration, omRatisSnapshotDir, peerNodesMap);
     } else {
-      omSnapshotProvider.addNewPeerNode(newOMNodeDetails);
+      omRatisSnapshotProvider.addNewPeerNode(newOMNodeDetails);
     }
     omRatisServer.addRaftPeer(newOMNodeDetails);
     peerNodesMap.put(newOMNodeId, newOMNodeDetails);
@@ -1971,7 +1974,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
           "present in peer list");
     }
 
-    omSnapshotProvider.removeDecommissionedPeerNode(decommNodeId);
+    omRatisSnapshotProvider.removeDecommissionedPeerNode(decommNodeId);
     omRatisServer.removeRaftPeer(decommOMNodeDetails);
     peerNodesMap.remove(decommNodeId);
     LOG.info("Removed OM {} from OM Peer Nodes.", decommNodeId);
@@ -2199,8 +2202,8 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       if (jvmPauseMonitor != null) {
         jvmPauseMonitor.stop();
       }
-      if (omSnapshotProvider != null) {
-        omSnapshotProvider.stop();
+      if (omRatisSnapshotProvider != null) {
+        omRatisSnapshotProvider.stop();
       }
       OMPerformanceMetrics.unregister();
       RatisDropwizardExports.clear(ratisMetricsMap, ratisReporterList);
@@ -3527,13 +3530,16 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
    *         corresponding termIndex. Otherwise, return null.
    */
   public TermIndex installSnapshotFromLeader(String leaderId) {
-    if (omSnapshotProvider == null) {
+    if (omRatisSnapshotProvider == null) {
       LOG.error("OM Snapshot Provider is not configured as there are no peer " 
+
           "nodes.");
       return null;
     }
 
     DBCheckpoint omDBCheckpoint = getDBCheckpointFromLeader(leaderId);
+    if (omDBCheckpoint == null) {
+      return null;
+    }
     LOG.info("Downloaded checkpoint from Leader {} to the location {}",
         leaderId, omDBCheckpoint.getCheckpointLocation());
 
@@ -3725,7 +3731,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
         "from the checkpoint.", leaderId);
 
     try {
-      return omSnapshotProvider.getOzoneManagerDBSnapshot(leaderId);
+      return omRatisSnapshotProvider.getOzoneManagerDBSnapshot(leaderId);
     } catch (IOException e) {
       LOG.error("Failed to download checkpoint from OM leader {}", leaderId, 
e);
     }
@@ -3754,16 +3760,30 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     String dbBackupName = OzoneConsts.OM_DB_BACKUP_PREFIX +
         lastAppliedIndex + "_" + System.currentTimeMillis();
     File dbDir = oldDB.getParentFile();
-    File dbBackup = new File(dbDir, dbBackupName);
 
-    try {
-      Files.move(oldDB.toPath(), dbBackup.toPath());
-    } catch (IOException e) {
-      LOG.error("Failed to create a backup of the current DB. Aborting " +
-          "snapshot installation.");
-      throw e;
+    // Backup the active fs and snapshot dirs.
+    File dbBackupDir = new File(dbDir, dbBackupName);
+    if (!dbBackupDir.mkdirs()) {
+      throw new IOException("Failed to make db backup dir: " +
+          dbBackupDir);
+    }
+    File dbBackup = new File(dbBackupDir, oldDB.getName());
+    File dbSnapshotsDir = new File(dbDir, OM_SNAPSHOT_DIR);
+    File dbSnapshotsBackup = new File(dbBackupDir, OM_SNAPSHOT_DIR);
+    Files.move(oldDB.toPath(), dbBackup.toPath());
+    if (dbSnapshotsDir.exists()) {
+      Files.move(dbSnapshotsDir.toPath(),
+          dbSnapshotsBackup.toPath());
     }
 
+    moveCheckpointFiles(oldDB, checkpointPath, dbDir, dbBackup, dbSnapshotsDir,
+        dbSnapshotsBackup);
+    return dbBackupDir;
+  }
+
+  private void moveCheckpointFiles(File oldDB, Path checkpointPath, File dbDir,
+                                   File dbBackup, File dbSnapshotsDir,
+                                   File dbSnapshotsBackup) throws IOException {
     // Move the new DB checkpoint into the om metadata dir
     Path markerFile = new File(dbDir, DB_TRANSIENT_MARKER).toPath();
     try {
@@ -3774,6 +3794,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       // starting up.
       Files.createFile(markerFile);
       FileUtils.moveDirectory(checkpointPath, oldDB.toPath());
+      moveOmSnapshotData(oldDB.toPath(), dbSnapshotsDir.toPath());
       Files.deleteIfExists(markerFile);
     } catch (IOException e) {
       LOG.error("Failed to move downloaded DB checkpoint {} to metadata " +
@@ -3781,6 +3802,9 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
           oldDB.toPath());
       try {
         Files.move(dbBackup.toPath(), oldDB.toPath());
+        if (dbSnapshotsBackup.exists()) {
+          Files.move(dbSnapshotsBackup.toPath(), dbSnapshotsDir.toPath());
+        }
         Files.deleteIfExists(markerFile);
       } catch (IOException ex) {
         String errorMsg = "Failed to reset to original DB. OM is in an " +
@@ -3789,7 +3813,17 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       }
       throw e;
     }
-    return dbBackup;
+  }
+
+  // Move the new snapshot directory into place and create hard links.
+  private void moveOmSnapshotData(Path dbPath, Path dbSnapshotsDir)
+      throws IOException {
+    Path incomingSnapshotsDir = Paths.get(dbPath.toString(),
+        OM_SNAPSHOT_DIR);
+    if (incomingSnapshotsDir.toFile().exists()) {
+      Files.move(incomingSnapshotsDir, dbSnapshotsDir);
+      OmSnapshotUtils.createHardLinks(dbPath);
+    }
   }
 
   /**
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
index 725190569d..cdc473646b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
@@ -52,6 +52,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import static org.apache.hadoop.ozone.OzoneConsts.FILTERED_SNAPSHOTS;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT;
@@ -144,9 +145,12 @@ public class SstFilteringService extends BackgroundService 
{
 
           String dbName = OM_DB_NAME + snapshotInfo.getCheckpointDirName();
 
+          String snapshotCheckpointDir = omMetadataDir + OM_KEY_PREFIX +
+              OM_SNAPSHOT_CHECKPOINT_DIR;
           try (RDBStore rdbStore = (RDBStore) OmMetadataManagerImpl
-              .loadDB(ozoneManager.getConfiguration(), new File(snapshotDir),
-                  dbName, true, Optional.of(Boolean.TRUE))) {
+              .loadDB(ozoneManager.getConfiguration(),
+                      new File(snapshotCheckpointDir),
+                      dbName, true, Optional.of(Boolean.TRUE))) {
             RocksDatabase db = rdbStore.getDb();
             db.deleteFilesNotMatchingPrefix(prefixPairs, filterFunction);
           }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java
similarity index 86%
rename from 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
rename to 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java
index 5c043a1acc..e5b2fdf963 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.ozone.om.snapshot;
+package org.apache.hadoop.ozone.om.ratis_snapshot;
 
 import java.io.File;
 import java.io.IOException;
@@ -54,13 +54,27 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * OzoneManagerSnapshotProvider downloads the latest checkpoint from the
- * leader OM and loads the checkpoint into State Machine.
+ * OmRatisSnapshotProvider downloads the latest checkpoint from the
+ * leader OM and loads the checkpoint into State Machine.  In addtion
+ * to the latest checkpoint, it also downloads any previous
+ * omSnapshots the leader has created.
+ *
+ * The term "snapshot" has two related but slightly different meanings
+ * in ozone.  An "omSnapshot" is a copy of the om's metadata at a
+ * point in time.  It is created by users through the "ozone sh
+ * snapshot create" cli.
+ *
+ * A "ratisSnapshot", (provided by this class), is used by om
+ * followers to bootstrap themselves to the current state of the om
+ * leader.  ratisSnapshots will contain copies of all the individual
+ * "omSnapshot"s that exist on the leader at the time of the
+ * bootstrap.  The follower needs these copies to respond the users
+ * snapshot requests when it becomes the leader.
  */
-public class OzoneManagerSnapshotProvider {
+public class OmRatisSnapshotProvider {
 
   private static final Logger LOG =
-      LoggerFactory.getLogger(OzoneManagerSnapshotProvider.class);
+      LoggerFactory.getLogger(OmRatisSnapshotProvider.class);
 
   private final File omSnapshotDir;
   private Map<String, OMNodeDetails> peerNodesMap;
@@ -68,9 +82,7 @@ public class OzoneManagerSnapshotProvider {
   private final boolean spnegoEnabled;
   private final URLConnectionFactory connectionFactory;
 
-  private static final String OM_SNAPSHOT_DB = "om.snapshot.db";
-
-  public OzoneManagerSnapshotProvider(MutableConfigurationSource conf,
+  public OmRatisSnapshotProvider(MutableConfigurationSource conf,
       File omRatisSnapshotDir, Map<String, OMNodeDetails> peerNodeDetails) {
 
     LOG.info("Initializing OM Snapshot Provider");
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/package-info.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/package-info.java
similarity index 94%
copy from 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/package-info.java
copy to 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/package-info.java
index 47cd36ac74..fce79c0a1f 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/package-info.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/package-info.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.ozone.om.snapshot;
+package org.apache.hadoop.ozone.om.ratis_snapshot;
 
 /**
  * This package contains OM Ratis Snapshot related classes.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
new file mode 100644
index 0000000000..9aef593af8
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR;
+
+/**
+ * Ozone Manager Snapshot Utilities.
+ */
+public final class OmSnapshotUtils {
+
+  private OmSnapshotUtils() { }
+
+  /**
+   * Get the filename without the introductory metadata directory.
+   *
+   * @param truncateLength Length to remove.
+   * @param file           File to remove prefix from.
+   * @return Truncated string.
+   */
+  public static String truncateFileName(int truncateLength, Path file) {
+    return file.toString().substring(truncateLength);
+  }
+
+  /**
+   * Get the INode for file.
+   *
+   * @param file File whose INode is to be retrieved.
+   * @return INode for file.
+   */
+  @VisibleForTesting
+  public static Object getINode(Path file) throws IOException {
+    return Files.readAttributes(file, BasicFileAttributes.class).fileKey();
+  }
+
+  /**
+   * Create file of links to add to tarball.
+   * Format of entries are either:
+   * dir1/fileTo fileFrom
+   * for files in active db or:
+   * dir1/fileTo dir2/fileFrom
+   * for files in another directory, (either another snapshot dir or
+   * sst compaction backup directory)
+   *
+   * @param truncateLength - Length of initial path to trim in file path.
+   * @param hardLinkFiles  - Map of link->file paths.
+   * @return Path to the file of links created.
+   */
+  public static Path createHardLinkList(int truncateLength,
+                                        Map<Path, Path> hardLinkFiles)
+      throws IOException {
+    Path data = Files.createTempFile("data", "txt");
+    StringBuilder sb = new StringBuilder();
+    for (Map.Entry<Path, Path> entry : hardLinkFiles.entrySet()) {
+      String fixedFile = truncateFileName(truncateLength, entry.getValue());
+      // If this file is from the active db, strip the path.
+      if (fixedFile.startsWith(OM_CHECKPOINT_DIR)) {
+        Path f = Paths.get(fixedFile).getFileName();
+        if (f != null) {
+          fixedFile = f.toString();
+        }
+      }
+      sb.append(truncateFileName(truncateLength, entry.getKey())).append("\t")
+          .append(fixedFile).append("\n");
+    }
+    Files.write(data, sb.toString().getBytes(StandardCharsets.UTF_8));
+    return data;
+  }
+
+  /**
+   * Create hard links listed in OM_HARDLINK_FILE.
+   *
+   * @param dbPath Path to db to have links created.
+   */
+  public static void createHardLinks(Path dbPath) throws IOException {
+    File hardLinkFile =
+        new File(dbPath.toString(), OmSnapshotManager.OM_HARDLINK_FILE);
+    if (hardLinkFile.exists()) {
+      // Read file.
+      try (Stream<String> s = Files.lines(hardLinkFile.toPath())) {
+        List<String> lines = s.collect(Collectors.toList());
+
+        // Create a link for each line.
+        for (String l : lines) {
+          String from = l.split("\t")[1];
+          String to = l.split("\t")[0];
+          Path fullFromPath = getFullPath(dbPath, from);
+          Path fullToPath = getFullPath(dbPath, to);
+          Files.createLink(fullToPath, fullFromPath);
+        }
+        if (!hardLinkFile.delete()) {
+          throw new IOException("Failed to delete: " + hardLinkFile);
+        }
+      }
+    }
+  }
+
+  // Prepend the full path to the hard link entry entry.
+  private static Path getFullPath(Path dbPath, String fileName)
+      throws IOException {
+    File file = new File(fileName);
+    // If there is no directory then this file belongs in the db.
+    if (file.getName().equals(fileName)) {
+      return Paths.get(dbPath.toString(), fileName);
+    }
+    // Else this file belong in a directory parallel to the db.
+    Path parent = dbPath.getParent();
+    if (parent == null) {
+      throw new IOException("Invalid database " + dbPath);
+    }
+    return Paths.get(parent.toString(), fileName);
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/package-info.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/package-info.java
index 47cd36ac74..26b99086e0 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/package-info.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/package-info.java
@@ -19,5 +19,5 @@
 package org.apache.hadoop.ozone.om.snapshot;
 
 /**
- * This package contains OM Ratis Snapshot related classes.
+ * This package contains OM Snapshot related classes.
  */
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
index 534524c778..583e381d25 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.hadoop.ozone.om;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -27,15 +28,28 @@ import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 
+import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR;
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.OM_HARDLINK_FILE;
+import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.getINode;
 import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
@@ -113,6 +127,60 @@ public class TestOmSnapshotManager {
     verify(firstSnapshotStore, timeout(3000).times(1)).close();
   }
 
+  @Test
+  @SuppressFBWarnings({"NP_NULL_ON_SOME_PATH"})
+  public void testHardLinkCreation() throws IOException {
+    byte[] dummyData = {0};
+
+    // Create dummy files to be linked to.
+    File snapDir1 = new File(testDir.toString(),
+        OM_SNAPSHOT_CHECKPOINT_DIR + OM_KEY_PREFIX + "dir1");
+    if (!snapDir1.mkdirs()) {
+      throw new IOException("failed to make directory: " + snapDir1);
+    }
+    Files.write(Paths.get(snapDir1.toString(), "s1"), dummyData);
+
+    File snapDir2 = new File(testDir.toString(),
+        OM_SNAPSHOT_CHECKPOINT_DIR + OM_KEY_PREFIX + "dir2");
+    if (!snapDir2.mkdirs()) {
+      throw new IOException("failed to make directory: " + snapDir2);
+    }
+
+    File dbDir = new File(testDir.toString(), OM_DB_NAME);
+    Files.write(Paths.get(dbDir.toString(), "f1"), dummyData);
+
+    // Create map of links to dummy files.
+    File checkpointDir1 = new File(testDir.toString(),
+        OM_CHECKPOINT_DIR + OM_KEY_PREFIX + "dir1");
+    Map<Path, Path> hardLinkFiles = new HashMap<>();
+    hardLinkFiles.put(Paths.get(snapDir2.toString(), "f1"),
+        Paths.get(checkpointDir1.toString(), "f1"));
+    hardLinkFiles.put(Paths.get(snapDir2.toString(), "s1"),
+        Paths.get(snapDir1.toString(), "s1"));
+
+    // Create link list.
+    Path hardLinkList =
+        OmSnapshotUtils.createHardLinkList(
+            testDir.toString().length() + 1, hardLinkFiles);
+    Files.move(hardLinkList, Paths.get(dbDir.toString(), OM_HARDLINK_FILE));
+
+    // Create links from list.
+    OmSnapshotUtils.createHardLinks(dbDir.toPath());
+
+    // Confirm expected links.
+    for (Map.Entry<Path, Path> entry : hardLinkFiles.entrySet()) {
+      Assert.assertTrue(entry.getKey().toFile().exists());
+      Path value = entry.getValue();
+      // Convert checkpoint path to om.db.
+      if (value.toString().contains(OM_CHECKPOINT_DIR)) {
+        value = Paths.get(dbDir.toString(),
+                          value.getFileName().toString());
+      }
+      Assert.assertEquals("link matches original file",
+          getINode(entry.getKey()), getINode(value));
+    }
+  }
+
   private SnapshotInfo createSnapshotInfo() {
     String snapshotName = UUID.randomUUID().toString();
     String volumeName = UUID.randomUUID().toString();
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
index acc2803c50..4b0be53a6c 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
@@ -61,7 +61,6 @@ import java.util.concurrent.TimeoutException;
 
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
@@ -205,8 +204,7 @@ public class TestSstFilteringService {
 
     String dbSnapshots = rocksDbDir + OM_KEY_PREFIX + OM_SNAPSHOT_DIR;
     String snapshotDirName =
-        dbSnapshots + OM_KEY_PREFIX + OM_DB_NAME + snapshotInfo
-            .getCheckpointDirName();
+        OmSnapshotManager.getSnapshotPath(conf, snapshotInfo);
 
     for (LiveFileMetaData file : allFiles) {
       File sstFile =
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java
index 26a4265417..f8a67fbe96 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java
@@ -47,9 +47,7 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
 
 /**
  * This class tests OMSnapshotCreateResponse.
@@ -61,11 +59,11 @@ public class TestOMSnapshotCreateResponse {
   
   private OMMetadataManager omMetadataManager;
   private BatchOperation batchOperation;
-  private String fsPath;
+  private OzoneConfiguration ozoneConfiguration;
   @Before
   public void setup() throws Exception {
-    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
-    fsPath = folder.newFolder().getAbsolutePath();
+    ozoneConfiguration = new OzoneConfiguration();
+    String fsPath = folder.newFolder().getAbsolutePath();
     ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
         fsPath);
     omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
@@ -112,9 +110,7 @@ public class TestOMSnapshotCreateResponse {
     omMetadataManager.getStore().commitBatchOperation(batchOperation);
 
     // Confirm snapshot directory was created
-    String snapshotDir = fsPath + OM_KEY_PREFIX +
-        OM_SNAPSHOT_DIR + OM_KEY_PREFIX + OM_DB_NAME +
-        snapshotInfo.getCheckpointDirName();
+    String snapshotDir = getSnapshotPath(ozoneConfiguration, snapshotInfo);
     Assert.assertTrue((new File(snapshotDir)).exists());
 
     // Confirm table has 1 entry
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotDeleteResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotDeleteResponse.java
index 8c861735d6..2c337383ed 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotDeleteResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotDeleteResponse.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateSnapshotResponse;
@@ -39,9 +40,6 @@ import org.junit.rules.TemporaryFolder;
 import java.io.File;
 import java.util.UUID;
 
-import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
 
 /**
  * This class tests OMSnapshotDeleteResponse.
@@ -54,12 +52,12 @@ public class TestOMSnapshotDeleteResponse {
   
   private OMMetadataManager omMetadataManager;
   private BatchOperation batchOperation;
-  private String fsPath;
+  private OzoneConfiguration ozoneConfiguration;
 
   @Before
   public void setup() throws Exception {
-    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
-    fsPath = folder.newFolder().getAbsolutePath();
+    ozoneConfiguration = new OzoneConfiguration();
+    String fsPath = folder.newFolder().getAbsolutePath();
     ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
         fsPath);
     omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
@@ -102,9 +100,8 @@ public class TestOMSnapshotDeleteResponse {
     omMetadataManager.getStore().commitBatchOperation(batchOperation);
 
     // Confirm snapshot directory was created
-    String snapshotDir = fsPath + OM_KEY_PREFIX +
-        OM_SNAPSHOT_DIR + OM_KEY_PREFIX + OM_DB_NAME +
-        snapshotInfo.getCheckpointDirName();
+    String snapshotDir = OmSnapshotManager.getSnapshotPath(ozoneConfiguration,
+        snapshotInfo);
     Assert.assertTrue((new File(snapshotDir)).exists());
 
     // Confirm table has 1 entry


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to