Copilot commented on code in PR #9140:
URL: https://github.com/apache/ozone/pull/9140#discussion_r2440388023


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java:
##########
@@ -228,6 +297,362 @@ public void close() {
     }
   }
 
+  private static final class LockDataProviderInitResult {
+    private final OmSnapshotLocalData snapshotLocalData;
+    private final HierarchicalResourceLock lock;
+    private final HierarchicalResourceLock previousLock;
+    private final UUID previousSnapshotId;
+
+    private LockDataProviderInitResult(HierarchicalResourceLock lock, 
OmSnapshotLocalData snapshotLocalData,
+        HierarchicalResourceLock previousLock, UUID previousSnapshotId) {
+      this.lock = lock;
+      this.snapshotLocalData = snapshotLocalData;
+      this.previousLock = previousLock;
+      this.previousSnapshotId = previousSnapshotId;
+    }
+
+    private HierarchicalResourceLock getLock() {
+      return lock;
+    }
+
+    private HierarchicalResourceLock getPreviousLock() {
+      return previousLock;
+    }
+
+    private UUID getPreviousSnapshotId() {
+      return previousSnapshotId;
+    }
+
+    private OmSnapshotLocalData getSnapshotLocalData() {
+      return snapshotLocalData;
+    }
+  }
+
+  /**
+   * The ReadableOmSnapshotLocalDataProvider class is responsible for managing 
the
+   * access and initialization of local snapshot data in a thread-safe manner.
+   * It provides mechanisms to handle snapshot data, retrieve associated 
previous
+   * snapshot data, and manage lock synchronization for safe concurrent 
operations.
+   *
+   * This class works with snapshot identifiers and ensures that the 
appropriate
+   * local data for a given snapshot is loaded and accessible. Additionally, it
+   * maintains locking mechanisms to ensure thread-safe initialization and 
access
+   * to both the current and previous snapshot local data. The implementation 
also
+   * supports handling errors in the snapshot data initialization process.
+   *
+   * Key Functionalities:
+   * - Initializes and provides access to snapshot local data associated with a
+   *   given snapshot identifier.
+   * - Resolves and retrieves data for the previous snapshot if applicable.
+   * - Ensures safe concurrent read operations using locking mechanisms.
+   * - Validates the integrity and consistency of snapshot data during 
initialization.
+   * - Ensures that appropriate locks are released upon closing.
+   *
+   * Thread-Safety:
+   * This class utilizes locks to guarantee thread-safe operations when 
accessing
+   * or modifying snapshot data. State variables relating to snapshot data are
+   * properly synchronized to ensure consistency during concurrent operations.
+   *
+   * Usage Considerations:
+   * - Ensure proper handling of exceptions while interacting with this class,
+   *   particularly during initialization and cleanup.
+   * - Always invoke the {@code close()} method after usage to release 
acquired locks
+   *   and avoid potential deadlocks.
+   */
+  public class ReadableOmSnapshotLocalDataProvider implements AutoCloseable {
+
+    private final UUID snapshotId;
+    private final HierarchicalResourceLock lock;
+    private final HierarchicalResourceLock previousLock;
+    private final OmSnapshotLocalData snapshotLocalData;
+    private OmSnapshotLocalData previousSnapshotLocalData;
+    private volatile boolean isPreviousSnapshotLoaded = false;
+    private final UUID resolvedPreviousSnapshotId;
+
+    protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId) throws 
IOException {
+      this(snapshotId, true);
+    }
+
+    protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, UUID 
snapIdToResolve) throws IOException {
+      this(snapshotId, true, null, snapIdToResolve, true);
+    }
+
+    protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, boolean 
readLock) throws IOException {
+      this(snapshotId, readLock, null, null, false);
+    }
+
+    protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, boolean 
readLock,
+        CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException> 
snapshotLocalDataSupplier,
+        UUID snapshotIdToBeResolved, boolean isSnapshotToBeResolvedNullable) 
throws IOException {
+      this.snapshotId = snapshotId;
+      LockDataProviderInitResult result = initialize(readLock, snapshotId, 
snapshotIdToBeResolved,
+          isSnapshotToBeResolvedNullable, snapshotLocalDataSupplier);
+      this.snapshotLocalData = result.getSnapshotLocalData();
+      this.lock = result.getLock();
+      this.previousLock = result.getPreviousLock();
+      this.resolvedPreviousSnapshotId = result.getPreviousSnapshotId();
+      this.previousSnapshotLocalData = null;
+      this.isPreviousSnapshotLoaded = false;
+    }
+
+    public OmSnapshotLocalData getSnapshotLocalData() {
+      return snapshotLocalData;
+    }
+
+    public synchronized OmSnapshotLocalData getPreviousSnapshotLocalData() 
throws IOException {
+      if (!isPreviousSnapshotLoaded) {
+        if (resolvedPreviousSnapshotId != null) {
+          File previousSnapshotLocalDataFile = new 
File(getSnapshotLocalPropertyYamlPath(resolvedPreviousSnapshotId));
+          this.previousSnapshotLocalData = 
snapshotLocalDataSerializer.load(previousSnapshotLocalDataFile);
+        }
+        this.isPreviousSnapshotLoaded = true;
+      }
+      return previousSnapshotLocalData;
+    }
+
+    private HierarchicalResourceLock acquireLock(UUID snapId, boolean 
readLock) throws IOException {
+      HierarchicalResourceLock acquiredLock = readLock ? 
locks.acquireReadLock(FlatResource.SNAPSHOT_LOCAL_DATA_LOCK,
+          snapId.toString()) : 
locks.acquireWriteLock(FlatResource.SNAPSHOT_LOCAL_DATA_LOCK, 
snapId.toString());
+      if (!acquiredLock.isLockAcquired()) {
+        throw new IOException("Unable to acquire lock for snapshotId: " + 
snapId);
+      }
+      return acquiredLock;
+    }
+
+    /**
+     * Intializer the snapshot local data by acquiring the lock on the 
snapshot and also acquires a read lock on the
+     * snapshotId to be resolved by iterating through the chain of previous 
snapshot ids.
+     */
+    private LockDataProviderInitResult initialize(
+        boolean readLock, UUID snapId, UUID toResolveSnapshotId, boolean 
isSnapshotToBeResolvedNullable,
+        CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException> 
snapshotLocalDataSupplier) throws IOException {
+      HierarchicalResourceLock snapIdLock = null;
+      HierarchicalResourceLock previousReadLockAcquired = null;
+      try {
+        snapIdLock = acquireLock(snapId, readLock);
+        snapshotLocalDataSupplier = snapshotLocalDataSupplier == null ? () -> {
+          File snapshotLocalDataFile = new 
File(getSnapshotLocalPropertyYamlPath(snapId));
+          return 
Pair.of(snapshotLocalDataSerializer.load(snapshotLocalDataFile), 
snapshotLocalDataFile);
+        } : snapshotLocalDataSupplier;
+        Pair<OmSnapshotLocalData, File> pair = snapshotLocalDataSupplier.get();
+        OmSnapshotLocalData ssLocalData = pair.getKey();
+        if (!Objects.equals(ssLocalData.getSnapshotId(), snapId)) {
+          String loadPath = pair.getValue() == null ? null : 
pair.getValue().getAbsolutePath();
+          throw new IOException("SnapshotId in path : " + loadPath + " 
contains snapshotLocalData corresponding " +
+              "to snapshotId " + ssLocalData.getSnapshotId() + ". Expected 
snapshotId " + snapId);
+        }
+        // Get previous snapshotId and acquire read lock on the id. We need to 
do this outside the loop instead of a
+        // do while loop since the nodes that need be added may not be present 
in the graph so it may not be possible
+        // to iterate through the chain.
+        UUID previousSnapshotId = ssLocalData.getPreviousSnapshotId();
+        // if flag toResolveSnapshotIdIsNull is true or toResolveSnapshotId is 
not null, then we resolve snapshot
+        // with previous snapshot id as null, which would mean if the snapshot 
local data is committed the snapshot
+        // local data would become first snapshot in the chain with no 
previous snapshot id.
+        toResolveSnapshotId = (isSnapshotToBeResolvedNullable || 
toResolveSnapshotId != null) ? toResolveSnapshotId :
+            ssLocalData.getPreviousSnapshotId();
+        if (toResolveSnapshotId != null && previousSnapshotId != null) {
+          if (!versionNodeMap.containsKey(previousSnapshotId)) {
+            throw new IOException(String.format("Operating on snapshot id : %s 
with previousSnapshotId: %s invalid " +
+                "since previousSnapshotId is not loaded.", snapId, 
previousSnapshotId));
+          }
+          previousReadLockAcquired = acquireLock(previousSnapshotId, true);
+          // Create a copy of the previous versionMap to get the previous 
versions corresponding to the previous
+          // snapshot. This map would mutated to resolve the previous 
snapshot's version corresponding to the
+          // toResolveSnapshotId by iterating through the chain of previous 
snapshot ids.
+          Map<Integer, LocalDataVersionNode> previousVersionNodeMap =
+              new 
HashMap<>(versionNodeMap.get(previousSnapshotId).getSnapshotVersions());
+          UUID currentIteratedSnapshotId = previousSnapshotId;
+          // Iterate through the chain of previous snapshot ids until the 
snapshot id to be resolved is found.
+          while (!Objects.equals(currentIteratedSnapshotId, 
toResolveSnapshotId)) {
+            // All versions for the snapshot should point to the same previous 
snapshot id. Otherwise this is a sign
+            // of corruption.
+            Set<UUID> previousIds =
+                
previousVersionNodeMap.values().stream().map(LocalDataVersionNode::getPreviousSnapshotId)
+                .collect(Collectors.toSet());
+            if (previousIds.size() > 1) {
+              throw new IOException(String.format("Snapshot %s versions has 
multiple previous snapshotIds %s",
+                  currentIteratedSnapshotId, previousIds));
+            }
+            if (previousIds.isEmpty()) {
+              throw new IOException(String.format("Snapshot %s versions 
doesn't have previous Id thus snapshot " +
+                      "%s cannot be resolved against id %s",
+                  currentIteratedSnapshotId, snapId, toResolveSnapshotId));
+            }
+            UUID previousId = previousIds.iterator().next();
+            HierarchicalResourceLock previousToPreviousReadLockAcquired = 
acquireLock(previousId, true);
+
+            try {
+              // Get the version node for the snapshot and update the version 
node to the successor to point to the
+              // previous node.
+              for (Map.Entry<Integer, LocalDataVersionNode> entry : 
previousVersionNodeMap.entrySet()) {
+                Set<LocalDataVersionNode> versionNode = 
localDataGraph.successors(entry.getValue());
+                if (versionNode.size() > 1) {
+                  throw new IOException(String.format("Snapshot %s version %d 
has multiple successors %s",
+                      currentIteratedSnapshotId, entry.getValue(), 
versionNode));
+                }
+                if (versionNode.isEmpty()) {
+                  throw new IOException(String.format("Snapshot %s version %d 
doesn't have successor",
+                      currentIteratedSnapshotId, entry.getValue()));
+                }
+                // Set the version node for iterated version to the successor 
corresponding to the previous snapshot id.
+                entry.setValue(versionNode.iterator().next());

Review Comment:
   The graph is built with edges from a snapshot version to its previous 
version (current -> previous). To walk 'forward' along the chain from a 
previous snapshot to its dependent, you must use predecessors(), not 
successors(). Using successors() here will either return empty or the wrong 
node, breaking version resolution. Replace successors(...) with 
predecessors(...), and update the error messages accordingly.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java:
##########
@@ -228,6 +297,362 @@ public void close() {
     }
   }
 
+  private static final class LockDataProviderInitResult {
+    private final OmSnapshotLocalData snapshotLocalData;
+    private final HierarchicalResourceLock lock;
+    private final HierarchicalResourceLock previousLock;
+    private final UUID previousSnapshotId;
+
+    private LockDataProviderInitResult(HierarchicalResourceLock lock, 
OmSnapshotLocalData snapshotLocalData,
+        HierarchicalResourceLock previousLock, UUID previousSnapshotId) {
+      this.lock = lock;
+      this.snapshotLocalData = snapshotLocalData;
+      this.previousLock = previousLock;
+      this.previousSnapshotId = previousSnapshotId;
+    }
+
+    private HierarchicalResourceLock getLock() {
+      return lock;
+    }
+
+    private HierarchicalResourceLock getPreviousLock() {
+      return previousLock;
+    }
+
+    private UUID getPreviousSnapshotId() {
+      return previousSnapshotId;
+    }
+
+    private OmSnapshotLocalData getSnapshotLocalData() {
+      return snapshotLocalData;
+    }
+  }
+
+  /**
+   * The ReadableOmSnapshotLocalDataProvider class is responsible for managing 
the
+   * access and initialization of local snapshot data in a thread-safe manner.
+   * It provides mechanisms to handle snapshot data, retrieve associated 
previous
+   * snapshot data, and manage lock synchronization for safe concurrent 
operations.
+   *
+   * This class works with snapshot identifiers and ensures that the 
appropriate
+   * local data for a given snapshot is loaded and accessible. Additionally, it
+   * maintains locking mechanisms to ensure thread-safe initialization and 
access
+   * to both the current and previous snapshot local data. The implementation 
also
+   * supports handling errors in the snapshot data initialization process.
+   *
+   * Key Functionalities:
+   * - Initializes and provides access to snapshot local data associated with a
+   *   given snapshot identifier.
+   * - Resolves and retrieves data for the previous snapshot if applicable.
+   * - Ensures safe concurrent read operations using locking mechanisms.
+   * - Validates the integrity and consistency of snapshot data during 
initialization.
+   * - Ensures that appropriate locks are released upon closing.
+   *
+   * Thread-Safety:
+   * This class utilizes locks to guarantee thread-safe operations when 
accessing
+   * or modifying snapshot data. State variables relating to snapshot data are
+   * properly synchronized to ensure consistency during concurrent operations.
+   *
+   * Usage Considerations:
+   * - Ensure proper handling of exceptions while interacting with this class,
+   *   particularly during initialization and cleanup.
+   * - Always invoke the {@code close()} method after usage to release 
acquired locks
+   *   and avoid potential deadlocks.
+   */
+  public class ReadableOmSnapshotLocalDataProvider implements AutoCloseable {
+
+    private final UUID snapshotId;
+    private final HierarchicalResourceLock lock;
+    private final HierarchicalResourceLock previousLock;
+    private final OmSnapshotLocalData snapshotLocalData;
+    private OmSnapshotLocalData previousSnapshotLocalData;
+    private volatile boolean isPreviousSnapshotLoaded = false;
+    private final UUID resolvedPreviousSnapshotId;
+
+    protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId) throws 
IOException {
+      this(snapshotId, true);
+    }
+
+    protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, UUID 
snapIdToResolve) throws IOException {
+      this(snapshotId, true, null, snapIdToResolve, true);
+    }
+
+    protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, boolean 
readLock) throws IOException {
+      this(snapshotId, readLock, null, null, false);
+    }
+
+    protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, boolean 
readLock,
+        CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException> 
snapshotLocalDataSupplier,
+        UUID snapshotIdToBeResolved, boolean isSnapshotToBeResolvedNullable) 
throws IOException {
+      this.snapshotId = snapshotId;
+      LockDataProviderInitResult result = initialize(readLock, snapshotId, 
snapshotIdToBeResolved,
+          isSnapshotToBeResolvedNullable, snapshotLocalDataSupplier);
+      this.snapshotLocalData = result.getSnapshotLocalData();
+      this.lock = result.getLock();
+      this.previousLock = result.getPreviousLock();
+      this.resolvedPreviousSnapshotId = result.getPreviousSnapshotId();
+      this.previousSnapshotLocalData = null;
+      this.isPreviousSnapshotLoaded = false;
+    }
+
+    public OmSnapshotLocalData getSnapshotLocalData() {
+      return snapshotLocalData;
+    }
+
+    public synchronized OmSnapshotLocalData getPreviousSnapshotLocalData() 
throws IOException {
+      if (!isPreviousSnapshotLoaded) {
+        if (resolvedPreviousSnapshotId != null) {
+          File previousSnapshotLocalDataFile = new 
File(getSnapshotLocalPropertyYamlPath(resolvedPreviousSnapshotId));
+          this.previousSnapshotLocalData = 
snapshotLocalDataSerializer.load(previousSnapshotLocalDataFile);
+        }
+        this.isPreviousSnapshotLoaded = true;
+      }
+      return previousSnapshotLocalData;
+    }
+
+    private HierarchicalResourceLock acquireLock(UUID snapId, boolean 
readLock) throws IOException {
+      HierarchicalResourceLock acquiredLock = readLock ? 
locks.acquireReadLock(FlatResource.SNAPSHOT_LOCAL_DATA_LOCK,
+          snapId.toString()) : 
locks.acquireWriteLock(FlatResource.SNAPSHOT_LOCAL_DATA_LOCK, 
snapId.toString());
+      if (!acquiredLock.isLockAcquired()) {
+        throw new IOException("Unable to acquire lock for snapshotId: " + 
snapId);
+      }
+      return acquiredLock;
+    }
+
+    /**
+     * Intializer the snapshot local data by acquiring the lock on the 
snapshot and also acquires a read lock on the
+     * snapshotId to be resolved by iterating through the chain of previous 
snapshot ids.
+     */
+    private LockDataProviderInitResult initialize(
+        boolean readLock, UUID snapId, UUID toResolveSnapshotId, boolean 
isSnapshotToBeResolvedNullable,
+        CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException> 
snapshotLocalDataSupplier) throws IOException {
+      HierarchicalResourceLock snapIdLock = null;
+      HierarchicalResourceLock previousReadLockAcquired = null;
+      try {
+        snapIdLock = acquireLock(snapId, readLock);
+        snapshotLocalDataSupplier = snapshotLocalDataSupplier == null ? () -> {
+          File snapshotLocalDataFile = new 
File(getSnapshotLocalPropertyYamlPath(snapId));
+          return 
Pair.of(snapshotLocalDataSerializer.load(snapshotLocalDataFile), 
snapshotLocalDataFile);
+        } : snapshotLocalDataSupplier;
+        Pair<OmSnapshotLocalData, File> pair = snapshotLocalDataSupplier.get();
+        OmSnapshotLocalData ssLocalData = pair.getKey();
+        if (!Objects.equals(ssLocalData.getSnapshotId(), snapId)) {
+          String loadPath = pair.getValue() == null ? null : 
pair.getValue().getAbsolutePath();
+          throw new IOException("SnapshotId in path : " + loadPath + " 
contains snapshotLocalData corresponding " +
+              "to snapshotId " + ssLocalData.getSnapshotId() + ". Expected 
snapshotId " + snapId);
+        }
+        // Get previous snapshotId and acquire read lock on the id. We need to 
do this outside the loop instead of a
+        // do while loop since the nodes that need be added may not be present 
in the graph so it may not be possible
+        // to iterate through the chain.
+        UUID previousSnapshotId = ssLocalData.getPreviousSnapshotId();
+        // if flag toResolveSnapshotIdIsNull is true or toResolveSnapshotId is 
not null, then we resolve snapshot
+        // with previous snapshot id as null, which would mean if the snapshot 
local data is committed the snapshot
+        // local data would become first snapshot in the chain with no 
previous snapshot id.
+        toResolveSnapshotId = (isSnapshotToBeResolvedNullable || 
toResolveSnapshotId != null) ? toResolveSnapshotId :
+            ssLocalData.getPreviousSnapshotId();
+        if (toResolveSnapshotId != null && previousSnapshotId != null) {
+          if (!versionNodeMap.containsKey(previousSnapshotId)) {
+            throw new IOException(String.format("Operating on snapshot id : %s 
with previousSnapshotId: %s invalid " +
+                "since previousSnapshotId is not loaded.", snapId, 
previousSnapshotId));
+          }
+          previousReadLockAcquired = acquireLock(previousSnapshotId, true);
+          // Create a copy of the previous versionMap to get the previous 
versions corresponding to the previous
+          // snapshot. This map would mutated to resolve the previous 
snapshot's version corresponding to the
+          // toResolveSnapshotId by iterating through the chain of previous 
snapshot ids.
+          Map<Integer, LocalDataVersionNode> previousVersionNodeMap =
+              new 
HashMap<>(versionNodeMap.get(previousSnapshotId).getSnapshotVersions());
+          UUID currentIteratedSnapshotId = previousSnapshotId;
+          // Iterate through the chain of previous snapshot ids until the 
snapshot id to be resolved is found.
+          while (!Objects.equals(currentIteratedSnapshotId, 
toResolveSnapshotId)) {
+            // All versions for the snapshot should point to the same previous 
snapshot id. Otherwise this is a sign
+            // of corruption.
+            Set<UUID> previousIds =
+                
previousVersionNodeMap.values().stream().map(LocalDataVersionNode::getPreviousSnapshotId)
+                .collect(Collectors.toSet());
+            if (previousIds.size() > 1) {
+              throw new IOException(String.format("Snapshot %s versions has 
multiple previous snapshotIds %s",
+                  currentIteratedSnapshotId, previousIds));
+            }
+            if (previousIds.isEmpty()) {
+              throw new IOException(String.format("Snapshot %s versions 
doesn't have previous Id thus snapshot " +
+                      "%s cannot be resolved against id %s",
+                  currentIteratedSnapshotId, snapId, toResolveSnapshotId));
+            }
+            UUID previousId = previousIds.iterator().next();
+            HierarchicalResourceLock previousToPreviousReadLockAcquired = 
acquireLock(previousId, true);
+
+            try {
+              // Get the version node for the snapshot and update the version 
node to the successor to point to the
+              // previous node.
+              for (Map.Entry<Integer, LocalDataVersionNode> entry : 
previousVersionNodeMap.entrySet()) {
+                Set<LocalDataVersionNode> versionNode = 
localDataGraph.successors(entry.getValue());
+                if (versionNode.size() > 1) {
+                  throw new IOException(String.format("Snapshot %s version %d 
has multiple successors %s",
+                      currentIteratedSnapshotId, entry.getValue(), 
versionNode));
+                }
+                if (versionNode.isEmpty()) {
+                  throw new IOException(String.format("Snapshot %s version %d 
doesn't have successor",
+                      currentIteratedSnapshotId, entry.getValue()));
+                }
+                // Set the version node for iterated version to the successor 
corresponding to the previous snapshot id.
+                entry.setValue(versionNode.iterator().next());
+              }
+            } finally {
+              // Release the read lock acquired on the previous snapshot id 
acquired. Now that the instance
+              // is no longer needed we can release the read lock for the 
snapshot iterated in the previous snapshot.
+              // Make previousToPrevious previous for next iteration.
+              previousReadLockAcquired.close();
+              previousReadLockAcquired = previousToPreviousReadLockAcquired;
+              currentIteratedSnapshotId = previousId;
+            }
+          }
+          ssLocalData.setPreviousSnapshotId(toResolveSnapshotId);
+          Map<Integer, OmSnapshotLocalData.VersionMeta> versionMetaMap = 
ssLocalData.getVersionSstFileInfos();
+          for (Map.Entry<Integer, OmSnapshotLocalData.VersionMeta> entry : 
versionMetaMap.entrySet()) {
+            OmSnapshotLocalData.VersionMeta versionMeta = entry.getValue();
+            // Get the relative version node which corresponds to the 
toResolveSnapshotId corresponding to the
+            // versionMeta which points to a particular version in the 
previous snapshot
+            LocalDataVersionNode relativePreviousVersionNode =
+                
previousVersionNodeMap.get(versionMeta.getPreviousSnapshotVersion());
+            if (relativePreviousVersionNode == null) {
+              throw new IOException(String.format("Unable to resolve previous 
version node for snapshot: %s" +
+                  " with version : %d against previous snapshot %s previous 
version : %d",
+                  snapId, entry.getKey(), toResolveSnapshotId, 
versionMeta.getPreviousSnapshotVersion()));
+            }
+            // Set the previous snapshot version to the 
relativePreviousVersionNode which was captured.
+            
versionMeta.setPreviousSnapshotVersion(relativePreviousVersionNode.getVersion());
+          }
+        } else {
+          toResolveSnapshotId = null;
+          ssLocalData.setPreviousSnapshotId(null);
+        }
+        return new LockDataProviderInitResult(snapIdLock, ssLocalData, 
previousReadLockAcquired, toResolveSnapshotId);
+      } catch (IOException e) {
+        // Release all the locks in case of an exception and rethrow the 
exception.
+        if (previousReadLockAcquired != null) {
+          previousReadLockAcquired.close();
+        }
+        if (snapIdLock != null) {
+          snapIdLock.close();
+        }
+        throw e;
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (previousLock != null) {
+        previousLock.close();
+      }
+      if (lock != null) {
+        lock.close();
+      }
+    }
+  }
+
+  /**
+   * This class represents a writable provider for managing local data of
+   * OmSnapshot. It extends the functionality of {@code 
ReadableOmSnapshotLocalDataProvider}
+   * and provides support for write operations, such as committing changes.
+   *
+   * The writable snapshot data provider interacts with version nodes and
+   * facilitates atomic updates to snapshot properties and files.
+   *
+   * This class is designed to ensure thread-safe operations and uses locks to
+   * guarantee consistent state across concurrent activities.
+   *
+   * The default usage includes creating an instance of this provider with
+   * specific snapshot identifiers and optionally handling additional 
parameters
+   * such as data resolution or a supplier for snapshot data.
+   */
+  public final class WritableOmSnapshotLocalDataProvider extends 
ReadableOmSnapshotLocalDataProvider {
+
+    private WritableOmSnapshotLocalDataProvider(UUID snapshotId) throws 
IOException {
+      super(snapshotId, false);
+      fullLock.readLock().lock();
+    }
+
+    private WritableOmSnapshotLocalDataProvider(UUID snapshotId, UUID 
snapshotIdToBeResolved) throws IOException {
+      super(snapshotId, false, null, snapshotIdToBeResolved, true);
+      fullLock.readLock().lock();
+    }

Review Comment:
   WritableOmSnapshotLocalDataProvider acquires fullLock.readLock() while 
performing mutations (commit/upsert) to versionNodeMap/localDataGraph. This can 
race with readers and other writers. Use fullLock.writeLock() for writes and 
hold fullLock.readLock() in ReadableOmSnapshotLocalDataProvider during graph 
reads. Specifically: change readLock() to writeLock() in these constructors and 
in close(); and acquire/release fullLock.readLock() in 
ReadableOmSnapshotLocalDataProvider to protect versionNodeMap/localDataGraph 
access.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java:
##########
@@ -228,6 +297,362 @@ public void close() {
     }
   }
 
+  private static final class LockDataProviderInitResult {
+    private final OmSnapshotLocalData snapshotLocalData;
+    private final HierarchicalResourceLock lock;
+    private final HierarchicalResourceLock previousLock;
+    private final UUID previousSnapshotId;
+
+    private LockDataProviderInitResult(HierarchicalResourceLock lock, 
OmSnapshotLocalData snapshotLocalData,
+        HierarchicalResourceLock previousLock, UUID previousSnapshotId) {
+      this.lock = lock;
+      this.snapshotLocalData = snapshotLocalData;
+      this.previousLock = previousLock;
+      this.previousSnapshotId = previousSnapshotId;
+    }
+
+    private HierarchicalResourceLock getLock() {
+      return lock;
+    }
+
+    private HierarchicalResourceLock getPreviousLock() {
+      return previousLock;
+    }
+
+    private UUID getPreviousSnapshotId() {
+      return previousSnapshotId;
+    }
+
+    private OmSnapshotLocalData getSnapshotLocalData() {
+      return snapshotLocalData;
+    }
+  }
+
+  /**
+   * The ReadableOmSnapshotLocalDataProvider class is responsible for managing 
the
+   * access and initialization of local snapshot data in a thread-safe manner.
+   * It provides mechanisms to handle snapshot data, retrieve associated 
previous
+   * snapshot data, and manage lock synchronization for safe concurrent 
operations.
+   *
+   * This class works with snapshot identifiers and ensures that the 
appropriate
+   * local data for a given snapshot is loaded and accessible. Additionally, it
+   * maintains locking mechanisms to ensure thread-safe initialization and 
access
+   * to both the current and previous snapshot local data. The implementation 
also
+   * supports handling errors in the snapshot data initialization process.
+   *
+   * Key Functionalities:
+   * - Initializes and provides access to snapshot local data associated with a
+   *   given snapshot identifier.
+   * - Resolves and retrieves data for the previous snapshot if applicable.
+   * - Ensures safe concurrent read operations using locking mechanisms.
+   * - Validates the integrity and consistency of snapshot data during 
initialization.
+   * - Ensures that appropriate locks are released upon closing.
+   *
+   * Thread-Safety:
+   * This class utilizes locks to guarantee thread-safe operations when 
accessing
+   * or modifying snapshot data. State variables relating to snapshot data are
+   * properly synchronized to ensure consistency during concurrent operations.
+   *
+   * Usage Considerations:
+   * - Ensure proper handling of exceptions while interacting with this class,
+   *   particularly during initialization and cleanup.
+   * - Always invoke the {@code close()} method after usage to release 
acquired locks
+   *   and avoid potential deadlocks.
+   */
+  public class ReadableOmSnapshotLocalDataProvider implements AutoCloseable {
+
+    private final UUID snapshotId;
+    private final HierarchicalResourceLock lock;
+    private final HierarchicalResourceLock previousLock;
+    private final OmSnapshotLocalData snapshotLocalData;
+    private OmSnapshotLocalData previousSnapshotLocalData;
+    private volatile boolean isPreviousSnapshotLoaded = false;
+    private final UUID resolvedPreviousSnapshotId;
+
+    protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId) throws 
IOException {
+      this(snapshotId, true);
+    }
+
+    protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, UUID 
snapIdToResolve) throws IOException {
+      this(snapshotId, true, null, snapIdToResolve, true);
+    }
+
+    protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, boolean 
readLock) throws IOException {
+      this(snapshotId, readLock, null, null, false);
+    }
+
+    protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, boolean 
readLock,
+        CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException> 
snapshotLocalDataSupplier,
+        UUID snapshotIdToBeResolved, boolean isSnapshotToBeResolvedNullable) 
throws IOException {
+      this.snapshotId = snapshotId;
+      LockDataProviderInitResult result = initialize(readLock, snapshotId, 
snapshotIdToBeResolved,
+          isSnapshotToBeResolvedNullable, snapshotLocalDataSupplier);
+      this.snapshotLocalData = result.getSnapshotLocalData();
+      this.lock = result.getLock();
+      this.previousLock = result.getPreviousLock();
+      this.resolvedPreviousSnapshotId = result.getPreviousSnapshotId();
+      this.previousSnapshotLocalData = null;
+      this.isPreviousSnapshotLoaded = false;
+    }
+
+    public OmSnapshotLocalData getSnapshotLocalData() {
+      return snapshotLocalData;
+    }
+
+    public synchronized OmSnapshotLocalData getPreviousSnapshotLocalData() 
throws IOException {
+      if (!isPreviousSnapshotLoaded) {
+        if (resolvedPreviousSnapshotId != null) {
+          File previousSnapshotLocalDataFile = new 
File(getSnapshotLocalPropertyYamlPath(resolvedPreviousSnapshotId));
+          this.previousSnapshotLocalData = 
snapshotLocalDataSerializer.load(previousSnapshotLocalDataFile);
+        }
+        this.isPreviousSnapshotLoaded = true;
+      }
+      return previousSnapshotLocalData;
+    }
+
+    private HierarchicalResourceLock acquireLock(UUID snapId, boolean 
readLock) throws IOException {
+      HierarchicalResourceLock acquiredLock = readLock ? 
locks.acquireReadLock(FlatResource.SNAPSHOT_LOCAL_DATA_LOCK,
+          snapId.toString()) : 
locks.acquireWriteLock(FlatResource.SNAPSHOT_LOCAL_DATA_LOCK, 
snapId.toString());
+      if (!acquiredLock.isLockAcquired()) {
+        throw new IOException("Unable to acquire lock for snapshotId: " + 
snapId);
+      }
+      return acquiredLock;
+    }
+
+    /**
+     * Intializer the snapshot local data by acquiring the lock on the 
snapshot and also acquires a read lock on the
+     * snapshotId to be resolved by iterating through the chain of previous 
snapshot ids.
+     */
+    private LockDataProviderInitResult initialize(
+        boolean readLock, UUID snapId, UUID toResolveSnapshotId, boolean 
isSnapshotToBeResolvedNullable,
+        CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException> 
snapshotLocalDataSupplier) throws IOException {
+      HierarchicalResourceLock snapIdLock = null;
+      HierarchicalResourceLock previousReadLockAcquired = null;
+      try {
+        snapIdLock = acquireLock(snapId, readLock);
+        snapshotLocalDataSupplier = snapshotLocalDataSupplier == null ? () -> {
+          File snapshotLocalDataFile = new 
File(getSnapshotLocalPropertyYamlPath(snapId));
+          return 
Pair.of(snapshotLocalDataSerializer.load(snapshotLocalDataFile), 
snapshotLocalDataFile);
+        } : snapshotLocalDataSupplier;
+        Pair<OmSnapshotLocalData, File> pair = snapshotLocalDataSupplier.get();
+        OmSnapshotLocalData ssLocalData = pair.getKey();
+        if (!Objects.equals(ssLocalData.getSnapshotId(), snapId)) {
+          String loadPath = pair.getValue() == null ? null : 
pair.getValue().getAbsolutePath();
+          throw new IOException("SnapshotId in path : " + loadPath + " 
contains snapshotLocalData corresponding " +
+              "to snapshotId " + ssLocalData.getSnapshotId() + ". Expected 
snapshotId " + snapId);
+        }
+        // Get previous snapshotId and acquire read lock on the id. We need to 
do this outside the loop instead of a
+        // do while loop since the nodes that need be added may not be present 
in the graph so it may not be possible
+        // to iterate through the chain.
+        UUID previousSnapshotId = ssLocalData.getPreviousSnapshotId();
+        // if flag toResolveSnapshotIdIsNull is true or toResolveSnapshotId is 
not null, then we resolve snapshot
+        // with previous snapshot id as null, which would mean if the snapshot 
local data is committed the snapshot
+        // local data would become first snapshot in the chain with no 
previous snapshot id.
+        toResolveSnapshotId = (isSnapshotToBeResolvedNullable || 
toResolveSnapshotId != null) ? toResolveSnapshotId :
+            ssLocalData.getPreviousSnapshotId();
+        if (toResolveSnapshotId != null && previousSnapshotId != null) {
+          if (!versionNodeMap.containsKey(previousSnapshotId)) {
+            throw new IOException(String.format("Operating on snapshot id : %s 
with previousSnapshotId: %s invalid " +
+                "since previousSnapshotId is not loaded.", snapId, 
previousSnapshotId));
+          }
+          previousReadLockAcquired = acquireLock(previousSnapshotId, true);
+          // Create a copy of the previous versionMap to get the previous 
versions corresponding to the previous
+          // snapshot. This map would mutated to resolve the previous 
snapshot's version corresponding to the
+          // toResolveSnapshotId by iterating through the chain of previous 
snapshot ids.
+          Map<Integer, LocalDataVersionNode> previousVersionNodeMap =
+              new 
HashMap<>(versionNodeMap.get(previousSnapshotId).getSnapshotVersions());
+          UUID currentIteratedSnapshotId = previousSnapshotId;
+          // Iterate through the chain of previous snapshot ids until the 
snapshot id to be resolved is found.
+          while (!Objects.equals(currentIteratedSnapshotId, 
toResolveSnapshotId)) {
+            // All versions for the snapshot should point to the same previous 
snapshot id. Otherwise this is a sign
+            // of corruption.
+            Set<UUID> previousIds =
+                
previousVersionNodeMap.values().stream().map(LocalDataVersionNode::getPreviousSnapshotId)
+                .collect(Collectors.toSet());
+            if (previousIds.size() > 1) {
+              throw new IOException(String.format("Snapshot %s versions has 
multiple previous snapshotIds %s",
+                  currentIteratedSnapshotId, previousIds));
+            }
+            if (previousIds.isEmpty()) {
+              throw new IOException(String.format("Snapshot %s versions 
doesn't have previous Id thus snapshot " +
+                      "%s cannot be resolved against id %s",
+                  currentIteratedSnapshotId, snapId, toResolveSnapshotId));
+            }
+            UUID previousId = previousIds.iterator().next();
+            HierarchicalResourceLock previousToPreviousReadLockAcquired = 
acquireLock(previousId, true);
+
+            try {
+              // Get the version node for the snapshot and update the version 
node to the successor to point to the
+              // previous node.
+              for (Map.Entry<Integer, LocalDataVersionNode> entry : 
previousVersionNodeMap.entrySet()) {
+                Set<LocalDataVersionNode> versionNode = 
localDataGraph.successors(entry.getValue());
+                if (versionNode.size() > 1) {
+                  throw new IOException(String.format("Snapshot %s version %d 
has multiple successors %s",
+                      currentIteratedSnapshotId, entry.getValue(), 
versionNode));
+                }
+                if (versionNode.isEmpty()) {
+                  throw new IOException(String.format("Snapshot %s version %d 
doesn't have successor",
+                      currentIteratedSnapshotId, entry.getValue()));
+                }
+                // Set the version node for iterated version to the successor 
corresponding to the previous snapshot id.
+                entry.setValue(versionNode.iterator().next());
+              }
+            } finally {
+              // Release the read lock acquired on the previous snapshot id 
acquired. Now that the instance
+              // is no longer needed we can release the read lock for the 
snapshot iterated in the previous snapshot.
+              // Make previousToPrevious previous for next iteration.
+              previousReadLockAcquired.close();
+              previousReadLockAcquired = previousToPreviousReadLockAcquired;
+              currentIteratedSnapshotId = previousId;
+            }
+          }
+          ssLocalData.setPreviousSnapshotId(toResolveSnapshotId);
+          Map<Integer, OmSnapshotLocalData.VersionMeta> versionMetaMap = 
ssLocalData.getVersionSstFileInfos();
+          for (Map.Entry<Integer, OmSnapshotLocalData.VersionMeta> entry : 
versionMetaMap.entrySet()) {
+            OmSnapshotLocalData.VersionMeta versionMeta = entry.getValue();
+            // Get the relative version node which corresponds to the 
toResolveSnapshotId corresponding to the
+            // versionMeta which points to a particular version in the 
previous snapshot
+            LocalDataVersionNode relativePreviousVersionNode =
+                
previousVersionNodeMap.get(versionMeta.getPreviousSnapshotVersion());
+            if (relativePreviousVersionNode == null) {
+              throw new IOException(String.format("Unable to resolve previous 
version node for snapshot: %s" +
+                  " with version : %d against previous snapshot %s previous 
version : %d",
+                  snapId, entry.getKey(), toResolveSnapshotId, 
versionMeta.getPreviousSnapshotVersion()));
+            }
+            // Set the previous snapshot version to the 
relativePreviousVersionNode which was captured.
+            
versionMeta.setPreviousSnapshotVersion(relativePreviousVersionNode.getVersion());
+          }
+        } else {
+          toResolveSnapshotId = null;
+          ssLocalData.setPreviousSnapshotId(null);
+        }
+        return new LockDataProviderInitResult(snapIdLock, ssLocalData, 
previousReadLockAcquired, toResolveSnapshotId);
+      } catch (IOException e) {
+        // Release all the locks in case of an exception and rethrow the 
exception.
+        if (previousReadLockAcquired != null) {
+          previousReadLockAcquired.close();
+        }
+        if (snapIdLock != null) {
+          snapIdLock.close();
+        }
+        throw e;
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (previousLock != null) {
+        previousLock.close();
+      }
+      if (lock != null) {
+        lock.close();
+      }
+    }
+  }
+
+  /**
+   * This class represents a writable provider for managing local data of
+   * OmSnapshot. It extends the functionality of {@code 
ReadableOmSnapshotLocalDataProvider}
+   * and provides support for write operations, such as committing changes.
+   *
+   * The writable snapshot data provider interacts with version nodes and
+   * facilitates atomic updates to snapshot properties and files.
+   *
+   * This class is designed to ensure thread-safe operations and uses locks to
+   * guarantee consistent state across concurrent activities.
+   *
+   * The default usage includes creating an instance of this provider with
+   * specific snapshot identifiers and optionally handling additional 
parameters
+   * such as data resolution or a supplier for snapshot data.
+   */
+  public final class WritableOmSnapshotLocalDataProvider extends 
ReadableOmSnapshotLocalDataProvider {
+
+    private WritableOmSnapshotLocalDataProvider(UUID snapshotId) throws 
IOException {
+      super(snapshotId, false);
+      fullLock.readLock().lock();
+    }
+
+    private WritableOmSnapshotLocalDataProvider(UUID snapshotId, UUID 
snapshotIdToBeResolved) throws IOException {
+      super(snapshotId, false, null, snapshotIdToBeResolved, true);
+      fullLock.readLock().lock();
+    }
+
+    private WritableOmSnapshotLocalDataProvider(UUID snapshotId,
+        CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException> 
snapshotLocalDataSupplier) throws IOException {
+      super(snapshotId, false, snapshotLocalDataSupplier, null, false);
+      fullLock.readLock().lock();
+    }
+
+    private SnapshotVersionsMeta validateModification(OmSnapshotLocalData 
snapshotLocalData)
+        throws IOException {
+      SnapshotVersionsMeta versionsToBeAdded = new 
SnapshotVersionsMeta(snapshotLocalData);
+      for (LocalDataVersionNode node : 
versionsToBeAdded.getSnapshotVersions().values()) {
+        validateVersionAddition(node);
+      }
+      UUID snapshotId = snapshotLocalData.getSnapshotId();
+      Map<Integer, LocalDataVersionNode> existingVersions = 
getVersionNodeMap().containsKey(snapshotId) ?
+          getVersionNodeMap().get(snapshotId).getSnapshotVersions() : 
Collections.emptyMap();
+      for (Map.Entry<Integer, LocalDataVersionNode> entry : 
existingVersions.entrySet()) {
+        if 
(!versionsToBeAdded.getSnapshotVersions().containsKey(entry.getKey())) {
+          validateVersionRemoval(snapshotId, entry.getKey());
+        }
+      }
+      return versionsToBeAdded;
+    }
+
+    private synchronized void upsertNode(UUID snapshotId, SnapshotVersionsMeta 
snapshotVersions) throws IOException {
+      SnapshotVersionsMeta existingSnapVersions = 
getVersionNodeMap().remove(snapshotId);
+      Map<Integer, LocalDataVersionNode> existingVersions = 
existingSnapVersions == null ? Collections.emptyMap() :
+          existingSnapVersions.getSnapshotVersions();
+      Map<Integer, Set<LocalDataVersionNode>> predecessors = new HashMap<>();
+      // Track all predecessors of the existing versions and remove the node 
from the graph.
+      for (Map.Entry<Integer, LocalDataVersionNode> existingVersion : 
existingVersions.entrySet()) {
+        LocalDataVersionNode existingVersionNode = existingVersion.getValue();
+        predecessors.put(existingVersion.getKey(), 
localDataGraph.predecessors(existingVersionNode));
+        localDataGraph.removeNode(existingVersionNode);
+      }
+      // Add the nodes to be added in the graph and map.
+      addSnapshotVersionMeta(snapshotId, snapshotVersions);
+      // Reconnect all the predecessors for existing nodes.
+      for (Map.Entry<Integer, LocalDataVersionNode> entry : 
snapshotVersions.getSnapshotVersions().entrySet()) {
+        for (LocalDataVersionNode predecessor : 
predecessors.getOrDefault(entry.getKey(), Collections.emptySet())) {
+          localDataGraph.putEdge(predecessor, entry.getValue());
+        }
+      }
+    }
+
+    public void addSnapshotVersion(RDBStore snapshotStore) throws IOException {
+      List<LiveFileMetaData> sstFiles = 
OmSnapshotManager.getSnapshotSSTFileList(snapshotStore);
+      OmSnapshotLocalData previousSnapshotLocalData = 
getPreviousSnapshotLocalData();
+      this.getSnapshotLocalData().addVersionSSTFileInfos(sstFiles, 
previousSnapshotLocalData == null ? 0 :
+          previousSnapshotLocalData.getVersion());
+    }
+
+    public synchronized void commit() throws IOException {
+      SnapshotVersionsMeta localDataVersionNodes = 
validateModification(super.snapshotLocalData);
+      String filePath = getSnapshotLocalPropertyYamlPath(super.snapshotId);
+      String tmpFilePath = filePath + ".tmp";
+      File tmpFile = new File(tmpFilePath);
+      boolean tmpFileExists = tmpFile.exists();
+      if (tmpFileExists) {
+        tmpFileExists = !tmpFile.delete();
+      }
+      if (tmpFileExists) {
+        throw new IOException("Unable to delete tmp file " + tmpFilePath);
+      }
+      snapshotLocalDataSerializer.save(new File(tmpFilePath), 
super.snapshotLocalData);
+      Files.move(tmpFile.toPath(), Paths.get(filePath), 
StandardCopyOption.ATOMIC_MOVE,
+          StandardCopyOption.REPLACE_EXISTING);
+      upsertNode(super.snapshotId, localDataVersionNodes);
+    }
+
+    @Override
+    public void close() throws IOException {
+      super.close();
+      fullLock.readLock().unlock();

Review Comment:
   WritableOmSnapshotLocalDataProvider acquires fullLock.readLock() while 
performing mutations (commit/upsert) to versionNodeMap/localDataGraph. This can 
race with readers and other writers. Use fullLock.writeLock() for writes and 
hold fullLock.readLock() in ReadableOmSnapshotLocalDataProvider during graph 
reads. Specifically: change readLock() to writeLock() in these constructors and 
in close(); and acquire/release fullLock.readLock() in 
ReadableOmSnapshotLocalDataProvider to protect versionNodeMap/localDataGraph 
access.
   ```suggestion
         fullLock.writeLock().unlock();
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalData.java:
##########
@@ -184,9 +184,14 @@ public void setPreviousSnapshotId(UUID previousSnapshotId) 
{
    * Adds an entry to the defragged SST file list.
    * @param sstFiles SST file name
    */
-  public void addVersionSSTFileInfos(List<SstFileInfo> sstFiles, int 
previousSnapshotVersion) {
+  public void addVersionSSTFileInfos(List<LiveFileMetaData> sstFiles, int 
previousSnapshotVersion) {
     version++;
-    this.versionSstFileInfos.put(version, new 
VersionMeta(previousSnapshotVersion, sstFiles));
+    this.versionSstFileInfos.put(version, new 
VersionMeta(previousSnapshotVersion, sstFiles.stream()
+        .map(SstFileInfo::new).collect(Collectors.toList())));
+  }

Review Comment:
   Changing addVersionSSTFileInfos to accept List<LiveFileMetaData> is a 
breaking API change for callers that pass List<SstFileInfo>. To preserve 
compatibility, consider overloading: keep the original signature and delegate 
to this method (mapping SstFileInfo to itself), while retaining the new 
LiveFileMetaData-based overload.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java:
##########
@@ -186,7 +213,8 @@ void addVersionNodeWithDependents(OmSnapshotLocalData 
snapshotLocalData) throws
       } else {
         UUID prevSnapId = snapshotVersionsMeta.getPreviousSnapshotId();
         if (prevSnapId != null && !versionNodeMap.containsKey(prevSnapId)) {
-          OmSnapshotLocalData prevSnapshotLocalData = 
getOmSnapshotLocalData(prevSnapId);
+          File previousSnapshotLocalDataFile = new 
File(getSnapshotLocalPropertyYamlPath(prevSnapId));
+          OmSnapshotLocalData prevSnapshotLocalData = 
snapshotLocalDataSerializer.load(previousSnapshotLocalDataFile);

Review Comment:
   This bypasses the existing validation that the YAML's embedded snapshotId 
matches the expected file (previously done in getOmSnapshotLocalData(UUID)). To 
keep the integrity check, either call 
getOmSnapshotLocalData(previousSnapshotLocalDataFile) (if it performs the 
validation) or add an explicit equality check and throw if mismatched.
   ```suggestion
             OmSnapshotLocalData prevSnapshotLocalData = 
snapshotLocalDataSerializer.load(previousSnapshotLocalDataFile);
             if (!prevSnapId.equals(prevSnapshotLocalData.getSnapshotId())) {
               throw new IOException("SnapshotId mismatch: expected " + 
prevSnapId +
                 " but found " + prevSnapshotLocalData.getSnapshotId() +
                 " in file " + previousSnapshotLocalDataFile.getAbsolutePath());
             }
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java:
##########
@@ -228,6 +297,362 @@ public void close() {
     }
   }
 
+  private static final class LockDataProviderInitResult {
+    private final OmSnapshotLocalData snapshotLocalData;
+    private final HierarchicalResourceLock lock;
+    private final HierarchicalResourceLock previousLock;
+    private final UUID previousSnapshotId;
+
+    private LockDataProviderInitResult(HierarchicalResourceLock lock, 
OmSnapshotLocalData snapshotLocalData,
+        HierarchicalResourceLock previousLock, UUID previousSnapshotId) {
+      this.lock = lock;
+      this.snapshotLocalData = snapshotLocalData;
+      this.previousLock = previousLock;
+      this.previousSnapshotId = previousSnapshotId;
+    }
+
+    private HierarchicalResourceLock getLock() {
+      return lock;
+    }
+
+    private HierarchicalResourceLock getPreviousLock() {
+      return previousLock;
+    }
+
+    private UUID getPreviousSnapshotId() {
+      return previousSnapshotId;
+    }
+
+    private OmSnapshotLocalData getSnapshotLocalData() {
+      return snapshotLocalData;
+    }
+  }
+
+  /**
+   * The ReadableOmSnapshotLocalDataProvider class is responsible for managing 
the
+   * access and initialization of local snapshot data in a thread-safe manner.
+   * It provides mechanisms to handle snapshot data, retrieve associated 
previous
+   * snapshot data, and manage lock synchronization for safe concurrent 
operations.
+   *
+   * This class works with snapshot identifiers and ensures that the 
appropriate
+   * local data for a given snapshot is loaded and accessible. Additionally, it
+   * maintains locking mechanisms to ensure thread-safe initialization and 
access
+   * to both the current and previous snapshot local data. The implementation 
also
+   * supports handling errors in the snapshot data initialization process.
+   *
+   * Key Functionalities:
+   * - Initializes and provides access to snapshot local data associated with a
+   *   given snapshot identifier.
+   * - Resolves and retrieves data for the previous snapshot if applicable.
+   * - Ensures safe concurrent read operations using locking mechanisms.
+   * - Validates the integrity and consistency of snapshot data during 
initialization.
+   * - Ensures that appropriate locks are released upon closing.
+   *
+   * Thread-Safety:
+   * This class utilizes locks to guarantee thread-safe operations when 
accessing
+   * or modifying snapshot data. State variables relating to snapshot data are
+   * properly synchronized to ensure consistency during concurrent operations.
+   *
+   * Usage Considerations:
+   * - Ensure proper handling of exceptions while interacting with this class,
+   *   particularly during initialization and cleanup.
+   * - Always invoke the {@code close()} method after usage to release 
acquired locks
+   *   and avoid potential deadlocks.
+   */
+  public class ReadableOmSnapshotLocalDataProvider implements AutoCloseable {
+
+    private final UUID snapshotId;
+    private final HierarchicalResourceLock lock;
+    private final HierarchicalResourceLock previousLock;
+    private final OmSnapshotLocalData snapshotLocalData;
+    private OmSnapshotLocalData previousSnapshotLocalData;
+    private volatile boolean isPreviousSnapshotLoaded = false;
+    private final UUID resolvedPreviousSnapshotId;
+
+    protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId) throws 
IOException {
+      this(snapshotId, true);
+    }
+
+    protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, UUID 
snapIdToResolve) throws IOException {
+      this(snapshotId, true, null, snapIdToResolve, true);
+    }
+
+    protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, boolean 
readLock) throws IOException {
+      this(snapshotId, readLock, null, null, false);
+    }
+
+    protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, boolean 
readLock,
+        CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException> 
snapshotLocalDataSupplier,
+        UUID snapshotIdToBeResolved, boolean isSnapshotToBeResolvedNullable) 
throws IOException {
+      this.snapshotId = snapshotId;
+      LockDataProviderInitResult result = initialize(readLock, snapshotId, 
snapshotIdToBeResolved,
+          isSnapshotToBeResolvedNullable, snapshotLocalDataSupplier);
+      this.snapshotLocalData = result.getSnapshotLocalData();
+      this.lock = result.getLock();
+      this.previousLock = result.getPreviousLock();
+      this.resolvedPreviousSnapshotId = result.getPreviousSnapshotId();
+      this.previousSnapshotLocalData = null;
+      this.isPreviousSnapshotLoaded = false;
+    }
+
+    public OmSnapshotLocalData getSnapshotLocalData() {
+      return snapshotLocalData;
+    }
+
+    public synchronized OmSnapshotLocalData getPreviousSnapshotLocalData() 
throws IOException {
+      if (!isPreviousSnapshotLoaded) {
+        if (resolvedPreviousSnapshotId != null) {
+          File previousSnapshotLocalDataFile = new 
File(getSnapshotLocalPropertyYamlPath(resolvedPreviousSnapshotId));
+          this.previousSnapshotLocalData = 
snapshotLocalDataSerializer.load(previousSnapshotLocalDataFile);
+        }
+        this.isPreviousSnapshotLoaded = true;
+      }
+      return previousSnapshotLocalData;
+    }
+
+    private HierarchicalResourceLock acquireLock(UUID snapId, boolean 
readLock) throws IOException {
+      HierarchicalResourceLock acquiredLock = readLock ? 
locks.acquireReadLock(FlatResource.SNAPSHOT_LOCAL_DATA_LOCK,
+          snapId.toString()) : 
locks.acquireWriteLock(FlatResource.SNAPSHOT_LOCAL_DATA_LOCK, 
snapId.toString());
+      if (!acquiredLock.isLockAcquired()) {
+        throw new IOException("Unable to acquire lock for snapshotId: " + 
snapId);
+      }
+      return acquiredLock;
+    }
+
+    /**
+     * Intializer the snapshot local data by acquiring the lock on the 
snapshot and also acquires a read lock on the

Review Comment:
   Corrected spelling of 'Intializer' to 'Initializes'.
   ```suggestion
        * Initializes the snapshot local data by acquiring the lock on the 
snapshot and also acquires a read lock on the
   ```



##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java:
##########
@@ -135,6 +165,376 @@ public void tearDown() throws Exception {
     }
   }
 
+  private String getReadLockMessageAcquire(UUID snapshotId) {
+    return READ_LOCK_MESSAGE_ACQUIRE + " " + 
FlatResource.SNAPSHOT_LOCAL_DATA_LOCK + " " + snapshotId;
+  }
+
+  private String getReadLockMessageRelease(UUID snapshotId) {
+    return READ_LOCK_MESSAGE_UNLOCK + " " + 
FlatResource.SNAPSHOT_LOCAL_DATA_LOCK + " " + snapshotId;
+  }
+
+  private String getWriteLockMessageAcquire(UUID snapshotId) {
+    return WRITE_LOCK_MESSAGE_ACQUIRE + " " + 
FlatResource.SNAPSHOT_LOCAL_DATA_LOCK + " " + snapshotId;
+  }
+
+  private String getWriteLockMessageRelease(UUID snapshotId) {
+    return WRITE_LOCK_MESSAGE_UNLOCK + " " + 
FlatResource.SNAPSHOT_LOCAL_DATA_LOCK + " " + snapshotId;
+  }
+
+  private HierarchicalResourceLock getHierarchicalResourceLock(FlatResource 
resource, String key, boolean isWriteLock) {
+    return new HierarchicalResourceLock() {
+      @Override
+      public boolean isLockAcquired() {
+        return true;
+      }
+
+      @Override
+      public void close() {
+        if (isWriteLock) {
+          lockCapturor.add(WRITE_LOCK_MESSAGE_UNLOCK + " " + resource + " " + 
key);
+        } else {
+          lockCapturor.add(READ_LOCK_MESSAGE_UNLOCK + " " + resource + " " + 
key);
+        }
+      }
+    };
+  }
+
+  private void mockLockManager() throws IOException {
+    lockCapturor.clear();
+    reset(lockManager);
+    when(lockManager.acquireReadLock(any(FlatResource.class), anyString()))
+        .thenAnswer(i -> {
+          lockCapturor.add(READ_LOCK_MESSAGE_ACQUIRE + " " + i.getArgument(0) 
+ " " + i.getArgument(1));
+          return getHierarchicalResourceLock(i.getArgument(0), 
i.getArgument(1), false);
+        });
+    when(lockManager.acquireWriteLock(any(FlatResource.class), anyString()))
+        .thenAnswer(i -> {
+          lockCapturor.add(WRITE_LOCK_MESSAGE_ACQUIRE + " " + i.getArgument(0) 
+ " " + i.getArgument(1));
+          return getHierarchicalResourceLock(i.getArgument(0), 
i.getArgument(1), true);
+        });
+  }
+
+  private List<UUID> createSnapshotLocalData(OmSnapshotLocalDataManager 
snapshotLocalDataManager,
+      int numberOfSnapshots) throws IOException {
+    SnapshotInfo previousSnapshotInfo = null;
+    int counter = 0;
+    Map<String, List<LiveFileMetaData>> liveFileMetaDataMap = new HashMap<>();
+    liveFileMetaDataMap.put(KEY_TABLE,
+        Lists.newArrayList(createMockLiveFileMetaData("file1.sst", KEY_TABLE, 
"key1", "key2")));
+    liveFileMetaDataMap.put(FILE_TABLE, 
Lists.newArrayList(createMockLiveFileMetaData("file2.sst", FILE_TABLE, "key1",
+        "key2")));
+    liveFileMetaDataMap.put(DIRECTORY_TABLE, 
Lists.newArrayList(createMockLiveFileMetaData("file2.sst",
+        DIRECTORY_TABLE, "key1", "key2")));
+    liveFileMetaDataMap.put("col1", 
Lists.newArrayList(createMockLiveFileMetaData("file2.sst", "col1", "key1",
+        "key2")));
+    List<UUID> snapshotIds = new ArrayList<>();
+    for (int i = 0; i < numberOfSnapshots; i++) {
+      UUID snapshotId = UUID.randomUUID();
+      SnapshotInfo snapshotInfo = createMockSnapshotInfo(snapshotId, 
previousSnapshotInfo == null ? null
+          : previousSnapshotInfo.getSnapshotId());
+      mockSnapshotStore(snapshotId, liveFileMetaDataMap.values().stream()
+          .flatMap(Collection::stream).collect(Collectors.toList()));
+      snapshotLocalDataManager.createNewOmSnapshotLocalDataFile(snapshotStore, 
snapshotInfo);
+      previousSnapshotInfo = snapshotInfo;
+      for (Map.Entry<String, List<LiveFileMetaData>> tableEntry : 
liveFileMetaDataMap.entrySet()) {
+        String table = tableEntry.getKey();
+        tableEntry.getValue().add(createMockLiveFileMetaData("file" + 
counter++ + ".sst", table, "key1", "key4"));
+      }
+      snapshotIds.add(snapshotId);
+    }
+    return snapshotIds;
+  }
+
+  private void mockSnapshotStore(UUID snapshotId, List<LiveFileMetaData> 
sstFiles) throws RocksDatabaseException {
+    // Setup snapshot store mock
+    File snapshotDbLocation = 
OmSnapshotManager.getSnapshotPath(omMetadataManager, snapshotId).toFile();
+    assertTrue(snapshotDbLocation.exists() || snapshotDbLocation.mkdirs());
+
+    when(snapshotStore.getDbLocation()).thenReturn(snapshotDbLocation);
+    RocksDatabase rocksDatabase = mock(RocksDatabase.class);
+    when(snapshotStore.getDb()).thenReturn(rocksDatabase);
+    when(rocksDatabase.getLiveFilesMetaData()).thenReturn(sstFiles);
+  }
+
+  /**
+   * Checks lock orders taken i.e. while reading a snapshot against the 
previous snapshot.
+   * Depending on read or write locks are acquired on the snapshotId and read 
lock is acquired on the previous
+   * snapshot. Once the instance is closed the read lock on previous snapshot 
is released followed by releasing the
+   * lock on the snapshotId.
+   * @param read
+   * @throws IOException
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testLockOrderingAgainstAnotherSnapshot(boolean read) throws 
IOException {
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    List<UUID> snapshotIds = new ArrayList<>();
+    snapshotIds.add(null);
+    snapshotIds.addAll(createSnapshotLocalData(localDataManager, 20));
+    for (int start = 0; start < snapshotIds.size(); start++) {
+      for (int end = start + 1; end < snapshotIds.size(); end++) {
+        UUID startSnapshotId = snapshotIds.get(start);
+        UUID endSnapshotId = snapshotIds.get(end);
+        lockCapturor.clear();
+        int logCaptorIdx = 0;
+        try (ReadableOmSnapshotLocalDataProvider omSnapshotLocalDataProvider =
+                 read ? localDataManager.getOmSnapshotLocalData(endSnapshotId, 
startSnapshotId) :
+                     
localDataManager.getWritableOmSnapshotLocalData(endSnapshotId, 
startSnapshotId)) {
+          OmSnapshotLocalData snapshotLocalData = 
omSnapshotLocalDataProvider.getSnapshotLocalData();
+          OmSnapshotLocalData previousSnapshot = 
omSnapshotLocalDataProvider.getPreviousSnapshotLocalData();
+          assertEquals(endSnapshotId, snapshotLocalData.getSnapshotId());
+          if (startSnapshotId == null) {
+            assertNull(previousSnapshot);
+            assertNull(snapshotLocalData.getPreviousSnapshotId());
+            continue;
+          }
+          assertEquals(startSnapshotId, previousSnapshot.getSnapshotId());
+          assertEquals(startSnapshotId, 
snapshotLocalData.getPreviousSnapshotId());
+          if (read) {
+            assertEquals(getReadLockMessageAcquire(endSnapshotId), 
lockCapturor.get(logCaptorIdx++));
+          } else {
+            assertEquals(getWriteLockMessageAcquire(endSnapshotId), 
lockCapturor.get(logCaptorIdx++));
+          }
+          int idx = end - 1;
+          UUID previousSnapId = snapshotIds.get(idx--);
+          assertEquals(getReadLockMessageAcquire(previousSnapId), 
lockCapturor.get(logCaptorIdx++));
+          while (idx >= start) {
+            UUID prevPrevSnapId = snapshotIds.get(idx--);
+            assertEquals(getReadLockMessageAcquire(prevPrevSnapId), 
lockCapturor.get(logCaptorIdx++));
+            assertEquals(getReadLockMessageRelease(previousSnapId), 
lockCapturor.get(logCaptorIdx++));
+            previousSnapId = prevPrevSnapId;
+          }
+        }
+        assertEquals(getReadLockMessageRelease(startSnapshotId), 
lockCapturor.get(logCaptorIdx++));
+        if (read) {
+          assertEquals(getReadLockMessageRelease(endSnapshotId), 
lockCapturor.get(logCaptorIdx++));
+        } else {
+          assertEquals(getWriteLockMessageRelease(endSnapshotId), 
lockCapturor.get(logCaptorIdx++));
+        }
+        assertEquals(lockCapturor.size(), logCaptorIdx);
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testVersionLockResolution(boolean read) throws IOException {
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 5);
+    for (int snapIdx = 0; snapIdx < snapshotIds.size(); snapIdx++) {
+      UUID snapId = snapshotIds.get(snapIdx);
+      UUID expectedPreviousSnapId = snapIdx - 1 >= 0 ? snapshotIds.get(snapIdx 
- 1) : null;
+      lockCapturor.clear();
+      int logCaptorIdx = 0;
+      try (ReadableOmSnapshotLocalDataProvider omSnapshotLocalDataProvider =
+               read ? localDataManager.getOmSnapshotLocalData(snapId) :
+                   localDataManager.getWritableOmSnapshotLocalData(snapId)) {
+        OmSnapshotLocalData snapshotLocalData = 
omSnapshotLocalDataProvider.getSnapshotLocalData();
+        OmSnapshotLocalData previousSnapshot = 
omSnapshotLocalDataProvider.getPreviousSnapshotLocalData();
+        assertEquals(snapId, snapshotLocalData.getSnapshotId());
+        assertEquals(expectedPreviousSnapId, previousSnapshot == null ? null :
+            previousSnapshot.getSnapshotId());
+        if (read) {
+          assertEquals(getReadLockMessageAcquire(snapId), 
lockCapturor.get(logCaptorIdx++));
+        } else {
+          assertEquals(getWriteLockMessageAcquire(snapId), 
lockCapturor.get(logCaptorIdx++));
+        }
+        if (expectedPreviousSnapId != null) {
+          assertEquals(getReadLockMessageAcquire(expectedPreviousSnapId), 
lockCapturor.get(logCaptorIdx++));
+        }
+      }
+      if (expectedPreviousSnapId != null) {
+        assertEquals(getReadLockMessageRelease(expectedPreviousSnapId), 
lockCapturor.get(logCaptorIdx++));
+      }
+      if (read) {
+        assertEquals(getReadLockMessageRelease(snapId), 
lockCapturor.get(logCaptorIdx++));
+      } else {
+        assertEquals(getWriteLockMessageRelease(snapId), 
lockCapturor.get(logCaptorIdx++));
+      }
+      assertEquals(lockCapturor.size(), logCaptorIdx);
+    }
+  }
+
+  @Test
+  public void 
testWriteVersionAdditionValidationWithoutPreviousSnapshotVersionExisting() 
throws IOException {
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 2);
+    UUID snapId = snapshotIds.get(1);
+    try (WritableOmSnapshotLocalDataProvider omSnapshotLocalDataProvider =
+             localDataManager.getWritableOmSnapshotLocalData(snapId)) {
+      OmSnapshotLocalData snapshotLocalData = 
omSnapshotLocalDataProvider.getSnapshotLocalData();
+      
snapshotLocalData.addVersionSSTFileInfos(Lists.newArrayList(createMockLiveFileMetaData("file1.sst",
 KEY_TABLE,
+          "key1", "key2")), 3);
+
+      IOException ex = assertThrows(IOException.class, 
omSnapshotLocalDataProvider::commit);
+      System.out.println(ex.getMessage());

Review Comment:
   Avoid printing to stdout in unit tests; it adds noise to test logs. Please 
remove the System.out.println(...) line.
   ```suggestion
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to