This is an automated email from the ASF dual-hosted git repository.

prashantpogde pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 66c4c06b67 HDDS-8943. [Snapshot] Limit the total size of sst files in 
bootstrapping tarball. (#5014)
66c4c06b67 is described below

commit 66c4c06b67b281a0f4950ae5cf6515c6a693d0dc
Author: GeorgeJahad <[email protected]>
AuthorDate: Tue Jul 11 13:07:28 2023 -0700

    HDDS-8943. [Snapshot] Limit the total size of sst files in bootstrapping 
tarball. (#5014)
---
 .../common/src/main/resources/ozone-default.xml    |   8 ++
 .../apache/hadoop/hdds/utils/HddsServerUtil.java   |  18 +++
 .../hadoop/hdds/utils/RDBSnapshotProvider.java     |  36 +++---
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   5 +
 .../hadoop/ozone/om/TestOMDbCheckpointServlet.java |   5 +-
 .../hadoop/ozone/om/TestOMRatisSnapshots.java      | 130 +++++++++++++++++++++
 .../hadoop/ozone/om/OMDBCheckpointServlet.java     |  84 +++++++++----
 .../hadoop/ozone/om/snapshot/OmSnapshotUtils.java  |   8 ++
 .../hadoop/ozone/om/TestOmSnapshotManager.java     |  43 +++++--
 9 files changed, 286 insertions(+), 51 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index ba0e549bb3..3e9279dda8 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2021,6 +2021,14 @@
       This fallback approach is not recommended for production environments.
     </description>
   </property>
+  <property>
+    <name>ozone.om.ratis.snapshot.max.total.sst.size</name>
+    <value>100000000</value>
+    <tag>OZONE, OM, RATIS</tag>
+    <description>
+      Max size of SST files in OM Ratis Snapshot tarball.
+    </description>
+  </property>
   <property>
     <name>ozone.om.snapshot.provider.socket.timeout</name>
     <value>5000s</value>
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
index c6840d39d5..2c59bc1e3c 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
@@ -109,6 +109,9 @@ public final class HddsServerUtil {
   private HddsServerUtil() {
   }
 
+  public static final String OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME =
+      "OZONE_RATIS_SNAPSHOT_COMPLETE";
+
   private static final Logger LOG = LoggerFactory.getLogger(
       HddsServerUtil.class);
 
@@ -590,6 +593,7 @@ public final class HddsServerUtil {
           }
         }
       }
+      includeRatisSnapshotCompleteFlag(archiveOutputStream);
     }
   }
 
@@ -605,6 +609,20 @@ public final class HddsServerUtil {
     archiveOutputStream.closeArchiveEntry();
   }
 
