This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new d85440b3292 HDDS-13785. Remove orphan versions from SnapshotLocalData 
Yaml file (#9150)
d85440b3292 is described below

commit d85440b3292c5004900e153a5548b05b8beec116
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Thu Nov 6 01:45:15 2025 -0500

    HDDS-13785. Remove orphan versions from SnapshotLocalData Yaml file (#9150)
---
 .../common/src/main/resources/ozone-default.xml    |   5 +
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   3 +
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  |  17 +-
 .../om/snapshot/OmSnapshotLocalDataManager.java    | 197 ++++++++++++++++++---
 .../snapshot/TestOmSnapshotLocalDataManager.java   | 131 +++++++++++---
 .../filter/AbstractReclaimableFilterTest.java      |   4 +-
 6 files changed, 307 insertions(+), 50 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 0bfa98f991b..1e8df1c6747 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -4853,4 +4853,9 @@
     <value>10000</value>
     <description>Maximum number of lock objects that could be present in the 
pool.</description>
   </property>
+  <property>
+    <name>ozone.om.snapshot.local.data.manager.service.interval</name>
+    <value>5m</value>
+    <description>Interval for cleaning up orphan snapshot local data versions 
corresponding to snapshots</description>
+  </property>
 </configuration>
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 0828be91ed4..469900aa8ea 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -678,6 +678,9 @@ public final class OMConfigKeys {
   public static final String OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_HARD_LIMIT =
       "ozone.om.hierarchical.resource.locks.hard.limit";
   public static final int 
OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_HARD_LIMIT_DEFAULT = 10000;
+  public static final String 
OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL =
+      "ozone.om.snapshot.local.data.manager.service.interval";
+  public static final String 
OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL_DEFAULT = "5m";
 
   /**
    * Never constructed.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 0954b029ab6..4fcb8ed22e8 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -196,9 +196,10 @@ public final class OmSnapshotManager implements 
AutoCloseable {
   private final AtomicInteger inFlightSnapshotCount = new AtomicInteger(0);
 
   public OmSnapshotManager(OzoneManager ozoneManager) throws IOException {
-    this.snapshotLocalDataManager = new 
OmSnapshotLocalDataManager(ozoneManager.getMetadataManager());
-    boolean isFilesystemSnapshotEnabled =
-        ozoneManager.isFilesystemSnapshotEnabled();
+    OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl) 
ozoneManager.getMetadataManager();
+    this.snapshotLocalDataManager = new 
OmSnapshotLocalDataManager(ozoneManager.getMetadataManager(),
+        omMetadataManager.getSnapshotChainManager(), 
ozoneManager.getConfiguration());
+    boolean isFilesystemSnapshotEnabled = 
ozoneManager.isFilesystemSnapshotEnabled();
     LOG.info("Ozone filesystem snapshot feature is {}.",
         isFilesystemSnapshotEnabled ? "enabled" : "disabled");
 
@@ -344,6 +345,16 @@ public OmSnapshotManager(OzoneManager ozoneManager) throws 
IOException {
     }
   }
 
+  public static boolean isSnapshotPurged(SnapshotChainManager chainManager, 
OMMetadataManager omMetadataManager,
+      UUID snapshotId, TransactionInfo transactionInfo) throws IOException {
+    String tableKey = chainManager.getTableKey(snapshotId);
+    if (tableKey == null) {
+      return true;
+    }
+    return !omMetadataManager.getSnapshotInfoTable().isExist(tableKey) && 
transactionInfo != null &&
+        isTransactionFlushedToDisk(omMetadataManager, transactionInfo);
+  }
+
   /**
    * Help reject OM startup if snapshot feature is disabled
    * but there are snapshots remaining in this OM. Note: snapshots that are
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java
index 33caddc9232..70955fa0578 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java
@@ -17,6 +17,8 @@
 
 package org.apache.hadoop.ozone.om.snapshot;
 
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OmSnapshotLocalDataYaml.YAML_FILE_EXTENSION;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -41,10 +43,13 @@
 import java.util.Stack;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.Scheduler;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.hdds.utils.db.RDBStore;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -52,6 +57,7 @@
 import org.apache.hadoop.ozone.om.OmSnapshotLocalData.VersionMeta;
 import org.apache.hadoop.ozone.om.OmSnapshotLocalDataYaml;
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.SnapshotChainManager;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.lock.FlatResource;
 import org.apache.hadoop.ozone.om.lock.HierarchicalResourceLockManager;
@@ -73,6 +79,7 @@
 public class OmSnapshotLocalDataManager implements AutoCloseable {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(OmSnapshotLocalDataManager.class);
+  private static final String LOCAL_DATA_MANAGER_SERVICE_NAME = 
"OmSnapshotLocalDataManagerService";
 
   private final ObjectSerializer<OmSnapshotLocalData> 
snapshotLocalDataSerializer;
   // In-memory DAG of snapshot-version dependencies. Each node represents a
@@ -101,8 +108,13 @@ public class OmSnapshotLocalDataManager implements 
AutoCloseable {
   private final ReadWriteLock internalLock;
   // Locks should be always acquired by iterating through the snapshot chain 
to avoid deadlocks.
   private HierarchicalResourceLockManager locks;
+  private Map<UUID, Integer> snapshotToBeCheckedForOrphans;
+  private Scheduler scheduler;
+  private volatile boolean closed;
 
-  public OmSnapshotLocalDataManager(OMMetadataManager omMetadataManager) 
throws IOException {
+  public OmSnapshotLocalDataManager(OMMetadataManager omMetadataManager,
+      SnapshotChainManager snapshotChainManager,
+      OzoneConfiguration configuration) throws IOException {
     this.localDataGraph = GraphBuilder.directed().build();
     this.omMetadataManager = omMetadataManager;
     this.snapshotLocalDataSerializer = new YamlSerializer<OmSnapshotLocalData>(
@@ -116,7 +128,7 @@ public void computeAndSetChecksum(Yaml yaml, 
OmSnapshotLocalData data) throws IO
     this.versionNodeMap = new ConcurrentHashMap<>();
     this.fullLock = new ReentrantReadWriteLock();
     this.internalLock = new ReentrantReadWriteLock();
-    init();
+    init(configuration, snapshotChainManager);
   }
 
   @VisibleForTesting
@@ -216,7 +228,7 @@ private LocalDataVersionNode getVersionNode(UUID 
snapshotId, int version) {
 
   private void addSnapshotVersionMeta(UUID snapshotId, SnapshotVersionsMeta 
snapshotVersionsMeta)
       throws IOException {
-    if (!versionNodeMap.containsKey(snapshotId)) {
+    if (!versionNodeMap.containsKey(snapshotId) && 
!snapshotVersionsMeta.getSnapshotVersions().isEmpty()) {
       for (LocalDataVersionNode versionNode : 
snapshotVersionsMeta.getSnapshotVersions().values()) {
         validateVersionAddition(versionNode);
         LocalDataVersionNode previousVersionNode =
@@ -261,8 +273,33 @@ void addVersionNodeWithDependents(OmSnapshotLocalData 
snapshotLocalData) throws
     }
   }
 
-  private void init() throws IOException {
+  private void incrementOrphanCheckCount(UUID snapshotId) {
+    if (snapshotId != null) {
+      this.snapshotToBeCheckedForOrphans.compute(snapshotId, (k, v) -> v == 
null ? 1 : (v + 1));
+    }
+  }
+
+  private void decrementOrphanCheckCount(UUID snapshotId, int decrementBy) {
+    this.snapshotToBeCheckedForOrphans.compute(snapshotId, (k, v) -> {
+      if (v == null) {
+        return null;
+      }
+      int newValue = v - decrementBy;
+      if (newValue <= 0) {
+        return null;
+      }
+      return newValue;
+    });
+  }
+
+  @VisibleForTesting
+  Map<UUID, Integer> getSnapshotToBeCheckedForOrphans() {
+    return snapshotToBeCheckedForOrphans;
+  }
+
+  private void init(OzoneConfiguration configuration, SnapshotChainManager 
chainManager) throws IOException {
     this.locks = omMetadataManager.getHierarchicalLockManager();
+    this.snapshotToBeCheckedForOrphans = new ConcurrentHashMap<>();
     RDBStore store = (RDBStore) omMetadataManager.getStore();
     String checkpointPrefix = store.getDbLocation().getName();
     File snapshotDir = new File(store.getSnapshotsParentDir());
@@ -283,6 +320,74 @@ private void init() throws IOException {
       }
       addVersionNodeWithDependents(snapshotLocalData);
     }
+    for (UUID snapshotId : versionNodeMap.keySet()) {
+      incrementOrphanCheckCount(snapshotId);
+    }
+    long snapshotLocalDataManagerServiceInterval = 
configuration.getTimeDuration(
+        OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL,
+        OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    if (snapshotLocalDataManagerServiceInterval > 0) {
+      this.scheduler = new Scheduler(LOCAL_DATA_MANAGER_SERVICE_NAME, true, 1);
+      this.scheduler.scheduleWithFixedDelay(
+          () -> {
+            try {
+              checkOrphanSnapshotVersions(omMetadataManager, chainManager);
+            } catch (Exception e) {
+              LOG.error("Exception while checking orphan snapshot versions", 
e);
+            }
+          }, snapshotLocalDataManagerServiceInterval, 
snapshotLocalDataManagerServiceInterval, TimeUnit.MILLISECONDS);
+    }
+
+  }
+
+  private void checkOrphanSnapshotVersions(OMMetadataManager metadataManager, 
SnapshotChainManager chainManager)
+      throws IOException {
+    for (Map.Entry<UUID, Integer> entry : 
snapshotToBeCheckedForOrphans.entrySet()) {
+      UUID snapshotId = entry.getKey();
+      int countBeforeCheck = entry.getValue();
+      checkOrphanSnapshotVersions(metadataManager, chainManager, snapshotId);
+      decrementOrphanCheckCount(snapshotId, countBeforeCheck);
+    }
+  }
+
+  @VisibleForTesting
+  void checkOrphanSnapshotVersions(OMMetadataManager metadataManager, 
SnapshotChainManager chainManager,
+      UUID snapshotId) throws IOException {
+    LOG.info("Checking orphan snapshot versions for snapshot {}", snapshotId);
+    try (WritableOmSnapshotLocalDataProvider snapshotLocalDataProvider = new 
WritableOmSnapshotLocalDataProvider(
+        snapshotId)) {
+      OmSnapshotLocalData snapshotLocalData = 
snapshotLocalDataProvider.getSnapshotLocalData();
+      boolean isSnapshotPurged = 
OmSnapshotManager.isSnapshotPurged(chainManager, metadataManager, snapshotId,
+          snapshotLocalData.getTransactionInfo());
+      for (Map.Entry<Integer, LocalDataVersionNode> 
integerLocalDataVersionNodeEntry : getVersionNodeMap()
+          .get(snapshotId).getSnapshotVersions().entrySet()) {
+        LocalDataVersionNode versionEntry = 
integerLocalDataVersionNodeEntry.getValue();
+        // remove the version entry if it is not referenced by any other 
snapshot version node. For version node 0
+        // a newly created snapshot version could point to a version with 
indegree 0 in such a scenario a version 0
+        // node can be only deleted if the snapshot is also purged.
+        internalLock.readLock().lock();
+        try {
+          boolean toRemove = localDataGraph.inDegree(versionEntry) == 0
+              && ((versionEntry.getVersion() != 0 && versionEntry.getVersion() 
!= snapshotLocalData.getVersion())
+              || isSnapshotPurged);
+          if (toRemove) {
+            LOG.info("Removing snapshot Id : {} version: {} from local data, 
snapshotLocalDataVersion : {}, " +
+                    "snapshotPurged: {}, inDegree : {}", snapshotId, 
versionEntry.getVersion(),
+                snapshotLocalData.getVersion(), isSnapshotPurged, 
localDataGraph.inDegree(versionEntry));
+            snapshotLocalDataProvider.removeVersion(versionEntry.getVersion());
+          }
+        } finally {
+          internalLock.readLock().unlock();
+        }
+      }
+      // If Snapshot is purged but not flushed completely to disk then this 
needs to wait for the next iteration
+      // which can be done by incrementing the orphan check count for the 
snapshotId.
+      if (!snapshotLocalData.getVersionSstFileInfos().isEmpty() && 
snapshotLocalData.getTransactionInfo() != null) {
+        incrementOrphanCheckCount(snapshotId);
+      }
+      snapshotLocalDataProvider.commit();
+    }
   }
 
   /**
@@ -326,13 +431,19 @@ private void validateVersionAddition(LocalDataVersionNode 
versionNode) throws IO
   }
 
   @Override
-  public void close() {
-    if (snapshotLocalDataSerializer != null) {
-      try {
-        snapshotLocalDataSerializer.close();
-      } catch (IOException e) {
-        LOG.error("Failed to close snapshot local data serializer", e);
+  public synchronized void close() {
+    if (!closed) {
+      if (snapshotLocalDataSerializer != null) {
+        try {
+          snapshotLocalDataSerializer.close();
+        } catch (IOException e) {
+          LOG.error("Failed to close snapshot local data serializer", e);
+        }
       }
+      if (scheduler != null) {
+        scheduler.close();
+      }
+      closed = true;
     }
   }
 
@@ -730,30 +841,66 @@ public synchronized void commit() throws IOException {
       // Need to update the disk state if and only if the dirty bit is set.
       if (isDirty()) {
         String filePath = getSnapshotLocalPropertyYamlPath(super.snapshotId);
-        String tmpFilePath = filePath + ".tmp";
-        File tmpFile = new File(tmpFilePath);
-        boolean tmpFileExists = tmpFile.exists();
-        if (tmpFileExists) {
-          tmpFileExists = !tmpFile.delete();
-        }
-        if (tmpFileExists) {
-          throw new IOException("Unable to delete tmp file " + tmpFilePath);
+        File snapshotLocalDataFile = new File(filePath);
+        if (!localDataVersionNodes.getSnapshotVersions().isEmpty()) {
+          String tmpFilePath = filePath + ".tmp";
+          File tmpFile = new File(tmpFilePath);
+          boolean tmpFileExists = tmpFile.exists();
+          if (tmpFileExists) {
+            tmpFileExists = !tmpFile.delete();
+          }
+          if (tmpFileExists) {
+            throw new IOException("Unable to delete tmp file " + tmpFilePath);
+          }
+          snapshotLocalDataSerializer.save(new File(tmpFilePath), 
super.snapshotLocalData);
+          Files.move(tmpFile.toPath(), Paths.get(filePath), 
StandardCopyOption.ATOMIC_MOVE,
+              StandardCopyOption.REPLACE_EXISTING);
+        } else if (snapshotLocalDataFile.exists()) {
+          LOG.info("Deleting YAML file corresponding to snapshotId: {} in path 
: {}",
+              super.snapshotId, snapshotLocalDataFile.getAbsolutePath());
+          if (!snapshotLocalDataFile.delete()) {
+            throw new IOException("Unable to delete file " + 
snapshotLocalDataFile.getAbsolutePath());
+          }
         }
-        snapshotLocalDataSerializer.save(new File(tmpFilePath), 
super.snapshotLocalData);
-        Files.move(tmpFile.toPath(), Paths.get(filePath), 
StandardCopyOption.ATOMIC_MOVE,
-            StandardCopyOption.REPLACE_EXISTING);
-        upsertNode(super.snapshotId, localDataVersionNodes);
+        SnapshotVersionsMeta previousVersionMeta = 
upsertNode(super.snapshotId, localDataVersionNodes);
+        checkForOphanVersionsAndIncrementCount(super.snapshotId, 
previousVersionMeta, localDataVersionNodes,
+            getSnapshotLocalData().getTransactionInfo() != null);
         // Reset dirty bit
         resetDirty();
       }
     }
 
-    private void upsertNode(UUID snapshotId, SnapshotVersionsMeta 
snapshotVersions) throws IOException {
+    private void checkForOphanVersionsAndIncrementCount(UUID snapshotId, 
SnapshotVersionsMeta previousVersionsMeta,
+        SnapshotVersionsMeta currentVersionMeta, boolean 
isPurgeTransactionSet) {
+      if (previousVersionsMeta != null) {
+        Map<Integer, LocalDataVersionNode> currentVersionNodeMap = 
currentVersionMeta.getSnapshotVersions();
+        Map<Integer, LocalDataVersionNode> previousVersionNodeMap = 
previousVersionsMeta.getSnapshotVersions();
+        boolean versionsRemoved = previousVersionNodeMap.keySet().stream()
+            .anyMatch(version -> !currentVersionNodeMap.containsKey(version));
+
+        // The previous snapshotId could have become an orphan entry or could 
have orphan versions.(In case of
+        // version removals)
+        if (versionsRemoved || 
!Objects.equals(previousVersionsMeta.getPreviousSnapshotId(),
+            currentVersionMeta.getPreviousSnapshotId())) {
+          
incrementOrphanCheckCount(previousVersionsMeta.getPreviousSnapshotId());
+        }
+        // If the transactionInfo set, this means the snapshot has been purged 
and the entire YAML file could have
+        // become an orphan. Otherwise if the version is updated it
+        // could mean that there could be some orphan version present within 
the
+        // same snapshot.
+        if (isPurgeTransactionSet || previousVersionsMeta.getVersion() != 
currentVersionMeta.getVersion()) {
+          incrementOrphanCheckCount(snapshotId);
+        }
+      }
+    }
+
+    private SnapshotVersionsMeta upsertNode(UUID snapshotId, 
SnapshotVersionsMeta snapshotVersions) throws IOException {
       internalLock.writeLock().lock();
       try {
         SnapshotVersionsMeta existingSnapVersions = 
getVersionNodeMap().remove(snapshotId);
         Map<Integer, LocalDataVersionNode> existingVersions = 
existingSnapVersions == null ? Collections.emptyMap() :
             existingSnapVersions.getSnapshotVersions();
+        Map<Integer, LocalDataVersionNode> newVersions = 
snapshotVersions.getSnapshotVersions();
         Map<Integer, List<LocalDataVersionNode>> predecessors = new 
HashMap<>();
         // Track all predecessors of the existing versions and remove the node 
from the graph.
         for (Map.Entry<Integer, LocalDataVersionNode> existingVersion : 
existingVersions.entrySet()) {
@@ -763,14 +910,16 @@ private void upsertNode(UUID snapshotId, 
SnapshotVersionsMeta snapshotVersions)
           predecessors.put(existingVersion.getKey(), new 
ArrayList<>(localDataGraph.predecessors(existingVersionNode)));
           localDataGraph.removeNode(existingVersionNode);
         }
+
         // Add the nodes to be added in the graph and map.
         addSnapshotVersionMeta(snapshotId, snapshotVersions);
         // Reconnect all the predecessors for existing nodes.
-        for (Map.Entry<Integer, LocalDataVersionNode> entry : 
snapshotVersions.getSnapshotVersions().entrySet()) {
+        for (Map.Entry<Integer, LocalDataVersionNode> entry : 
newVersions.entrySet()) {
           for (LocalDataVersionNode predecessor : 
predecessors.getOrDefault(entry.getKey(), Collections.emptyList())) {
             localDataGraph.putEdge(predecessor, entry.getValue());
           }
         }
+        return existingSnapVersions;
       } finally {
         internalLock.writeLock().unlock();
       }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java
index df26fa742e8..9aa56d2dd02 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java
@@ -19,6 +19,7 @@
 
 import static org.apache.hadoop.hdds.StringUtils.bytes2String;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_SEPARATOR;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.om.OmSnapshotLocalDataYaml.YAML_FILE_EXTENSION;
 import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DIRECTORY_TABLE;
 import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.FILE_TABLE;
@@ -31,7 +32,9 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.when;
 
@@ -40,6 +43,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -61,6 +65,7 @@
 import org.apache.commons.compress.utils.Sets;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.StringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.hdds.utils.db.RDBStore;
 import org.apache.hadoop.hdds.utils.db.RocksDatabase;
@@ -88,6 +93,7 @@
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mock;
+import org.mockito.MockedStatic;
 import org.mockito.MockitoAnnotations;
 import org.rocksdb.LiveFileMetaData;
 import org.yaml.snakeyaml.Yaml;
@@ -100,6 +106,8 @@ public class TestOmSnapshotLocalDataManager {
 
   private static YamlSerializer<OmSnapshotLocalData> 
snapshotLocalDataYamlSerializer;
   private static List<String> lockCapturor;
+  private static OzoneConfiguration conf;
+  private static Map<UUID, Boolean> purgedSnapshotIdMap;
 
   @Mock
   private OMMetadataManager omMetadataManager;
@@ -120,6 +128,7 @@ public class TestOmSnapshotLocalDataManager {
   private AutoCloseable mocks;
 
   private File snapshotsDir;
+  private MockedStatic<OmSnapshotManager> snapshotUtilMock;
 
   private static final String READ_LOCK_MESSAGE_ACQUIRE = "readLock acquire";
   private static final String READ_LOCK_MESSAGE_UNLOCK = "readLock unlock";
@@ -128,6 +137,7 @@ public class TestOmSnapshotLocalDataManager {
 
   @BeforeAll
   public static void setupClass() {
+    conf = new OzoneConfiguration();
     snapshotLocalDataYamlSerializer = new YamlSerializer<OmSnapshotLocalData>(
         new OmSnapshotLocalDataYaml.YamlFactory()) {
 
@@ -137,6 +147,7 @@ public void computeAndSetChecksum(Yaml yaml, 
OmSnapshotLocalData data) throws IO
       }
     };
     lockCapturor = new ArrayList<>();
+    purgedSnapshotIdMap = new HashMap<>();
   }
 
   @AfterAll
@@ -162,6 +173,11 @@ public void setUp() throws IOException {
 
     
when(rdbStore.getSnapshotsParentDir()).thenReturn(snapshotsDir.getAbsolutePath());
     when(rdbStore.getDbLocation()).thenReturn(dbLocation);
+    this.snapshotUtilMock = mockStatic(OmSnapshotManager.class, 
CALLS_REAL_METHODS);
+    purgedSnapshotIdMap.clear();
+    snapshotUtilMock.when(() -> OmSnapshotManager.isSnapshotPurged(any(), 
any(), any(), any()))
+        .thenAnswer(i -> purgedSnapshotIdMap.getOrDefault(i.getArgument(2), 
false));
+    conf.setInt(OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL, -1);
   }
 
   @AfterEach
@@ -172,6 +188,9 @@ public void tearDown() throws Exception {
     if (mocks != null) {
       mocks.close();
     }
+    if (snapshotUtilMock != null) {
+      snapshotUtilMock.close();
+    }
   }
 
   private String getReadLockMessageAcquire(UUID snapshotId) {
@@ -276,7 +295,7 @@ private void mockSnapshotStore(UUID snapshotId, 
List<LiveFileMetaData> sstFiles)
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testLockOrderingAgainstAnotherSnapshot(boolean read) throws 
IOException {
-    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
     List<UUID> snapshotIds = new ArrayList<>();
     snapshotIds.add(null);
     snapshotIds.addAll(createSnapshotLocalData(localDataManager, 20));
@@ -328,7 +347,7 @@ public void testLockOrderingAgainstAnotherSnapshot(boolean 
read) throws IOExcept
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testVersionLockResolution(boolean read) throws IOException {
-    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
     List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 5);
     for (int snapIdx = 0; snapIdx < snapshotIds.size(); snapIdx++) {
       UUID snapId = snapshotIds.get(snapIdx);
@@ -366,7 +385,7 @@ public void testVersionLockResolution(boolean read) throws 
IOException {
 
   @Test
   public void 
testWriteVersionAdditionValidationWithoutPreviousSnapshotVersionExisting() 
throws IOException {
-    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
     List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 2);
     UUID snapId = snapshotIds.get(1);
     try (WritableOmSnapshotLocalDataProvider omSnapshotLocalDataProvider =
@@ -382,7 +401,7 @@ public void 
testWriteVersionAdditionValidationWithoutPreviousSnapshotVersionExis
 
   @Test
   public void testUpdateTransactionInfo() throws IOException {
-    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
     TransactionInfo transactionInfo = 
TransactionInfo.valueOf(ThreadLocalRandom.current().nextLong(),
         ThreadLocalRandom.current().nextLong());
     UUID snapshotId = createSnapshotLocalData(localDataManager, 1).get(0);
@@ -401,7 +420,7 @@ public void testUpdateTransactionInfo() throws IOException {
 
   @Test
   public void testAddVersionFromRDB() throws IOException {
-    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
     List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 2);
     addVersionsToLocalData(localDataManager, snapshotIds.get(0), 
ImmutableMap.of(4, 5, 6, 8));
     UUID snapId = snapshotIds.get(1);
@@ -435,10 +454,79 @@ private void validateVersions(OmSnapshotLocalDataManager 
snapshotLocalDataManage
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans =  {true, false})
+  public void testOrphanVersionDeletionWithVersionDeletion(boolean 
purgeSnapshot) throws IOException {
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
+    List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 3);
+    UUID firstSnapId = snapshotIds.get(0);
+    UUID secondSnapId = snapshotIds.get(1);
+    UUID thirdSnapId = snapshotIds.get(2);
+
+    addVersionsToLocalData(localDataManager, firstSnapId, ImmutableMap.of(1, 
1, 2, 2, 3, 3));
+    addVersionsToLocalData(localDataManager, secondSnapId, ImmutableMap.of(4, 
2, 8, 1, 10, 3, 11, 3));
+    addVersionsToLocalData(localDataManager, thirdSnapId, ImmutableMap.of(5, 
8, 13, 10));
+    assertEquals(new HashSet<>(snapshotIds), 
localDataManager.getSnapshotToBeCheckedForOrphans().keySet());
+    localDataManager.getSnapshotToBeCheckedForOrphans().clear();
+    purgedSnapshotIdMap.put(secondSnapId, purgeSnapshot);
+    localDataManager.checkOrphanSnapshotVersions(omMetadataManager, null, 
thirdSnapId);
+    try (ReadableOmSnapshotLocalDataProvider snap = 
localDataManager.getOmSnapshotLocalData(thirdSnapId)) {
+      OmSnapshotLocalData snapshotLocalData = snap.getSnapshotLocalData();
+      assertEquals(Sets.newHashSet(0, 13), 
snapshotLocalData.getVersionSstFileInfos().keySet());
+    }
+    
assertTrue(localDataManager.getSnapshotToBeCheckedForOrphans().containsKey(secondSnapId));
+    localDataManager.checkOrphanSnapshotVersions(omMetadataManager, null, 
secondSnapId);
+    try (ReadableOmSnapshotLocalDataProvider snap = 
localDataManager.getOmSnapshotLocalData(secondSnapId)) {
+      OmSnapshotLocalData snapshotLocalData = snap.getSnapshotLocalData();
+      if (purgeSnapshot) {
+        assertEquals(Sets.newHashSet(0, 10), 
snapshotLocalData.getVersionSstFileInfos().keySet());
+      } else {
+        assertEquals(Sets.newHashSet(0, 10, 11), 
snapshotLocalData.getVersionSstFileInfos().keySet());
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans =  {true, false})
+  public void testOrphanVersionDeletionWithChainUpdate(boolean purgeSnapshot) 
throws IOException {
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
+    List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 3);
+    UUID firstSnapId = snapshotIds.get(0);
+    UUID secondSnapId = snapshotIds.get(1);
+    UUID thirdSnapId = snapshotIds.get(2);
+
+    addVersionsToLocalData(localDataManager, firstSnapId, ImmutableMap.of(1, 
1, 2, 2, 3, 3));
+    addVersionsToLocalData(localDataManager, secondSnapId, ImmutableMap.of(4, 
2, 8, 1, 10, 3, 11, 3));
+    addVersionsToLocalData(localDataManager, thirdSnapId, ImmutableMap.of(5, 
8, 13, 10));
+    purgedSnapshotIdMap.put(secondSnapId, purgeSnapshot);
+    try (WritableOmSnapshotLocalDataProvider snapshotLocalDataProvider =
+             localDataManager.getWritableOmSnapshotLocalData(thirdSnapId, 
firstSnapId)) {
+      snapshotLocalDataProvider.commit();
+    }
+    try (ReadableOmSnapshotLocalDataProvider snap = 
localDataManager.getOmSnapshotLocalData(thirdSnapId)) {
+      OmSnapshotLocalData snapshotLocalData = snap.getSnapshotLocalData();
+      assertEquals(Sets.newHashSet(0, 5, 13), 
snapshotLocalData.getVersionSstFileInfos().keySet());
+      assertEquals(firstSnapId, snapshotLocalData.getPreviousSnapshotId());
+    }
+
+    
assertTrue(localDataManager.getSnapshotToBeCheckedForOrphans().containsKey(secondSnapId));
+    localDataManager.checkOrphanSnapshotVersions(omMetadataManager, null, 
secondSnapId);
+    if (purgeSnapshot) {
+      assertThrows(NoSuchFileException.class,
+          () -> localDataManager.getOmSnapshotLocalData(secondSnapId));
+      
assertFalse(localDataManager.getVersionNodeMap().containsKey(secondSnapId));
+    } else {
+      try (ReadableOmSnapshotLocalDataProvider snap = 
localDataManager.getOmSnapshotLocalData(secondSnapId)) {
+        OmSnapshotLocalData snapshotLocalData = snap.getSnapshotLocalData();
+        assertEquals(Sets.newHashSet(0, 11), 
snapshotLocalData.getVersionSstFileInfos().keySet());
+      }
+    }
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testWriteWithChainUpdate(boolean previousSnapshotExisting) 
throws IOException {
-    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
     List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 3 + 
(previousSnapshotExisting ? 1 : 0));
     int snapshotIdx = 1 + (previousSnapshotExisting ? 1 : 0);
     for (UUID snapshotId : snapshotIds) {
@@ -490,7 +578,7 @@ public void testWriteWithChainUpdate(boolean 
previousSnapshotExisting) throws IO
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testWriteVersionValidation(boolean nextVersionExisting) throws 
IOException {
-    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
     List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 3);
     UUID prevSnapId = snapshotIds.get(0);
     UUID snapId = snapshotIds.get(1);
@@ -564,7 +652,7 @@ private void 
addVersionsToLocalData(OmSnapshotLocalDataManager snapshotLocalData
   @ParameterizedTest
   @ValueSource(ints = {1, 2, 3})
   public void testNeedsDefrag(int previousVersion) throws IOException {
-    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
     List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 2);
     for (UUID snapshotId : snapshotIds) {
       try (ReadableOmSnapshotLocalDataProvider snap = 
localDataManager.getOmSnapshotLocalData(snapshotId)) {
@@ -584,7 +672,7 @@ public void testNeedsDefrag(int previousVersion) throws 
IOException {
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testVersionResolution(boolean read) throws IOException {
-    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
     List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 5);
     List<Map<Integer, Integer>> versionMaps = Arrays.asList(
         ImmutableMap.of(4, 1, 6, 3, 8, 9, 11, 15),
@@ -627,7 +715,7 @@ public void testVersionResolution(boolean read) throws 
IOException {
 
   @Test
   public void testConstructor() throws IOException {
-    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
     assertNotNull(localDataManager);
   }
 
@@ -636,7 +724,7 @@ public void 
testGetSnapshotLocalPropertyYamlPathWithSnapshotInfo() throws IOExce
     UUID snapshotId = UUID.randomUUID();
     SnapshotInfo snapshotInfo = createMockSnapshotInfo(snapshotId, null);
     
-    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
 
     File yamlPath = new 
File(localDataManager.getSnapshotLocalPropertyYamlPath(snapshotInfo));
     assertNotNull(yamlPath);
@@ -674,7 +762,7 @@ public void testCreateNewSnapshotLocalYaml() throws 
IOException {
     mockedLiveFiles.add(createMockLiveFileMetaData("ot2.sst", "otherTable", 
"k1", "k2"));
 
     mockSnapshotStore(snapshotId, mockedLiveFiles);
-    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
     Path snapshotYaml = 
Paths.get(localDataManager.getSnapshotLocalPropertyYamlPath(snapshotInfo));
     // Create an existing YAML file for the snapshot
     assertTrue(snapshotYaml.toFile().createNewFile());
@@ -715,7 +803,7 @@ public void testCreateNewOmSnapshotLocalDataFile() throws 
IOException {
             bytes2String(lfm.largestKey()), 
bytes2String(lfm.columnFamilyName()))).collect(Collectors.toList());
     mockSnapshotStore(snapshotId, sstFiles);
 
-    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
 
     localDataManager.createNewOmSnapshotLocalDataFile(snapshotStore, 
snapshotInfo);
     
@@ -740,7 +828,7 @@ public void testGetOmSnapshotLocalDataWithSnapshotInfo() 
throws IOException {
     // Create and write snapshot local data file
     OmSnapshotLocalData localData = createMockLocalData(snapshotId, null);
     
-    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
 
     // Write the file manually for testing
     Path yamlPath = 
Paths.get(localDataManager.getSnapshotLocalPropertyYamlPath(snapshotInfo.getSnapshotId()));
@@ -761,7 +849,7 @@ public void 
testGetOmSnapshotLocalDataWithMismatchedSnapshotId() throws IOExcept
     // Create local data with wrong snapshot ID
     OmSnapshotLocalData localData = createMockLocalData(wrongSnapshotId, null);
     
-    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
 
     Path yamlPath = 
Paths.get(localDataManager.getSnapshotLocalPropertyYamlPath(snapshotId));
     writeLocalDataToFile(localData, yamlPath);
@@ -777,7 +865,7 @@ public void testGetOmSnapshotLocalDataWithFile() throws 
IOException {
     
     OmSnapshotLocalData localData = createMockLocalData(snapshotId, null);
     
-    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
 
     Path yamlPath = tempDir.resolve("test-snapshot.yaml");
     writeLocalDataToFile(localData, yamlPath);
@@ -795,7 +883,7 @@ public void testAddVersionNodeWithDependents() throws 
IOException {
         
.sorted(Comparator.comparing(String::valueOf)).collect(Collectors.toList());
     UUID snapshotId = versionIds.get(0);
     UUID previousSnapshotId = versionIds.get(1);
-    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
     // Create snapshot directory structure and files
     createSnapshotLocalDataFile(snapshotId, previousSnapshotId);
     createSnapshotLocalDataFile(previousSnapshotId, null);
@@ -811,7 +899,7 @@ public void testAddVersionNodeWithDependentsAlreadyExists() 
throws IOException {
     
     createSnapshotLocalDataFile(snapshotId, null);
     
-    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
 
     OmSnapshotLocalData localData = createMockLocalData(snapshotId, null);
     
@@ -833,7 +921,7 @@ public void testInitWithExistingYamlFiles() throws 
IOException {
     createSnapshotLocalDataFile(snapshotId, previousSnapshotId);
     
     // Initialize - should load existing files
-    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
 
     assertNotNull(localDataManager);
     Map<UUID, OmSnapshotLocalDataManager.SnapshotVersionsMeta> versionMap =
@@ -853,14 +941,13 @@ public void testInitWithInvalidPathThrowsException() 
throws IOException {
     
     // Should throw IOException during init
     assertThrows(IOException.class, () -> {
-      new OmSnapshotLocalDataManager(omMetadataManager);
+      new OmSnapshotLocalDataManager(omMetadataManager, null, conf);
     });
   }
 
   @Test
   public void testClose() throws IOException {
-    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
-
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, 
conf);
     // Should not throw exception
     localDataManager.close();
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/filter/AbstractReclaimableFilterTest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/filter/AbstractReclaimableFilterTest.java
index ef97975ca8e..3c50e93625f 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/filter/AbstractReclaimableFilterTest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/filter/AbstractReclaimableFilterTest.java
@@ -52,6 +52,7 @@
 import org.apache.hadoop.ozone.om.BucketManager;
 import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OmSnapshot;
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
@@ -162,10 +163,11 @@ protected void teardown() throws IOException {
   }
 
   private void mockOzoneManager(BucketLayout bucketLayout) throws IOException {
-    OMMetadataManager metadataManager = mock(OMMetadataManager.class);
+    OmMetadataManagerImpl metadataManager = mock(OmMetadataManagerImpl.class);
     BucketManager bucketManager = mock(BucketManager.class);
     when(ozoneManager.getMetadataManager()).thenReturn(metadataManager);
     when(ozoneManager.getBucketManager()).thenReturn(bucketManager);
+    
when(metadataManager.getSnapshotChainManager()).thenReturn(snapshotChainManager);
     long volumeCount = 0;
     for (String volume : volumes) {
       when(metadataManager.getVolumeId(eq(volume))).thenReturn(volumeCount);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to