hemantk-12 commented on code in PR #3980:
URL: https://github.com/apache/ozone/pull/3980#discussion_r1027231762


##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java:
##########
@@ -259,69 +291,259 @@ public void testSpnegoEnabled() throws Exception {
   }
 
   @Test
-  public void testWriteCheckpointToOutputStream() throws Exception {
-
-    FileInputStream fis = null;
-    FileOutputStream fos = null;
-
-    try {
-      String testDirName = folder.newFolder().getAbsolutePath();
-      File file = new File(testDirName + "/temp1.txt");
-      OutputStreamWriter writer = new OutputStreamWriter(
-          new FileOutputStream(file), StandardCharsets.UTF_8);
-      writer.write("Test data 1");
-      writer.close();
-
-      file = new File(testDirName + "/temp2.txt");
-      writer = new OutputStreamWriter(
-          new FileOutputStream(file), StandardCharsets.UTF_8);
-      writer.write("Test data 2");
-      writer.close();
-
-      File outputFile =
-          new File(Paths.get(testDirName, "output_file.tgz").toString());
-      TestDBCheckpoint dbCheckpoint = new TestDBCheckpoint(
-          Paths.get(testDirName));
-      writeDBCheckpointToStream(dbCheckpoint,
-          new FileOutputStream(outputFile));
-      assertNotNull(outputFile);
-    } finally {
-      IOUtils.closeStream(fis);
-      IOUtils.closeStream(fos);
+  public void testWriteDbDataToStream()
+      throws Exception {

Review Comment:
   ```suggestion
     public void testWriteDbDataToStream() throws Exception {
   ```



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -164,19 +164,19 @@ public RocksDBCheckpointDiffer(String metadataDir, String 
sstBackupDir,
     this.activeDBLocationStr = activeDBLocation.toString() + "/";
   }
 
-  private void setCompactionLogDir(String metadataDir,
+  private void setCompactionLogDir(String snapshotDiffDir,
       String compactionLogDirName) {
 
-    final File parentDir = new File(metadataDir);
+    final File parentDir = new File(snapshotDiffDir);
     if (!parentDir.exists()) {
-      if (!parentDir.mkdir()) {
+      if (!parentDir.mkdirs()) {
         LOG.error("Error creating compaction log parent dir.");
         return;
       }
     }
 
     this.compactionLogDir =
-        Paths.get(metadataDir, compactionLogDirName).toString();
+        Paths.get(parentDir.toString(), compactionLogDirName).toString();

Review Comment:
   Should we use `getPath()` instead of `toString()` on file?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java:
##########
@@ -82,4 +112,165 @@ public void init() throws ServletException {
         allowedGroups,
         om.isSpnegoEnabled());
   }
+
+  @Override
+  public void writeDbDataToStream(DBCheckpoint checkpoint,
+                                  HttpServletRequest request,
+                                  OutputStream destination)
+      throws IOException, InterruptedException, CompressorException {
+    // Map of inodes to path
+    HashMap<Object, Path> copyFiles = new HashMap<>();
+    // Map of link to path
+    HashMap<Path, Path> hardLinkFiles = new HashMap<>();
+
+    getFilesForArchive(checkpoint, copyFiles, hardLinkFiles,
+        includeSnapshotData(request));
+
+    try (CompressorOutputStream gzippedOut = new CompressorStreamFactory()
+        .createCompressorOutputStream(CompressorStreamFactory.GZIP,
+            destination)) {
+
+      try (TarArchiveOutputStream archiveOutputStream =
+          new TarArchiveOutputStream(gzippedOut)) {
+        archiveOutputStream
+            .setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
+
+        writeFilesToArchive(copyFiles, hardLinkFiles, archiveOutputStream);
+      }
+    } catch (CompressorException e) {
+      throw new IOException(
+          "Can't compress the checkpoint: " +
+              checkpoint.getCheckpointLocation(), e);
+    }

Review Comment:
   ```suggestion
       try (CompressorOutputStream gzippedOut = new CompressorStreamFactory()
           .createCompressorOutputStream(CompressorStreamFactory.GZIP, 
destination);
            TarArchiveOutputStream archiveOutputStream = 
                new TarArchiveOutputStream(gzippedOut)
       ) {
         
archiveOutputStream.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
         writeFilesToArchive(copyFiles, hardLinkFiles, archiveOutputStream);
       } catch (CompressorException e) {
         throw new IOException(
             "Can't compress the checkpoint: " +
                 checkpoint.getCheckpointLocation(), e);
       }
   ````
   
   You can define multiple objects in try. Will remove the unnecessary nesting.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java:
##########
@@ -82,4 +112,165 @@ public void init() throws ServletException {
         allowedGroups,
         om.isSpnegoEnabled());
   }
+
+  @Override
+  public void writeDbDataToStream(DBCheckpoint checkpoint,
+                                  HttpServletRequest request,
+                                  OutputStream destination)
+      throws IOException, InterruptedException, CompressorException {
+    // Map of inodes to path
+    HashMap<Object, Path> copyFiles = new HashMap<>();
+    // Map of link to path
+    HashMap<Path, Path> hardLinkFiles = new HashMap<>();
+
+    getFilesForArchive(checkpoint, copyFiles, hardLinkFiles,
+        includeSnapshotData(request));
+
+    try (CompressorOutputStream gzippedOut = new CompressorStreamFactory()
+        .createCompressorOutputStream(CompressorStreamFactory.GZIP,
+            destination)) {
+
+      try (TarArchiveOutputStream archiveOutputStream =
+          new TarArchiveOutputStream(gzippedOut)) {
+        archiveOutputStream
+            .setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
+
+        writeFilesToArchive(copyFiles, hardLinkFiles, archiveOutputStream);
+      }
+    } catch (CompressorException e) {
+      throw new IOException(
+          "Can't compress the checkpoint: " +
+              checkpoint.getCheckpointLocation(), e);
+    }
+  }
+
+  private void getFilesForArchive(DBCheckpoint checkpoint,
+                                  Map<Object, Path> copyFiles,
+                                  Map<Path, Path> hardLinkFiles,
+                                  boolean includeSnapshotData)
+      throws IOException, InterruptedException {
+
+    // Get the active fs files
+    Path dir = checkpoint.getCheckpointLocation();
+    processDir(dir, copyFiles, hardLinkFiles);
+
+    if (!includeSnapshotData) {
+      return;
+    }
+
+    // Get the snapshot files
+    waitForSnapshotDirs(checkpoint);
+    Path snapshotDir = Paths.get(OMStorage.getOmDbDir(getConf()).toString(),
+        OM_SNAPSHOT_DIR);
+    processDir(snapshotDir, copyFiles, hardLinkFiles);
+  }
+
+  // The snapshotInfo table may contain a snapshot that
+  //  doesn't yet exist on the fs, so wait a few seconds for it
+  private void waitForSnapshotDirs(DBCheckpoint checkpoint)
+      throws IOException, InterruptedException {
+
+    OzoneConfiguration conf = getConf();
+
+    // get snapshotInfo entries
+    OmMetadataManagerImpl checkpointMetadataManager =
+        OmMetadataManagerImpl.createCheckpointMetadataManager(
+            conf, checkpoint);
+    try (TableIterator<String, ? extends Table.KeyValue<String, SnapshotInfo>>
+        iterator = checkpointMetadataManager
+        .getSnapshotInfoTable().iterator()) {
+
+      // for each entry, wait for corresponding directory
+      while (iterator.hasNext()) {
+        Table.KeyValue<String, SnapshotInfo> entry = iterator.next();
+        Path path = Paths.get(getSnapshotPath(conf, entry.getValue()));
+        waitForDirToExist(path);
+      }
+    }
+  }
+
+  private void waitForDirToExist(Path dir)
+      throws IOException, InterruptedException {
+    long endTime = System.currentTimeMillis() +
+        Duration.parse(DURATION_TO_WAIT_FOR_DIRECTORY).toMillis();
+    while (!dir.toFile().exists()) {
+      Thread.sleep(100);
+      if (System.currentTimeMillis() > endTime) {
+        break;
+      }
+    }
+    if (System.currentTimeMillis() > endTime) {
+      throw new IOException("snapshot dir doesn't exist: " + dir);

Review Comment:
   You can throw this exception at line #119. You don't have to check and throw 
here.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java:
##########
@@ -3568,7 +3570,7 @@ private DBCheckpoint getDBCheckpointFromLeader(String 
leaderId) {
         "from the checkpoint.", leaderId);
 
     try {
-      return omSnapshotProvider.getOzoneManagerDBSnapshot(leaderId);
+      return omRatisSnapshotProvider.getOzoneManagerDBSnapshot(leaderId);

Review Comment:
   Is it Ok to return null in case of IOException? We don't have a null check 
where returned value is used at line # 3385-3387. Also the same value is passed 
to `installCheckpoint` without null check.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java:
##########
@@ -298,15 +301,19 @@ protected OmMetadataManagerImpl() {
     this.omEpoch = 0;
   }
 
-  // metadata constructor for snapshots
-  private OmMetadataManagerImpl(OzoneConfiguration conf, String 
snapshotDirName)
+  /**
+   * metadata constructor for checkpoints.

Review Comment:
   nit: `* Metadata ....`



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java:
##########
@@ -3597,10 +3599,22 @@ File replaceOMDBWithCheckpoint(long lastAppliedIndex, 
File oldDB,
     String dbBackupName = OzoneConsts.OM_DB_BACKUP_PREFIX +
         lastAppliedIndex + "_" + System.currentTimeMillis();
     File dbDir = oldDB.getParentFile();
-    File dbBackup = new File(dbDir, dbBackupName);
 
+    // Backup the active fs and snapshot dirs
+    File dbBackupDir = new File(dbDir, dbBackupName);
+    if (!dbBackupDir.mkdirs()) {
+      throw new IOException("Failed to make db backup dir: " +
+          dbBackupDir);
+    }
+    File dbBackup = new File(dbBackupDir, oldDB.getName());
+    File dbSnapshotsDir = new File(dbDir, OM_SNAPSHOT_DIR);
+    File dbSnapshotsBackup = new File(dbBackupDir, OM_SNAPSHOT_DIR);
     try {
       Files.move(oldDB.toPath(), dbBackup.toPath());
+      if (dbSnapshotsDir.exists()) {
+        Files.move(dbSnapshotsDir.toPath(),
+            dbSnapshotsBackup.toPath());
+      }
     } catch (IOException e) {

Review Comment:
   This is unnecessary catch. Doing nothing other than double logging and 
re-throwing the same exception.



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java:
##########
@@ -259,69 +291,259 @@ public void testSpnegoEnabled() throws Exception {
   }
 
   @Test
-  public void testWriteCheckpointToOutputStream() throws Exception {
-
-    FileInputStream fis = null;
-    FileOutputStream fos = null;
-
-    try {
-      String testDirName = folder.newFolder().getAbsolutePath();
-      File file = new File(testDirName + "/temp1.txt");
-      OutputStreamWriter writer = new OutputStreamWriter(
-          new FileOutputStream(file), StandardCharsets.UTF_8);
-      writer.write("Test data 1");
-      writer.close();
-
-      file = new File(testDirName + "/temp2.txt");
-      writer = new OutputStreamWriter(
-          new FileOutputStream(file), StandardCharsets.UTF_8);
-      writer.write("Test data 2");
-      writer.close();
-
-      File outputFile =
-          new File(Paths.get(testDirName, "output_file.tgz").toString());
-      TestDBCheckpoint dbCheckpoint = new TestDBCheckpoint(
-          Paths.get(testDirName));
-      writeDBCheckpointToStream(dbCheckpoint,
-          new FileOutputStream(outputFile));
-      assertNotNull(outputFile);
-    } finally {
-      IOUtils.closeStream(fis);
-      IOUtils.closeStream(fos);
+  public void testWriteDbDataToStream()
+      throws Exception {
+    prepSnapshotData();
+    // set http param to include snapshot data
+    when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA))
+        .thenReturn("true");
+
+    // get the tarball
+    try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
+      omDbCheckpointServletMock.writeDbDataToStream(dbCheckpoint, requestMock,
+          fileOutputStream);
+    }
+
+    // Untar the file into a temp folder to be examined
+    String testDirName = folder.newFolder().getAbsolutePath();
+    int testDirLength = testDirName.length() + 1;
+    String newDbDirName = testDirName + OM_KEY_PREFIX + OM_DB_NAME;
+    int newDbDirLength = newDbDirName.length() + 1;
+    File newDbDir = new File(newDbDirName);
+    newDbDir.mkdirs();
+    FileUtil.unTar(tempFile, newDbDir);
+
+    // Move snapshot dir to correct location
+    new File(newDbDirName, OM_SNAPSHOT_DIR)
+        .renameTo(new File(newDbDir.getParent(), OM_SNAPSHOT_DIR));
+
+
+    // Confirm the checkpoint directories match, (after remove extras)
+    Path checkpointLocation = dbCheckpoint.getCheckpointLocation();
+    Set<String> initialCheckpointSet = getFiles(checkpointLocation,
+        checkpointLocation.toString().length() + 1);
+    Path finalCheckpointLocation = Paths.get(newDbDirName);
+    Set<String> finalCheckpointSet = getFiles(finalCheckpointLocation,
+        newDbDirLength);
+
+    Assert.assertTrue("hardlink file exists in checkpoint dir",
+        finalCheckpointSet.contains(OM_HARDLINK_FILE));
+    finalCheckpointSet.remove(OM_HARDLINK_FILE);
+    Assert.assertEquals(initialCheckpointSet, finalCheckpointSet);
+
+    int metaDirLength = metaDir.toString().length() + 1;
+    String shortSnapshotLocation =
+        truncateFileName(metaDirLength, Paths.get(snapshotDirName));
+    String shortSnapshotLocation2 =
+        truncateFileName(metaDirLength, Paths.get(snapshotDirName2));
+    String shortCompactionDirLocation =
+        truncateFileName(metaDirLength, compactionDirPath);
+
+    Set<String> finalFullSet =
+        getFiles(Paths.get(testDirName, OM_SNAPSHOT_DIR), testDirLength);
+
+    // check each line in the hard link file
+    Stream<String> lines = Files.lines(Paths.get(newDbDirName,
+        OM_HARDLINK_FILE));
+
+    List<String> fabricatedLinkLines = new ArrayList<>();
+    for (String line: lines.collect(Collectors.toList())) {
+      Assert.assertFalse("CURRENT file is not a hard link",
+          line.contains("CURRENT"));
+      if (line.contains("fabricatedFile")) {
+        fabricatedLinkLines.add(line);
+      } else {
+        checkLine(shortSnapshotLocation, shortSnapshotLocation2, line);
+        // add links to the final set
+        finalFullSet.add(line.split("\t")[0]);
+      }
     }
+
+    Set<String> directories = Sets.newHashSet(
+        shortSnapshotLocation, shortSnapshotLocation2,
+        shortCompactionDirLocation);
+    checkFabricatedLines(directories, fabricatedLinkLines, testDirName);
+
+    Set<String> initialFullSet =
+        getFiles(Paths.get(metaDir.toString(), OM_SNAPSHOT_DIR), 
metaDirLength);
+    Assert.assertEquals("found expected snapshot files",
+        initialFullSet, finalFullSet);
   }
-}
 
-class TestDBCheckpoint implements DBCheckpoint {
+  @Test
+  public void testWriteDbDataWithoutOmSnapshot()
+      throws Exception {
+    prepSnapshotData();
+
+    // set http param to exclude snapshot data
+    when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA))
+        .thenReturn(null);
+
+    // get the tarball
+    try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
+      omDbCheckpointServletMock.writeDbDataToStream(dbCheckpoint, requestMock,
+          fileOutputStream);
+    }
+
+    // Untar the file into a temp folder to be examined
+    String testDirName = folder.newFolder().getAbsolutePath();
+    int testDirLength = testDirName.length() + 1;
+    FileUtil.unTar(tempFile, new File(testDirName));
 
-  private final Path checkpointFile;
+    // Confirm the checkpoint directories match
+    Path checkpointLocation = dbCheckpoint.getCheckpointLocation();
+    Set<String> initialCheckpointSet = getFiles(checkpointLocation,
+        checkpointLocation.toString().length() + 1);
+    Path finalCheckpointLocation = Paths.get(testDirName);
+    Set<String> finalCheckpointSet = getFiles(finalCheckpointLocation,
+        testDirLength);
 
-  TestDBCheckpoint(Path checkpointFile) {
-    this.checkpointFile = checkpointFile;
+    Assert.assertEquals(initialCheckpointSet, finalCheckpointSet);
   }
 
-  @Override
-  public Path getCheckpointLocation() {
-    return checkpointFile;
+
+
+  private void prepSnapshotData() throws Exception {
+    setupCluster();
+    metaDir = OMStorage.getOmDbDir(conf);
+
+    OzoneBucket bucket = TestDataUtil.createVolumeAndBucket(cluster);
+
+    // Create dummy keys for snapshotting
+    TestDataUtil.createKey(bucket, UUID.randomUUID().toString(),
+        "content");
+    TestDataUtil.createKey(bucket, UUID.randomUUID().toString(),
+        "content");
+
+    // this sleep can be removed after this is fixed:
+    //  https://issues.apache.org/jira/browse/HDDS-7279
+    Thread.sleep(2000);
+    snapshotDirName =
+        createSnapshot(bucket.getVolumeName(), bucket.getName());
+    snapshotDirName2 =
+        createSnapshot(bucket.getVolumeName(), bucket.getName());
+
+
+    // create fabricated links to snapshot dirs
+    //  to confirm that links are recognized even if
+    //  they are don't point to the checkpoint directory
+    Path fabricatedFile = Paths.get(snapshotDirName, "fabricatedFile");
+    Path fabricatedLink = Paths.get(snapshotDirName2, "fabricatedFile");
+
+    Files.write(fabricatedFile,
+        "fabricatedData".getBytes(StandardCharsets.UTF_8));
+    Files.createLink(fabricatedLink, fabricatedFile);
+
+    // simulate links from the compaction dir
+    compactionDirPath = Paths.get(metaDir.toString(),
+        OM_SNAPSHOT_DIFF_DIR, OM_COMPACTION_BACKUP_DIR);
+    Path fabricatedLink2 = Paths.get(compactionDirPath.toString(),
+        "fabricatedFile");
+    Files.createLink(fabricatedLink2, fabricatedFile);
+    Path currentFile = Paths.get(metaDir.toString(),
+                                    OM_DB_NAME, "CURRENT");
+    Path currentLink = Paths.get(compactionDirPath.toString(), "CURRENT");
+    Files.createLink(currentLink, currentFile);
+
+    dbCheckpoint = cluster.getOzoneManager()
+        .getMetadataManager().getStore()
+        .getCheckpoint(true);
+
   }
 
-  @Override
-  public long getCheckpointTimestamp() {
-    return 0;
+  private String createSnapshot(String vname, String bname)
+      throws IOException, InterruptedException, TimeoutException {
+    final OzoneManager om = cluster.getOzoneManager();
+    String snapshotName = UUID.randomUUID().toString();
+    OzoneManagerProtocol writeClient = cluster.getRpcClient().getObjectStore()
+        .getClientProxy().getOzoneManagerClient();
+
+    writeClient.createSnapshot(vname, bname, snapshotName);
+    SnapshotInfo snapshotInfo = om.getMetadataManager().getSnapshotInfoTable()
+        .get(SnapshotInfo.getTableKey(vname, bname, snapshotName));
+    String snapshotPath = getSnapshotPath(conf, snapshotInfo)
+        + OM_KEY_PREFIX;
+    GenericTestUtils.waitFor(() -> new File(snapshotPath).exists(),
+        100, 2000);
+    return snapshotPath;
   }
 
-  @Override
-  public long getLatestSequenceNumber() {
-    return 0;
+  private Set<String> getFiles(Path path, int truncateLength)
+      throws IOException {
+    return getFiles(path, truncateLength, new HashSet<>());
+  }
+
+  // Get all files below path, recursively, (skipping fabricated files)
+  @SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
+  private Set<String> getFiles(Path path, int truncateLength,
+      Set<String> fileSet) throws IOException {
+    try (Stream<Path> files = Files.list(path)) {
+      for (Path file : files.collect(Collectors.toList())) {
+        if (file.toFile().isDirectory()) {
+          getFiles(file, truncateLength, fileSet);
+        }
+        if (!file.getFileName().toString().equals("fabricatedFile")) {
+          fileSet.add(truncateFileName(truncateLength, file));
+        }
+      }
+    }
+    return fileSet;
   }
 
-  @Override
-  public long checkpointCreationTimeTaken() {
-    return 0;
+  // tests to see that fabricated link lines in hardlink file are

Review Comment:
   1. This is not a test.
   2. Isn't supposed to be a java doc comment?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java:
##########
@@ -82,4 +112,165 @@ public void init() throws ServletException {
         allowedGroups,
         om.isSpnegoEnabled());
   }
+
+  @Override
+  public void writeDbDataToStream(DBCheckpoint checkpoint,
+                                  HttpServletRequest request,
+                                  OutputStream destination)
+      throws IOException, InterruptedException, CompressorException {
+    // Map of inodes to path
+    HashMap<Object, Path> copyFiles = new HashMap<>();
+    // Map of link to path
+    HashMap<Path, Path> hardLinkFiles = new HashMap<>();
+
+    getFilesForArchive(checkpoint, copyFiles, hardLinkFiles,
+        includeSnapshotData(request));
+
+    try (CompressorOutputStream gzippedOut = new CompressorStreamFactory()
+        .createCompressorOutputStream(CompressorStreamFactory.GZIP,
+            destination)) {
+
+      try (TarArchiveOutputStream archiveOutputStream =
+          new TarArchiveOutputStream(gzippedOut)) {
+        archiveOutputStream
+            .setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
+
+        writeFilesToArchive(copyFiles, hardLinkFiles, archiveOutputStream);
+      }
+    } catch (CompressorException e) {
+      throw new IOException(
+          "Can't compress the checkpoint: " +
+              checkpoint.getCheckpointLocation(), e);
+    }
+  }
+
+  private void getFilesForArchive(DBCheckpoint checkpoint,
+                                  Map<Object, Path> copyFiles,
+                                  Map<Path, Path> hardLinkFiles,
+                                  boolean includeSnapshotData)
+      throws IOException, InterruptedException {
+
+    // Get the active fs files
+    Path dir = checkpoint.getCheckpointLocation();
+    processDir(dir, copyFiles, hardLinkFiles);
+
+    if (!includeSnapshotData) {
+      return;
+    }
+
+    // Get the snapshot files
+    waitForSnapshotDirs(checkpoint);
+    Path snapshotDir = Paths.get(OMStorage.getOmDbDir(getConf()).toString(),
+        OM_SNAPSHOT_DIR);
+    processDir(snapshotDir, copyFiles, hardLinkFiles);
+  }
+
+  // The snapshotInfo table may contain a snapshot that
+  //  doesn't yet exist on the fs, so wait a few seconds for it
+  private void waitForSnapshotDirs(DBCheckpoint checkpoint)
+      throws IOException, InterruptedException {
+
+    OzoneConfiguration conf = getConf();
+
+    // get snapshotInfo entries
+    OmMetadataManagerImpl checkpointMetadataManager =
+        OmMetadataManagerImpl.createCheckpointMetadataManager(
+            conf, checkpoint);
+    try (TableIterator<String, ? extends Table.KeyValue<String, SnapshotInfo>>
+        iterator = checkpointMetadataManager
+        .getSnapshotInfoTable().iterator()) {
+
+      // for each entry, wait for corresponding directory
+      while (iterator.hasNext()) {
+        Table.KeyValue<String, SnapshotInfo> entry = iterator.next();
+        Path path = Paths.get(getSnapshotPath(conf, entry.getValue()));
+        waitForDirToExist(path);
+      }
+    }
+  }
+
+  private void waitForDirToExist(Path dir)
+      throws IOException, InterruptedException {
+    long endTime = System.currentTimeMillis() +
+        Duration.parse(DURATION_TO_WAIT_FOR_DIRECTORY).toMillis();
+    while (!dir.toFile().exists()) {
+      Thread.sleep(100);
+      if (System.currentTimeMillis() > endTime) {
+        break;
+      }
+    }
+    if (System.currentTimeMillis() > endTime) {
+      throw new IOException("snapshot dir doesn't exist: " + dir);
+    }
+  }
+
+  private void processDir(Path dir, Map<Object, Path> copyFiles,
+                          Map<Path, Path> hardLinkFiles)
+      throws IOException {
+    try (Stream<Path> files = Files.list(dir)) {
+      for (Path file : files.collect(Collectors.toList())) {
+        if (file.toFile().isDirectory()) {
+          processDir(file, copyFiles, hardLinkFiles);
+        } else {
+          processFile(file, copyFiles, hardLinkFiles);
+        }
+      }
+    }
+  }
+
+  private void processFile(Path file, Map<Object, Path> copyFiles,
+                           Map<Path, Path> hardLinkFiles) throws IOException {
+    // get the inode
+    Object key = OmSnapshotManager.getINode(file);
+    // If we already have the inode, store as hard link
+    if (copyFiles.containsKey(key)) {
+      hardLinkFiles.put(file, copyFiles.get(key));
+    } else {
+      copyFiles.put(key, file);
+    }
+  }
+
+  // returns value of http request parameter
+  private boolean includeSnapshotData(HttpServletRequest request) {
+    String includeParam =
+        request.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA);
+    if (StringUtils.isNotEmpty(includeParam)) {
+      return Boolean.parseBoolean(includeParam);
+    }
+    return false;
+  }
+
+  @SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
+  private void writeFilesToArchive(HashMap<Object, Path> copyFiles,
+                         HashMap<Path, Path> hardLinkFiles,
+                         ArchiveOutputStream archiveOutputStream)
+      throws IOException {
+
+    File metaDirPath = ServerUtils.getOzoneMetaDirPath(getConf());
+    int truncateLength = metaDirPath.toString().length() + 1;
+
+    // Go through each of the files to be copied and add to archive
+    for (Path file : copyFiles.values()) {
+      String fixedFile = truncateFileName(truncateLength, file);
+      if (fixedFile.startsWith(OM_CHECKPOINT_DIR)) {
+        // checkpoint files go to root of tarball
+        fixedFile = Paths.get(fixedFile).getFileName().toString();
+      }
+      includeFile(file.toFile(), fixedFile,
+          archiveOutputStream);

Review Comment:
   ```suggestion
         includeFile(file.toFile(), fixedFile, archiveOutputStream);
   ```
   Could be fit in one line.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java:
##########
@@ -3632,7 +3650,19 @@ File replaceOMDBWithCheckpoint(long lastAppliedIndex, 
File oldDB,
       }
       throw e;
     }
-    return dbBackup;
+    return dbBackupDir;
+  }
+
+  // move the new snapshot directory into place and create hard links
+  private void moveOmSnapshotData(Path dbPath, Path dbSnapshotsDir)
+      throws IOException {
+    Path incomingSnapshotsDir = Paths.get(dbPath.toString(),
+        OM_SNAPSHOT_DIR);
+    if (incomingSnapshotsDir.toFile().exists()) {
+      Files.move(incomingSnapshotsDir, dbSnapshotsDir);
+      OmSnapshotManager.createHardLinks(dbPath);
+    }
+

Review Comment:
   unnecessary line.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java:
##########
@@ -82,4 +112,165 @@ public void init() throws ServletException {
         allowedGroups,
         om.isSpnegoEnabled());
   }
+
+  @Override
+  public void writeDbDataToStream(DBCheckpoint checkpoint,
+                                  HttpServletRequest request,
+                                  OutputStream destination)
+      throws IOException, InterruptedException, CompressorException {
+    // Map of inodes to path
+    HashMap<Object, Path> copyFiles = new HashMap<>();
+    // Map of link to path
+    HashMap<Path, Path> hardLinkFiles = new HashMap<>();

Review Comment:
   Use `Map` instead of `HashMap` to declare variables. 
   
   
https://medium.com/javarevisited/oop-good-practices-coding-to-the-interface-baea84fd60d3
   



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java:
##########
@@ -227,6 +233,20 @@ public void testInstallSnapshot() throws Exception {
         getDefaultBucketLayout()).get(followerOMMetaMngr.getOzoneKey(
         volumeName, bucketName, newKeys.get(0))));
      */
+
+    // read back data from the OM snapshot
+    OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+            .setBucketName(bucketName)
+                .setKeyName(".snapshot/snap1/" + keys.get(0)).build();

Review Comment:
   ```suggestion
       OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
           .setVolumeName(volumeName)
           .setBucketName(bucketName)
           .setKeyName(".snapshot/snap1/" + keys.get(0)).build();
   ```
   
   Indentation is bit off. It should be like this.



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java:
##########
@@ -227,6 +233,20 @@ public void testInstallSnapshot() throws Exception {
         getDefaultBucketLayout()).get(followerOMMetaMngr.getOzoneKey(
         volumeName, bucketName, newKeys.get(0))));
      */
+
+    // read back data from the OM snapshot
+    OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+            .setBucketName(bucketName)
+                .setKeyName(".snapshot/snap1/" + keys.get(0)).build();
+    OmKeyInfo omKeyInfo = null;
+    try {
+      omKeyInfo = followerOM.lookupKey(omKeyArgs);
+    } catch (Exception e) {
+      Assertions.fail("received exception: " + e);
+    }
+    Assertions.assertTrue(omKeyInfo != null);

Review Comment:
   ```suggestion
       Assertions.assertNotNull(omKeyInfo);
   ```



##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java:
##########
@@ -114,6 +127,61 @@ public void testCloseOnEviction() throws IOException {
     verify(firstSnapshotStore, timeout(3000).times(1)).close();
   }
 
+  @Test
+  @SuppressFBWarnings({"NP_NULL_ON_SOME_PATH"})
+  public void testHardLinkCreation() throws IOException {
+    byte[] dummyData = {0};
+
+

Review Comment:
   Remove extra line.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java:
##########
@@ -82,4 +112,165 @@ public void init() throws ServletException {
         allowedGroups,
         om.isSpnegoEnabled());
   }
+
+  @Override
+  public void writeDbDataToStream(DBCheckpoint checkpoint,
+                                  HttpServletRequest request,
+                                  OutputStream destination)
+      throws IOException, InterruptedException, CompressorException {
+    // Map of inodes to path
+    HashMap<Object, Path> copyFiles = new HashMap<>();
+    // Map of link to path
+    HashMap<Path, Path> hardLinkFiles = new HashMap<>();
+
+    getFilesForArchive(checkpoint, copyFiles, hardLinkFiles,
+        includeSnapshotData(request));
+
+    try (CompressorOutputStream gzippedOut = new CompressorStreamFactory()
+        .createCompressorOutputStream(CompressorStreamFactory.GZIP,
+            destination)) {
+
+      try (TarArchiveOutputStream archiveOutputStream =
+          new TarArchiveOutputStream(gzippedOut)) {
+        archiveOutputStream
+            .setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
+
+        writeFilesToArchive(copyFiles, hardLinkFiles, archiveOutputStream);
+      }
+    } catch (CompressorException e) {
+      throw new IOException(
+          "Can't compress the checkpoint: " +
+              checkpoint.getCheckpointLocation(), e);
+    }
+  }
+
+  private void getFilesForArchive(DBCheckpoint checkpoint,
+                                  Map<Object, Path> copyFiles,
+                                  Map<Path, Path> hardLinkFiles,
+                                  boolean includeSnapshotData)
+      throws IOException, InterruptedException {
+
+    // Get the active fs files
+    Path dir = checkpoint.getCheckpointLocation();
+    processDir(dir, copyFiles, hardLinkFiles);
+
+    if (!includeSnapshotData) {
+      return;
+    }
+
+    // Get the snapshot files
+    waitForSnapshotDirs(checkpoint);
+    Path snapshotDir = Paths.get(OMStorage.getOmDbDir(getConf()).toString(),
+        OM_SNAPSHOT_DIR);
+    processDir(snapshotDir, copyFiles, hardLinkFiles);
+  }
+
+  // The snapshotInfo table may contain a snapshot that
+  //  doesn't yet exist on the fs, so wait a few seconds for it
+  private void waitForSnapshotDirs(DBCheckpoint checkpoint)
+      throws IOException, InterruptedException {
+
+    OzoneConfiguration conf = getConf();
+
+    // get snapshotInfo entries
+    OmMetadataManagerImpl checkpointMetadataManager =
+        OmMetadataManagerImpl.createCheckpointMetadataManager(
+            conf, checkpoint);
+    try (TableIterator<String, ? extends Table.KeyValue<String, SnapshotInfo>>
+        iterator = checkpointMetadataManager
+        .getSnapshotInfoTable().iterator()) {
+
+      // for each entry, wait for corresponding directory
+      while (iterator.hasNext()) {
+        Table.KeyValue<String, SnapshotInfo> entry = iterator.next();
+        Path path = Paths.get(getSnapshotPath(conf, entry.getValue()));
+        waitForDirToExist(path);
+      }
+    }
+  }
+
+  private void waitForDirToExist(Path dir)
+      throws IOException, InterruptedException {
+    long endTime = System.currentTimeMillis() +
+        Duration.parse(DURATION_TO_WAIT_FOR_DIRECTORY).toMillis();
+    while (!dir.toFile().exists()) {
+      Thread.sleep(100);
+      if (System.currentTimeMillis() > endTime) {
+        break;
+      }
+    }
+    if (System.currentTimeMillis() > endTime) {
+      throw new IOException("snapshot dir doesn't exist: " + dir);
+    }
+  }
+
+  private void processDir(Path dir, Map<Object, Path> copyFiles,
+                          Map<Path, Path> hardLinkFiles)
+      throws IOException {
+    try (Stream<Path> files = Files.list(dir)) {
+      for (Path file : files.collect(Collectors.toList())) {
+        if (file.toFile().isDirectory()) {
+          processDir(file, copyFiles, hardLinkFiles);
+        } else {
+          processFile(file, copyFiles, hardLinkFiles);
+        }
+      }
+    }
+  }
+
+  private void processFile(Path file, Map<Object, Path> copyFiles,
+                           Map<Path, Path> hardLinkFiles) throws IOException {
+    // get the inode
+    Object key = OmSnapshotManager.getINode(file);
+    // If we already have the inode, store as hard link
+    if (copyFiles.containsKey(key)) {
+      hardLinkFiles.put(file, copyFiles.get(key));
+    } else {
+      copyFiles.put(key, file);
+    }
+  }
+
+  // returns value of http request parameter
+  private boolean includeSnapshotData(HttpServletRequest request) {
+    String includeParam =
+        request.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA);
+    if (StringUtils.isNotEmpty(includeParam)) {
+      return Boolean.parseBoolean(includeParam);
+    }
+    return false;
+  }
+
+  @SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
+  private void writeFilesToArchive(HashMap<Object, Path> copyFiles,
+                         HashMap<Path, Path> hardLinkFiles,
+                         ArchiveOutputStream archiveOutputStream)

Review Comment:
   alignment is off for declaration of `hardLinkFiles` and  
`archiveOutputStream`. Also use Map.



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java:
##########
@@ -227,6 +233,20 @@ public void testInstallSnapshot() throws Exception {
         getDefaultBucketLayout()).get(followerOMMetaMngr.getOzoneKey(
         volumeName, bucketName, newKeys.get(0))));
      */
+
+    // read back data from the OM snapshot
+    OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+            .setBucketName(bucketName)
+                .setKeyName(".snapshot/snap1/" + keys.get(0)).build();
+    OmKeyInfo omKeyInfo = null;
+    try {
+      omKeyInfo = followerOM.lookupKey(omKeyArgs);
+    } catch (Exception e) {

Review Comment:
   I feel this is unnecessary to catch exception here.



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java:
##########
@@ -537,6 +557,36 @@ public void testInstallCorruptedCheckpointFailure() throws 
Exception {
     Assert.assertTrue(logCapture.getOutput().contains(msg));
   }
 
+  private void createSnapshot(OzoneManager leaderOM, List<String> keys)
+      throws TimeoutException, InterruptedException, IOException {
+    // Avoid double buffer issue waiting for keys
+    GenericTestUtils.waitFor(() -> {
+      try {
+        OmKeyInfo key = leaderOM.getMetadataManager()
+            .getKeyTable(ozoneBucket.getBucketLayout())
+            .getSkipCache(leaderOM.getMetadataManager()
+            .getOzoneKey(volumeName, bucketName, keys.get(0)));
+        return key != null;
+      } catch (Exception e) {
+        return false;
+      }
+    }, 100, 10000);
+    objectStore.createSnapshot(volumeName, bucketName, "snap1");
+
+    // allow the snapshot to be written to the info table
+    GenericTestUtils.waitFor(() -> {
+      try {
+        SnapshotInfo snapshotInfo =

Review Comment:
   ```suggestion
           String tableKey = SnapshotInfo.getTableKey(volumeName,
               bucketName,
               "snap1");
           SnapshotInfo snapshotInfo = leaderOM.getMetadataManager()
               .getSnapshotInfoTable()
               .getSkipCache(tableKey);
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java:
##########
@@ -3617,13 +3631,17 @@ File replaceOMDBWithCheckpoint(long lastAppliedIndex, 
File oldDB,
       // starting up.
       Files.createFile(markerFile);
       FileUtils.moveDirectory(checkpointPath, oldDB.toPath());
+      moveOmSnapshotData(oldDB.toPath(), dbSnapshotsDir.toPath());
       Files.deleteIfExists(markerFile);
     } catch (IOException e) {
       LOG.error("Failed to move downloaded DB checkpoint {} to metadata " +
               "directory {}. Resetting to original DB.", checkpointPath,
           oldDB.toPath());
       try {

Review Comment:
   This try-catch block seems complex if there is no context. It would be 
better to extract out this part of code to some function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to