This is an automated email from the ASF dual-hosted git repository.
swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new b39bac05eca HDDS-13783. Implement locks for OmSnapshotLocalDataManager
(#9140)
b39bac05eca is described below
commit b39bac05eca3f78a77b7efe8cbb5487fc00d48c2
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Wed Oct 29 02:19:30 2025 -0400
HDDS-13783. Implement locks for OmSnapshotLocalDataManager (#9140)
---
.../apache/hadoop/ozone/om/lock/FlatResource.java | 4 +-
.../hadoop/ozone/om/OmSnapshotLocalData.java | 17 +-
.../hadoop/ozone/om/SnapshotDefragService.java | 8 +-
.../om/snapshot/OmSnapshotLocalDataManager.java | 573 ++++++++++++++++++++-
.../ozone/om/TestOmSnapshotLocalDataYaml.java | 8 +-
.../hadoop/ozone/om/TestOmSnapshotManager.java | 6 +-
.../snapshot/TestOmSnapshotLocalDataManager.java | 462 ++++++++++++++++-
7 files changed, 1018 insertions(+), 60 deletions(-)
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/FlatResource.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/FlatResource.java
index 73f8357252f..f4d7e72ece3 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/FlatResource.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/FlatResource.java
@@ -26,7 +26,9 @@ public enum FlatResource implements Resource {
// Background services lock on a Snapshot.
SNAPSHOT_GC_LOCK("SNAPSHOT_GC_LOCK"),
// Lock acquired on a Snapshot's RocksDB Handle.
- SNAPSHOT_DB_LOCK("SNAPSHOT_DB_LOCK");
+ SNAPSHOT_DB_LOCK("SNAPSHOT_DB_LOCK"),
+ // Lock acquired on a Snapshot's Local Data.
+ SNAPSHOT_LOCAL_DATA_LOCK("SNAPSHOT_LOCAL_DATA_LOCK");
private String name;
private IOzoneManagerLock.ResourceManager resourceManager;
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalData.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalData.java
index 83ad02fb14b..1c840a1cd2e 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalData.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalData.java
@@ -163,7 +163,7 @@ public Map<Integer, VersionMeta> getVersionSstFileInfos() {
* Sets the defragged SST file list.
* @param versionSstFileInfos Map of version to defragged SST file list
*/
- public void setVersionSstFileInfos(Map<Integer, VersionMeta>
versionSstFileInfos) {
+ void setVersionSstFileInfos(Map<Integer, VersionMeta> versionSstFileInfos) {
this.versionSstFileInfos.clear();
this.versionSstFileInfos.putAll(versionSstFileInfos);
}
@@ -184,9 +184,14 @@ public void setPreviousSnapshotId(UUID previousSnapshotId)
{
* Adds an entry to the defragged SST file list.
* @param sstFiles SST file name
*/
- public void addVersionSSTFileInfos(List<SstFileInfo> sstFiles, int
previousSnapshotVersion) {
+ public void addVersionSSTFileInfos(List<LiveFileMetaData> sstFiles, int
previousSnapshotVersion) {
version++;
- this.versionSstFileInfos.put(version, new
VersionMeta(previousSnapshotVersion, sstFiles));
+ this.versionSstFileInfos.put(version, new
VersionMeta(previousSnapshotVersion, sstFiles.stream()
+ .map(SstFileInfo::new).collect(Collectors.toList())));
+ }
+
+ public void removeVersionSSTFileInfos(int snapshotVersion) {
+ this.versionSstFileInfos.remove(snapshotVersion);
}
/**
@@ -274,7 +279,7 @@ public OmSnapshotLocalData copyObject() {
* maintain immutability.
*/
public static class VersionMeta implements CopyObject<VersionMeta> {
- private final int previousSnapshotVersion;
+ private int previousSnapshotVersion;
private final List<SstFileInfo> sstFiles;
public VersionMeta(int previousSnapshotVersion, List<SstFileInfo>
sstFiles) {
@@ -286,6 +291,10 @@ public int getPreviousSnapshotVersion() {
return previousSnapshotVersion;
}
+ public void setPreviousSnapshotVersion(int previousSnapshotVersion) {
+ this.previousSnapshotVersion = previousSnapshotVersion;
+ }
+
public List<SstFileInfo> getSstFiles() {
return sstFiles;
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
index 9747bb7c894..87f6ff55bb7 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
import org.apache.hadoop.ozone.om.snapshot.MultiSnapshotLocks;
+import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -130,11 +131,10 @@ private boolean needsDefragmentation(SnapshotInfo
snapshotInfo) {
String snapshotPath = OmSnapshotManager.getSnapshotPath(
ozoneManager.getConfiguration(), snapshotInfo);
- try {
+ try (OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider
readableOmSnapshotLocalDataProvider =
+
ozoneManager.getOmSnapshotManager().getSnapshotLocalDataManager().getOmSnapshotLocalData(snapshotInfo))
{
// Read snapshot local metadata from YAML
- OmSnapshotLocalData snapshotLocalData =
ozoneManager.getOmSnapshotManager()
- .getSnapshotLocalDataManager()
- .getOmSnapshotLocalData(snapshotInfo);
+ OmSnapshotLocalData snapshotLocalData =
readableOmSnapshotLocalDataProvider.getSnapshotLocalData();
// Check if snapshot needs compaction (defragmentation)
boolean needsDefrag = snapshotLocalData.getNeedsDefrag();
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java
index 3c529abaf3c..3e92eb6748c 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java
@@ -27,15 +27,23 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.Stack;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -44,8 +52,15 @@
import org.apache.hadoop.ozone.om.OmSnapshotLocalDataYaml;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.FlatResource;
+import org.apache.hadoop.ozone.om.lock.HierarchicalResourceLockManager;
+import
org.apache.hadoop.ozone.om.lock.HierarchicalResourceLockManager.HierarchicalResourceLock;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
import org.apache.hadoop.ozone.util.ObjectSerializer;
import org.apache.hadoop.ozone.util.YamlSerializer;
+import org.apache.ratis.util.function.CheckedSupplier;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
+import org.rocksdb.LiveFileMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
@@ -59,9 +74,32 @@ public class OmSnapshotLocalDataManager implements
AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(OmSnapshotLocalDataManager.class);
private final ObjectSerializer<OmSnapshotLocalData>
snapshotLocalDataSerializer;
+ // In-memory DAG of snapshot-version dependencies. Each node represents a
+ // specific (snapshotId, version) pair, and a directed edge points to the
+ // corresponding (previousSnapshotId, previousSnapshotVersion) it depends on.
+ // The durable state is stored in each snapshot's YAML (previousSnapshotId
and
+ // VersionMeta.previousSnapshotVersion). This graph mirrors that persisted
+ // structure to validate adds/removes and to resolve versions across chains.
+ // This graph is maintained only in memory and is not persisted to disk.
+ // Example (linear chain, arrows point to previous):
+ // (S0, v1) <- (S1, v4) <- (S2, v5) <- (S3, v7)
+ // where each node is (snapshotId, version) and each arrow points to its
+ // corresponding (previousSnapshotId, previousSnapshotVersion) dependency.
+ //
+ // Example (multiple versions for a single snapshotId S2):
+ // (S1, v4) <- (S2, v6) <- (S3, v8)
+ // (S1, v3) <- (S2, v5)
+ // Here S2 has two distinct versions (v6 and v5), each represented as its own
+ // node, and each version can depend on a different previousSnapshotVersion
on S1.
private final MutableGraph<LocalDataVersionNode> localDataGraph;
private final Map<UUID, SnapshotVersionsMeta> versionNodeMap;
private final OMMetadataManager omMetadataManager;
+ // Used for acquiring locks on the entire data structure.
+ private final ReadWriteLock fullLock;
+ // Used for taking a lock on internal data structure Map and Graph to ensure
thread safety;
+ private final ReadWriteLock internalLock;
+ // Locks should be always acquired by iterating through the snapshot chain
to avoid deadlocks.
+ private HierarchicalResourceLockManager locks;
public OmSnapshotLocalDataManager(OMMetadataManager omMetadataManager)
throws IOException {
this.localDataGraph = GraphBuilder.directed().build();
@@ -74,7 +112,9 @@ public void computeAndSetChecksum(Yaml yaml,
OmSnapshotLocalData data) throws IO
data.computeAndSetChecksum(yaml);
}
};
- this.versionNodeMap = new HashMap<>();
+ this.versionNodeMap = new ConcurrentHashMap<>();
+ this.fullLock = new ReentrantReadWriteLock();
+ this.internalLock = new ReentrantReadWriteLock();
init();
}
@@ -114,26 +154,41 @@ public String getSnapshotLocalPropertyYamlPath(UUID
snapshotId) {
* @param snapshotInfo snapshot info instance corresponding to snapshot.
*/
public void createNewOmSnapshotLocalDataFile(RDBStore snapshotStore,
SnapshotInfo snapshotInfo) throws IOException {
- Path snapshotLocalDataPath = Paths.get(
-
getSnapshotLocalPropertyYamlPath(snapshotStore.getDbLocation().toPath()));
- Files.deleteIfExists(snapshotLocalDataPath);
- OmSnapshotLocalData snapshotLocalDataYaml = new
OmSnapshotLocalData(snapshotInfo.getSnapshotId(),
- OmSnapshotManager.getSnapshotSSTFileList(snapshotStore),
snapshotInfo.getPathPreviousSnapshotId());
- snapshotLocalDataSerializer.save(snapshotLocalDataPath.toFile(),
snapshotLocalDataYaml);
+ try (WritableOmSnapshotLocalDataProvider snapshotLocalData =
+ new WritableOmSnapshotLocalDataProvider(snapshotInfo.getSnapshotId(),
+ () -> Pair.of(new OmSnapshotLocalData(snapshotInfo.getSnapshotId(),
+ OmSnapshotManager.getSnapshotSSTFileList(snapshotStore),
snapshotInfo.getPathPreviousSnapshotId()),
+ null))) {
+ snapshotLocalData.commit();
+ }
}
- public OmSnapshotLocalData getOmSnapshotLocalData(SnapshotInfo snapshotInfo)
throws IOException {
+ public ReadableOmSnapshotLocalDataProvider
getOmSnapshotLocalData(SnapshotInfo snapshotInfo) throws IOException {
return getOmSnapshotLocalData(snapshotInfo.getSnapshotId());
}
- public OmSnapshotLocalData getOmSnapshotLocalData(UUID snapshotId) throws
IOException {
- Path snapshotLocalDataPath =
Paths.get(getSnapshotLocalPropertyYamlPath(snapshotId));
- OmSnapshotLocalData snapshotLocalData =
snapshotLocalDataSerializer.load(snapshotLocalDataPath.toFile());
- if (!Objects.equals(snapshotLocalData.getSnapshotId(), snapshotId)) {
- throw new IOException("SnapshotId in path : " + snapshotLocalDataPath +
" contains snapshotLocalData " +
- "corresponding to snapshotId " + snapshotLocalData.getSnapshotId() +
". Expected snapshotId " + snapshotId);
- }
- return snapshotLocalData;
+ public ReadableOmSnapshotLocalDataProvider getOmSnapshotLocalData(UUID
snapshotId) throws IOException {
+ return new ReadableOmSnapshotLocalDataProvider(snapshotId);
+ }
+
+ public ReadableOmSnapshotLocalDataProvider getOmSnapshotLocalData(UUID
snapshotId, UUID previousSnapshotID)
+ throws IOException {
+ return new ReadableOmSnapshotLocalDataProvider(snapshotId,
previousSnapshotID);
+ }
+
+ public WritableOmSnapshotLocalDataProvider
getWritableOmSnapshotLocalData(SnapshotInfo snapshotInfo)
+ throws IOException {
+ return getWritableOmSnapshotLocalData(snapshotInfo.getSnapshotId(),
snapshotInfo.getPathPreviousSnapshotId());
+ }
+
+ public WritableOmSnapshotLocalDataProvider
getWritableOmSnapshotLocalData(UUID snapshotId, UUID previousSnapshotId)
+ throws IOException {
+ return new WritableOmSnapshotLocalDataProvider(snapshotId,
previousSnapshotId);
+ }
+
+ public WritableOmSnapshotLocalDataProvider
getWritableOmSnapshotLocalData(UUID snapshotId)
+ throws IOException {
+ return new WritableOmSnapshotLocalDataProvider(snapshotId);
}
public OmSnapshotLocalData getOmSnapshotLocalData(File snapshotDataPath)
throws IOException {
@@ -141,7 +196,7 @@ public OmSnapshotLocalData getOmSnapshotLocalData(File
snapshotDataPath) throws
}
private LocalDataVersionNode getVersionNode(UUID snapshotId, int version) {
- if (!versionNodeMap.containsKey(snapshotId)) {
+ if (snapshotId == null || !versionNodeMap.containsKey(snapshotId)) {
return null;
}
return versionNodeMap.get(snapshotId).getVersionNode(version);
@@ -151,15 +206,9 @@ private void addSnapshotVersionMeta(UUID snapshotId,
SnapshotVersionsMeta snapsh
throws IOException {
if (!versionNodeMap.containsKey(snapshotId)) {
for (LocalDataVersionNode versionNode :
snapshotVersionsMeta.getSnapshotVersions().values()) {
- if (getVersionNode(versionNode.snapshotId, versionNode.version) !=
null) {
- throw new IOException("Unable to add " + versionNode + " since it
already exists");
- }
- LocalDataVersionNode previousVersionNode =
versionNode.previousSnapshotId == null ? null :
+ validateVersionAddition(versionNode);
+ LocalDataVersionNode previousVersionNode =
getVersionNode(versionNode.previousSnapshotId,
versionNode.previousSnapshotVersion);
- if (versionNode.previousSnapshotId != null && previousVersionNode ==
null) {
- throw new IOException("Unable to add " + versionNode + " since
previous snapshot with version hasn't been " +
- "loaded");
- }
localDataGraph.addNode(versionNode);
if (previousVersionNode != null) {
localDataGraph.putEdge(versionNode, previousVersionNode);
@@ -186,7 +235,13 @@ void addVersionNodeWithDependents(OmSnapshotLocalData
snapshotLocalData) throws
} else {
UUID prevSnapId = snapshotVersionsMeta.getPreviousSnapshotId();
if (prevSnapId != null && !versionNodeMap.containsKey(prevSnapId)) {
- OmSnapshotLocalData prevSnapshotLocalData =
getOmSnapshotLocalData(prevSnapId);
+ File previousSnapshotLocalDataFile = new
File(getSnapshotLocalPropertyYamlPath(prevSnapId));
+ OmSnapshotLocalData prevSnapshotLocalData =
snapshotLocalDataSerializer.load(previousSnapshotLocalDataFile);
+ if (!prevSnapId.equals(prevSnapshotLocalData.getSnapshotId())) {
+ throw new IOException("SnapshotId mismatch: expected " +
prevSnapId +
+ " but found " + prevSnapshotLocalData.getSnapshotId() +
+ " in file " + previousSnapshotLocalDataFile.getAbsolutePath());
+ }
stack.push(Pair.of(prevSnapshotLocalData.getSnapshotId(), new
SnapshotVersionsMeta(prevSnapshotLocalData)));
}
visitedSnapshotIds.add(snapId);
@@ -195,6 +250,7 @@ void addVersionNodeWithDependents(OmSnapshotLocalData
snapshotLocalData) throws
}
private void init() throws IOException {
+ this.locks = omMetadataManager.getHierarchicalLockManager();
RDBStore store = (RDBStore) omMetadataManager.getStore();
String checkpointPrefix = store.getDbLocation().getName();
File snapshotDir = new File(store.getSnapshotsParentDir());
@@ -217,6 +273,46 @@ private void init() throws IOException {
}
}
+ /**
+ * Acquires a write lock and provides an auto-closeable supplier for
specifying details
+ * of the lock acquisition. The lock is released when the returned supplier
is closed.
+ *
+ * @return an instance of {@code
UncheckedAutoCloseableSupplier<OMLockDetails>} representing
+ * the acquired lock details, where the lock will automatically be
released on close.
+ */
+ public UncheckedAutoCloseableSupplier<OMLockDetails> lock() {
+ this.fullLock.writeLock().lock();
+ return new UncheckedAutoCloseableSupplier<OMLockDetails>() {
+ @Override
+ public OMLockDetails get() {
+ return OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED;
+ }
+
+ @Override
+ public void close() {
+ fullLock.writeLock().unlock();
+ }
+ };
+ }
+
+ private void validateVersionRemoval(UUID snapshotId, int version) throws
IOException {
+ LocalDataVersionNode versionNode = getVersionNode(snapshotId, version);
+ if (versionNode != null && localDataGraph.inDegree(versionNode) != 0) {
+ Set<LocalDataVersionNode> versionNodes =
localDataGraph.predecessors(versionNode);
+ throw new IOException(String.format("Cannot remove Snapshot %s with
version : %d since it still has " +
+ "predecessors : %s", snapshotId, version, versionNodes));
+ }
+ }
+
+ private void validateVersionAddition(LocalDataVersionNode versionNode)
throws IOException {
+ LocalDataVersionNode previousVersionNode =
getVersionNode(versionNode.previousSnapshotId,
+ versionNode.previousSnapshotVersion);
+ if (versionNode.previousSnapshotId != null && previousVersionNode == null)
{
+ throw new IOException("Unable to add " + versionNode + " since previous
snapshot with version hasn't been " +
+ "loaded");
+ }
+ }
+
@Override
public void close() {
if (snapshotLocalDataSerializer != null) {
@@ -228,6 +324,413 @@ public void close() {
}
}
+ private static final class LockDataProviderInitResult {
+ private final OmSnapshotLocalData snapshotLocalData;
+ private final HierarchicalResourceLock lock;
+ private final HierarchicalResourceLock previousLock;
+ private final UUID previousSnapshotId;
+
+ private LockDataProviderInitResult(HierarchicalResourceLock lock,
OmSnapshotLocalData snapshotLocalData,
+ HierarchicalResourceLock previousLock, UUID previousSnapshotId) {
+ this.lock = lock;
+ this.snapshotLocalData = snapshotLocalData;
+ this.previousLock = previousLock;
+ this.previousSnapshotId = previousSnapshotId;
+ }
+
+ private HierarchicalResourceLock getLock() {
+ return lock;
+ }
+
+ private HierarchicalResourceLock getPreviousLock() {
+ return previousLock;
+ }
+
+ private UUID getPreviousSnapshotId() {
+ return previousSnapshotId;
+ }
+
+ private OmSnapshotLocalData getSnapshotLocalData() {
+ return snapshotLocalData;
+ }
+ }
+
+ /**
+ * The ReadableOmSnapshotLocalDataProvider class is responsible for managing
the
+ * access and initialization of local snapshot data in a thread-safe manner.
+ * It provides mechanisms to handle snapshot data, retrieve associated
previous
+ * snapshot data, and manage lock synchronization for safe concurrent
operations.
+ *
+ * This class works with snapshot identifiers and ensures that the
appropriate
+ * local data for a given snapshot is loaded and accessible. Additionally, it
+ * maintains locking mechanisms to ensure thread-safe initialization and
access
+ * to both the current and previous snapshot local data. The implementation
also
+ * supports handling errors in the snapshot data initialization process.
+ *
+ * Key Functionalities:
+ * - Initializes and provides access to snapshot local data associated with a
+ * given snapshot identifier.
+ * - Resolves and retrieves data for the previous snapshot if applicable.
+ * - Ensures safe concurrent read operations using locking mechanisms.
+ * - Validates the integrity and consistency of snapshot data during
initialization.
+ * - Ensures that appropriate locks are released upon closing.
+ *
+ * Thread-Safety:
+ * This class utilizes locks to guarantee thread-safe operations when
accessing
+ * or modifying snapshot data. State variables relating to snapshot data are
+ * properly synchronized to ensure consistency during concurrent operations.
+ *
+ * Usage Considerations:
+ * - Ensure proper handling of exceptions while interacting with this class,
+ * particularly during initialization and cleanup.
+ * - Always invoke the {@code close()} method after usage to release
acquired locks
+ * and avoid potential deadlocks.
+ */
+ public class ReadableOmSnapshotLocalDataProvider implements AutoCloseable {
+
+ private final UUID snapshotId;
+ private final HierarchicalResourceLock lock;
+ private final HierarchicalResourceLock previousLock;
+ private final OmSnapshotLocalData snapshotLocalData;
+ private OmSnapshotLocalData previousSnapshotLocalData;
+ private volatile boolean isPreviousSnapshotLoaded = false;
+ private final UUID resolvedPreviousSnapshotId;
+
+ protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId) throws
IOException {
+ this(snapshotId, true);
+ }
+
+ protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, UUID
snapIdToResolve) throws IOException {
+ this(snapshotId, true, null, snapIdToResolve, true);
+ }
+
+ protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, boolean
readLock) throws IOException {
+ this(snapshotId, readLock, null, null, false);
+ }
+
+ protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, boolean
readLock,
+ CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException>
snapshotLocalDataSupplier,
+ UUID snapshotIdToBeResolved, boolean isSnapshotToBeResolvedNullable)
throws IOException {
+ this.snapshotId = snapshotId;
+ LockDataProviderInitResult result = initialize(readLock, snapshotId,
snapshotIdToBeResolved,
+ isSnapshotToBeResolvedNullable, snapshotLocalDataSupplier);
+ this.snapshotLocalData = result.getSnapshotLocalData();
+ this.lock = result.getLock();
+ this.previousLock = result.getPreviousLock();
+ this.resolvedPreviousSnapshotId = result.getPreviousSnapshotId();
+ this.previousSnapshotLocalData = null;
+ this.isPreviousSnapshotLoaded = false;
+ }
+
+ public OmSnapshotLocalData getSnapshotLocalData() {
+ return snapshotLocalData;
+ }
+
+ public synchronized OmSnapshotLocalData getPreviousSnapshotLocalData()
throws IOException {
+ if (!isPreviousSnapshotLoaded) {
+ if (resolvedPreviousSnapshotId != null) {
+ File previousSnapshotLocalDataFile = new
File(getSnapshotLocalPropertyYamlPath(resolvedPreviousSnapshotId));
+ this.previousSnapshotLocalData =
snapshotLocalDataSerializer.load(previousSnapshotLocalDataFile);
+ }
+ this.isPreviousSnapshotLoaded = true;
+ }
+ return previousSnapshotLocalData;
+ }
+
+ private HierarchicalResourceLock acquireLock(UUID snapId, boolean
readLock) throws IOException {
+ HierarchicalResourceLock acquiredLock = readLock ?
locks.acquireReadLock(FlatResource.SNAPSHOT_LOCAL_DATA_LOCK,
+ snapId.toString()) :
locks.acquireWriteLock(FlatResource.SNAPSHOT_LOCAL_DATA_LOCK,
snapId.toString());
+ if (!acquiredLock.isLockAcquired()) {
+ throw new IOException("Unable to acquire lock for snapshotId: " +
snapId);
+ }
+ return acquiredLock;
+ }
+
+ /**
+ * Intializes the snapshot local data by acquiring the lock on the
snapshot and also acquires a read lock on the
+ * snapshotId to be resolved by iterating through the chain of previous
snapshot ids.
+ */
+ private LockDataProviderInitResult initialize(
+ boolean readLock, UUID snapId, UUID toResolveSnapshotId, boolean
isSnapshotToBeResolvedNullable,
+ CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException>
snapshotLocalDataSupplier) throws IOException {
+ HierarchicalResourceLock snapIdLock = null;
+ HierarchicalResourceLock previousReadLockAcquired = null;
+ try {
+ snapIdLock = acquireLock(snapId, readLock);
+ snapshotLocalDataSupplier = snapshotLocalDataSupplier == null ? () -> {
+ File snapshotLocalDataFile = new
File(getSnapshotLocalPropertyYamlPath(snapId));
+ return
Pair.of(snapshotLocalDataSerializer.load(snapshotLocalDataFile),
snapshotLocalDataFile);
+ } : snapshotLocalDataSupplier;
+ Pair<OmSnapshotLocalData, File> pair = snapshotLocalDataSupplier.get();
+ OmSnapshotLocalData ssLocalData = pair.getKey();
+ if (!Objects.equals(ssLocalData.getSnapshotId(), snapId)) {
+ String loadPath = pair.getValue() == null ? null :
pair.getValue().getAbsolutePath();
+ throw new IOException("SnapshotId in path : " + loadPath + "
contains snapshotLocalData corresponding " +
+ "to snapshotId " + ssLocalData.getSnapshotId() + ". Expected
snapshotId " + snapId);
+ }
+ // Get previous snapshotId and acquire read lock on the id. We need to
do this outside the loop instead of a
+ // do while loop since the nodes that need be added may not be present
in the graph so it may not be possible
+ // to iterate through the chain.
+ UUID previousSnapshotId = ssLocalData.getPreviousSnapshotId();
+ // if flag toResolveSnapshotIdIsNull is true or toResolveSnapshotId is
not null, then we resolve snapshot
+ // with previous snapshot id as null, which would mean if the snapshot
local data is committed the snapshot
+ // local data would become first snapshot in the chain with no
previous snapshot id.
+ toResolveSnapshotId = (isSnapshotToBeResolvedNullable ||
toResolveSnapshotId != null) ? toResolveSnapshotId :
+ ssLocalData.getPreviousSnapshotId();
+ if (toResolveSnapshotId != null && previousSnapshotId != null) {
+ previousReadLockAcquired = acquireLock(previousSnapshotId, true);
+ if (!versionNodeMap.containsKey(previousSnapshotId)) {
+ throw new IOException(String.format("Operating on snapshot id : %s
with previousSnapshotId: %s invalid " +
+ "since previousSnapshotId is not loaded.", snapId,
previousSnapshotId));
+ }
+ // Create a copy of the previous versionMap to get the previous
versions corresponding to the previous
+ // snapshot. This map would mutated to resolve the previous
snapshot's version corresponding to the
+ // toResolveSnapshotId by iterating through the chain of previous
snapshot ids.
+ Map<Integer, LocalDataVersionNode> previousVersionNodeMap =
+ new
HashMap<>(versionNodeMap.get(previousSnapshotId).getSnapshotVersions());
+ UUID currentIteratedSnapshotId = previousSnapshotId;
+ // Iterate through the chain of previous snapshot ids until the
snapshot id to be resolved is found.
+ while (!Objects.equals(currentIteratedSnapshotId,
toResolveSnapshotId)) {
+ // All versions for the snapshot should point to the same previous
snapshot id. Otherwise this is a sign
+ // of corruption.
+ Set<UUID> previousIds =
+
previousVersionNodeMap.values().stream().map(LocalDataVersionNode::getPreviousSnapshotId)
+ .collect(Collectors.toSet());
+ if (previousIds.size() > 1) {
+ throw new IOException(String.format("Snapshot %s versions has
multiple previous snapshotIds %s",
+ currentIteratedSnapshotId, previousIds));
+ }
+ if (previousIds.isEmpty()) {
+ throw new IOException(String.format("Snapshot %s versions
doesn't have previous Id thus snapshot " +
+ "%s cannot be resolved against id %s",
+ currentIteratedSnapshotId, snapId, toResolveSnapshotId));
+ }
+ UUID previousId = previousIds.iterator().next();
+ HierarchicalResourceLock previousToPreviousReadLockAcquired =
acquireLock(previousId, true);
+ try {
+ // Get the version node for the snapshot and update the version
node to the successor to point to the
+ // previous node.
+ for (Map.Entry<Integer, LocalDataVersionNode> entry :
previousVersionNodeMap.entrySet()) {
+ internalLock.readLock().lock();
+ try {
+ Set<LocalDataVersionNode> versionNode =
localDataGraph.successors(entry.getValue());
+ if (versionNode.size() > 1) {
+ throw new IOException(String.format("Snapshot %s version
%d has multiple successors %s",
+ currentIteratedSnapshotId,
entry.getValue().getVersion(), versionNode));
+ }
+ if (versionNode.isEmpty()) {
+ throw new IOException(String.format("Snapshot %s version
%d doesn't have successor",
+ currentIteratedSnapshotId,
entry.getValue().getVersion()));
+ }
+ // Set the version node for iterated version to the
successor corresponding to the previous snapshot
+ // id.
+ entry.setValue(versionNode.iterator().next());
+ } finally {
+ internalLock.readLock().unlock();
+ }
+ }
+ } finally {
+ // Release the read lock acquired on the previous snapshot id
acquired. Now that the instance
+ // is no longer needed we can release the read lock for the
snapshot iterated in the previous snapshot.
+ // Make previousToPrevious previous for next iteration.
+ previousReadLockAcquired.close();
+ previousReadLockAcquired = previousToPreviousReadLockAcquired;
+ currentIteratedSnapshotId = previousId;
+ }
+ }
+ ssLocalData.setPreviousSnapshotId(toResolveSnapshotId);
+ Map<Integer, OmSnapshotLocalData.VersionMeta> versionMetaMap =
ssLocalData.getVersionSstFileInfos();
+ for (Map.Entry<Integer, OmSnapshotLocalData.VersionMeta> entry :
versionMetaMap.entrySet()) {
+ OmSnapshotLocalData.VersionMeta versionMeta = entry.getValue();
+ // Get the relative version node which corresponds to the
toResolveSnapshotId corresponding to the
+ // versionMeta which points to a particular version in the
previous snapshot
+ LocalDataVersionNode relativePreviousVersionNode =
+
previousVersionNodeMap.get(versionMeta.getPreviousSnapshotVersion());
+ if (relativePreviousVersionNode == null) {
+ throw new IOException(String.format("Unable to resolve previous
version node for snapshot: %s" +
+ " with version : %d against previous snapshot %s previous
version : %d",
+ snapId, entry.getKey(), toResolveSnapshotId,
versionMeta.getPreviousSnapshotVersion()));
+ }
+ // Set the previous snapshot version to the
relativePreviousVersionNode which was captured.
+
versionMeta.setPreviousSnapshotVersion(relativePreviousVersionNode.getVersion());
+ }
+ } else {
+ toResolveSnapshotId = null;
+ ssLocalData.setPreviousSnapshotId(null);
+ }
+ return new LockDataProviderInitResult(snapIdLock, ssLocalData,
previousReadLockAcquired, toResolveSnapshotId);
+ } catch (IOException e) {
+ // Release all the locks in case of an exception and rethrow the
exception.
+ if (previousReadLockAcquired != null) {
+ previousReadLockAcquired.close();
+ }
+ if (snapIdLock != null) {
+ snapIdLock.close();
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (previousLock != null) {
+ previousLock.close();
+ }
+ if (lock != null) {
+ lock.close();
+ }
+ }
+ }
+
+ /**
+ * This class represents a writable provider for managing local data of
+ * OmSnapshot. It extends the functionality of {@code
ReadableOmSnapshotLocalDataProvider}
+ * and provides support for write operations, such as committing changes.
+ *
+ * The writable snapshot data provider interacts with version nodes and
+ * facilitates atomic updates to snapshot properties and files.
+ *
+ * This class is designed to ensure thread-safe operations and uses locks to
+ * guarantee consistent state across concurrent activities.
+ *
+ * The default usage includes creating an instance of this provider with
+ * specific snapshot identifiers and optionally handling additional
parameters
+ * such as data resolution or a supplier for snapshot data.
+ */
+ public final class WritableOmSnapshotLocalDataProvider extends
ReadableOmSnapshotLocalDataProvider {
+
+ private boolean dirty;
+
+ private WritableOmSnapshotLocalDataProvider(UUID snapshotId) throws
IOException {
+ super(snapshotId, false);
+ fullLock.readLock().lock();
+ }
+
+ private WritableOmSnapshotLocalDataProvider(UUID snapshotId, UUID
snapshotIdToBeResolved) throws IOException {
+ super(snapshotId, false, null, snapshotIdToBeResolved, true);
+ fullLock.readLock().lock();
+ }
+
+ private WritableOmSnapshotLocalDataProvider(UUID snapshotId,
+ CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException>
snapshotLocalDataSupplier) throws IOException {
+ super(snapshotId, false, snapshotLocalDataSupplier, null, false);
+ fullLock.readLock().lock();
+ }
+
+ private SnapshotVersionsMeta validateModification(OmSnapshotLocalData
snapshotLocalData)
+ throws IOException {
+ internalLock.readLock().lock();
+ try {
+ SnapshotVersionsMeta versionsToBeAdded = new
SnapshotVersionsMeta(snapshotLocalData);
+ SnapshotVersionsMeta existingVersionsMeta =
getVersionNodeMap().get(snapshotLocalData.getSnapshotId());
+ for (LocalDataVersionNode node :
versionsToBeAdded.getSnapshotVersions().values()) {
+ validateVersionAddition(node);
+ }
+ UUID snapshotId = snapshotLocalData.getSnapshotId();
+ Map<Integer, LocalDataVersionNode> existingVersions =
getVersionNodeMap().containsKey(snapshotId) ?
+ getVersionNodeMap().get(snapshotId).getSnapshotVersions() :
Collections.emptyMap();
+ for (Map.Entry<Integer, LocalDataVersionNode> entry :
existingVersions.entrySet()) {
+ if
(!versionsToBeAdded.getSnapshotVersions().containsKey(entry.getKey())) {
+ validateVersionRemoval(snapshotId, entry.getKey());
+ }
+ }
+ // Set Dirty if the snapshot doesn't exist or previousSnapshotId has
changed.
+ if (existingVersionsMeta == null ||
!Objects.equals(versionsToBeAdded.getPreviousSnapshotId(),
+ existingVersionsMeta.getPreviousSnapshotId())) {
+ setDirty();
+ }
+ return versionsToBeAdded;
+ } finally {
+ internalLock.readLock().unlock();
+ }
+ }
+
+ public void addSnapshotVersion(RDBStore snapshotStore) throws IOException {
+ List<LiveFileMetaData> sstFiles =
OmSnapshotManager.getSnapshotSSTFileList(snapshotStore);
+ OmSnapshotLocalData previousSnapshotLocalData =
getPreviousSnapshotLocalData();
+ this.getSnapshotLocalData().addVersionSSTFileInfos(sstFiles,
previousSnapshotLocalData == null ? 0 :
+ previousSnapshotLocalData.getVersion());
+ // Set Dirty if a version is added.
+ setDirty();
+ }
+
+ public void removeVersion(int version) {
+ this.getSnapshotLocalData().removeVersionSSTFileInfos(version);
+ // Set Dirty if a version is removed.
+ setDirty();
+ }
+
+ public synchronized void commit() throws IOException {
+ // Validate modification and commit the changes.
+ SnapshotVersionsMeta localDataVersionNodes =
validateModification(super.snapshotLocalData);
+ // Need to update the disk state if and only if the dirty bit is set.
+ if (isDirty()) {
+ String filePath = getSnapshotLocalPropertyYamlPath(super.snapshotId);
+ String tmpFilePath = filePath + ".tmp";
+ File tmpFile = new File(tmpFilePath);
+ boolean tmpFileExists = tmpFile.exists();
+ if (tmpFileExists) {
+ tmpFileExists = !tmpFile.delete();
+ }
+ if (tmpFileExists) {
+ throw new IOException("Unable to delete tmp file " + tmpFilePath);
+ }
+ snapshotLocalDataSerializer.save(new File(tmpFilePath),
super.snapshotLocalData);
+ Files.move(tmpFile.toPath(), Paths.get(filePath),
StandardCopyOption.ATOMIC_MOVE,
+ StandardCopyOption.REPLACE_EXISTING);
+ upsertNode(super.snapshotId, localDataVersionNodes);
+ // Reset dirty bit
+ resetDirty();
+ }
+ }
+
+ private void upsertNode(UUID snapshotId, SnapshotVersionsMeta
snapshotVersions) throws IOException {
+ internalLock.writeLock().lock();
+ try {
+ SnapshotVersionsMeta existingSnapVersions =
getVersionNodeMap().remove(snapshotId);
+ Map<Integer, LocalDataVersionNode> existingVersions =
existingSnapVersions == null ? Collections.emptyMap() :
+ existingSnapVersions.getSnapshotVersions();
+ Map<Integer, List<LocalDataVersionNode>> predecessors = new
HashMap<>();
+ // Track all predecessors of the existing versions and remove the node
from the graph.
+ for (Map.Entry<Integer, LocalDataVersionNode> existingVersion :
existingVersions.entrySet()) {
+ LocalDataVersionNode existingVersionNode =
existingVersion.getValue();
+ // Create a copy of predecessors since the list of nodes returned
would be a mutable set and it changes as the
+ // nodes in the graph would change.
+ predecessors.put(existingVersion.getKey(), new
ArrayList<>(localDataGraph.predecessors(existingVersionNode)));
+ localDataGraph.removeNode(existingVersionNode);
+ }
+ // Add the nodes to be added in the graph and map.
+ addSnapshotVersionMeta(snapshotId, snapshotVersions);
+ // Reconnect all the predecessors for existing nodes.
+ for (Map.Entry<Integer, LocalDataVersionNode> entry :
snapshotVersions.getSnapshotVersions().entrySet()) {
+ for (LocalDataVersionNode predecessor :
predecessors.getOrDefault(entry.getKey(), Collections.emptyList())) {
+ localDataGraph.putEdge(predecessor, entry.getValue());
+ }
+ }
+ } finally {
+ internalLock.writeLock().unlock();
+ }
+ }
+
+ private void setDirty() {
+ dirty = true;
+ }
+
+ private void resetDirty() {
+ dirty = false;
+ }
+
+ private boolean isDirty() {
+ return dirty;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ fullLock.readLock().unlock();
+ }
+ }
+
static final class LocalDataVersionNode {
private final UUID snapshotId;
private final int version;
@@ -241,6 +744,14 @@ private LocalDataVersionNode(UUID snapshotId, int version,
UUID previousSnapshot
this.version = version;
}
+ private UUID getPreviousSnapshotId() {
+ return previousSnapshotId;
+ }
+
+ private int getVersion() {
+ return version;
+ }
+
@Override
public boolean equals(Object o) {
if (!(o instanceof LocalDataVersionNode)) {
@@ -255,6 +766,16 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(snapshotId, version, previousSnapshotId,
previousSnapshotVersion);
}
+
+ @Override
+ public String toString() {
+ return "LocalDataVersionNode{" +
+ "snapshotId=" + snapshotId +
+ ", version=" + version +
+ ", previousSnapshotId=" + previousSnapshotId +
+ ", previousSnapshotVersion=" + previousSnapshotVersion +
+ '}';
+ }
}
static final class SnapshotVersionsMeta {
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotLocalDataYaml.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotLocalDataYaml.java
index 23d332ae75b..b234014ebbc 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotLocalDataYaml.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotLocalDataYaml.java
@@ -130,11 +130,11 @@ private Pair<File, UUID> writeToYaml(UUID snapshotId,
String snapshotName) throw
// Add some defragged SST files
dataYaml.addVersionSSTFileInfos(ImmutableList.of(
- new SstFileInfo("defragged-sst1", "k1", "k2", "table1"),
- new SstFileInfo("defragged-sst2", "k3", "k4", "table2")),
+ createLiveFileMetaData("defragged-sst1", "table1", "k1", "k2"),
+ createLiveFileMetaData("defragged-sst2", "table2", "k3", "k4")),
1);
dataYaml.addVersionSSTFileInfos(Collections.singletonList(
- new SstFileInfo("defragged-sst3", "k4", "k5", "table1")), 3);
+ createLiveFileMetaData("defragged-sst3", "table1", "k4", "k5")), 3);
File yamlFile = new File(testRoot, yamlFilePath);
@@ -202,7 +202,7 @@ public void testUpdateSnapshotDataFile() throws IOException
{
dataYaml.setSstFiltered(false);
dataYaml.setNeedsDefrag(false);
dataYaml.addVersionSSTFileInfos(
- singletonList(new SstFileInfo("defragged-sst4", "k5", "k6",
"table3")), 5);
+ singletonList(createLiveFileMetaData("defragged-sst4", "table3", "k5",
"k6")), 5);
// Write updated data back to file
omSnapshotLocalDataSerializer.save(yamlFile, dataYaml);
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
index 7f808df3f97..6ec49935b35 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
@@ -774,6 +774,7 @@ public void testCreateSnapshotIdempotent() throws Exception
{
when(bucketTable.get(dbBucketKey)).thenReturn(omBucketInfo);
SnapshotInfo first = createSnapshotInfo(volumeName, bucketName);
+ first.setPathPreviousSnapshotId(null);
when(snapshotInfoTable.get(first.getTableKey())).thenReturn(first);
// Create first checkpoint for the snapshot checkpoint
@@ -797,10 +798,13 @@ public void testCreateSnapshotIdempotent() throws
Exception {
private SnapshotInfo createSnapshotInfo(String volumeName,
String bucketName) {
- return SnapshotInfo.newInstance(volumeName,
+ SnapshotInfo snapshotInfo = SnapshotInfo.newInstance(volumeName,
bucketName,
UUID.randomUUID().toString(),
UUID.randomUUID(),
Time.now());
+ snapshotInfo.setPathPreviousSnapshotId(null);
+ snapshotInfo.setGlobalPreviousSnapshotId(null);
+ return snapshotInfo;
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java
index 34bde4814a6..947c1a4b7f4 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java
@@ -24,41 +24,63 @@
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.when;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import org.apache.commons.compress.utils.Sets;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.hdds.utils.db.RocksDatabase;
+import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmSnapshotLocalData;
import org.apache.hadoop.ozone.om.OmSnapshotLocalDataYaml;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.FlatResource;
+import org.apache.hadoop.ozone.om.lock.HierarchicalResourceLockManager;
+import
org.apache.hadoop.ozone.om.lock.HierarchicalResourceLockManager.HierarchicalResourceLock;
+import
org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider;
+import
org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager.WritableOmSnapshotLocalDataProvider;
import org.apache.hadoop.ozone.util.YamlSerializer;
import org.apache.ozone.compaction.log.SstFileInfo;
+import org.assertj.core.util.Lists;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.rocksdb.LiveFileMetaData;
@@ -67,13 +89,18 @@
/**
* Test class for OmSnapshotLocalDataManager.
*/
+@Timeout(value = 30, unit = TimeUnit.SECONDS)
public class TestOmSnapshotLocalDataManager {
private static YamlSerializer<OmSnapshotLocalData>
snapshotLocalDataYamlSerializer;
+ private static List<String> lockCapturor;
@Mock
private OMMetadataManager omMetadataManager;
+ @Mock
+ private HierarchicalResourceLockManager lockManager;
+
@Mock
private RDBStore rdbStore;
@@ -88,6 +115,11 @@ public class TestOmSnapshotLocalDataManager {
private File snapshotsDir;
+ private static final String READ_LOCK_MESSAGE_ACQUIRE = "readLock acquire";
+ private static final String READ_LOCK_MESSAGE_UNLOCK = "readLock unlock";
+ private static final String WRITE_LOCK_MESSAGE_ACQUIRE = "writeLock acquire";
+ private static final String WRITE_LOCK_MESSAGE_UNLOCK = "writeLock unlock";
+
@BeforeAll
public static void setupClass() {
snapshotLocalDataYamlSerializer = new YamlSerializer<OmSnapshotLocalData>(
@@ -98,10 +130,11 @@ public void computeAndSetChecksum(Yaml yaml,
OmSnapshotLocalData data) throws IO
data.computeAndSetChecksum(yaml);
}
};
+ lockCapturor = new ArrayList<>();
}
@AfterAll
- public static void teardownClass() throws IOException {
+ public static void teardownClass() {
snapshotLocalDataYamlSerializer.close();
snapshotLocalDataYamlSerializer = null;
}
@@ -112,15 +145,15 @@ public void setUp() throws IOException {
// Setup mock behavior
when(omMetadataManager.getStore()).thenReturn(rdbStore);
-
+
when(omMetadataManager.getHierarchicalLockManager()).thenReturn(lockManager);
this.snapshotsDir = tempDir.resolve("snapshots").toFile();
FileUtils.deleteDirectory(snapshotsDir);
assertTrue(snapshotsDir.exists() || snapshotsDir.mkdirs());
File dbLocation = tempDir.resolve("db").toFile();
FileUtils.deleteDirectory(dbLocation);
assertTrue(dbLocation.exists() || dbLocation.mkdirs());
+ mockLockManager();
-
when(rdbStore.getSnapshotsParentDir()).thenReturn(snapshotsDir.getAbsolutePath());
when(rdbStore.getDbLocation()).thenReturn(dbLocation);
}
@@ -135,6 +168,392 @@ public void tearDown() throws Exception {
}
}
+ private String getReadLockMessageAcquire(UUID snapshotId) {
+ return READ_LOCK_MESSAGE_ACQUIRE + " " +
FlatResource.SNAPSHOT_LOCAL_DATA_LOCK + " " + snapshotId;
+ }
+
+ private String getReadLockMessageRelease(UUID snapshotId) {
+ return READ_LOCK_MESSAGE_UNLOCK + " " +
FlatResource.SNAPSHOT_LOCAL_DATA_LOCK + " " + snapshotId;
+ }
+
+ private String getWriteLockMessageAcquire(UUID snapshotId) {
+ return WRITE_LOCK_MESSAGE_ACQUIRE + " " +
FlatResource.SNAPSHOT_LOCAL_DATA_LOCK + " " + snapshotId;
+ }
+
+ private String getWriteLockMessageRelease(UUID snapshotId) {
+ return WRITE_LOCK_MESSAGE_UNLOCK + " " +
FlatResource.SNAPSHOT_LOCAL_DATA_LOCK + " " + snapshotId;
+ }
+
+ private HierarchicalResourceLock getHierarchicalResourceLock(FlatResource
resource, String key, boolean isWriteLock) {
+ return new HierarchicalResourceLock() {
+ @Override
+ public boolean isLockAcquired() {
+ return true;
+ }
+
+ @Override
+ public void close() {
+ if (isWriteLock) {
+ lockCapturor.add(WRITE_LOCK_MESSAGE_UNLOCK + " " + resource + " " +
key);
+ } else {
+ lockCapturor.add(READ_LOCK_MESSAGE_UNLOCK + " " + resource + " " +
key);
+ }
+ }
+ };
+ }
+
+ private void mockLockManager() throws IOException {
+ lockCapturor.clear();
+ reset(lockManager);
+ when(lockManager.acquireReadLock(any(FlatResource.class), anyString()))
+ .thenAnswer(i -> {
+ lockCapturor.add(READ_LOCK_MESSAGE_ACQUIRE + " " + i.getArgument(0)
+ " " + i.getArgument(1));
+ return getHierarchicalResourceLock(i.getArgument(0),
i.getArgument(1), false);
+ });
+ when(lockManager.acquireWriteLock(any(FlatResource.class), anyString()))
+ .thenAnswer(i -> {
+ lockCapturor.add(WRITE_LOCK_MESSAGE_ACQUIRE + " " + i.getArgument(0)
+ " " + i.getArgument(1));
+ return getHierarchicalResourceLock(i.getArgument(0),
i.getArgument(1), true);
+ });
+ }
+
+ private List<UUID> createSnapshotLocalData(OmSnapshotLocalDataManager
snapshotLocalDataManager,
+ int numberOfSnapshots) throws IOException {
+ SnapshotInfo previousSnapshotInfo = null;
+ int counter = 0;
+ Map<String, List<LiveFileMetaData>> liveFileMetaDataMap = new HashMap<>();
+ liveFileMetaDataMap.put(KEY_TABLE,
+ Lists.newArrayList(createMockLiveFileMetaData("file1.sst", KEY_TABLE,
"key1", "key2")));
+ liveFileMetaDataMap.put(FILE_TABLE,
Lists.newArrayList(createMockLiveFileMetaData("file2.sst", FILE_TABLE, "key1",
+ "key2")));
+ liveFileMetaDataMap.put(DIRECTORY_TABLE,
Lists.newArrayList(createMockLiveFileMetaData("file2.sst",
+ DIRECTORY_TABLE, "key1", "key2")));
+ liveFileMetaDataMap.put("col1",
Lists.newArrayList(createMockLiveFileMetaData("file2.sst", "col1", "key1",
+ "key2")));
+ List<UUID> snapshotIds = new ArrayList<>();
+ for (int i = 0; i < numberOfSnapshots; i++) {
+ UUID snapshotId = UUID.randomUUID();
+ SnapshotInfo snapshotInfo = createMockSnapshotInfo(snapshotId,
previousSnapshotInfo == null ? null
+ : previousSnapshotInfo.getSnapshotId());
+ mockSnapshotStore(snapshotId, liveFileMetaDataMap.values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList()));
+ snapshotLocalDataManager.createNewOmSnapshotLocalDataFile(snapshotStore,
snapshotInfo);
+ previousSnapshotInfo = snapshotInfo;
+ for (Map.Entry<String, List<LiveFileMetaData>> tableEntry :
liveFileMetaDataMap.entrySet()) {
+ String table = tableEntry.getKey();
+ tableEntry.getValue().add(createMockLiveFileMetaData("file" +
counter++ + ".sst", table, "key1", "key4"));
+ }
+ snapshotIds.add(snapshotId);
+ }
+ return snapshotIds;
+ }
+
+ private void mockSnapshotStore(UUID snapshotId, List<LiveFileMetaData>
sstFiles) throws RocksDatabaseException {
+ // Setup snapshot store mock
+ File snapshotDbLocation =
OmSnapshotManager.getSnapshotPath(omMetadataManager, snapshotId).toFile();
+ assertTrue(snapshotDbLocation.exists() || snapshotDbLocation.mkdirs());
+
+ when(snapshotStore.getDbLocation()).thenReturn(snapshotDbLocation);
+ RocksDatabase rocksDatabase = mock(RocksDatabase.class);
+ when(snapshotStore.getDb()).thenReturn(rocksDatabase);
+ when(rocksDatabase.getLiveFilesMetaData()).thenReturn(sstFiles);
+ }
+
+ /**
+ * Checks lock orders taken i.e. while reading a snapshot against the
previous snapshot.
+ * Depending on read or write locks are acquired on the snapshotId and read
lock is acquired on the previous
+ * snapshot. Once the instance is closed the read lock on previous snapshot
is released followed by releasing the
+ * lock on the snapshotId.
+ * @param read
+ * @throws IOException
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testLockOrderingAgainstAnotherSnapshot(boolean read) throws
IOException {
+ localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+ List<UUID> snapshotIds = new ArrayList<>();
+ snapshotIds.add(null);
+ snapshotIds.addAll(createSnapshotLocalData(localDataManager, 20));
+ for (int start = 0; start < snapshotIds.size(); start++) {
+ for (int end = start + 1; end < snapshotIds.size(); end++) {
+ UUID startSnapshotId = snapshotIds.get(start);
+ UUID endSnapshotId = snapshotIds.get(end);
+ lockCapturor.clear();
+ int logCaptorIdx = 0;
+ try (ReadableOmSnapshotLocalDataProvider omSnapshotLocalDataProvider =
+ read ? localDataManager.getOmSnapshotLocalData(endSnapshotId,
startSnapshotId) :
+
localDataManager.getWritableOmSnapshotLocalData(endSnapshotId,
startSnapshotId)) {
+ OmSnapshotLocalData snapshotLocalData =
omSnapshotLocalDataProvider.getSnapshotLocalData();
+ OmSnapshotLocalData previousSnapshot =
omSnapshotLocalDataProvider.getPreviousSnapshotLocalData();
+ assertEquals(endSnapshotId, snapshotLocalData.getSnapshotId());
+ if (startSnapshotId == null) {
+ assertNull(previousSnapshot);
+ assertNull(snapshotLocalData.getPreviousSnapshotId());
+ continue;
+ }
+ assertEquals(startSnapshotId, previousSnapshot.getSnapshotId());
+ assertEquals(startSnapshotId,
snapshotLocalData.getPreviousSnapshotId());
+ if (read) {
+ assertEquals(getReadLockMessageAcquire(endSnapshotId),
lockCapturor.get(logCaptorIdx++));
+ } else {
+ assertEquals(getWriteLockMessageAcquire(endSnapshotId),
lockCapturor.get(logCaptorIdx++));
+ }
+ int idx = end - 1;
+ UUID previousSnapId = snapshotIds.get(idx--);
+ assertEquals(getReadLockMessageAcquire(previousSnapId),
lockCapturor.get(logCaptorIdx++));
+ while (idx >= start) {
+ UUID prevPrevSnapId = snapshotIds.get(idx--);
+ assertEquals(getReadLockMessageAcquire(prevPrevSnapId),
lockCapturor.get(logCaptorIdx++));
+ assertEquals(getReadLockMessageRelease(previousSnapId),
lockCapturor.get(logCaptorIdx++));
+ previousSnapId = prevPrevSnapId;
+ }
+ }
+ assertEquals(getReadLockMessageRelease(startSnapshotId),
lockCapturor.get(logCaptorIdx++));
+ if (read) {
+ assertEquals(getReadLockMessageRelease(endSnapshotId),
lockCapturor.get(logCaptorIdx++));
+ } else {
+ assertEquals(getWriteLockMessageRelease(endSnapshotId),
lockCapturor.get(logCaptorIdx++));
+ }
+ assertEquals(lockCapturor.size(), logCaptorIdx);
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testVersionLockResolution(boolean read) throws IOException {
+ localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+ List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 5);
+ for (int snapIdx = 0; snapIdx < snapshotIds.size(); snapIdx++) {
+ UUID snapId = snapshotIds.get(snapIdx);
+ UUID expectedPreviousSnapId = snapIdx - 1 >= 0 ? snapshotIds.get(snapIdx
- 1) : null;
+ lockCapturor.clear();
+ int logCaptorIdx = 0;
+ try (ReadableOmSnapshotLocalDataProvider omSnapshotLocalDataProvider =
+ read ? localDataManager.getOmSnapshotLocalData(snapId) :
+ localDataManager.getWritableOmSnapshotLocalData(snapId)) {
+ OmSnapshotLocalData snapshotLocalData =
omSnapshotLocalDataProvider.getSnapshotLocalData();
+ OmSnapshotLocalData previousSnapshot =
omSnapshotLocalDataProvider.getPreviousSnapshotLocalData();
+ assertEquals(snapId, snapshotLocalData.getSnapshotId());
+ assertEquals(expectedPreviousSnapId, previousSnapshot == null ? null :
+ previousSnapshot.getSnapshotId());
+ if (read) {
+ assertEquals(getReadLockMessageAcquire(snapId),
lockCapturor.get(logCaptorIdx++));
+ } else {
+ assertEquals(getWriteLockMessageAcquire(snapId),
lockCapturor.get(logCaptorIdx++));
+ }
+ if (expectedPreviousSnapId != null) {
+ assertEquals(getReadLockMessageAcquire(expectedPreviousSnapId),
lockCapturor.get(logCaptorIdx++));
+ }
+ }
+ if (expectedPreviousSnapId != null) {
+ assertEquals(getReadLockMessageRelease(expectedPreviousSnapId),
lockCapturor.get(logCaptorIdx++));
+ }
+ if (read) {
+ assertEquals(getReadLockMessageRelease(snapId),
lockCapturor.get(logCaptorIdx++));
+ } else {
+ assertEquals(getWriteLockMessageRelease(snapId),
lockCapturor.get(logCaptorIdx++));
+ }
+ assertEquals(lockCapturor.size(), logCaptorIdx);
+ }
+ }
+
+ @Test
+ public void
testWriteVersionAdditionValidationWithoutPreviousSnapshotVersionExisting()
throws IOException {
+ localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+ List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 2);
+ UUID snapId = snapshotIds.get(1);
+ try (WritableOmSnapshotLocalDataProvider omSnapshotLocalDataProvider =
+ localDataManager.getWritableOmSnapshotLocalData(snapId)) {
+ OmSnapshotLocalData snapshotLocalData =
omSnapshotLocalDataProvider.getSnapshotLocalData();
+
snapshotLocalData.addVersionSSTFileInfos(Lists.newArrayList(createMockLiveFileMetaData("file1.sst",
KEY_TABLE,
+ "key1", "key2")), 3);
+
+ IOException ex = assertThrows(IOException.class,
omSnapshotLocalDataProvider::commit);
+ assertTrue(ex.getMessage().contains("since previous snapshot with
version hasn't been loaded"));
+ }
+ }
+
+ @Test
+ public void testAddVersionFromRDB() throws IOException {
+ localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+ List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 2);
+ addVersionsToLocalData(localDataManager, snapshotIds.get(0),
ImmutableMap.of(4, 5, 6, 8));
+ UUID snapId = snapshotIds.get(1);
+ List<LiveFileMetaData> newVersionSstFiles =
+ Lists.newArrayList(createMockLiveFileMetaData("file5.sst", KEY_TABLE,
"key1", "key2"),
+ createMockLiveFileMetaData("file6.sst", FILE_TABLE, "key1", "key2"),
+ createMockLiveFileMetaData("file7.sst", KEY_TABLE, "key1", "key2"),
+ createMockLiveFileMetaData("file1.sst", "col1", "key1", "key2"));
+ try (WritableOmSnapshotLocalDataProvider snap =
+ localDataManager.getWritableOmSnapshotLocalData(snapId)) {
+ mockSnapshotStore(snapId, newVersionSstFiles);
+ snap.addSnapshotVersion(snapshotStore);
+ snap.commit();
+ }
+ validateVersions(localDataManager, snapId, 1, Sets.newHashSet(0, 1));
+ try (ReadableOmSnapshotLocalDataProvider snap =
localDataManager.getOmSnapshotLocalData(snapId)) {
+ OmSnapshotLocalData snapshotLocalData = snap.getSnapshotLocalData();
+ OmSnapshotLocalData.VersionMeta versionMeta =
snapshotLocalData.getVersionSstFileInfos().get(1);
+ assertEquals(6, versionMeta.getPreviousSnapshotVersion());
+ List<SstFileInfo> expectedLiveFileMetaData =
+ newVersionSstFiles.subList(0,
3).stream().map(SstFileInfo::new).collect(Collectors.toList());
+ assertEquals(expectedLiveFileMetaData, versionMeta.getSstFiles());
+ }
+ }
+
+ private void validateVersions(OmSnapshotLocalDataManager
snapshotLocalDataManager, UUID snapId, int expectedVersion,
+ Set<Integer> expectedVersions) throws IOException {
+ try (ReadableOmSnapshotLocalDataProvider snap =
snapshotLocalDataManager.getOmSnapshotLocalData(snapId)) {
+ assertEquals(expectedVersion, snap.getSnapshotLocalData().getVersion());
+ assertEquals(expectedVersions,
snap.getSnapshotLocalData().getVersionSstFileInfos().keySet());
+ }
+ }
+
+ /**
+ * Validates write-time version propagation and removal rules when the
previous
+ * snapshot already has a concrete version recorded.
+ *
+ * Test flow:
+ * 1) Create two snapshots in a chain: {@code prevSnapId -> snapId}.
+ * 2) For {@code prevSnapId}: set {@code version=3} and add SST metadata for
version {@code 0}; commit.
+ * 3) For {@code snapId}: set {@code version=4} and add SST metadata for
version {@code 4}; commit.
+ * After commit, versions resolve to {@code prev.version=4} and {@code
snap.version=5}, and their
+ * version maps are {@code {0,4}} and {@code {0,5}} respectively (base
version 0 plus the current one).
+ * 4) If {@code nextVersionExisting} is {@code true}:
+ * - Attempt to remove version {@code 4} from {@code prevSnapId}; expect
{@link IOException} because
+ * the successor snapshot still exists at version {@code 5} and depends
on {@code prevSnapId}.
+ * - Validate that versions and version maps remain unchanged.
+ * Else ({@code false}):
+ * - Remove version {@code 5} from {@code snapId} and commit, then remove
version {@code 4} from
+ * {@code prevSnapId} and commit.
+ * - Validate that both snapshots now only contain the base version
{@code 0}.
+ *
+ * This ensures a snapshot cannot drop a version that still has a dependent
successor, and that removals
+ * are allowed only after dependents are cleared.
+ *
+ * @param nextVersionExisting whether the successor snapshot's version still
exists ({@code true}) or is
+ * removed first ({@code false})
+ * @throws IOException if commit validation fails as expected in the
protected case
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testWriteVersionValidation(boolean nextVersionExisting) throws
IOException {
+ localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+ List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 3);
+ UUID prevSnapId = snapshotIds.get(0);
+ UUID snapId = snapshotIds.get(1);
+ UUID nextSnapId = snapshotIds.get(2);
+ addVersionsToLocalData(localDataManager, prevSnapId, ImmutableMap.of(4,
1));
+ addVersionsToLocalData(localDataManager, snapId, ImmutableMap.of(5, 4));
+ addVersionsToLocalData(localDataManager, nextSnapId, ImmutableMap.of(6,
0));
+
+ validateVersions(localDataManager, snapId, 5, Sets.newHashSet(0, 5));
+ validateVersions(localDataManager, prevSnapId, 4, Sets.newHashSet(0, 4));
+
+ if (nextVersionExisting) {
+ try (WritableOmSnapshotLocalDataProvider prevSnap =
localDataManager.getWritableOmSnapshotLocalData(prevSnapId)) {
+ prevSnap.removeVersion(4);
+ IOException ex = assertThrows(IOException.class, prevSnap::commit);
+ assertTrue(ex.getMessage().contains("Cannot remove Snapshot " +
prevSnapId + " with version : 4 since it " +
+ "still has predecessors"));
+ }
+ validateVersions(localDataManager, snapId, 5, Sets.newHashSet(0, 5));
+ validateVersions(localDataManager, prevSnapId, 4, Sets.newHashSet(0, 4));
+ } else {
+ try (WritableOmSnapshotLocalDataProvider snap =
localDataManager.getWritableOmSnapshotLocalData(snapId)) {
+ snap.removeVersion(5);
+ snap.commit();
+ }
+
+ try (WritableOmSnapshotLocalDataProvider prevSnap =
localDataManager.getWritableOmSnapshotLocalData(prevSnapId)) {
+ prevSnap.removeVersion(4);
+ prevSnap.commit();
+ }
+ validateVersions(localDataManager, snapId, 5, Sets.newHashSet(0));
+ validateVersions(localDataManager, prevSnapId, 4, Sets.newHashSet(0));
+ // Check next snapshot is able to resolve to previous snapshot.
+ try (ReadableOmSnapshotLocalDataProvider nextSnap =
localDataManager.getOmSnapshotLocalData(nextSnapId,
+ prevSnapId)) {
+ OmSnapshotLocalData snapshotLocalData =
nextSnap.getSnapshotLocalData();
+ assertEquals(prevSnapId, snapshotLocalData.getPreviousSnapshotId());
+ snapshotLocalData.getVersionSstFileInfos()
+ .forEach((version, versionMeta) -> {
+ assertEquals(0, versionMeta.getPreviousSnapshotVersion());
+ });
+ }
+ }
+ }
+
+ private void addVersionsToLocalData(OmSnapshotLocalDataManager
snapshotLocalDataManager,
+ UUID snapId, Map<Integer, Integer> versionMap) throws IOException {
+ try (WritableOmSnapshotLocalDataProvider snap =
snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapId)) {
+ OmSnapshotLocalData snapshotLocalData = snap.getSnapshotLocalData();
+ for (Map.Entry<Integer, Integer> version : versionMap.entrySet().stream()
+ .sorted(Map.Entry.comparingByKey()).collect(Collectors.toList())) {
+ snapshotLocalData.setVersion(version.getKey() - 1);
+
snapshotLocalData.addVersionSSTFileInfos(ImmutableList.of(createMockLiveFileMetaData("file"
+ version +
+ ".sst", KEY_TABLE, "key1", "key2")), version.getValue());
+ }
+ mockSnapshotStore(snapId,
ImmutableList.of(createMockLiveFileMetaData("file"
+ + snapshotLocalData.getVersion() + 1 + ".sst", KEY_TABLE, "key1",
"key2")));
+ snap.addSnapshotVersion(snapshotStore);
+ snap.removeVersion(snapshotLocalData.getVersion());
+ snapshotLocalData.setVersion(snapshotLocalData.getVersion() - 1);
+ snap.commit();
+ }
+ try (ReadableOmSnapshotLocalDataProvider snap =
snapshotLocalDataManager.getOmSnapshotLocalData(snapId)) {
+ OmSnapshotLocalData snapshotLocalData = snap.getSnapshotLocalData();
+ for (int version : versionMap.keySet()) {
+
assertTrue(snapshotLocalData.getVersionSstFileInfos().containsKey(version));
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testVersionResolution(boolean read) throws IOException {
+ localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+ List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 5);
+ List<Map<Integer, Integer>> versionMaps = Arrays.asList(
+ ImmutableMap.of(4, 1, 6, 3, 8, 9, 11, 15),
+ ImmutableMap.of(5, 4, 6, 8, 10, 11),
+ ImmutableMap.of(1, 5, 3, 5, 8, 10),
+ ImmutableMap.of(1, 1, 2, 3, 5, 8),
+ ImmutableMap.of(1, 1, 11, 2, 20, 5, 30, 2)
+ );
+ for (int i = 0; i < snapshotIds.size(); i++) {
+ addVersionsToLocalData(localDataManager, snapshotIds.get(i),
versionMaps.get(i));
+ }
+ for (int start = 0; start < snapshotIds.size(); start++) {
+ for (int end = start + 1; end < snapshotIds.size(); end++) {
+ UUID prevSnapId = snapshotIds.get(start);
+ UUID snapId = snapshotIds.get(end);
+ Map<Integer, Integer> versionMap = new HashMap<>(versionMaps.get(end));
+ versionMap.put(0, 0);
+ for (int idx = end - 1; idx > start; idx--) {
+ for (Map.Entry<Integer, Integer> version : versionMap.entrySet()) {
+
version.setValue(versionMaps.get(idx).getOrDefault(version.getValue(), 0));
+ }
+ }
+ try (ReadableOmSnapshotLocalDataProvider snap = read ?
+ localDataManager.getOmSnapshotLocalData(snapId, prevSnapId) :
+ localDataManager.getWritableOmSnapshotLocalData(snapId,
prevSnapId)) {
+ OmSnapshotLocalData snapshotLocalData = snap.getSnapshotLocalData();
+ OmSnapshotLocalData prevSnapshotLocalData =
snap.getPreviousSnapshotLocalData();
+ assertEquals(prevSnapshotLocalData.getSnapshotId(),
snapshotLocalData.getPreviousSnapshotId());
+ assertEquals(prevSnapId, snapshotLocalData.getPreviousSnapshotId());
+ assertEquals(snapId, snapshotLocalData.getSnapshotId());
+ assertTrue(snapshotLocalData.getVersionSstFileInfos().size() > 1);
+ snapshotLocalData.getVersionSstFileInfos()
+ .forEach((version, versionMeta) -> {
+ assertEquals(versionMap.get(version),
versionMeta.getPreviousSnapshotVersion());
+ });
+ }
+ }
+ }
+ }
+
@Test
public void testConstructor() throws IOException {
localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
@@ -147,7 +566,7 @@ public void
testGetSnapshotLocalPropertyYamlPathWithSnapshotInfo() throws IOExce
SnapshotInfo snapshotInfo = createMockSnapshotInfo(snapshotId, null);
localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
-
+
File yamlPath = new
File(localDataManager.getSnapshotLocalPropertyYamlPath(snapshotInfo));
assertNotNull(yamlPath);
Path expectedYamlPath = Paths.get(snapshotsDir.getAbsolutePath(), "db" +
OM_SNAPSHOT_SEPARATOR + snapshotId
@@ -178,15 +597,18 @@ public void testCreateNewOmSnapshotLocalDataFile() throws
IOException {
when(snapshotStore.getDb()).thenReturn(rocksDatabase);
when(rocksDatabase.getLiveFilesMetaData()).thenReturn(sstFiles);
localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
-
+
localDataManager.createNewOmSnapshotLocalDataFile(snapshotStore,
snapshotInfo);
// Verify file was created
- OmSnapshotLocalData snapshotLocalData =
localDataManager.getOmSnapshotLocalData(snapshotId);
- assertEquals(1, snapshotLocalData.getVersionSstFileInfos().size());
- OmSnapshotLocalData.VersionMeta versionMeta =
snapshotLocalData.getVersionSstFileInfos().get(0);
- OmSnapshotLocalData.VersionMeta expectedVersionMeta = new
OmSnapshotLocalData.VersionMeta(0, sstFileInfos);
- assertEquals(expectedVersionMeta, versionMeta);
+ OmSnapshotLocalData.VersionMeta versionMeta;
+ try (ReadableOmSnapshotLocalDataProvider snapshotLocalData =
localDataManager.getOmSnapshotLocalData(snapshotId)) {
+ assertEquals(1,
snapshotLocalData.getSnapshotLocalData().getVersionSstFileInfos().size());
+ versionMeta =
snapshotLocalData.getSnapshotLocalData().getVersionSstFileInfos().get(0);
+ OmSnapshotLocalData.VersionMeta expectedVersionMeta =
+ new OmSnapshotLocalData.VersionMeta(0, sstFileInfos);
+ assertEquals(expectedVersionMeta, versionMeta);
+ }
}
@Test
@@ -198,16 +620,16 @@ public void testGetOmSnapshotLocalDataWithSnapshotInfo()
throws IOException {
OmSnapshotLocalData localData = createMockLocalData(snapshotId, null);
localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
-
+
// Write the file manually for testing
Path yamlPath =
Paths.get(localDataManager.getSnapshotLocalPropertyYamlPath(snapshotInfo.getSnapshotId()));
writeLocalDataToFile(localData, yamlPath);
// Test retrieval
- OmSnapshotLocalData retrieved =
localDataManager.getOmSnapshotLocalData(snapshotInfo);
-
- assertNotNull(retrieved);
- assertEquals(snapshotId, retrieved.getSnapshotId());
+ try (ReadableOmSnapshotLocalDataProvider retrieved =
localDataManager.getOmSnapshotLocalData(snapshotInfo)) {
+ assertNotNull(retrieved.getSnapshotLocalData());
+ assertEquals(snapshotId,
retrieved.getSnapshotLocalData().getSnapshotId());
+ }
}
@Test
@@ -219,7 +641,7 @@ public void
testGetOmSnapshotLocalDataWithMismatchedSnapshotId() throws IOExcept
OmSnapshotLocalData localData = createMockLocalData(wrongSnapshotId, null);
localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
-
+
Path yamlPath =
Paths.get(localDataManager.getSnapshotLocalPropertyYamlPath(snapshotId));
writeLocalDataToFile(localData, yamlPath);
// Should throw IOException due to mismatched IDs
@@ -235,7 +657,7 @@ public void testGetOmSnapshotLocalDataWithFile() throws
IOException {
OmSnapshotLocalData localData = createMockLocalData(snapshotId, null);
localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
-
+
Path yamlPath = tempDir.resolve("test-snapshot.yaml");
writeLocalDataToFile(localData, yamlPath);
@@ -269,7 +691,7 @@ public void testAddVersionNodeWithDependentsAlreadyExists()
throws IOException {
createSnapshotLocalDataFile(snapshotId, null);
localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
-
+
OmSnapshotLocalData localData = createMockLocalData(snapshotId, null);
// First addition
@@ -291,7 +713,7 @@ public void testInitWithExistingYamlFiles() throws
IOException {
// Initialize - should load existing files
localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
-
+
assertNotNull(localDataManager);
Map<UUID, OmSnapshotLocalDataManager.SnapshotVersionsMeta> versionMap =
localDataManager.getVersionNodeMap();
@@ -317,7 +739,7 @@ public void testInitWithInvalidPathThrowsException() throws
IOException {
@Test
public void testClose() throws IOException {
localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
-
+
// Should not throw exception
localDataManager.close();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]