swamirishi commented on code in PR #9132:
URL: https://github.com/apache/ozone/pull/9132#discussion_r2589553036


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/InodeMetadataRocksDBCheckpoint.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RocksDB checkpoint implementation that uses hardlinks to optimize disk space
+ * for inode-based metadata checkpoints.
+ *
+ * <p>During construction, reads a hardlink mapping file and creates hardlinks
+ * from checkpoint files to the checkpoint_data directory. Original files are
+ * then deleted since they're accessible via hardlinks, saving disk space while
+ * maintaining checkpoint functionality.
+ * </p>
+ */
+public class InodeMetadataRocksDBCheckpoint implements DBCheckpoint {
+
+  private final Path checkpointLocation;
+  private final long checkpointTimestamp = System.currentTimeMillis();
+  private final long latestSequenceNumber = -1;
+  private final long checkpointCreationTimeTaken = 0L;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(InodeMetadataRocksDBCheckpoint.class);
+
+  public static final String OM_HARDLINK_FILE = "hardLinkFile";
+
+  public InodeMetadataRocksDBCheckpoint(Path checkpointLocation) throws 
IOException {
+    this.checkpointLocation = checkpointLocation;
+    installHardLinks();
+  }
+
+  @Override
+  public Path getCheckpointLocation() {
+    return this.checkpointLocation;
+  }
+
+  @Override
+  public long getCheckpointTimestamp() {
+    return this.checkpointTimestamp;
+  }
+
+  @Override
+  public long getLatestSequenceNumber() {
+    return this.latestSequenceNumber;
+  }
+
+  @Override
+  public long checkpointCreationTimeTaken() {
+    return this.checkpointCreationTimeTaken;
+  }
+
+  @Override
+  public void cleanupCheckpoint() throws IOException {
+    LOG.info("Cleaning up RocksDB checkpoint at {}",
+        checkpointLocation.toString());
+    FileUtils.deleteDirectory(checkpointLocation.toFile());
+  }
+
+  private void installHardLinks() throws IOException {
+    File hardLinkFile = new File(checkpointLocation.toFile(),
+        OM_HARDLINK_FILE);
+
+    if (!hardLinkFile.exists()) {
+      LOG.error("Hardlink file : {} does not exist.", hardLinkFile);
+      return;
+    }
+
+    // Track source files that need to be deleted after hardlink creation
+    List<Path> sourceFilesToDelete = new ArrayList<>();
+
+    // Read file and create hardlinks directly in checkpointLocation
+    try (Stream<String> s = Files.lines(hardLinkFile.toPath())) {
+      List<String> lines = s.collect(Collectors.toList());
+
+      // Create hardlinks directly in checkpointLocation
+      for (String l : lines) {
+        String[] parts = l.split("\t");

Review Comment:
   make the "\t" a delimiter constant and use constant both here and 
CreateHardLinkFile function



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/InodeMetadataRocksDBCheckpoint.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RocksDB checkpoint implementation that uses hardlinks to optimize disk space
+ * for inode-based metadata checkpoints.
+ *
+ * <p>During construction, reads a hardlink mapping file and creates hardlinks
+ * from checkpoint files to the checkpoint_data directory. Original files are
+ * then deleted since they're accessible via hardlinks, saving disk space while
+ * maintaining checkpoint functionality.
+ * </p>
+ */
+public class InodeMetadataRocksDBCheckpoint implements DBCheckpoint {
+
+  private final Path checkpointLocation;
+  private final long checkpointTimestamp = System.currentTimeMillis();
+  private final long latestSequenceNumber = -1;
+  private final long checkpointCreationTimeTaken = 0L;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(InodeMetadataRocksDBCheckpoint.class);
+
+  public static final String OM_HARDLINK_FILE = "hardLinkFile";
+
+  public InodeMetadataRocksDBCheckpoint(Path checkpointLocation) throws 
IOException {
+    this.checkpointLocation = checkpointLocation;
+    installHardLinks();
+  }
+
+  @Override
+  public Path getCheckpointLocation() {
+    return this.checkpointLocation;
+  }
+
+  @Override
+  public long getCheckpointTimestamp() {
+    return this.checkpointTimestamp;
+  }
+
+  @Override
+  public long getLatestSequenceNumber() {
+    return this.latestSequenceNumber;
+  }
+
+  @Override
+  public long checkpointCreationTimeTaken() {
+    return this.checkpointCreationTimeTaken;
+  }
+
+  @Override
+  public void cleanupCheckpoint() throws IOException {
+    LOG.info("Cleaning up RocksDB checkpoint at {}",
+        checkpointLocation.toString());
+    FileUtils.deleteDirectory(checkpointLocation.toFile());
+  }
+
+  private void installHardLinks() throws IOException {
+    File hardLinkFile = new File(checkpointLocation.toFile(),
+        OM_HARDLINK_FILE);
+
+    if (!hardLinkFile.exists()) {
+      LOG.error("Hardlink file : {} does not exist.", hardLinkFile);
+      return;
+    }
+
+    // Track source files that need to be deleted after hardlink creation
+    List<Path> sourceFilesToDelete = new ArrayList<>();
+
+    // Read file and create hardlinks directly in checkpointLocation
+    try (Stream<String> s = Files.lines(hardLinkFile.toPath())) {
+      List<String> lines = s.collect(Collectors.toList());
+
+      // Create hardlinks directly in checkpointLocation
+      for (String l : lines) {
+        String[] parts = l.split("\t");
+        if (parts.length != 2) {
+          LOG.warn("Skipping malformed line in hardlink file: {}", l);
+          continue;
+        }
+        String to = parts[0];      // Destination path (relative)
+        String from = parts[1];    // Source path (relative to 
checkpointLocation)
+
+        Path sourcePath = Paths.get(checkpointLocation.toString(), from);

Review Comment:
   `Path sourcePath = checkpointLocation.resolve(from).toAbsolutePath();`



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java:
##########
@@ -183,7 +184,7 @@ public String getSnapshotFileName(String leaderNodeID) {
    * @return {@link RocksDBCheckpoint}
    * @throws IOException
    */
-  public RocksDBCheckpoint getCheckpointFromSnapshotFile(File snapshot,
+  public Path untarContentsOfTarball(File snapshot,

Review Comment:
   I believe the function getCheckpointFromSnapshotFile should be overridden in 
OmRatisSnapshotProvider class instead of changing the definition of this 
function



##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotUtils.java:
##########
@@ -75,4 +77,44 @@ public void testLinkFiles(@TempDir File tempDir) throws 
Exception {
 
     assertEquals(tree1Files, tree2Files);
   }
+
+  /**
+   * Test createHardLinks().
+   */
+  @Test
+  public void testCreateHardLinksWithOmDbPrefix(@TempDir File tempDir) throws 
Exception {

Review Comment:
   Please move this to TestInodeMetadataRocksdbCheckpoint



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/InodeMetadataRocksDBCheckpoint.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RocksDB checkpoint implementation that uses hardlinks to optimize disk space
+ * for inode-based metadata checkpoints.
+ *
+ * <p>During construction, reads a hardlink mapping file and creates hardlinks
+ * from checkpoint files to the checkpoint_data directory. Original files are
+ * then deleted since they're accessible via hardlinks, saving disk space while
+ * maintaining checkpoint functionality.
+ * </p>
+ */
+public class InodeMetadataRocksDBCheckpoint implements DBCheckpoint {
+
+  private final Path checkpointLocation;
+  private final long checkpointTimestamp = System.currentTimeMillis();
+  private final long latestSequenceNumber = -1;
+  private final long checkpointCreationTimeTaken = 0L;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(InodeMetadataRocksDBCheckpoint.class);
+
+  public static final String OM_HARDLINK_FILE = "hardLinkFile";
+
+  public InodeMetadataRocksDBCheckpoint(Path checkpointLocation) throws 
IOException {
+    this.checkpointLocation = checkpointLocation;
+    installHardLinks();
+  }
+
+  @Override
+  public Path getCheckpointLocation() {
+    return this.checkpointLocation;
+  }
+
+  @Override
+  public long getCheckpointTimestamp() {
+    return this.checkpointTimestamp;
+  }
+
+  @Override
+  public long getLatestSequenceNumber() {
+    return this.latestSequenceNumber;
+  }
+
+  @Override
+  public long checkpointCreationTimeTaken() {
+    return this.checkpointCreationTimeTaken;
+  }
+
+  @Override
+  public void cleanupCheckpoint() throws IOException {
+    LOG.info("Cleaning up RocksDB checkpoint at {}",
+        checkpointLocation.toString());
+    FileUtils.deleteDirectory(checkpointLocation.toFile());
+  }
+
+  private void installHardLinks() throws IOException {
+    File hardLinkFile = new File(checkpointLocation.toFile(),
+        OM_HARDLINK_FILE);
+
+    if (!hardLinkFile.exists()) {
+      LOG.error("Hardlink file : {} does not exist.", hardLinkFile);
+      return;
+    }
+
+    // Track source files that need to be deleted after hardlink creation
+    List<Path> sourceFilesToDelete = new ArrayList<>();
+
+    // Read file and create hardlinks directly in checkpointLocation
+    try (Stream<String> s = Files.lines(hardLinkFile.toPath())) {
+      List<String> lines = s.collect(Collectors.toList());
+
+      // Create hardlinks directly in checkpointLocation
+      for (String l : lines) {
+        String[] parts = l.split("\t");
+        if (parts.length != 2) {
+          LOG.warn("Skipping malformed line in hardlink file: {}", l);
+          continue;
+        }
+        String to = parts[0];      // Destination path (relative)
+        String from = parts[1];    // Source path (relative to 
checkpointLocation)
+
+        Path sourcePath = Paths.get(checkpointLocation.toString(), from);
+        Path targetPath = Paths.get(checkpointLocation.toString(), to);
+
+        // Track source file for later deletion
+        if (Files.exists(sourcePath)) {
+          sourceFilesToDelete.add(sourcePath);
+        }
+
+        // Make parent directory if it doesn't exist
+        Path parent = targetPath.getParent();
+        if (parent != null && !Files.exists(parent)) {

Review Comment:
   why a redundant null check?



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java:
##########
@@ -121,13 +122,13 @@ public DBCheckpoint downloadDBSnapshotFromLeader(String 
leaderNodeID)
       numDownloaded.incrementAndGet();
       injectPause();
 
-      RocksDBCheckpoint checkpoint = getCheckpointFromSnapshotFile(targetFile,
+      Path unTarredDb = untarContentsOfTarball(targetFile,
           candidateDir, true);
       LOG.info("Successfully untar the downloaded snapshot {} at {}.",
-          targetFile, checkpoint.getCheckpointLocation());
-      if (ratisSnapshotComplete(checkpoint.getCheckpointLocation())) {
+          targetFile, unTarredDb.toFile().getAbsolutePath());

Review Comment:
   ```suggestion
             targetFile, unTarredDb.toAbsolutePath().toString());
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java:
##########
@@ -4240,89 +4240,118 @@ private void stopTrashEmptier() {
   }
 
   /**
-   * Replace the current OM DB with the new DB checkpoint.
+   * Replaces the OM DB with checkpoint data from leader.
+   * Creates a single parent backup directory containing all current state.
    *
-   * @param lastAppliedIndex the last applied index in the current OM DB.
-   * @param checkpointPath   path to the new DB checkpoint
-   * @return location of backup of the original DB
+   * @param lastAppliedIndex last applied transaction index
+   * @param oldDB current DB directory
+   * @param checkpointLocation checkpoint data directory from leader
+   * @return backup directory containing original state
+   * @throws IOException if operations fail
    */
   File replaceOMDBWithCheckpoint(long lastAppliedIndex, File oldDB,
-      Path checkpointPath) throws IOException {
+      Path checkpointLocation) throws IOException {
 
-    // Take a backup of the current DB
+    // Create single parent backup directory
     String dbBackupName = OzoneConsts.OM_DB_BACKUP_PREFIX +
         lastAppliedIndex + "_" + System.currentTimeMillis();
     File dbDir = oldDB.getParentFile();
-
-    // Backup the active fs and snapshot dirs.
     File dbBackupDir = new File(dbDir, dbBackupName);
+
     if (!dbBackupDir.mkdirs()) {
-      throw new IOException("Failed to make db backup dir: " +
-          dbBackupDir);
+      throw new IOException("Failed to create backup directory: " + 
dbBackupDir);
     }
-    File dbBackup = new File(dbBackupDir, oldDB.getName());
-    File dbSnapshotsDir = new File(dbDir, OM_SNAPSHOT_DIR);
-    File dbSnapshotsBackup = new File(dbBackupDir, OM_SNAPSHOT_DIR);
-    Files.move(oldDB.toPath(), dbBackup.toPath());
-    if (dbSnapshotsDir.exists()) {
-      Files.move(dbSnapshotsDir.toPath(),
-          dbSnapshotsBackup.toPath());
+
+    // Move entire current state to backup (everything in dbDir that we care 
about)
+    File[] currentContents = dbDir.listFiles();
+    if (currentContents != null) {
+      for (File item : currentContents) {
+        // Skip backup directories and marker files
+        if (item.getName().startsWith(OzoneConsts.OM_DB_BACKUP_PREFIX) ||
+            item.getName().equals(DB_TRANSIENT_MARKER)) {
+          continue;
+        }
+
+        // Move to backup - Files.move handles both files and directories 
recursively
+        Path targetPath = dbBackupDir.toPath().resolve(item.getName());
+        Files.move(item.toPath(), targetPath, 
StandardCopyOption.REPLACE_EXISTING);
+      }
     }
 
-    moveCheckpointFiles(oldDB, checkpointPath, dbDir, dbBackup, dbSnapshotsDir,
-        dbSnapshotsBackup);
+    // Move checkpoint files
+    moveCheckpointFiles(oldDB, checkpointLocation, dbDir, dbBackupDir);
+
     return dbBackupDir;
   }
 
-  private void moveCheckpointFiles(File oldDB, Path checkpointPath, File dbDir,
-                                   File dbBackup, File dbSnapshotsDir,
-                                   File dbSnapshotsBackup) throws IOException {
-    // Move the new DB checkpoint into the om metadata dir
+  /**
+   * Moves all contents from checkpointLocation to dbDir, replacing existing 
files/dirs.
+   * Uses a single parent backup for rollback on failure.
+   *
+   * @param oldDB the old DB directory (will be replaced)
+   * @param checkpointLocation source directory containing checkpoint data
+   * @param dbDir target directory (parent of oldDB)
+   * @param dbBackupDir backup directory containing the original state
+   * @throws IOException if file operations fail
+   */
+  private void moveCheckpointFiles(File oldDB, Path checkpointLocation, File 
dbDir,
+      File dbBackupDir) throws IOException {
     Path markerFile = new File(dbDir, DB_TRANSIENT_MARKER).toPath();
+
     try {
-      // Create a Transient Marker file. This file will be deleted if the
-      // checkpoint DB is successfully moved to the old DB location or if the
-      // old DB backup is reset to its location. If not, then the OM DB is in
-      // an inconsistent state and this marker file will fail OM from
-      // starting up.
+      // Create transient marker file
       Files.createFile(markerFile);
-      // Link each of the candidate DB files to real DB directory.  This
-      // preserves the links that already exist between files in the
-      // candidate db.
-      OmSnapshotUtils.linkFiles(checkpointPath.toFile(),
-          oldDB);
-      moveOmSnapshotData(oldDB.toPath(), dbSnapshotsDir.toPath());
+      // Move everything from checkpointLocation to dbDir, replacing existing
+      if (!Files.exists(checkpointLocation) || 
!Files.isDirectory(checkpointLocation)) {
+        throw new IOException("Checkpoint data directory does not exist: " + 
checkpointLocation);
+      }
+      try (Stream<Path> checkpointContents = Files.list(checkpointLocation)) {
+        for (Path sourcePath : 
checkpointContents.collect(Collectors.toList())) {
+          Path targetPath = dbDir.toPath().resolve(sourcePath.getFileName());
+          // Delete target if it exists (file or directory)
+          if (Files.exists(targetPath)) {
+            if (Files.isDirectory(targetPath)) {
+              FileUtil.fullyDelete(targetPath.toFile());
+            } else {
+              Files.delete(targetPath);
+            }
+          }
+          // Move source to target
+          Files.move(sourcePath, targetPath, 
StandardCopyOption.REPLACE_EXISTING);

Review Comment:
   Remove REPLACE_EXISTING



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java:
##########
@@ -4240,89 +4240,118 @@ private void stopTrashEmptier() {
   }
 
   /**
-   * Replace the current OM DB with the new DB checkpoint.
+   * Replaces the OM DB with checkpoint data from leader.
+   * Creates a single parent backup directory containing all current state.
    *
-   * @param lastAppliedIndex the last applied index in the current OM DB.
-   * @param checkpointPath   path to the new DB checkpoint
-   * @return location of backup of the original DB
+   * @param lastAppliedIndex last applied transaction index
+   * @param oldDB current DB directory
+   * @param checkpointLocation checkpoint data directory from leader
+   * @return backup directory containing original state
+   * @throws IOException if operations fail
    */
   File replaceOMDBWithCheckpoint(long lastAppliedIndex, File oldDB,
-      Path checkpointPath) throws IOException {
+      Path checkpointLocation) throws IOException {
 
-    // Take a backup of the current DB
+    // Create single parent backup directory
     String dbBackupName = OzoneConsts.OM_DB_BACKUP_PREFIX +
         lastAppliedIndex + "_" + System.currentTimeMillis();
     File dbDir = oldDB.getParentFile();
-
-    // Backup the active fs and snapshot dirs.
     File dbBackupDir = new File(dbDir, dbBackupName);
+
     if (!dbBackupDir.mkdirs()) {
-      throw new IOException("Failed to make db backup dir: " +
-          dbBackupDir);
+      throw new IOException("Failed to create backup directory: " + 
dbBackupDir);
     }
-    File dbBackup = new File(dbBackupDir, oldDB.getName());
-    File dbSnapshotsDir = new File(dbDir, OM_SNAPSHOT_DIR);
-    File dbSnapshotsBackup = new File(dbBackupDir, OM_SNAPSHOT_DIR);
-    Files.move(oldDB.toPath(), dbBackup.toPath());
-    if (dbSnapshotsDir.exists()) {
-      Files.move(dbSnapshotsDir.toPath(),
-          dbSnapshotsBackup.toPath());
+
+    // Move entire current state to backup (everything in dbDir that we care 
about)
+    File[] currentContents = dbDir.listFiles();
+    if (currentContents != null) {
+      for (File item : currentContents) {
+        // Skip backup directories and marker files
+        if (item.getName().startsWith(OzoneConsts.OM_DB_BACKUP_PREFIX) ||
+            item.getName().equals(DB_TRANSIENT_MARKER)) {
+          continue;
+        }
+
+        // Move to backup - Files.move handles both files and directories 
recursively
+        Path targetPath = dbBackupDir.toPath().resolve(item.getName());
+        Files.move(item.toPath(), targetPath, 
StandardCopyOption.REPLACE_EXISTING);
+      }
     }
 
-    moveCheckpointFiles(oldDB, checkpointPath, dbDir, dbBackup, dbSnapshotsDir,
-        dbSnapshotsBackup);
+    // Move checkpoint files
+    moveCheckpointFiles(oldDB, checkpointLocation, dbDir, dbBackupDir);
+
     return dbBackupDir;
   }
 
-  private void moveCheckpointFiles(File oldDB, Path checkpointPath, File dbDir,
-                                   File dbBackup, File dbSnapshotsDir,
-                                   File dbSnapshotsBackup) throws IOException {
-    // Move the new DB checkpoint into the om metadata dir
+  /**
+   * Moves all contents from checkpointLocation to dbDir, replacing existing 
files/dirs.
+   * Uses a single parent backup for rollback on failure.
+   *
+   * @param oldDB the old DB directory (will be replaced)
+   * @param checkpointLocation source directory containing checkpoint data
+   * @param dbDir target directory (parent of oldDB)
+   * @param dbBackupDir backup directory containing the original state
+   * @throws IOException if file operations fail
+   */
+  private void moveCheckpointFiles(File oldDB, Path checkpointLocation, File 
dbDir,
+      File dbBackupDir) throws IOException {
     Path markerFile = new File(dbDir, DB_TRANSIENT_MARKER).toPath();
+
     try {
-      // Create a Transient Marker file. This file will be deleted if the
-      // checkpoint DB is successfully moved to the old DB location or if the
-      // old DB backup is reset to its location. If not, then the OM DB is in
-      // an inconsistent state and this marker file will fail OM from
-      // starting up.
+      // Create transient marker file
       Files.createFile(markerFile);
-      // Link each of the candidate DB files to real DB directory.  This
-      // preserves the links that already exist between files in the
-      // candidate db.
-      OmSnapshotUtils.linkFiles(checkpointPath.toFile(),
-          oldDB);
-      moveOmSnapshotData(oldDB.toPath(), dbSnapshotsDir.toPath());
+      // Move everything from checkpointLocation to dbDir, replacing existing
+      if (!Files.exists(checkpointLocation) || 
!Files.isDirectory(checkpointLocation)) {
+        throw new IOException("Checkpoint data directory does not exist: " + 
checkpointLocation);
+      }
+      try (Stream<Path> checkpointContents = Files.list(checkpointLocation)) {
+        for (Path sourcePath : 
checkpointContents.collect(Collectors.toList())) {
+          Path targetPath = dbDir.toPath().resolve(sourcePath.getFileName());
+          // Delete target if it exists (file or directory)
+          if (Files.exists(targetPath)) {

Review Comment:
   If the target path still exists we should throw an exception. This should be 
an IllegalState and not silently proceed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to