+  // Mark tarball completed.
+  public static void includeRatisSnapshotCompleteFlag(
+      ArchiveOutputStream archiveOutput) throws IOException {
+    File file = File.createTempFile(
+        OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME, "");
+    String entryName = OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME;
+    includeFile(file, entryName, archiveOutput);
+  }
+
+  static boolean ratisSnapshotComplete(Path dir) {
+    return new File(dir.toString(),
+        OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME).exists();
+  }
+
   // optimize ugi lookup for RPC operations to avoid a trip through
   // UGI.getCurrentUser which is synch'ed
   public static UserGroupInformation getRemoteUser() throws IOException {
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java
index 261e4e103d..2ebb56179c 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.hadoop.hdds.utils.HddsServerUtil.ratisSnapshotComplete;
 import static org.apache.hadoop.ozone.OzoneConsts.SNAPSHOT_CANDIDATE_DIR;
 
 /**
@@ -110,21 +111,26 @@ public abstract class RDBSnapshotProvider implements 
Closeable {
         "reloading state from the snapshot.", leaderNodeID);
     checkLeaderConsistency(leaderNodeID);
 
-    String snapshotFileName = getSnapshotFileName(leaderNodeID);
-    File targetFile = new File(snapshotDir, snapshotFileName);
-    downloadSnapshot(leaderNodeID, targetFile);
-    LOG.info("Successfully download the latest snapshot {} from leader OM: {}",
-        targetFile, leaderNodeID);
-
-    numDownloaded.incrementAndGet();
-    injectPause();
-
-    RocksDBCheckpoint checkpoint = getCheckpointFromSnapshotFile(targetFile,
-        candidateDir, true);
-    LOG.info("Successfully untar the downloaded snapshot {} at {}.", 
targetFile,
-        checkpoint.getCheckpointLocation());
-
-    return checkpoint;
+    while (true) {
+      String snapshotFileName = getSnapshotFileName(leaderNodeID);
+      File targetFile = new File(snapshotDir, snapshotFileName);
+      downloadSnapshot(leaderNodeID, targetFile);
+      LOG.info(
+          "Successfully download the latest snapshot {} from leader OM: {}",
+          targetFile, leaderNodeID);
+
+      numDownloaded.incrementAndGet();
+      injectPause();
+
+      RocksDBCheckpoint checkpoint = getCheckpointFromSnapshotFile(targetFile,
+          candidateDir, true);
+      LOG.info("Successfully untar the downloaded snapshot {} at {}.",
+          targetFile, checkpoint.getCheckpointLocation());
+      if (ratisSnapshotComplete(checkpoint.getCheckpointLocation())) {
+        LOG.info("Ratis snapshot transfer is complete.");
+        return checkpoint;
+      }
+    }
   }
 
   /**
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 2d824de3bb..259f7ab3ee 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -202,6 +202,11 @@ public final class OMConfigKeys {
   public static final long
       OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_DEFAULT = 400000;
 
+  public static final String OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY
+      = "ozone.om.ratis.snapshot.max.total.sst.size";
+  public static final long
+      OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT = 100_000_000;
+
   // OM Ratis server configurations
   public static final String OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_KEY
       = "ozone.om.ratis.server.request.timeout";
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
index c16b31cc36..b57fbbb78e 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.commons.io.FileUtils;
 
 import static 
org.apache.hadoop.hdds.recon.ReconConfig.ConfigStrings.OZONE_RECON_KERBEROS_PRINCIPAL_KEY;
+import static 
org.apache.hadoop.hdds.utils.HddsServerUtil.OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
@@ -703,7 +704,9 @@ public class TestOMDbCheckpointServlet {
         if (file.toFile().isDirectory()) {
           getFiles(file, truncateLength, fileSet);
         }
-        if (!file.getFileName().toString().startsWith("fabricated")) {
+        String filename = file.getFileName().toString();
+        if (!filename.startsWith("fabricated") &&
+            !filename.startsWith(OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME)) {
           fileSet.add(truncateFileName(truncateLength, file));
         }
       }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
index b65a17207e..2042f5a55a 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
@@ -16,6 +16,9 @@
  */
 package org.apache.hadoop.ozone.om;
 
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.ExitManager;
@@ -63,6 +66,8 @@ import org.slf4j.Logger;
 import org.slf4j.event.Level;
 
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -70,8 +75,10 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -82,6 +89,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY;
 import static org.apache.hadoop.ozone.om.OmSnapshotManager.OM_HARDLINK_FILE;
 import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
 import static org.apache.hadoop.ozone.om.TestOzoneManagerHAWithData.createKey;
@@ -201,6 +209,13 @@ public class TestOMRatisSnapshots {
     }
     OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
 
+    List<Set<String>> sstSetList = new ArrayList<>();
+    FaultInjector faultInjector =
+        new SnapshotMaxSizeInjector(leaderOM,
+            followerOM.getOmSnapshotProvider().getSnapshotDir(),
+            sstSetList);
+    followerOM.getOmSnapshotProvider().setInjector(faultInjector);
+
     // Create some snapshots, each with new keys
     int keyIncrement = 10;
     String snapshotNamePrefix = "snapshot";
@@ -287,6 +302,17 @@ public class TestOMRatisSnapshots {
      */
 
     checkSnapshot(leaderOM, followerOM, snapshotName, keys, snapshotInfo);
+    int sstFileCount = 0;
+    Set<String> sstFileUnion = new HashSet<>();
+    for (Set<String> sstFiles : sstSetList) {
+      sstFileCount += sstFiles.size();
+      sstFileUnion.addAll(sstFiles);
+    }
+    // Confirm that there were multiple tarballs.
+    assertTrue(sstSetList.size() > 1);
+    // Confirm that there was no overlap of sst files
+    // between the individual tarballs.
+    assertEquals(sstFileUnion.size(), sstFileCount);
   }
 
   private void checkSnapshot(OzoneManager leaderOM, OzoneManager followerOM,
@@ -1132,4 +1158,108 @@ public class TestOMRatisSnapshots {
       init();
     }
   }
+
+  // Interrupts the tarball download process to test creation of
+  // multiple tarballs as needed when the tarball size exceeds the
+  // max.
+  private static class SnapshotMaxSizeInjector extends FaultInjector {
+    private final OzoneManager om;
+    private int count;
+    private final File snapshotDir;
+    private final List<Set<String>> sstSetList;
+    private final Path tempDir;
+    SnapshotMaxSizeInjector(OzoneManager om, File snapshotDir,
+                            List<Set<String>> sstSetList) throws IOException {
+      this.om = om;
+      this.snapshotDir = snapshotDir;
+      this.sstSetList = sstSetList;
+      this.tempDir = Files.createTempDirectory("tmpDirPrefix");
+      init();
+    }
+
+    @Override
+    public void init() {
+    }
+
+    @Override
+    // Pause each time a tarball is received, to process it.
+    public void pause() throws IOException {
+      count++;
+      File tarball = getTarball(snapshotDir);
+      // First time through, get total size of sst files and reduce
+      // max size config.  That way next time through, we get multiple
+      // tarballs.
+      if (count == 1) {
+        long sstSize = getSizeOfSstFiles(tarball);
+        om.getConfiguration().setLong(
+            OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY, sstSize / 2);
+        // Now empty the tarball to restart the download
+        // process from the beginning.
+        createEmptyTarball(tarball);
+      } else {
+        // Each time we get a new tarball add a set of
+        // its sst file to the list, (i.e. one per tarball.)
+        sstSetList.add(getSstFilenames(tarball));
+      }
+    }
+
+    // Get Size of sstfiles in tarball.
+    private long getSizeOfSstFiles(File tarball) throws IOException {
+      FileUtil.unTar(tarball, tempDir.toFile());
+      List<Path> sstPaths = Files.walk(tempDir).filter(
+          path -> path.toString().endsWith(".sst")).
+          collect(Collectors.toList());
+      long sstSize = 0;
+      for (Path sstPath : sstPaths) {
+        sstSize += Files.size(sstPath);
+      }
+      return sstSize;
+    }
+
+    private void createEmptyTarball(File dummyTarFile)
+        throws IOException {
+      FileOutputStream fileOutputStream = new FileOutputStream(dummyTarFile);
+      TarArchiveOutputStream archiveOutputStream =
+          new TarArchiveOutputStream(fileOutputStream);
+      archiveOutputStream.close();
+    }
+
+    // Return a list of sst files in tarball.
+    private Set<String> getSstFilenames(File tarball)
+        throws IOException {
+      Set<String> sstFilenames = new HashSet<>();
+      try (TarArchiveInputStream tarInput =
+           new TarArchiveInputStream(new FileInputStream(tarball))) {
+        TarArchiveEntry entry;
+        while ((entry = tarInput.getNextTarEntry()) != null) {
+          String name = entry.getName();
+          if (name.toLowerCase().endsWith(".sst")) {
+            sstFilenames.add(entry.getName());
+          }
+        }
+      }
+      return sstFilenames;
+    }
+
+    // Find the tarball in the dir.
+    private File getTarball(File dir) {
+      File[] fileList = dir.listFiles();
+      assertNotNull(fileList);
+      for (File f : fileList) {
+        if (f.getName().toLowerCase().endsWith(".tar")) {
+          return f;
+        }
+      }
+      return null;
+    }
+
+    @Override
+    public void resume() throws IOException {
+    }
+
+    @Override
+    public void reset() throws IOException {
+      init();
+    }
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
index f6e14fe97a..7fb339d24c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
@@ -52,15 +52,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.hadoop.hdds.utils.HddsServerUtil.includeFile;
+import static 
org.apache.hadoop.hdds.utils.HddsServerUtil.includeRatisSnapshotCompleteFlag;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
 import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA;
 import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY;
 import static 
org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.createHardLinkList;
 import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
 import static 
org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.truncateFileName;
@@ -84,7 +88,7 @@ public class OMDBCheckpointServlet extends 
DBCheckpointServlet {
       LoggerFactory.getLogger(OMDBCheckpointServlet.class);
   private static final long serialVersionUID = 1L;
   private transient BootstrapStateHandler.Lock lock;
-
+  private long maxTotalSstSize = 0;
   @Override
   public void init() throws ServletException {
 
@@ -144,9 +148,11 @@ public class OMDBCheckpointServlet extends 
DBCheckpointServlet {
       Set<Path> toExcludeFiles = normalizeExcludeList(toExcludeList,
           checkpoint.getCheckpointLocation().toString(),
           ServerUtils.getOzoneMetaDirPath(getConf()).toString());
-      getFilesForArchive(checkpoint, copyFiles, hardLinkFiles, toExcludeFiles,
-          includeSnapshotData(request), excludedList);
-      writeFilesToArchive(copyFiles, hardLinkFiles, archiveOutputStream);
+      boolean completed = getFilesForArchive(checkpoint, copyFiles,
+          hardLinkFiles, toExcludeFiles, includeSnapshotData(request),
+          excludedList);
+      writeFilesToArchive(copyFiles, hardLinkFiles, archiveOutputStream,
+          completed);
     } catch (Exception e) {
       LOG.error("got exception writing to archive " + e);
       throw e;
@@ -169,7 +175,7 @@ public class OMDBCheckpointServlet extends 
DBCheckpointServlet {
     return paths;
   }
 
-  private void getFilesForArchive(DBCheckpoint checkpoint,
+  private boolean getFilesForArchive(DBCheckpoint checkpoint,
                                   Set<Path> copyFiles,
                                   Map<Path, Path> hardLinkFiles,
                                   Set<Path> toExcludeFiles,
@@ -177,21 +183,28 @@ public class OMDBCheckpointServlet extends 
DBCheckpointServlet {
                                   List<String> excluded)
       throws IOException {
 
+    maxTotalSstSize = getConf().getLong(
+        OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY,
+        OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT);
+
+    AtomicLong copySize = new AtomicLong(0L);
     // Get the active fs files.
     Path dir = checkpoint.getCheckpointLocation();
-    processDir(dir, copyFiles, hardLinkFiles, toExcludeFiles,
-        new HashSet<>(), excluded);
+    if (!processDir(dir, copyFiles, hardLinkFiles, toExcludeFiles,
+        new HashSet<>(), excluded, copySize)) {
+      return false;
+    }
 
     if (!includeSnapshotData) {
-      return;
+      return true;
     }
 
     // Get the snapshot files.
     Set<Path> snapshotPaths = waitForSnapshotDirs(checkpoint);
     Path snapshotDir = Paths.get(OMStorage.getOmDbDir(getConf()).toString(),
         OM_SNAPSHOT_DIR);
-    processDir(snapshotDir, copyFiles, hardLinkFiles, toExcludeFiles,
-        snapshotPaths, excluded);
+    return processDir(snapshotDir, copyFiles, hardLinkFiles, toExcludeFiles,
+        snapshotPaths, excluded, copySize);
   }
 
   /**
@@ -234,11 +247,12 @@ public class OMDBCheckpointServlet extends 
DBCheckpointServlet {
     }
   }
 
-  private void processDir(Path dir, Set<Path> copyFiles,
+  private boolean processDir(Path dir, Set<Path> copyFiles,
                           Map<Path, Path> hardLinkFiles,
                           Set<Path> toExcludeFiles,
                           Set<Path> snapshotPaths,
-                          List<String> excluded)
+                          List<String> excluded,
+                          AtomicLong copySize)
       throws IOException {
     try (Stream<Path> files = Files.list(dir)) {
       for (Path file : files.collect(Collectors.toList())) {
@@ -251,13 +265,22 @@ public class OMDBCheckpointServlet extends 
DBCheckpointServlet {
             LOG.debug("Skipping unneeded file: " + file);
             continue;
           }
-          processDir(file, copyFiles, hardLinkFiles, toExcludeFiles,
-              snapshotPaths, excluded);
+          if (!processDir(file, copyFiles, hardLinkFiles, toExcludeFiles,
+                          snapshotPaths, excluded, copySize)) {
+            return false;
+          }
         } else {
-          processFile(file, copyFiles, hardLinkFiles, toExcludeFiles, 
excluded);
+          long fileSize = processFile(file, copyFiles, hardLinkFiles,
+              toExcludeFiles, excluded);
+          if (copySize.get() + fileSize > maxTotalSstSize) {
+            return false;
+          } else {
+            copySize.addAndGet(fileSize);
+          }
         }
       }
     }
+    return true;
   }
 
   /**
@@ -272,10 +295,11 @@ public class OMDBCheckpointServlet extends 
DBCheckpointServlet {
    * @param excluded The list of db files that actually were excluded.
    */
   @VisibleForTesting
-  public static void processFile(Path file, Set<Path> copyFiles,
+  public static long processFile(Path file, Set<Path> copyFiles,
                                  Map<Path, Path> hardLinkFiles,
                                  Set<Path> toExcludeFiles,
-                                 List<String> excluded) {
+                                 List<String> excluded) throws IOException {
+    long fileSize = 0;
     if (toExcludeFiles.contains(file)) {
       excluded.add(file.toString());
     } else {
@@ -297,6 +321,7 @@ public class OMDBCheckpointServlet extends 
DBCheckpointServlet {
           } else {
             // Add to tarball.
             copyFiles.add(file);
+            fileSize = Files.size(file);
           }
         }
       } else {
@@ -304,6 +329,7 @@ public class OMDBCheckpointServlet extends 
DBCheckpointServlet {
         copyFiles.add(file);
       }
     }
+    return fileSize;
   }
 
   // If fileName exists in "files" parameter,
@@ -326,14 +352,20 @@ public class OMDBCheckpointServlet extends 
DBCheckpointServlet {
 
   private void writeFilesToArchive(Set<Path> copyFiles,
                                    Map<Path, Path> hardLinkFiles,
-                                   ArchiveOutputStream archiveOutputStream)
+                                   ArchiveOutputStream archiveOutputStream,
+                                   boolean completed)
       throws IOException {
 
     File metaDirPath = ServerUtils.getOzoneMetaDirPath(getConf());
     int truncateLength = metaDirPath.toString().length() + 1;
 
+    Set<Path> filteredCopyFiles = completed ? copyFiles :
+        copyFiles.stream().filter(path ->
+          path.getFileName().toString().toLowerCase().endsWith(".sst")).
+          collect(Collectors.toSet());
+
     // Go through each of the files to be copied and add to archive.
-    for (Path file : copyFiles) {
+    for (Path file : filteredCopyFiles) {
       String fixedFile = truncateFileName(truncateLength, file);
       if (fixedFile.startsWith(OM_CHECKPOINT_DIR)) {
         // checkpoint files go to root of tarball
@@ -345,11 +377,15 @@ public class OMDBCheckpointServlet extends 
DBCheckpointServlet {
       includeFile(file.toFile(), fixedFile, archiveOutputStream);
     }
 
-    // Create list of hard links.
-    if (!hardLinkFiles.isEmpty()) {
-      Path hardLinkFile = createHardLinkList(truncateLength, hardLinkFiles);
-      includeFile(hardLinkFile.toFile(), OmSnapshotManager.OM_HARDLINK_FILE,
-          archiveOutputStream);
+    if (completed) {
+      // Only create the hard link list for the last tarball.
+      if (!hardLinkFiles.isEmpty()) {
+        Path hardLinkFile = createHardLinkList(truncateLength, hardLinkFiles);
+        includeFile(hardLinkFile.toFile(), OmSnapshotManager.OM_HARDLINK_FILE,
+            archiveOutputStream);
+      }
+      // Mark tarball completed.
+      includeRatisSnapshotCompleteFlag(archiveOutputStream);
     }
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
index eac63a54bc..0b432f1592 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
@@ -117,6 +117,14 @@ public final class OmSnapshotUtils {
           String to = l.split("\t")[0];
           Path fullFromPath = Paths.get(dbPath.toString(), from);
           Path fullToPath = Paths.get(dbPath.toString(), to);
+          // Make parent dir if it doesn't exist.
+          Path parent = fullToPath.getParent();
+          if ((parent != null) && (!parent.toFile().exists())) {
+            if (!parent.toFile().mkdirs()) {
+              throw new IOException(
+                  "Failed to create directory: " + parent.toString());
+            }
+          }
           Files.createLink(fullToPath, fullFromPath);
         }
         if (!hardLinkFile.delete()) {
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
index 6af88cf241..52f56deaa6 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
@@ -39,6 +39,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -340,67 +341,87 @@ public class TestOmSnapshotManager {
    * should be copied, linked, or excluded from the tarball entirely.
    */
   @Test
-  public void testProcessFile() {
+  public void testProcessFile() throws IOException {
+    Assert.assertTrue(new File(testDir.toString(), "snap1").mkdirs());
+    Assert.assertTrue(new File(testDir.toString(), "snap2").mkdirs());
     Path copyFile = Paths.get(testDir.toString(),
         "snap1/copyfile.sst");
+    Files.write(copyFile,
+        "dummyData".getBytes(StandardCharsets.UTF_8));
+    long expectedFileSize = Files.size(copyFile);
     Path excludeFile = Paths.get(testDir.toString(),
         "snap1/excludeFile.sst");
+    Files.write(excludeFile,
+        "dummyData".getBytes(StandardCharsets.UTF_8));
     Path linkToExcludedFile = Paths.get(testDir.toString(),
         "snap2/excludeFile.sst");
+    Files.write(linkToExcludedFile,
+        "dummyData".getBytes(StandardCharsets.UTF_8));
     Path linkToCopiedFile = Paths.get(testDir.toString(),
         "snap2/copyfile.sst");
+    Files.write(linkToCopiedFile,
+        "dummyData".getBytes(StandardCharsets.UTF_8));
     Path addToCopiedFiles = Paths.get(testDir.toString(),
         "snap1/copyfile2.sst");
+    Files.write(addToCopiedFiles,
+        "dummyData".getBytes(StandardCharsets.UTF_8));
     Path addNonSstToCopiedFiles = Paths.get(testDir.toString(),
         "snap1/nonSst");
+    Files.write(addNonSstToCopiedFiles,
+        "dummyData".getBytes(StandardCharsets.UTF_8));
 
     Set<Path> toExcludeFiles = new HashSet<>(
         Collections.singletonList(excludeFile));
     Set<Path> copyFiles = new HashSet<>(Collections.singletonList(copyFile));
     List<String> excluded = new ArrayList<>();
     Map<Path, Path> hardLinkFiles = new HashMap<>();
-
+    long fileSize;
     // Confirm the exclude file gets added to the excluded list,
     //  (and thus is excluded.)
-    processFile(excludeFile, copyFiles, hardLinkFiles, toExcludeFiles,
-        excluded);
+    fileSize = processFile(excludeFile, copyFiles, hardLinkFiles,
+        toExcludeFiles, excluded);
     Assert.assertEquals(excluded.size(), 1);
     Assert.assertEquals((excluded.get(0)), excludeFile.toString());
     Assert.assertEquals(copyFiles.size(), 1);
     Assert.assertEquals(hardLinkFiles.size(), 0);
+    Assert.assertEquals(fileSize, 0);
     excluded = new ArrayList<>();
 
     // Confirm the linkToExcludedFile gets added as a link.
-    processFile(linkToExcludedFile, copyFiles, hardLinkFiles, toExcludeFiles,
-        excluded);
+    fileSize = processFile(linkToExcludedFile, copyFiles, hardLinkFiles,
+        toExcludeFiles, excluded);
     Assert.assertEquals(excluded.size(), 0);
     Assert.assertEquals(copyFiles.size(), 1);
     Assert.assertEquals(hardLinkFiles.size(), 1);
     Assert.assertEquals(hardLinkFiles.get(linkToExcludedFile), excludeFile);
+    Assert.assertEquals(fileSize, 0);
     hardLinkFiles = new HashMap<>();
 
     // Confirm the linkToCopiedFile gets added as a link.
-    processFile(linkToCopiedFile, copyFiles, hardLinkFiles, toExcludeFiles,
-        excluded);
+    fileSize = processFile(linkToCopiedFile, copyFiles, hardLinkFiles,
+        toExcludeFiles, excluded);
     Assert.assertEquals(excluded.size(), 0);
     Assert.assertEquals(copyFiles.size(), 1);
     Assert.assertEquals(hardLinkFiles.size(), 1);
     Assert.assertEquals(hardLinkFiles.get(linkToCopiedFile), copyFile);
+    Assert.assertEquals(fileSize, 0);
     hardLinkFiles = new HashMap<>();
 
     // Confirm the addToCopiedFiles gets added to list of copied files
-    processFile(addToCopiedFiles, copyFiles, hardLinkFiles, toExcludeFiles,
-        excluded);
+    fileSize = processFile(addToCopiedFiles, copyFiles, hardLinkFiles,
+        toExcludeFiles, excluded);
     Assert.assertEquals(excluded.size(), 0);
     Assert.assertEquals(copyFiles.size(), 2);
     Assert.assertTrue(copyFiles.contains(addToCopiedFiles));
+    Assert.assertEquals(fileSize, expectedFileSize);
     copyFiles = new HashSet<>(Collections.singletonList(copyFile));
 
     // Confirm the addNonSstToCopiedFiles gets added to list of copied files
-    processFile(addNonSstToCopiedFiles, copyFiles, hardLinkFiles,
+    fileSize = processFile(addNonSstToCopiedFiles, copyFiles, hardLinkFiles,
         toExcludeFiles, excluded);
     Assert.assertEquals(excluded.size(), 0);
     Assert.assertEquals(copyFiles.size(), 2);
+    Assert.assertEquals(fileSize, 0);
     Assert.assertTrue(copyFiles.contains(addNonSstToCopiedFiles));
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to