This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 053ab55fe97 HDDS-14721. Make OM bootstrap implementation configurable 
(#9830)
053ab55fe97 is described below

commit 053ab55fe97e358241fc26eba0c94c8d3b9cc2a4
Author: Sadanand Shenoy <[email protected]>
AuthorDate: Tue Mar 3 13:26:01 2026 +0530

    HDDS-14721. Make OM bootstrap implementation configurable (#9830)
---
 .../common/src/main/resources/ozone-default.xml    |   9 +
 .../java/org/apache/hadoop/hdds/utils/HAUtils.java |  29 +++
 .../utils/db/InodeMetadataRocksDBCheckpoint.java   |  83 +++++++--
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   4 +
 .../hadoop/ozone/om/helpers/OMNodeDetails.java     |  17 +-
 .../hadoop/ozone/om/helpers/TestOMNodeDetails.java |  24 +++
 .../apache/hadoop/ozone/om/TestOMBootstrap.java    | 202 +++++++++++++++++++++
 .../hadoop/ozone/om/TestOMRatisSnapshots.java      | 103 ++++++++++-
 .../hadoop/ozone/shell/TestOzoneTenantShell.java   |   5 +-
 .../src/test/resources/auditlog.properties         |   6 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  28 ++-
 .../ozone/om/ratis/OzoneManagerStateMachine.java   |   9 +-
 .../om/ratis_snapshot/OmRatisSnapshotProvider.java |  16 +-
 .../TestOmRatisSnapshotProvider.java               |   4 +-
 .../defrag/TestInodeMetadataRocksDBCheckpoint.java |  24 +++
 15 files changed, 522 insertions(+), 41 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index dc52800642a..8e83a4da5ed 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1603,6 +1603,15 @@
     </description>
   </property>
 
+  <property>
+    <name>ozone.om.db.checkpoint.use.inode.based.transfer</name>
+    <value>true</value>
+    <tag>OZONE, OM</tag>
+    <description>
+      Denotes if the OM bootstrap inode based transfer implementation is set 
as default.
+    </description>
+  </property>
+
   <property>
     <name>hdds.datanode.plugins</name>
     <value/>
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
index 844978c44f6..5740450419c 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
@@ -23,6 +23,7 @@
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_INFO_WAIT_DURATION_DEFAULT;
 import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
 import static org.apache.hadoop.ozone.OzoneConsts.DB_TRANSIENT_MARKER;
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -358,6 +359,34 @@ public static List<String> getExistingFiles(File db) 
throws IOException {
     return sstList;
   }
 
+  /**
+   * Old Implementation i.e. when useInodeBasedCheckpoint = false,
+   * the relative paths are sent in the toExcludeFile list to the leader OM.
+   * @param db candidate OM Dir
+   * @return a list of SST File paths relative to the DB.
+   * @throws IOException in case of failure
+   */
+  public static List<String> getExistingSstFilesRelativeToDbDir(File db)
+      throws IOException {
+    List<String> sstList = new ArrayList<>();
+    if (!db.exists()) {
+      return sstList;
+    }
+
+    int truncateLength = db.toString().length() + 1;
+    // Walk the db dir and get all sst files including omSnapshot files.
+    try (Stream<Path> files = Files.walk(db.toPath())) {
+      sstList =
+          files.filter(path -> path.toString().endsWith(ROCKSDB_SST_SUFFIX)).
+              map(p -> p.toString().substring(truncateLength)).
+              collect(Collectors.toList());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Scanned SST files {} in {}.", sstList, 
db.getAbsolutePath());
+      }
+    }
+    return sstList;
+  }
+
   /**
    * Retry forever until CA list matches expected count.
    * Fails fast on authentication exceptions.
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/InodeMetadataRocksDBCheckpoint.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/InodeMetadataRocksDBCheckpoint.java
index f70e4b2a515..db5909d9e25 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/InodeMetadataRocksDBCheckpoint.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/InodeMetadataRocksDBCheckpoint.java
@@ -27,6 +27,7 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,10 +37,12 @@
  * for inode-based metadata checkpoints.
  *
  * <p>During construction, reads a hardlink mapping file and creates hardlinks
- * from checkpoint files to the checkpoint_data directory. Original files are
- * then deleted since they're accessible via hardlinks, saving disk space while
- * maintaining checkpoint functionality.
+ * from checkpoint files to the checkpoint directory. When deleteSourceFiles
+ * is true (v2 format), original files are deleted since they're accessible via
+ * hardlinks. When false (v1 format), source files are preserved as they are 
real
+ * paths.
  * </p>
+ *
  */
 public class InodeMetadataRocksDBCheckpoint implements DBCheckpoint {
 
@@ -51,9 +54,23 @@ public class InodeMetadataRocksDBCheckpoint implements 
DBCheckpoint {
 
   public static final String OM_HARDLINK_FILE = "hardLinkFile";
 
+  /**
+   * Creates checkpoint with deleteSourceFiles=true (v2 format).
+   */
   public InodeMetadataRocksDBCheckpoint(Path checkpointLocation) throws 
IOException {
+    this(checkpointLocation, true);
+  }
+
+  /**
+   * @param checkpointLocation path to untarred checkpoint directory
+   * @param deleteSourceFiles when true (v2), delete source files after 
creating
+   *        hardlinks, when false (v1), preserve them.
+   */
+  public InodeMetadataRocksDBCheckpoint(Path checkpointLocation,
+      boolean deleteSourceFiles) throws IOException {
     this.checkpointLocation = checkpointLocation;
-    installHardLinks();
+    installHardLinks(deleteSourceFiles);
+    moveActiveDbFilesToOmDbIfNeeded();
   }
 
   @Override
@@ -83,7 +100,7 @@ public void cleanupCheckpoint() throws IOException {
     FileUtils.deleteDirectory(checkpointLocation.toFile());
   }
 
-  private void installHardLinks() throws IOException {
+  private void installHardLinks(boolean deleteSourceFiles) throws IOException {
     File hardLinkFile = new File(checkpointLocation.toFile(),
         OM_HARDLINK_FILE);
 
@@ -112,8 +129,8 @@ private void installHardLinks() throws IOException {
         Path sourcePath = checkpointLocation.resolve(from).toAbsolutePath();
         Path targetPath = checkpointLocation.resolve(to).toAbsolutePath();
 
-        // Track source file for later deletion
-        if (Files.exists(sourcePath)) {
+        // Track source file for later deletion (only when deleteSourceFiles)
+        if (deleteSourceFiles && Files.exists(sourcePath)) {
           sourceFilesToDelete.add(sourcePath);
         }
 
@@ -132,19 +149,51 @@ private void installHardLinks() throws IOException {
         throw new IOException("Failed to delete: " + hardLinkFile);
       }
 
-      // Delete all source files after hardlinks are created
-      for (Path sourcePath : sourceFilesToDelete) {
-        try {
-          if (Files.isDirectory(sourcePath)) {
-            FileUtils.deleteDirectory(sourcePath.toFile());
-          } else {
-            Files.delete(sourcePath);
+      // Delete source files only when deleteSourceFiles is true (v2 format)
+      if (deleteSourceFiles) {
+        for (Path sourcePath : sourceFilesToDelete) {
+          try {
+            if (Files.isDirectory(sourcePath)) {
+              FileUtils.deleteDirectory(sourcePath.toFile());
+            } else {
+              Files.delete(sourcePath);
+            }
+          } catch (IOException e) {
+            LOG.warn("Failed to delete source file {}: {}", sourcePath, 
e.getMessage());
+            // Continue with other files
           }
-        } catch (IOException e) {
-          LOG.warn("Failed to delete source file {}: {}", sourcePath, 
e.getMessage());
-          // Continue with other files
         }
       }
     }
   }
+
+  /**
+   * For v1 format, active DB files (SST, CURRENT, etc.) will be at checkpoint 
root
+   * with no om.db. Create om.db and move them so installCheckpoint always sees
+   * checkpointLocation/om.db and checkpointLocation/db.snapshots.
+   */
+  private void moveActiveDbFilesToOmDbIfNeeded() throws IOException {
+    Path omDbDir = checkpointLocation.resolve(OzoneConsts.OM_DB_NAME);
+    if (Files.exists(omDbDir) && Files.isDirectory(omDbDir)) {
+      return;
+    }
+    try (Stream<Path> entries = Files.list(checkpointLocation)) {
+      List<Path> toMove = entries
+          .filter(p -> {
+            String name = p.getFileName() != null ? p.getFileName().toString() 
: "";
+            return p.toFile().isFile() && !name.equals(OM_HARDLINK_FILE)
+                && 
!name.equals(HddsServerUtil.OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME);
+          })
+          .collect(Collectors.toList());
+      if (toMove.isEmpty()) {
+        return;
+      }
+      LOG.info("Creating om.db and moving {} active DB files for v1 format", 
toMove.size());
+      Files.createDirectories(omDbDir);
+      for (Path source : toMove) {
+        Path target = omDbDir.resolve(source.getFileName());
+        Files.move(source, target);
+      }
+    }
+  }
 }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index b831af6d9c0..44dc80f01ec 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -226,6 +226,10 @@ public final class OMConfigKeys {
   public static final long
       OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT = 10737418240L;
 
+  public static final String OZONE_OM_DB_CHECKPOINT_USE_INODE_BASED_KEY
+      = "ozone.om.db.checkpoint.use.inode.based.transfer";
+  public static final boolean OZONE_OM_DB_CHECKPOINT_USE_INODE_BASED_DEFAULT = 
true;
+
   // OM Ratis server configurations
   public static final String OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_KEY
       = "ozone.om.ratis.server.request.timeout";
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
index ff51cfa7adc..e04641618ac 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.om.helpers;
 
+import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_HTTP_ENDPOINT;
 import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_HTTP_ENDPOINT_V2;
 import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA;
 import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
@@ -182,14 +183,28 @@ public OMNodeDetails build() {
     }
   }
 
+  /**
+   * Returns v2 endpoint URL. Use {@link 
#getOMDBCheckpointEndpointUrl(boolean, boolean, boolean)}
+   * for configurable v1/v2 selection.
+   */
   public URL getOMDBCheckpointEndpointUrl(boolean isHttp, boolean flush)
       throws IOException {
+    return getOMDBCheckpointEndpointUrl(true, isHttp, flush);
+  }
+
+  /**
+   * @param useV2 when true use /v2/dbCheckpoint, when false use /dbCheckpoint
+   */
+  public URL getOMDBCheckpointEndpointUrl(boolean useV2, boolean isHttp,
+      boolean flush) throws IOException {
+    String path = useV2 ? OZONE_DB_CHECKPOINT_HTTP_ENDPOINT_V2
+        : OZONE_DB_CHECKPOINT_HTTP_ENDPOINT;
     URL url;
     try {
       URIBuilder urlBuilder = new URIBuilder().
           setScheme(isHttp ? "http" : "https").
           setHost(isHttp ? getHttpAddress() : getHttpsAddress()).
-          setPath(OZONE_DB_CHECKPOINT_HTTP_ENDPOINT_V2).
+          setPath(path).
           addParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA, "true").
           addParameter(OZONE_DB_CHECKPOINT_REQUEST_FLUSH,
               flush ? "true" : "false");
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOMNodeDetails.java
 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOMNodeDetails.java
index 9a9951c20a2..4b3eaef0018 100644
--- 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOMNodeDetails.java
+++ 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOMNodeDetails.java
@@ -342,6 +342,30 @@ public void testGetOMDBCheckpointEndpointUrlHttps() throws 
IOException {
     assertEquals(HOST_ADDRESS + ":9875", url.getAuthority());
   }
 
+  /**
+   * Test getOMDBCheckpointEndpointUrl with useV2 parameter.
+   */
+  @Test
+  public void testGetOMDBCheckpointEndpointUrlUseV2() throws IOException {
+    OMNodeDetails nodeDetails = new OMNodeDetails.Builder()
+        .setOMServiceId(OM_SERVICE_ID)
+        .setOMNodeId(OM_NODE_ID)
+        .setHostAddress(HOST_ADDRESS)
+        .setRpcPort(RPC_PORT)
+        .setRatisPort(RATIS_PORT)
+        .setHttpAddress(HOST_ADDRESS + ":9874")
+        .setHttpsAddress(HOST_ADDRESS + ":9875")
+        .build();
+
+    URL urlV2 = nodeDetails.getOMDBCheckpointEndpointUrl(true, true, false);
+    assertNotNull(urlV2);
+    assertTrue(urlV2.getPath().endsWith("/v2/dbCheckpoint"));
+
+    URL urlV1 = nodeDetails.getOMDBCheckpointEndpointUrl(false, true, false);
+    assertNotNull(urlV1);
+    assertTrue(urlV1.getPath().endsWith("/dbCheckpoint"));
+  }
+
   /**
    * Test getOMNodeAddressFromConf.
    */
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMBootstrap.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMBootstrap.java
new file mode 100644
index 00000000000..2f63f0b6278
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMBootstrap.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_HTTP_ENDPOINT;
+import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_HTTP_ENDPOINT_V2;
+import static org.apache.hadoop.ozone.om.TestOMRatisSnapshots.checkSnapshot;
+import static 
org.apache.hadoop.ozone.om.TestOMRatisSnapshots.createOzoneSnapshot;
+import static org.apache.hadoop.ozone.om.TestOMRatisSnapshots.writeKeys;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.hdds.utils.TransactionInfo;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.client.BucketArgs;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.hadoop.ozone.conf.OMClientConfig;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServerConfig;
+import org.apache.hadoop.ozone.om.ratis_snapshot.OmRatisSnapshotProvider;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.GenericTestUtils.LogCapturer;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.Parameter;
+import org.junit.jupiter.params.ParameterizedClass;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * Tests that bootstrapping a follower works for both v1/v2 checkpoint format.
+ * A stopped follower is restarted after the leader has advanced beyond
+ * the purge gap, forcing InstallSnapshot/bootstrap.
+ */
+@ParameterizedClass
+@ValueSource(booleans = {false, true})
+public class TestOMBootstrap {
+
+  private static final String OM_SERVICE_ID = "om-service-test1";
+  private static final int NUM_OF_OMS = 3;
+  private static final long SNAPSHOT_THRESHOLD = 50;
+  private static final BucketLayout TEST_BUCKET_LAYOUT = 
BucketLayout.OBJECT_STORE;
+
+  private MiniOzoneHAClusterImpl cluster;
+  private ObjectStore objectStore;
+  private OzoneBucket ozoneBucket;
+  private String volumeName;
+  private String bucketName;
+  private OzoneClient client;
+
+  @Parameter
+  private boolean useV2Checkpoint;
+
+  @BeforeEach
+  public void init() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setBoolean(OMConfigKeys.OZONE_OM_DB_CHECKPOINT_USE_INODE_BASED_KEY, 
useV2Checkpoint);
+    conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, 5);
+    conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_KEY, 16, 
StorageUnit.KB);
+    
conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, 
16, StorageUnit.KB);
+    
conf.setLong(OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY, 
SNAPSHOT_THRESHOLD);
+
+    OzoneManagerRatisServerConfig omRatisConf = 
conf.getObject(OzoneManagerRatisServerConfig.class);
+    omRatisConf.setLogAppenderWaitTimeMin(10);
+    conf.setFromObject(omRatisConf);
+
+    OMClientConfig clientConfig = conf.getObject(OMClientConfig.class);
+    clientConfig.setRpcTimeOut(TimeUnit.SECONDS.toMillis(5));
+    conf.setFromObject(clientConfig);
+
+    cluster = MiniOzoneCluster.newHABuilder(conf)
+        .setOMServiceId(OM_SERVICE_ID)
+        .setNumOfOzoneManagers(NUM_OF_OMS)
+        .setNumOfActiveOMs(NUM_OF_OMS)
+        .build();
+    cluster.waitForClusterToBeReady();
+    client = OzoneClientFactory.getRpcClient(OM_SERVICE_ID, conf);
+    objectStore = client.getObjectStore();
+
+    volumeName = "volume" + RandomStringUtils.secure().nextNumeric(5);
+    bucketName = "bucket" + RandomStringUtils.secure().nextNumeric(5);
+    VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+        .setOwner("user" + RandomStringUtils.secure().nextNumeric(5))
+        .setAdmin("admin" + RandomStringUtils.secure().nextNumeric(5))
+        .build();
+    objectStore.createVolume(volumeName, createVolumeArgs);
+    OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
+    retVolumeinfo.createBucket(bucketName,
+        BucketArgs.newBuilder().setBucketLayout(TEST_BUCKET_LAYOUT).build());
+    ozoneBucket = retVolumeinfo.getBucket(bucketName);
+  }
+
+  @AfterEach
+  public void shutdown() {
+    IOUtils.closeQuietly(client);
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testBootstrapFollower() throws Exception {
+    final String leaderOMNodeId = OmTestUtil
+        .getCurrentOmProxyNodeId(objectStore);
+    OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+
+    String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
+    OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+    cluster.stopOzoneManager(followerNodeId);
+    GenericTestUtils.waitFor(() -> !followerOM.isRunning() &&
+            !followerOM.isOmRpcServerRunning(),
+        100, 15_000);
+
+    List<String> keys = writeKeys(ozoneBucket, 25);
+    SnapshotInfo snapshotInfo1 = createOzoneSnapshot(objectStore, volumeName,
+        bucketName, leaderOM, "snap1");
+    List<String> moreKeys = writeKeys(ozoneBucket, 5);
+    SnapshotInfo snapshotInfo2 = createOzoneSnapshot(objectStore, volumeName,
+        bucketName, leaderOM, "snap2");
+
+    LogCapturer ozoneManagerLog = LogCapturer.captureLogs(OzoneManager.class);
+    LogCapturer omRatisSnapshotProviderLog = LogCapturer.captureLogs(
+        OmRatisSnapshotProvider.class);
+    cluster.restartOzoneManager(followerOM, true);
+
+    TransactionInfo transactionInfo = TransactionInfo.readTransactionInfo(
+        leaderOM.getMetadataManager());
+    TermIndex leaderOMTermIndex = TermIndex.valueOf(transactionInfo.getTerm(),
+        transactionInfo.getTransactionIndex());
+    long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
+
+    GenericTestUtils.waitFor(() -> {
+      long index = 
followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex();
+      return index >= leaderOMSnapshotIndex - 1;
+    }, 100, 30_000);
+
+    assertLogCapture(ozoneManagerLog, "Reloaded OM state");
+    assertLogCapture(ozoneManagerLog, "Install Checkpoint is finished");
+    assertLogCapture(ozoneManagerLog, "Install Checkpoint is finished");
+    if (useV2Checkpoint) {
+      assertLogCapture(omRatisSnapshotProviderLog, 
OZONE_DB_CHECKPOINT_HTTP_ENDPOINT_V2);
+    } else {
+      assertLogCapture(omRatisSnapshotProviderLog, 
OZONE_DB_CHECKPOINT_HTTP_ENDPOINT);
+    }
+
+    OMMetadataManager followerOMMetaMngr = followerOM.getMetadataManager();
+    assertNotNull(followerOMMetaMngr.getVolumeTable().get(
+        followerOMMetaMngr.getVolumeKey(volumeName)));
+    assertNotNull(followerOMMetaMngr.getBucketTable().get(
+        followerOMMetaMngr.getBucketKey(volumeName, bucketName)));
+    for (String key : keys) {
+      assertNotNull(followerOMMetaMngr.getKeyTable(TEST_BUCKET_LAYOUT)
+          .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
+    }
+    for (String key : moreKeys) {
+      assertNotNull(followerOMMetaMngr.getKeyTable(TEST_BUCKET_LAYOUT)
+          .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
+    }
+
+    GenericTestUtils.waitFor(() -> followerOM.isOmRpcServerRunning(),
+        100, 30_000);
+
+    checkSnapshot(volumeName, bucketName, leaderOM, followerOM,
+        "snap1", keys, snapshotInfo1);
+    checkSnapshot(volumeName, bucketName, leaderOM, followerOM,
+        "snap2", moreKeys, snapshotInfo2);
+  }
+
+  private void assertLogCapture(LogCapturer logCapture, String msg)
+      throws InterruptedException, TimeoutException {
+    GenericTestUtils.waitFor(() -> logCapture.getOutput().contains(msg), 100, 
30_000);
+  }
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
index 7134463db3a..80db722b179 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
@@ -98,6 +98,9 @@
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.Parameter;
+import org.junit.jupiter.params.ParameterizedClass;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
@@ -107,6 +110,8 @@
 /**
  * Tests the Ratis snapshots feature in OM.
  */
+@ParameterizedClass
+@ValueSource(booleans = {false, true})
 public class TestOMRatisSnapshots {
   // tried up to 1000 snapshots and this test works, but some of the
   //  timeouts have to be increased.
@@ -131,7 +136,8 @@ public class TestOMRatisSnapshots {
   private static final BucketLayout TEST_BUCKET_LAYOUT =
       BucketLayout.OBJECT_STORE;
   private OzoneClient client;
-  private GenericTestUtils.PrintStreamCapturer output;
+  @Parameter
+  private boolean useInodeBasedCheckpoint;
 
   @BeforeEach
   public void init(TestInfo testInfo) throws Exception {
@@ -141,6 +147,7 @@ public void init(TestInfo testInfo) throws Exception {
         StorageUnit.KB);
     conf.setStorageSize(OMConfigKeys.
         OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, 16, StorageUnit.KB);
+    conf.setBoolean(OMConfigKeys.OZONE_OM_DB_CHECKPOINT_USE_INODE_BASED_KEY, 
useInodeBasedCheckpoint);
     long snapshotThreshold = SNAPSHOT_THRESHOLD;
     // TODO: refactor tests to run under a new class with different configs.
     if (testInfo.getTestMethod().isPresent() &&
@@ -148,7 +155,6 @@ public void init(TestInfo testInfo) throws Exception {
             .equals("testInstallSnapshot")) {
       snapshotThreshold = SNAPSHOT_THRESHOLD * 10;
       AuditLogTestUtils.enableAuditLog();
-      output = GenericTestUtils.captureOut();
     }
     conf.setLong(
         OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
@@ -213,8 +219,8 @@ public void testInstallSnapshot(@TempDir Path tempDir) 
throws Exception {
     List<Set<String>> sstSetList = new ArrayList<>();
     FaultInjector faultInjector =
         new SnapshotMaxSizeInjector(leaderOM,
-            followerOM.getOmSnapshotProvider().getSnapshotDir(),
-            sstSetList, tempDir);
+            followerOM.getOmSnapshotProvider().getSnapshotDir(), sstSetList,
+            tempDir, useInodeBasedCheckpoint);
     followerOM.getOmSnapshotProvider().setInjector(faultInjector);
 
     // Create some snapshots, each with new keys
@@ -288,8 +294,10 @@ public void testInstallSnapshot(@TempDir Path tempDir) 
throws Exception {
 
     assertLogCapture(logCapture,
         "Install Checkpoint is finished");
-    assertThat(output.get()).contains("op=DB_CHECKPOINT_INSTALL 
{\"leaderId\":\"" + leaderOMNodeId + "\",\"term\":\"" +
-        leaderOMSnapshotTermIndex, "\"lastAppliedIndex\":\"" + 
followerOMLastAppliedIndex);
+    String toMatch = String.format(
+        "op=DB_CHECKPOINT_INSTALL 
{\"leaderId\":\"%s\",\"term\":\"%d\",\"lastAppliedIndex\":\"%d\"}",
+        leaderOMNodeId, leaderOMSnapshotTermIndex, followerOMLastAppliedIndex);
+    assertTrue(AuditLogTestUtils.auditLogContains(toMatch));
 
     // Read & Write after snapshot installed.
     List<String> newKeys = writeKeys(1);
@@ -313,11 +321,16 @@ public void testInstallSnapshot(@TempDir Path tempDir) 
throws Exception {
     // Confirm that there was no overlap of sst files
     // between the individual tarballs.
     assertEquals(sstFileUnion.size(), sstFileCount);
-
-    output.reset();
   }
 
   private void checkSnapshot(OzoneManager leaderOM, OzoneManager followerOM,
+      String snapshotName,
+      List<String> keys, SnapshotInfo snapshotInfo) throws RocksDBException, 
IOException {
+    checkSnapshot(volumeName, bucketName, leaderOM, followerOM, snapshotName, 
keys, snapshotInfo);
+  }
+
+  static void checkSnapshot(String volumeName, String bucketName,
+      OzoneManager leaderOM, OzoneManager followerOM,
                              String snapshotName,
                              List<String> keys, SnapshotInfo snapshotInfo)
       throws IOException, RocksDBException {
@@ -1040,6 +1053,46 @@ public void testInstallCorruptedCheckpointFailure() 
throws Exception {
     assertLogCapture(logCapture, msg);
   }
 
+  @Test
+  public void testInstallSnapshotFromLeaderFailedDownloadCleanupSucceeds()
+      throws Exception {
+    final String leaderOMNodeId = 
OmTestUtil.getCurrentOmProxyNodeId(objectStore);
+    OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+    String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
+    if (cluster.isOMActive(followerNodeId)) {
+      followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
+    }
+    OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+    File candidateDir = followerOM.getOmSnapshotProvider().getCandidateDir();
+    assertTrue(candidateDir.exists(),
+        "Candidate dir should exist before download attempt");
+
+    // Inject fault: throw on first pause (after first download part, before 
untar)
+    FaultInjector faultInjector =
+        new ThrowOnPauseFaultInjector("Simulated download failure for test");
+    followerOM.getOmSnapshotProvider().setInjector(faultInjector);
+
+    // Advance leader so follower will need install snapshot when started
+    OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();
+    writeKeysToIncreaseLogIndex(leaderRatisServer, 100);
+
+    // Start follower - Ratis will trigger install snapshot
+    cluster.startInactiveOM(followerNodeId);
+
+    // Wait for install snapshot attempt to complete (download fails, cleanup 
runs)
+    GenericTestUtils.waitFor(() -> {
+      return !candidateDir.exists() || (candidateDir.list() != null && 
candidateDir.list().length == 0);
+    }, 500, 10_000);
+
+    // Verify cleanup succeeded: candidate dir is empty
+    String[] filesInCandidate = candidateDir.exists() ? candidateDir.list() : 
new String[0];
+    assertNotNull(filesInCandidate);
+    assertEquals(0, filesInCandidate.length,
+        "Candidate dir should be cleaned after failed download");
+    // Clear injector
+    followerOM.getOmSnapshotProvider().setInjector(null);
+  }
+
   /**
    * Moves all contents from the checkpoint location into the omDbDir.
    * This reorganizes the checkpoint structure so that all checkpoint files
@@ -1097,6 +1150,12 @@ private void moveCheckpointContentsToOmDbDir(Path 
checkpointLocation, Path omDbD
 
   private SnapshotInfo createOzoneSnapshot(OzoneManager leaderOM, String name)
       throws IOException {
+    return createOzoneSnapshot(objectStore, volumeName, bucketName, leaderOM, 
name);
+  }
+
+  static SnapshotInfo createOzoneSnapshot(ObjectStore objectStore, String 
volumeName, String bucketName,
+      OzoneManager leaderOM, String name)
+      throws IOException {
     objectStore.createSnapshot(volumeName, bucketName, name);
 
     String tableKey = SnapshotInfo.getTableKey(volumeName,
@@ -1130,6 +1189,10 @@ private List<String> writeKeysToIncreaseLogIndex(
   }
 
   private List<String> writeKeys(long keyCount) throws IOException {
+    return writeKeys(ozoneBucket, keyCount);
+  }
+
+  static List<String> writeKeys(OzoneBucket ozoneBucket, long keyCount) throws 
IOException {
     List<String> keys = new ArrayList<>();
     long index = 0;
     while (index < keyCount) {
@@ -1193,13 +1256,16 @@ private static class SnapshotMaxSizeInjector extends 
FaultInjector {
     private final File snapshotDir;
     private final List<Set<String>> sstSetList;
     private final Path tempDir;
+    private boolean useInodeBasedCheckpoint;
 
     SnapshotMaxSizeInjector(OzoneManager om, File snapshotDir,
-                            List<Set<String>> sstSetList, Path tempDir) {
+                            List<Set<String>> sstSetList, Path tempDir,
+        boolean useInodeBasedCheckpoint) {
       this.om = om;
       this.snapshotDir = snapshotDir;
       this.sstSetList = sstSetList;
       this.tempDir = tempDir;
+      this.useInodeBasedCheckpoint = useInodeBasedCheckpoint;
       init();
     }
 
@@ -1234,7 +1300,7 @@ public void pause() throws IOException {
     private long getSizeOfSstFiles(File tarball) throws IOException {
       FileUtil.unTar(tarball, tempDir.toFile());
       InodeMetadataRocksDBCheckpoint obtainedCheckpoint =
-          new InodeMetadataRocksDBCheckpoint(tempDir);
+          new InodeMetadataRocksDBCheckpoint(tempDir, useInodeBasedCheckpoint);
       assertNotNull(obtainedCheckpoint);
       Path omDbDir = 
Paths.get(obtainedCheckpoint.getCheckpointLocation().toString(), OM_DB_NAME);
       assertNotNull(omDbDir);
@@ -1295,4 +1361,21 @@ public void reset() throws IOException {
       init();
     }
   }
+
+  /**
+   * FaultInjector that throws IOException on pause(), simulating a download 
failure
+   * after the first part completes. Used to test cleanup on failed download.
+   */
+  private static class ThrowOnPauseFaultInjector extends FaultInjector {
+    private final IOException toThrow;
+
+    ThrowOnPauseFaultInjector(String message) {
+      this.toThrow = new IOException(message);
+    }
+
+    @Override
+    public void pause() throws IOException {
+      throw toThrow;
+    }
+  }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
index 5d06b3ad90c..ac0b9b25842 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
@@ -36,6 +36,7 @@
 import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.cli.GenericCli;
@@ -353,7 +354,9 @@ public void testAssignAdmin() throws IOException {
   @SuppressWarnings("methodlength")
   public void testOzoneTenantBasicOperations() throws IOException {
 
-    List<String> lines = FileUtils.readLines(AUDIT_LOG_FILE, (String)null);
+    List<String> lines = FileUtils.readLines(AUDIT_LOG_FILE, (String) 
null).stream()
+        .filter(line -> !line.contains("OMSystemAudit"))
+            .collect(Collectors.toList());
     assertEquals(0, lines.size());
 
     executeHA(tenantShell, new String[] {"list"});
diff --git 
a/hadoop-ozone/integration-test/src/test/resources/auditlog.properties 
b/hadoop-ozone/integration-test/src/test/resources/auditlog.properties
index fb644786d0b..c5e9f8d2c7d 100644
--- a/hadoop-ozone/integration-test/src/test/resources/auditlog.properties
+++ b/hadoop-ozone/integration-test/src/test/resources/auditlog.properties
@@ -64,11 +64,15 @@ appender.audit.fileName=audit.log
 appender.audit.layout.type=PatternLayout
 appender.audit.layout.pattern= %d{DEFAULT} | %-5level | %c{1} | %msg | 
%throwable{3} %n
 
-loggers=audit
+loggers=audit,omSystemAudit
 logger.audit.name=OMAudit
 logger.audit.level = INFO
 logger.audit.appenderRefs = audit
 logger.audit.appenderRef.file.ref = AUDITLOG
+logger.omSystemAudit.name=OMSystemAudit
+logger.omSystemAudit.level = INFO
+logger.omSystemAudit.appenderRefs = audit
+logger.omSystemAudit.appenderRef.file.ref = AUDITLOG
 
 rootLogger.level = INFO
 rootLogger.appenderRefs = stdout
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 7a9d66f86df..070dd6d68b5 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -4010,20 +4010,22 @@ public List<OzoneAcl> getAcl(OzoneObj obj) throws 
IOException {
    * @param leaderId peerNodeID of the leader OM
    * @return If checkpoint is installed successfully, return the
    *         corresponding termIndex. Otherwise, return null.
+   * @throws IOException if download or cleanup fails
    */
-  public synchronized TermIndex installSnapshotFromLeader(String leaderId) {
+  public synchronized TermIndex installSnapshotFromLeader(String leaderId) 
throws IOException {
     if (omRatisSnapshotProvider == null) {
       LOG.error("OM Snapshot Provider is not configured as there are no peer " 
+
           "nodes.");
       return null;
     }
 
-    DBCheckpoint omDBCheckpoint;
+    DBCheckpoint omDBCheckpoint = null;
     try {
       omDBCheckpoint = omRatisSnapshotProvider.
           downloadDBSnapshotFromLeader(leaderId);
     } catch (IOException ex) {
       LOG.error("Failed to download snapshot from Leader {}.", leaderId,  ex);
+      cleanupCheckpoint(omDBCheckpoint);
       return null;
     }
 
@@ -4037,13 +4039,31 @@ public synchronized TermIndex 
installSnapshotFromLeader(String leaderId) {
     } catch (Exception ex) {
       LOG.error("Failed to install snapshot from Leader OM.", ex);
     } finally {
+      cleanupCheckpoint(omDBCheckpoint);
+    }
+    return termIndex;
+  }
+
+  private void cleanupCheckpoint(DBCheckpoint omDBCheckpoint) throws 
IOException {
+    if (omDBCheckpoint != null) {
       try {
         omDBCheckpoint.cleanupCheckpoint();
       } catch (IOException e) {
-        LOG.error("Failed to cleanup checkpoint at {}", 
omDBCheckpoint.getCheckpointLocation(), e);
+        LOG.error("Failed to cleanup checkpoint at {}",
+            omDBCheckpoint.getCheckpointLocation(), e);
+      }
+    } else {
+      // Download failed; clean up any partial content in candidate dir
+      File candidateDir = omRatisSnapshotProvider.getCandidateDir();
+      if (candidateDir.exists()) {
+        try {
+          org.apache.commons.io.FileUtils.deleteDirectory(candidateDir);
+        } catch (IOException ioe) {
+          LOG.error("Failed to delete candidate dir: {}", candidateDir, ioe);
+          throw new IOException("Failed to cleanup candidate dir after 
download failure", ioe);
+        }
       }
     }
-    return termIndex;
   }
 
   /**
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index ddad0365291..2cd27687694 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -30,6 +30,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
@@ -582,7 +583,13 @@ public CompletableFuture<TermIndex> 
notifyInstallSnapshotFromLeader(
             "term index: {}", leaderNodeId, firstTermIndexInLog);
 
     return CompletableFuture.supplyAsync(
-        () -> ozoneManager.installSnapshotFromLeader(leaderNodeId),
+        () -> {
+          try {
+            return ozoneManager.installSnapshotFromLeader(leaderNodeId);
+          } catch (IOException e) {
+            throw new CompletionException(e);
+          }
+        },
         installSnapshotExecutor);
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java
index 8d6baf02022..a67c2e9e53b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java
@@ -22,6 +22,8 @@
 import static org.apache.hadoop.ozone.OzoneConsts.MULTIPART_FORM_DATA_BOUNDARY;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
 import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_CHECKPOINT_USE_INODE_BASED_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_CHECKPOINT_USE_INODE_BASED_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_AUTH_TYPE;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_PROVIDER_CONNECTION_TIMEOUT_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_PROVIDER_CONNECTION_TIMEOUT_KEY;
@@ -83,6 +85,7 @@ public class OmRatisSnapshotProvider extends 
RDBSnapshotProvider {
   private final HttpConfig.Policy httpPolicy;
   private final boolean spnegoEnabled;
   private final URLConnectionFactory connectionFactory;
+  private final boolean useV2CheckpointApi;
 
   public OmRatisSnapshotProvider(File snapshotDir,
       Map<String, OMNodeDetails> peerNodesMap, HttpConfig.Policy httpPolicy,
@@ -92,6 +95,7 @@ public OmRatisSnapshotProvider(File snapshotDir,
     this.httpPolicy = httpPolicy;
     this.spnegoEnabled = spnegoEnabled;
     this.connectionFactory = connectionFactory;
+    this.useV2CheckpointApi = OZONE_OM_DB_CHECKPOINT_USE_INODE_BASED_DEFAULT;
   }
 
   public OmRatisSnapshotProvider(MutableConfigurationSource conf,
@@ -100,6 +104,8 @@ public OmRatisSnapshotProvider(MutableConfigurationSource 
conf,
     LOG.info("Initializing OM Snapshot Provider");
     this.peerNodesMap = new ConcurrentHashMap<>();
     peerNodesMap.putAll(peerNodeDetails);
+    this.useV2CheckpointApi = 
conf.getBoolean(OZONE_OM_DB_CHECKPOINT_USE_INODE_BASED_KEY,
+        OZONE_OM_DB_CHECKPOINT_USE_INODE_BASED_DEFAULT);
 
     this.httpPolicy = HttpConfig.getHttpPolicy(conf);
     this.spnegoEnabled = conf.get(OZONE_OM_HTTP_AUTH_TYPE, "simple")
@@ -143,7 +149,7 @@ public void downloadSnapshot(String leaderNodeID, File 
targetFile)
       throws IOException {
     OMNodeDetails leader = peerNodesMap.get(leaderNodeID);
     URL omCheckpointUrl = leader.getOMDBCheckpointEndpointUrl(
-        httpPolicy.isHttpEnabled(), true);
+        useV2CheckpointApi, httpPolicy.isHttpEnabled(), true);
     LOG.info("Downloading latest checkpoint from Leader OM {}. Checkpoint: {} 
URL: {}",
         leaderNodeID, targetFile.getName(), omCheckpointUrl);
     SecurityUtil.doAsCurrentUser(() -> {
@@ -155,8 +161,10 @@ public void downloadSnapshot(String leaderNodeID, File 
targetFile)
           MULTIPART_FORM_DATA_BOUNDARY;
       connection.setRequestProperty("Content-Type", contentTypeValue);
       connection.setDoOutput(true);
-      writeFormData(connection,
-          HAUtils.getExistingFiles(getCandidateDir()));
+
+      List<String> existingFiles = useV2CheckpointApi ? 
HAUtils.getExistingFiles(getCandidateDir())
+          : HAUtils.getExistingSstFilesRelativeToDbDir(getCandidateDir());
+      writeFormData(connection, existingFiles);
 
       connection.connect();
       int errorCode = connection.getResponseCode();
@@ -212,7 +220,7 @@ public static void downloadFileWithProgress(InputStream 
inputStream, File target
 
   @Override
   public DBCheckpoint getCheckpointFromUntarredDb(Path untarredDbDir) throws 
IOException {
-    return new InodeMetadataRocksDBCheckpoint(untarredDbDir);
+    return new InodeMetadataRocksDBCheckpoint(untarredDbDir, 
useV2CheckpointApi);
   }
 
   /**
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis_snapshot/TestOmRatisSnapshotProvider.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis_snapshot/TestOmRatisSnapshotProvider.java
index d2a8f92f29f..32e567dd44b 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis_snapshot/TestOmRatisSnapshotProvider.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis_snapshot/TestOmRatisSnapshotProvider.java
@@ -85,8 +85,8 @@ public void testDownloadSnapshot() throws IOException,
       AuthenticationException {
     URL omCheckpointUrl = mock(URL.class);
     StringBuilder sb = getStringBuilder();
-    when(leader.getOMDBCheckpointEndpointUrl(anyBoolean(), anyBoolean()))
-        .thenReturn(omCheckpointUrl);
+    when(leader.getOMDBCheckpointEndpointUrl(anyBoolean(),
+        anyBoolean(), anyBoolean())).thenReturn(omCheckpointUrl);
 
     HttpURLConnection connection = mock(HttpURLConnection.class);
     when(connectionFactory.openConnection(any(URL.class), anyBoolean()))
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestInodeMetadataRocksDBCheckpoint.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestInodeMetadataRocksDBCheckpoint.java
index 9a8816f9e46..50d5b244cd3 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestInodeMetadataRocksDBCheckpoint.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestInodeMetadataRocksDBCheckpoint.java
@@ -73,4 +73,28 @@ public void testCreateHardLinksWithOmDbPrefix(@TempDir File 
tempDir) throws Exce
     assertEquals(sourceFileInode, getINode(target2.toPath()),
         "Hard links should have same inode as source");
   }
+
+  /**
+   * Test with deleteSourceFiles=false (v1 format) - source files are 
preserved.
+   */
+  @Test
+  public void testCreateHardLinksWithoutDeletingSources(@TempDir File tempDir)
+      throws Exception {
+    File testDir = new File(tempDir, "testDir");
+    assertTrue(testDir.mkdir(), "Failed to create test dir");
+    File sourceFile = new File(testDir, "source.sst");
+    Files.write(sourceFile.toPath(), "test content".getBytes(UTF_8));
+
+    File hardlinkFile = new File(testDir, "hardLinkFile");
+    String hardlinkContent = "om.db/target.sst\tsource.sst\n";
+    Files.write(hardlinkFile.toPath(), hardlinkContent.getBytes(UTF_8));
+
+    InodeMetadataRocksDBCheckpoint obtainedCheckpoint =
+        new InodeMetadataRocksDBCheckpoint(testDir.toPath(), false);
+    assertNotNull(obtainedCheckpoint);
+
+    assertTrue(sourceFile.exists(), "Source file should be preserved");
+    assertTrue(new File(testDir, "om.db/target.sst").exists(),
+        "Hard link should be created");
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to