Copilot commented on code in PR #9140:
URL: https://github.com/apache/ozone/pull/9140#discussion_r2440388023
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java:
##########
@@ -228,6 +297,362 @@ public void close() {
}
}
+ private static final class LockDataProviderInitResult {
+ private final OmSnapshotLocalData snapshotLocalData;
+ private final HierarchicalResourceLock lock;
+ private final HierarchicalResourceLock previousLock;
+ private final UUID previousSnapshotId;
+
+ private LockDataProviderInitResult(HierarchicalResourceLock lock,
OmSnapshotLocalData snapshotLocalData,
+ HierarchicalResourceLock previousLock, UUID previousSnapshotId) {
+ this.lock = lock;
+ this.snapshotLocalData = snapshotLocalData;
+ this.previousLock = previousLock;
+ this.previousSnapshotId = previousSnapshotId;
+ }
+
+ private HierarchicalResourceLock getLock() {
+ return lock;
+ }
+
+ private HierarchicalResourceLock getPreviousLock() {
+ return previousLock;
+ }
+
+ private UUID getPreviousSnapshotId() {
+ return previousSnapshotId;
+ }
+
+ private OmSnapshotLocalData getSnapshotLocalData() {
+ return snapshotLocalData;
+ }
+ }
+
+ /**
+ * The ReadableOmSnapshotLocalDataProvider class is responsible for managing
the
+ * access and initialization of local snapshot data in a thread-safe manner.
+ * It provides mechanisms to handle snapshot data, retrieve associated
previous
+ * snapshot data, and manage lock synchronization for safe concurrent
operations.
+ *
+ * This class works with snapshot identifiers and ensures that the
appropriate
+ * local data for a given snapshot is loaded and accessible. Additionally, it
+ * maintains locking mechanisms to ensure thread-safe initialization and
access
+ * to both the current and previous snapshot local data. The implementation
also
+ * supports handling errors in the snapshot data initialization process.
+ *
+ * Key Functionalities:
+ * - Initializes and provides access to snapshot local data associated with a
+ * given snapshot identifier.
+ * - Resolves and retrieves data for the previous snapshot if applicable.
+ * - Ensures safe concurrent read operations using locking mechanisms.
+ * - Validates the integrity and consistency of snapshot data during
initialization.
+ * - Ensures that appropriate locks are released upon closing.
+ *
+ * Thread-Safety:
+ * This class utilizes locks to guarantee thread-safe operations when
accessing
+ * or modifying snapshot data. State variables relating to snapshot data are
+ * properly synchronized to ensure consistency during concurrent operations.
+ *
+ * Usage Considerations:
+ * - Ensure proper handling of exceptions while interacting with this class,
+ * particularly during initialization and cleanup.
+ * - Always invoke the {@code close()} method after usage to release
acquired locks
+ * and avoid potential deadlocks.
+ */
+ public class ReadableOmSnapshotLocalDataProvider implements AutoCloseable {
+
+ private final UUID snapshotId;
+ private final HierarchicalResourceLock lock;
+ private final HierarchicalResourceLock previousLock;
+ private final OmSnapshotLocalData snapshotLocalData;
+ private OmSnapshotLocalData previousSnapshotLocalData;
+ private volatile boolean isPreviousSnapshotLoaded = false;
+ private final UUID resolvedPreviousSnapshotId;
+
+ protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId) throws
IOException {
+ this(snapshotId, true);
+ }
+
+ protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, UUID
snapIdToResolve) throws IOException {
+ this(snapshotId, true, null, snapIdToResolve, true);
+ }
+
+ protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, boolean
readLock) throws IOException {
+ this(snapshotId, readLock, null, null, false);
+ }
+
+ protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, boolean
readLock,
+ CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException>
snapshotLocalDataSupplier,
+ UUID snapshotIdToBeResolved, boolean isSnapshotToBeResolvedNullable)
throws IOException {
+ this.snapshotId = snapshotId;
+ LockDataProviderInitResult result = initialize(readLock, snapshotId,
snapshotIdToBeResolved,
+ isSnapshotToBeResolvedNullable, snapshotLocalDataSupplier);
+ this.snapshotLocalData = result.getSnapshotLocalData();
+ this.lock = result.getLock();
+ this.previousLock = result.getPreviousLock();
+ this.resolvedPreviousSnapshotId = result.getPreviousSnapshotId();
+ this.previousSnapshotLocalData = null;
+ this.isPreviousSnapshotLoaded = false;
+ }
+
+ public OmSnapshotLocalData getSnapshotLocalData() {
+ return snapshotLocalData;
+ }
+
+ public synchronized OmSnapshotLocalData getPreviousSnapshotLocalData()
throws IOException {
+ if (!isPreviousSnapshotLoaded) {
+ if (resolvedPreviousSnapshotId != null) {
+ File previousSnapshotLocalDataFile = new
File(getSnapshotLocalPropertyYamlPath(resolvedPreviousSnapshotId));
+ this.previousSnapshotLocalData =
snapshotLocalDataSerializer.load(previousSnapshotLocalDataFile);
+ }
+ this.isPreviousSnapshotLoaded = true;
+ }
+ return previousSnapshotLocalData;
+ }
+
+ private HierarchicalResourceLock acquireLock(UUID snapId, boolean
readLock) throws IOException {
+ HierarchicalResourceLock acquiredLock = readLock ?
locks.acquireReadLock(FlatResource.SNAPSHOT_LOCAL_DATA_LOCK,
+ snapId.toString()) :
locks.acquireWriteLock(FlatResource.SNAPSHOT_LOCAL_DATA_LOCK,
snapId.toString());
+ if (!acquiredLock.isLockAcquired()) {
+ throw new IOException("Unable to acquire lock for snapshotId: " +
snapId);
+ }
+ return acquiredLock;
+ }
+
+ /**
+ * Intializer the snapshot local data by acquiring the lock on the
snapshot and also acquires a read lock on the
+ * snapshotId to be resolved by iterating through the chain of previous
snapshot ids.
+ */
+ private LockDataProviderInitResult initialize(
+ boolean readLock, UUID snapId, UUID toResolveSnapshotId, boolean
isSnapshotToBeResolvedNullable,
+ CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException>
snapshotLocalDataSupplier) throws IOException {
+ HierarchicalResourceLock snapIdLock = null;
+ HierarchicalResourceLock previousReadLockAcquired = null;
+ try {
+ snapIdLock = acquireLock(snapId, readLock);
+ snapshotLocalDataSupplier = snapshotLocalDataSupplier == null ? () -> {
+ File snapshotLocalDataFile = new
File(getSnapshotLocalPropertyYamlPath(snapId));
+ return
Pair.of(snapshotLocalDataSerializer.load(snapshotLocalDataFile),
snapshotLocalDataFile);
+ } : snapshotLocalDataSupplier;
+ Pair<OmSnapshotLocalData, File> pair = snapshotLocalDataSupplier.get();
+ OmSnapshotLocalData ssLocalData = pair.getKey();
+ if (!Objects.equals(ssLocalData.getSnapshotId(), snapId)) {
+ String loadPath = pair.getValue() == null ? null :
pair.getValue().getAbsolutePath();
+ throw new IOException("SnapshotId in path : " + loadPath + "
contains snapshotLocalData corresponding " +
+ "to snapshotId " + ssLocalData.getSnapshotId() + ". Expected
snapshotId " + snapId);
+ }
+ // Get previous snapshotId and acquire read lock on the id. We need to
do this outside the loop instead of a
+ // do while loop since the nodes that need be added may not be present
in the graph so it may not be possible
+ // to iterate through the chain.
+ UUID previousSnapshotId = ssLocalData.getPreviousSnapshotId();
+ // if flag toResolveSnapshotIdIsNull is true or toResolveSnapshotId is
not null, then we resolve snapshot
+ // with previous snapshot id as null, which would mean if the snapshot
local data is committed the snapshot
+ // local data would become first snapshot in the chain with no
previous snapshot id.
+ toResolveSnapshotId = (isSnapshotToBeResolvedNullable ||
toResolveSnapshotId != null) ? toResolveSnapshotId :
+ ssLocalData.getPreviousSnapshotId();
+ if (toResolveSnapshotId != null && previousSnapshotId != null) {
+ if (!versionNodeMap.containsKey(previousSnapshotId)) {
+ throw new IOException(String.format("Operating on snapshot id : %s
with previousSnapshotId: %s invalid " +
+ "since previousSnapshotId is not loaded.", snapId,
previousSnapshotId));
+ }
+ previousReadLockAcquired = acquireLock(previousSnapshotId, true);
+ // Create a copy of the previous versionMap to get the previous
versions corresponding to the previous
+ // snapshot. This map would mutated to resolve the previous
snapshot's version corresponding to the
+ // toResolveSnapshotId by iterating through the chain of previous
snapshot ids.
+ Map<Integer, LocalDataVersionNode> previousVersionNodeMap =
+ new
HashMap<>(versionNodeMap.get(previousSnapshotId).getSnapshotVersions());
+ UUID currentIteratedSnapshotId = previousSnapshotId;
+ // Iterate through the chain of previous snapshot ids until the
snapshot id to be resolved is found.
+ while (!Objects.equals(currentIteratedSnapshotId,
toResolveSnapshotId)) {
+ // All versions for the snapshot should point to the same previous
snapshot id. Otherwise this is a sign
+ // of corruption.
+ Set<UUID> previousIds =
+
previousVersionNodeMap.values().stream().map(LocalDataVersionNode::getPreviousSnapshotId)
+ .collect(Collectors.toSet());
+ if (previousIds.size() > 1) {
+ throw new IOException(String.format("Snapshot %s versions has
multiple previous snapshotIds %s",
+ currentIteratedSnapshotId, previousIds));
+ }
+ if (previousIds.isEmpty()) {
+ throw new IOException(String.format("Snapshot %s versions
doesn't have previous Id thus snapshot " +
+ "%s cannot be resolved against id %s",
+ currentIteratedSnapshotId, snapId, toResolveSnapshotId));
+ }
+ UUID previousId = previousIds.iterator().next();
+ HierarchicalResourceLock previousToPreviousReadLockAcquired =
acquireLock(previousId, true);
+
+ try {
+ // Get the version node for the snapshot and update the version
node to the successor to point to the
+ // previous node.
+ for (Map.Entry<Integer, LocalDataVersionNode> entry :
previousVersionNodeMap.entrySet()) {
+ Set<LocalDataVersionNode> versionNode =
localDataGraph.successors(entry.getValue());
+ if (versionNode.size() > 1) {
+ throw new IOException(String.format("Snapshot %s version %d
has multiple successors %s",
+ currentIteratedSnapshotId, entry.getValue(),
versionNode));
+ }
+ if (versionNode.isEmpty()) {
+ throw new IOException(String.format("Snapshot %s version %d
doesn't have successor",
+ currentIteratedSnapshotId, entry.getValue()));
+ }
+ // Set the version node for iterated version to the successor
corresponding to the previous snapshot id.
+ entry.setValue(versionNode.iterator().next());
Review Comment:
The graph is built with edges from a snapshot version to its previous
version (current -> previous). To walk 'forward' along the chain from a
previous snapshot to its dependent, you must use predecessors(), not
successors(). Using successors() here will either return empty or the wrong
node, breaking version resolution. Replace successors(...) with
predecessors(...), and update the error messages accordingly.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java:
##########
@@ -228,6 +297,362 @@ public void close() {
}
}
+ private static final class LockDataProviderInitResult {
+ private final OmSnapshotLocalData snapshotLocalData;
+ private final HierarchicalResourceLock lock;
+ private final HierarchicalResourceLock previousLock;
+ private final UUID previousSnapshotId;
+
+ private LockDataProviderInitResult(HierarchicalResourceLock lock,
OmSnapshotLocalData snapshotLocalData,
+ HierarchicalResourceLock previousLock, UUID previousSnapshotId) {
+ this.lock = lock;
+ this.snapshotLocalData = snapshotLocalData;
+ this.previousLock = previousLock;
+ this.previousSnapshotId = previousSnapshotId;
+ }
+
+ private HierarchicalResourceLock getLock() {
+ return lock;
+ }
+
+ private HierarchicalResourceLock getPreviousLock() {
+ return previousLock;
+ }
+
+ private UUID getPreviousSnapshotId() {
+ return previousSnapshotId;
+ }
+
+ private OmSnapshotLocalData getSnapshotLocalData() {
+ return snapshotLocalData;
+ }
+ }
+
+ /**
+ * The ReadableOmSnapshotLocalDataProvider class is responsible for managing
the
+ * access and initialization of local snapshot data in a thread-safe manner.
+ * It provides mechanisms to handle snapshot data, retrieve associated
previous
+ * snapshot data, and manage lock synchronization for safe concurrent
operations.
+ *
+ * This class works with snapshot identifiers and ensures that the
appropriate
+ * local data for a given snapshot is loaded and accessible. Additionally, it
+ * maintains locking mechanisms to ensure thread-safe initialization and
access
+ * to both the current and previous snapshot local data. The implementation
also
+ * supports handling errors in the snapshot data initialization process.
+ *
+ * Key Functionalities:
+ * - Initializes and provides access to snapshot local data associated with a
+ * given snapshot identifier.
+ * - Resolves and retrieves data for the previous snapshot if applicable.
+ * - Ensures safe concurrent read operations using locking mechanisms.
+ * - Validates the integrity and consistency of snapshot data during
initialization.
+ * - Ensures that appropriate locks are released upon closing.
+ *
+ * Thread-Safety:
+ * This class utilizes locks to guarantee thread-safe operations when
accessing
+ * or modifying snapshot data. State variables relating to snapshot data are
+ * properly synchronized to ensure consistency during concurrent operations.
+ *
+ * Usage Considerations:
+ * - Ensure proper handling of exceptions while interacting with this class,
+ * particularly during initialization and cleanup.
+ * - Always invoke the {@code close()} method after usage to release
acquired locks
+ * and avoid potential deadlocks.
+ */
+ public class ReadableOmSnapshotLocalDataProvider implements AutoCloseable {
+
+ private final UUID snapshotId;
+ private final HierarchicalResourceLock lock;
+ private final HierarchicalResourceLock previousLock;
+ private final OmSnapshotLocalData snapshotLocalData;
+ private OmSnapshotLocalData previousSnapshotLocalData;
+ private volatile boolean isPreviousSnapshotLoaded = false;
+ private final UUID resolvedPreviousSnapshotId;
+
+ protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId) throws
IOException {
+ this(snapshotId, true);
+ }
+
+ protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, UUID
snapIdToResolve) throws IOException {
+ this(snapshotId, true, null, snapIdToResolve, true);
+ }
+
+ protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, boolean
readLock) throws IOException {
+ this(snapshotId, readLock, null, null, false);
+ }
+
+ protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, boolean
readLock,
+ CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException>
snapshotLocalDataSupplier,
+ UUID snapshotIdToBeResolved, boolean isSnapshotToBeResolvedNullable)
throws IOException {
+ this.snapshotId = snapshotId;
+ LockDataProviderInitResult result = initialize(readLock, snapshotId,
snapshotIdToBeResolved,
+ isSnapshotToBeResolvedNullable, snapshotLocalDataSupplier);
+ this.snapshotLocalData = result.getSnapshotLocalData();
+ this.lock = result.getLock();
+ this.previousLock = result.getPreviousLock();
+ this.resolvedPreviousSnapshotId = result.getPreviousSnapshotId();
+ this.previousSnapshotLocalData = null;
+ this.isPreviousSnapshotLoaded = false;
+ }
+
+ public OmSnapshotLocalData getSnapshotLocalData() {
+ return snapshotLocalData;
+ }
+
+ public synchronized OmSnapshotLocalData getPreviousSnapshotLocalData()
throws IOException {
+ if (!isPreviousSnapshotLoaded) {
+ if (resolvedPreviousSnapshotId != null) {
+ File previousSnapshotLocalDataFile = new
File(getSnapshotLocalPropertyYamlPath(resolvedPreviousSnapshotId));
+ this.previousSnapshotLocalData =
snapshotLocalDataSerializer.load(previousSnapshotLocalDataFile);
+ }
+ this.isPreviousSnapshotLoaded = true;
+ }
+ return previousSnapshotLocalData;
+ }
+
+ private HierarchicalResourceLock acquireLock(UUID snapId, boolean
readLock) throws IOException {
+ HierarchicalResourceLock acquiredLock = readLock ?
locks.acquireReadLock(FlatResource.SNAPSHOT_LOCAL_DATA_LOCK,
+ snapId.toString()) :
locks.acquireWriteLock(FlatResource.SNAPSHOT_LOCAL_DATA_LOCK,
snapId.toString());
+ if (!acquiredLock.isLockAcquired()) {
+ throw new IOException("Unable to acquire lock for snapshotId: " +
snapId);
+ }
+ return acquiredLock;
+ }
+
+ /**
+ * Intializer the snapshot local data by acquiring the lock on the
snapshot and also acquires a read lock on the
+ * snapshotId to be resolved by iterating through the chain of previous
snapshot ids.
+ */
+ private LockDataProviderInitResult initialize(
+ boolean readLock, UUID snapId, UUID toResolveSnapshotId, boolean
isSnapshotToBeResolvedNullable,
+ CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException>
snapshotLocalDataSupplier) throws IOException {
+ HierarchicalResourceLock snapIdLock = null;
+ HierarchicalResourceLock previousReadLockAcquired = null;
+ try {
+ snapIdLock = acquireLock(snapId, readLock);
+ snapshotLocalDataSupplier = snapshotLocalDataSupplier == null ? () -> {
+ File snapshotLocalDataFile = new
File(getSnapshotLocalPropertyYamlPath(snapId));
+ return
Pair.of(snapshotLocalDataSerializer.load(snapshotLocalDataFile),
snapshotLocalDataFile);
+ } : snapshotLocalDataSupplier;
+ Pair<OmSnapshotLocalData, File> pair = snapshotLocalDataSupplier.get();
+ OmSnapshotLocalData ssLocalData = pair.getKey();
+ if (!Objects.equals(ssLocalData.getSnapshotId(), snapId)) {
+ String loadPath = pair.getValue() == null ? null :
pair.getValue().getAbsolutePath();
+ throw new IOException("SnapshotId in path : " + loadPath + "
contains snapshotLocalData corresponding " +
+ "to snapshotId " + ssLocalData.getSnapshotId() + ". Expected
snapshotId " + snapId);
+ }
+ // Get previous snapshotId and acquire read lock on the id. We need to
do this outside the loop instead of a
+ // do while loop since the nodes that need be added may not be present
in the graph so it may not be possible
+ // to iterate through the chain.
+ UUID previousSnapshotId = ssLocalData.getPreviousSnapshotId();
+ // if flag toResolveSnapshotIdIsNull is true or toResolveSnapshotId is
not null, then we resolve snapshot
+ // with previous snapshot id as null, which would mean if the snapshot
local data is committed the snapshot
+ // local data would become first snapshot in the chain with no
previous snapshot id.
+ toResolveSnapshotId = (isSnapshotToBeResolvedNullable ||
toResolveSnapshotId != null) ? toResolveSnapshotId :
+ ssLocalData.getPreviousSnapshotId();
+ if (toResolveSnapshotId != null && previousSnapshotId != null) {
+ if (!versionNodeMap.containsKey(previousSnapshotId)) {
+ throw new IOException(String.format("Operating on snapshot id : %s
with previousSnapshotId: %s invalid " +
+ "since previousSnapshotId is not loaded.", snapId,
previousSnapshotId));
+ }
+ previousReadLockAcquired = acquireLock(previousSnapshotId, true);
+ // Create a copy of the previous versionMap to get the previous
versions corresponding to the previous
+ // snapshot. This map would mutated to resolve the previous
snapshot's version corresponding to the
+ // toResolveSnapshotId by iterating through the chain of previous
snapshot ids.
+ Map<Integer, LocalDataVersionNode> previousVersionNodeMap =
+ new
HashMap<>(versionNodeMap.get(previousSnapshotId).getSnapshotVersions());
+ UUID currentIteratedSnapshotId = previousSnapshotId;
+ // Iterate through the chain of previous snapshot ids until the
snapshot id to be resolved is found.
+ while (!Objects.equals(currentIteratedSnapshotId,
toResolveSnapshotId)) {
+ // All versions for the snapshot should point to the same previous
snapshot id. Otherwise this is a sign
+ // of corruption.
+ Set<UUID> previousIds =
+
previousVersionNodeMap.values().stream().map(LocalDataVersionNode::getPreviousSnapshotId)
+ .collect(Collectors.toSet());
+ if (previousIds.size() > 1) {
+ throw new IOException(String.format("Snapshot %s versions has
multiple previous snapshotIds %s",
+ currentIteratedSnapshotId, previousIds));
+ }
+ if (previousIds.isEmpty()) {
+ throw new IOException(String.format("Snapshot %s versions
doesn't have previous Id thus snapshot " +
+ "%s cannot be resolved against id %s",
+ currentIteratedSnapshotId, snapId, toResolveSnapshotId));
+ }
+ UUID previousId = previousIds.iterator().next();
+ HierarchicalResourceLock previousToPreviousReadLockAcquired =
acquireLock(previousId, true);
+
+ try {
+ // Get the version node for the snapshot and update the version
node to the successor to point to the
+ // previous node.
+ for (Map.Entry<Integer, LocalDataVersionNode> entry :
previousVersionNodeMap.entrySet()) {
+ Set<LocalDataVersionNode> versionNode =
localDataGraph.successors(entry.getValue());
+ if (versionNode.size() > 1) {
+ throw new IOException(String.format("Snapshot %s version %d
has multiple successors %s",
+ currentIteratedSnapshotId, entry.getValue(),
versionNode));
+ }
+ if (versionNode.isEmpty()) {
+ throw new IOException(String.format("Snapshot %s version %d
doesn't have successor",
+ currentIteratedSnapshotId, entry.getValue()));
+ }
+ // Set the version node for iterated version to the successor
corresponding to the previous snapshot id.
+ entry.setValue(versionNode.iterator().next());
+ }
+ } finally {
+ // Release the read lock acquired on the previous snapshot id
acquired. Now that the instance
+ // is no longer needed we can release the read lock for the
snapshot iterated in the previous snapshot.
+ // Make previousToPrevious previous for next iteration.
+ previousReadLockAcquired.close();
+ previousReadLockAcquired = previousToPreviousReadLockAcquired;
+ currentIteratedSnapshotId = previousId;
+ }
+ }
+ ssLocalData.setPreviousSnapshotId(toResolveSnapshotId);
+ Map<Integer, OmSnapshotLocalData.VersionMeta> versionMetaMap =
ssLocalData.getVersionSstFileInfos();
+ for (Map.Entry<Integer, OmSnapshotLocalData.VersionMeta> entry :
versionMetaMap.entrySet()) {
+ OmSnapshotLocalData.VersionMeta versionMeta = entry.getValue();
+ // Get the relative version node which corresponds to the
toResolveSnapshotId corresponding to the
+ // versionMeta which points to a particular version in the
previous snapshot
+ LocalDataVersionNode relativePreviousVersionNode =
+
previousVersionNodeMap.get(versionMeta.getPreviousSnapshotVersion());
+ if (relativePreviousVersionNode == null) {
+ throw new IOException(String.format("Unable to resolve previous
version node for snapshot: %s" +
+ " with version : %d against previous snapshot %s previous
version : %d",
+ snapId, entry.getKey(), toResolveSnapshotId,
versionMeta.getPreviousSnapshotVersion()));
+ }
+ // Set the previous snapshot version to the
relativePreviousVersionNode which was captured.
+
versionMeta.setPreviousSnapshotVersion(relativePreviousVersionNode.getVersion());
+ }
+ } else {
+ toResolveSnapshotId = null;
+ ssLocalData.setPreviousSnapshotId(null);
+ }
+ return new LockDataProviderInitResult(snapIdLock, ssLocalData,
previousReadLockAcquired, toResolveSnapshotId);
+ } catch (IOException e) {
+ // Release all the locks in case of an exception and rethrow the
exception.
+ if (previousReadLockAcquired != null) {
+ previousReadLockAcquired.close();
+ }
+ if (snapIdLock != null) {
+ snapIdLock.close();
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (previousLock != null) {
+ previousLock.close();
+ }
+ if (lock != null) {
+ lock.close();
+ }
+ }
+ }
+
+ /**
+ * This class represents a writable provider for managing local data of
+ * OmSnapshot. It extends the functionality of {@code
ReadableOmSnapshotLocalDataProvider}
+ * and provides support for write operations, such as committing changes.
+ *
+ * The writable snapshot data provider interacts with version nodes and
+ * facilitates atomic updates to snapshot properties and files.
+ *
+ * This class is designed to ensure thread-safe operations and uses locks to
+ * guarantee consistent state across concurrent activities.
+ *
+ * The default usage includes creating an instance of this provider with
+ * specific snapshot identifiers and optionally handling additional
parameters
+ * such as data resolution or a supplier for snapshot data.
+ */
+ public final class WritableOmSnapshotLocalDataProvider extends
ReadableOmSnapshotLocalDataProvider {
+
+ private WritableOmSnapshotLocalDataProvider(UUID snapshotId) throws
IOException {
+ super(snapshotId, false);
+ fullLock.readLock().lock();
+ }
+
+ private WritableOmSnapshotLocalDataProvider(UUID snapshotId, UUID
snapshotIdToBeResolved) throws IOException {
+ super(snapshotId, false, null, snapshotIdToBeResolved, true);
+ fullLock.readLock().lock();
+ }
Review Comment:
WritableOmSnapshotLocalDataProvider acquires fullLock.readLock() while
performing mutations (commit/upsert) to versionNodeMap/localDataGraph. This can
race with readers and other writers. Use fullLock.writeLock() for writes and
hold fullLock.readLock() in ReadableOmSnapshotLocalDataProvider during graph
reads. Specifically: change readLock() to writeLock() in these constructors and
in close(); and acquire/release fullLock.readLock() in
ReadableOmSnapshotLocalDataProvider to protect versionNodeMap/localDataGraph
access.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java:
##########
@@ -228,6 +297,362 @@ public void close() {
}
}
+ private static final class LockDataProviderInitResult {
+ private final OmSnapshotLocalData snapshotLocalData;
+ private final HierarchicalResourceLock lock;
+ private final HierarchicalResourceLock previousLock;
+ private final UUID previousSnapshotId;
+
+ private LockDataProviderInitResult(HierarchicalResourceLock lock,
OmSnapshotLocalData snapshotLocalData,
+ HierarchicalResourceLock previousLock, UUID previousSnapshotId) {
+ this.lock = lock;
+ this.snapshotLocalData = snapshotLocalData;
+ this.previousLock = previousLock;
+ this.previousSnapshotId = previousSnapshotId;
+ }
+
+ private HierarchicalResourceLock getLock() {
+ return lock;
+ }
+
+ private HierarchicalResourceLock getPreviousLock() {
+ return previousLock;
+ }
+
+ private UUID getPreviousSnapshotId() {
+ return previousSnapshotId;
+ }
+
+ private OmSnapshotLocalData getSnapshotLocalData() {
+ return snapshotLocalData;
+ }
+ }
+
+ /**
+ * The ReadableOmSnapshotLocalDataProvider class is responsible for managing
the
+ * access and initialization of local snapshot data in a thread-safe manner.
+ * It provides mechanisms to handle snapshot data, retrieve associated
previous
+ * snapshot data, and manage lock synchronization for safe concurrent
operations.
+ *
+ * This class works with snapshot identifiers and ensures that the
appropriate
+ * local data for a given snapshot is loaded and accessible. Additionally, it
+ * maintains locking mechanisms to ensure thread-safe initialization and
access
+ * to both the current and previous snapshot local data. The implementation
also
+ * supports handling errors in the snapshot data initialization process.
+ *
+ * Key Functionalities:
+ * - Initializes and provides access to snapshot local data associated with a
+ * given snapshot identifier.
+ * - Resolves and retrieves data for the previous snapshot if applicable.
+ * - Ensures safe concurrent read operations using locking mechanisms.
+ * - Validates the integrity and consistency of snapshot data during
initialization.
+ * - Ensures that appropriate locks are released upon closing.
+ *
+ * Thread-Safety:
+ * This class utilizes locks to guarantee thread-safe operations when
accessing
+ * or modifying snapshot data. State variables relating to snapshot data are
+ * properly synchronized to ensure consistency during concurrent operations.
+ *
+ * Usage Considerations:
+ * - Ensure proper handling of exceptions while interacting with this class,
+ * particularly during initialization and cleanup.
+ * - Always invoke the {@code close()} method after usage to release
acquired locks
+ * and avoid potential deadlocks.
+ */
+ public class ReadableOmSnapshotLocalDataProvider implements AutoCloseable {
+
+ private final UUID snapshotId;
+ private final HierarchicalResourceLock lock;
+ private final HierarchicalResourceLock previousLock;
+ private final OmSnapshotLocalData snapshotLocalData;
+ private OmSnapshotLocalData previousSnapshotLocalData;
+ private volatile boolean isPreviousSnapshotLoaded = false;
+ private final UUID resolvedPreviousSnapshotId;
+
+ protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId) throws
IOException {
+ this(snapshotId, true);
+ }
+
+ protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, UUID
snapIdToResolve) throws IOException {
+ this(snapshotId, true, null, snapIdToResolve, true);
+ }
+
+ protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, boolean
readLock) throws IOException {
+ this(snapshotId, readLock, null, null, false);
+ }
+
+ protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, boolean
readLock,
+ CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException>
snapshotLocalDataSupplier,
+ UUID snapshotIdToBeResolved, boolean isSnapshotToBeResolvedNullable)
throws IOException {
+ this.snapshotId = snapshotId;
+ LockDataProviderInitResult result = initialize(readLock, snapshotId,
snapshotIdToBeResolved,
+ isSnapshotToBeResolvedNullable, snapshotLocalDataSupplier);
+ this.snapshotLocalData = result.getSnapshotLocalData();
+ this.lock = result.getLock();
+ this.previousLock = result.getPreviousLock();
+ this.resolvedPreviousSnapshotId = result.getPreviousSnapshotId();
+ this.previousSnapshotLocalData = null;
+ this.isPreviousSnapshotLoaded = false;
+ }
+
+ public OmSnapshotLocalData getSnapshotLocalData() {
+ return snapshotLocalData;
+ }
+
+ public synchronized OmSnapshotLocalData getPreviousSnapshotLocalData()
throws IOException {
+ if (!isPreviousSnapshotLoaded) {
+ if (resolvedPreviousSnapshotId != null) {
+ File previousSnapshotLocalDataFile = new
File(getSnapshotLocalPropertyYamlPath(resolvedPreviousSnapshotId));
+ this.previousSnapshotLocalData =
snapshotLocalDataSerializer.load(previousSnapshotLocalDataFile);
+ }
+ this.isPreviousSnapshotLoaded = true;
+ }
+ return previousSnapshotLocalData;
+ }
+
+ private HierarchicalResourceLock acquireLock(UUID snapId, boolean
readLock) throws IOException {
+ HierarchicalResourceLock acquiredLock = readLock ?
locks.acquireReadLock(FlatResource.SNAPSHOT_LOCAL_DATA_LOCK,
+ snapId.toString()) :
locks.acquireWriteLock(FlatResource.SNAPSHOT_LOCAL_DATA_LOCK,
snapId.toString());
+ if (!acquiredLock.isLockAcquired()) {
+ throw new IOException("Unable to acquire lock for snapshotId: " +
snapId);
+ }
+ return acquiredLock;
+ }
+
+ /**
+ * Intializer the snapshot local data by acquiring the lock on the
snapshot and also acquires a read lock on the
+ * snapshotId to be resolved by iterating through the chain of previous
snapshot ids.
+ */
+ private LockDataProviderInitResult initialize(
+ boolean readLock, UUID snapId, UUID toResolveSnapshotId, boolean
isSnapshotToBeResolvedNullable,
+ CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException>
snapshotLocalDataSupplier) throws IOException {
+ HierarchicalResourceLock snapIdLock = null;
+ HierarchicalResourceLock previousReadLockAcquired = null;
+ try {
+ snapIdLock = acquireLock(snapId, readLock);
+ snapshotLocalDataSupplier = snapshotLocalDataSupplier == null ? () -> {
+ File snapshotLocalDataFile = new
File(getSnapshotLocalPropertyYamlPath(snapId));
+ return
Pair.of(snapshotLocalDataSerializer.load(snapshotLocalDataFile),
snapshotLocalDataFile);
+ } : snapshotLocalDataSupplier;
+ Pair<OmSnapshotLocalData, File> pair = snapshotLocalDataSupplier.get();
+ OmSnapshotLocalData ssLocalData = pair.getKey();
+ if (!Objects.equals(ssLocalData.getSnapshotId(), snapId)) {
+ String loadPath = pair.getValue() == null ? null :
pair.getValue().getAbsolutePath();
+ throw new IOException("SnapshotId in path : " + loadPath + "
contains snapshotLocalData corresponding " +
+ "to snapshotId " + ssLocalData.getSnapshotId() + ". Expected
snapshotId " + snapId);
+ }
+ // Get previous snapshotId and acquire read lock on the id. We need to
do this outside the loop instead of a
+ // do while loop since the nodes that need be added may not be present
in the graph so it may not be possible
+ // to iterate through the chain.
+ UUID previousSnapshotId = ssLocalData.getPreviousSnapshotId();
+ // if flag toResolveSnapshotIdIsNull is true or toResolveSnapshotId is
not null, then we resolve snapshot
+ // with previous snapshot id as null, which would mean if the snapshot
local data is committed the snapshot
+ // local data would become first snapshot in the chain with no
previous snapshot id.
+ toResolveSnapshotId = (isSnapshotToBeResolvedNullable ||
toResolveSnapshotId != null) ? toResolveSnapshotId :
+ ssLocalData.getPreviousSnapshotId();
+ if (toResolveSnapshotId != null && previousSnapshotId != null) {
+ if (!versionNodeMap.containsKey(previousSnapshotId)) {
+ throw new IOException(String.format("Operating on snapshot id : %s
with previousSnapshotId: %s invalid " +
+ "since previousSnapshotId is not loaded.", snapId,
previousSnapshotId));
+ }
+ previousReadLockAcquired = acquireLock(previousSnapshotId, true);
+ // Create a copy of the previous versionMap to get the previous
versions corresponding to the previous
+ // snapshot. This map would mutated to resolve the previous
snapshot's version corresponding to the
+ // toResolveSnapshotId by iterating through the chain of previous
snapshot ids.
+ Map<Integer, LocalDataVersionNode> previousVersionNodeMap =
+ new
HashMap<>(versionNodeMap.get(previousSnapshotId).getSnapshotVersions());
+ UUID currentIteratedSnapshotId = previousSnapshotId;
+ // Iterate through the chain of previous snapshot ids until the
snapshot id to be resolved is found.
+ while (!Objects.equals(currentIteratedSnapshotId,
toResolveSnapshotId)) {
+ // All versions for the snapshot should point to the same previous
snapshot id. Otherwise this is a sign
+ // of corruption.
+ Set<UUID> previousIds =
+
previousVersionNodeMap.values().stream().map(LocalDataVersionNode::getPreviousSnapshotId)
+ .collect(Collectors.toSet());
+ if (previousIds.size() > 1) {
+ throw new IOException(String.format("Snapshot %s versions has
multiple previous snapshotIds %s",
+ currentIteratedSnapshotId, previousIds));
+ }
+ if (previousIds.isEmpty()) {
+ throw new IOException(String.format("Snapshot %s versions
doesn't have previous Id thus snapshot " +
+ "%s cannot be resolved against id %s",
+ currentIteratedSnapshotId, snapId, toResolveSnapshotId));
+ }
+ UUID previousId = previousIds.iterator().next();
+ HierarchicalResourceLock previousToPreviousReadLockAcquired =
acquireLock(previousId, true);
+
+ try {
+ // Get the version node for the snapshot and update the version
node to the successor to point to the
+ // previous node.
+ for (Map.Entry<Integer, LocalDataVersionNode> entry :
previousVersionNodeMap.entrySet()) {
+ Set<LocalDataVersionNode> versionNode =
localDataGraph.successors(entry.getValue());
+ if (versionNode.size() > 1) {
+ throw new IOException(String.format("Snapshot %s version %d
has multiple successors %s",
+ currentIteratedSnapshotId, entry.getValue(),
versionNode));
+ }
+ if (versionNode.isEmpty()) {
+ throw new IOException(String.format("Snapshot %s version %d
doesn't have successor",
+ currentIteratedSnapshotId, entry.getValue()));
+ }
+ // Set the version node for iterated version to the successor
corresponding to the previous snapshot id.
+ entry.setValue(versionNode.iterator().next());
+ }
+ } finally {
+ // Release the read lock acquired on the previous snapshot id
acquired. Now that the instance
+ // is no longer needed we can release the read lock for the
snapshot iterated in the previous snapshot.
+ // Make previousToPrevious previous for next iteration.
+ previousReadLockAcquired.close();
+ previousReadLockAcquired = previousToPreviousReadLockAcquired;
+ currentIteratedSnapshotId = previousId;
+ }
+ }
+ ssLocalData.setPreviousSnapshotId(toResolveSnapshotId);
+ Map<Integer, OmSnapshotLocalData.VersionMeta> versionMetaMap =
ssLocalData.getVersionSstFileInfos();
+ for (Map.Entry<Integer, OmSnapshotLocalData.VersionMeta> entry :
versionMetaMap.entrySet()) {
+ OmSnapshotLocalData.VersionMeta versionMeta = entry.getValue();
+ // Get the relative version node which corresponds to the
toResolveSnapshotId corresponding to the
+ // versionMeta which points to a particular version in the
previous snapshot
+ LocalDataVersionNode relativePreviousVersionNode =
+
previousVersionNodeMap.get(versionMeta.getPreviousSnapshotVersion());
+ if (relativePreviousVersionNode == null) {
+ throw new IOException(String.format("Unable to resolve previous
version node for snapshot: %s" +
+ " with version : %d against previous snapshot %s previous
version : %d",
+ snapId, entry.getKey(), toResolveSnapshotId,
versionMeta.getPreviousSnapshotVersion()));
+ }
+ // Set the previous snapshot version to the
relativePreviousVersionNode which was captured.
+
versionMeta.setPreviousSnapshotVersion(relativePreviousVersionNode.getVersion());
+ }
+ } else {
+ toResolveSnapshotId = null;
+ ssLocalData.setPreviousSnapshotId(null);
+ }
+ return new LockDataProviderInitResult(snapIdLock, ssLocalData,
previousReadLockAcquired, toResolveSnapshotId);
+ } catch (IOException e) {
+ // Release all the locks in case of an exception and rethrow the
exception.
+ if (previousReadLockAcquired != null) {
+ previousReadLockAcquired.close();
+ }
+ if (snapIdLock != null) {
+ snapIdLock.close();
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (previousLock != null) {
+ previousLock.close();
+ }
+ if (lock != null) {
+ lock.close();
+ }
+ }
+ }
+
+ /**
+ * This class represents a writable provider for managing local data of
+ * OmSnapshot. It extends the functionality of {@code
ReadableOmSnapshotLocalDataProvider}
+ * and provides support for write operations, such as committing changes.
+ *
+ * The writable snapshot data provider interacts with version nodes and
+ * facilitates atomic updates to snapshot properties and files.
+ *
+ * This class is designed to ensure thread-safe operations and uses locks to
+ * guarantee consistent state across concurrent activities.
+ *
+ * The default usage includes creating an instance of this provider with
+ * specific snapshot identifiers and optionally handling additional
parameters
+ * such as data resolution or a supplier for snapshot data.
+ */
+ public final class WritableOmSnapshotLocalDataProvider extends
ReadableOmSnapshotLocalDataProvider {
+
+ private WritableOmSnapshotLocalDataProvider(UUID snapshotId) throws
IOException {
+ super(snapshotId, false);
+ fullLock.readLock().lock();
+ }
+
+ private WritableOmSnapshotLocalDataProvider(UUID snapshotId, UUID
snapshotIdToBeResolved) throws IOException {
+ super(snapshotId, false, null, snapshotIdToBeResolved, true);
+ fullLock.readLock().lock();
+ }
+
+ private WritableOmSnapshotLocalDataProvider(UUID snapshotId,
+ CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException>
snapshotLocalDataSupplier) throws IOException {
+ super(snapshotId, false, snapshotLocalDataSupplier, null, false);
+ fullLock.readLock().lock();
+ }
+
+ private SnapshotVersionsMeta validateModification(OmSnapshotLocalData
snapshotLocalData)
+ throws IOException {
+ SnapshotVersionsMeta versionsToBeAdded = new
SnapshotVersionsMeta(snapshotLocalData);
+ for (LocalDataVersionNode node :
versionsToBeAdded.getSnapshotVersions().values()) {
+ validateVersionAddition(node);
+ }
+ UUID snapshotId = snapshotLocalData.getSnapshotId();
+ Map<Integer, LocalDataVersionNode> existingVersions =
getVersionNodeMap().containsKey(snapshotId) ?
+ getVersionNodeMap().get(snapshotId).getSnapshotVersions() :
Collections.emptyMap();
+ for (Map.Entry<Integer, LocalDataVersionNode> entry :
existingVersions.entrySet()) {
+ if
(!versionsToBeAdded.getSnapshotVersions().containsKey(entry.getKey())) {
+ validateVersionRemoval(snapshotId, entry.getKey());
+ }
+ }
+ return versionsToBeAdded;
+ }
+
+ private synchronized void upsertNode(UUID snapshotId, SnapshotVersionsMeta
snapshotVersions) throws IOException {
+ SnapshotVersionsMeta existingSnapVersions =
getVersionNodeMap().remove(snapshotId);
+ Map<Integer, LocalDataVersionNode> existingVersions =
existingSnapVersions == null ? Collections.emptyMap() :
+ existingSnapVersions.getSnapshotVersions();
+ Map<Integer, Set<LocalDataVersionNode>> predecessors = new HashMap<>();
+ // Track all predecessors of the existing versions and remove the node
from the graph.
+ for (Map.Entry<Integer, LocalDataVersionNode> existingVersion :
existingVersions.entrySet()) {
+ LocalDataVersionNode existingVersionNode = existingVersion.getValue();
+ predecessors.put(existingVersion.getKey(),
localDataGraph.predecessors(existingVersionNode));
+ localDataGraph.removeNode(existingVersionNode);
+ }
+ // Add the nodes to be added in the graph and map.
+ addSnapshotVersionMeta(snapshotId, snapshotVersions);
+ // Reconnect all the predecessors for existing nodes.
+ for (Map.Entry<Integer, LocalDataVersionNode> entry :
snapshotVersions.getSnapshotVersions().entrySet()) {
+ for (LocalDataVersionNode predecessor :
predecessors.getOrDefault(entry.getKey(), Collections.emptySet())) {
+ localDataGraph.putEdge(predecessor, entry.getValue());
+ }
+ }
+ }
+
+ public void addSnapshotVersion(RDBStore snapshotStore) throws IOException {
+ List<LiveFileMetaData> sstFiles =
OmSnapshotManager.getSnapshotSSTFileList(snapshotStore);
+ OmSnapshotLocalData previousSnapshotLocalData =
getPreviousSnapshotLocalData();
+ this.getSnapshotLocalData().addVersionSSTFileInfos(sstFiles,
previousSnapshotLocalData == null ? 0 :
+ previousSnapshotLocalData.getVersion());
+ }
+
+ public synchronized void commit() throws IOException {
+ SnapshotVersionsMeta localDataVersionNodes =
validateModification(super.snapshotLocalData);
+ String filePath = getSnapshotLocalPropertyYamlPath(super.snapshotId);
+ String tmpFilePath = filePath + ".tmp";
+ File tmpFile = new File(tmpFilePath);
+ boolean tmpFileExists = tmpFile.exists();
+ if (tmpFileExists) {
+ tmpFileExists = !tmpFile.delete();
+ }
+ if (tmpFileExists) {
+ throw new IOException("Unable to delete tmp file " + tmpFilePath);
+ }
+ snapshotLocalDataSerializer.save(new File(tmpFilePath),
super.snapshotLocalData);
+ Files.move(tmpFile.toPath(), Paths.get(filePath),
StandardCopyOption.ATOMIC_MOVE,
+ StandardCopyOption.REPLACE_EXISTING);
+ upsertNode(super.snapshotId, localDataVersionNodes);
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ fullLock.readLock().unlock();
Review Comment:
WritableOmSnapshotLocalDataProvider acquires fullLock.readLock() while
performing mutations (commit/upsert) to versionNodeMap/localDataGraph. This can
race with readers and other writers. Use fullLock.writeLock() for writes and
hold fullLock.readLock() in ReadableOmSnapshotLocalDataProvider during graph
reads. Specifically: change readLock() to writeLock() in these constructors and
in close(); and acquire/release fullLock.readLock() in
ReadableOmSnapshotLocalDataProvider to protect versionNodeMap/localDataGraph
access.
```suggestion
fullLock.writeLock().unlock();
```
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalData.java:
##########
@@ -184,9 +184,14 @@ public void setPreviousSnapshotId(UUID previousSnapshotId)
{
* Adds an entry to the defragged SST file list.
* @param sstFiles SST file name
*/
- public void addVersionSSTFileInfos(List<SstFileInfo> sstFiles, int
previousSnapshotVersion) {
+ public void addVersionSSTFileInfos(List<LiveFileMetaData> sstFiles, int
previousSnapshotVersion) {
version++;
- this.versionSstFileInfos.put(version, new
VersionMeta(previousSnapshotVersion, sstFiles));
+ this.versionSstFileInfos.put(version, new
VersionMeta(previousSnapshotVersion, sstFiles.stream()
+ .map(SstFileInfo::new).collect(Collectors.toList())));
+ }
Review Comment:
Changing addVersionSSTFileInfos to accept List<LiveFileMetaData> is a
breaking API change for callers that pass List<SstFileInfo>. To preserve
compatibility, consider overloading: keep the original signature and delegate
to this method (mapping SstFileInfo to itself), while retaining the new
LiveFileMetaData-based overload.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java:
##########
@@ -186,7 +213,8 @@ void addVersionNodeWithDependents(OmSnapshotLocalData
snapshotLocalData) throws
} else {
UUID prevSnapId = snapshotVersionsMeta.getPreviousSnapshotId();
if (prevSnapId != null && !versionNodeMap.containsKey(prevSnapId)) {
- OmSnapshotLocalData prevSnapshotLocalData =
getOmSnapshotLocalData(prevSnapId);
+ File previousSnapshotLocalDataFile = new
File(getSnapshotLocalPropertyYamlPath(prevSnapId));
+ OmSnapshotLocalData prevSnapshotLocalData =
snapshotLocalDataSerializer.load(previousSnapshotLocalDataFile);
Review Comment:
This bypasses the existing validation that the YAML's embedded snapshotId
matches the expected file (previously done in getOmSnapshotLocalData(UUID)). To
keep the integrity check, either call
getOmSnapshotLocalData(previousSnapshotLocalDataFile) (if it performs the
validation) or add an explicit equality check and throw if mismatched.
```suggestion
OmSnapshotLocalData prevSnapshotLocalData =
snapshotLocalDataSerializer.load(previousSnapshotLocalDataFile);
if (!prevSnapId.equals(prevSnapshotLocalData.getSnapshotId())) {
throw new IOException("SnapshotId mismatch: expected " +
prevSnapId +
" but found " + prevSnapshotLocalData.getSnapshotId() +
" in file " + previousSnapshotLocalDataFile.getAbsolutePath());
}
```
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java:
##########
@@ -228,6 +297,362 @@ public void close() {
}
}
+ private static final class LockDataProviderInitResult {
+ private final OmSnapshotLocalData snapshotLocalData;
+ private final HierarchicalResourceLock lock;
+ private final HierarchicalResourceLock previousLock;
+ private final UUID previousSnapshotId;
+
+ private LockDataProviderInitResult(HierarchicalResourceLock lock,
OmSnapshotLocalData snapshotLocalData,
+ HierarchicalResourceLock previousLock, UUID previousSnapshotId) {
+ this.lock = lock;
+ this.snapshotLocalData = snapshotLocalData;
+ this.previousLock = previousLock;
+ this.previousSnapshotId = previousSnapshotId;
+ }
+
+ private HierarchicalResourceLock getLock() {
+ return lock;
+ }
+
+ private HierarchicalResourceLock getPreviousLock() {
+ return previousLock;
+ }
+
+ private UUID getPreviousSnapshotId() {
+ return previousSnapshotId;
+ }
+
+ private OmSnapshotLocalData getSnapshotLocalData() {
+ return snapshotLocalData;
+ }
+ }
+
+ /**
+ * The ReadableOmSnapshotLocalDataProvider class is responsible for managing
the
+ * access and initialization of local snapshot data in a thread-safe manner.
+ * It provides mechanisms to handle snapshot data, retrieve associated
previous
+ * snapshot data, and manage lock synchronization for safe concurrent
operations.
+ *
+ * This class works with snapshot identifiers and ensures that the
appropriate
+ * local data for a given snapshot is loaded and accessible. Additionally, it
+ * maintains locking mechanisms to ensure thread-safe initialization and
access
+ * to both the current and previous snapshot local data. The implementation
also
+ * supports handling errors in the snapshot data initialization process.
+ *
+ * Key Functionalities:
+ * - Initializes and provides access to snapshot local data associated with a
+ * given snapshot identifier.
+ * - Resolves and retrieves data for the previous snapshot if applicable.
+ * - Ensures safe concurrent read operations using locking mechanisms.
+ * - Validates the integrity and consistency of snapshot data during
initialization.
+ * - Ensures that appropriate locks are released upon closing.
+ *
+ * Thread-Safety:
+ * This class utilizes locks to guarantee thread-safe operations when
accessing
+ * or modifying snapshot data. State variables relating to snapshot data are
+ * properly synchronized to ensure consistency during concurrent operations.
+ *
+ * Usage Considerations:
+ * - Ensure proper handling of exceptions while interacting with this class,
+ * particularly during initialization and cleanup.
+ * - Always invoke the {@code close()} method after usage to release
acquired locks
+ * and avoid potential deadlocks.
+ */
+ public class ReadableOmSnapshotLocalDataProvider implements AutoCloseable {
+
+ private final UUID snapshotId;
+ private final HierarchicalResourceLock lock;
+ private final HierarchicalResourceLock previousLock;
+ private final OmSnapshotLocalData snapshotLocalData;
+ private OmSnapshotLocalData previousSnapshotLocalData;
+ private volatile boolean isPreviousSnapshotLoaded = false;
+ private final UUID resolvedPreviousSnapshotId;
+
+ protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId) throws
IOException {
+ this(snapshotId, true);
+ }
+
+ protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, UUID
snapIdToResolve) throws IOException {
+ this(snapshotId, true, null, snapIdToResolve, true);
+ }
+
+ protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, boolean
readLock) throws IOException {
+ this(snapshotId, readLock, null, null, false);
+ }
+
+ protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, boolean
readLock,
+ CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException>
snapshotLocalDataSupplier,
+ UUID snapshotIdToBeResolved, boolean isSnapshotToBeResolvedNullable)
throws IOException {
+ this.snapshotId = snapshotId;
+ LockDataProviderInitResult result = initialize(readLock, snapshotId,
snapshotIdToBeResolved,
+ isSnapshotToBeResolvedNullable, snapshotLocalDataSupplier);
+ this.snapshotLocalData = result.getSnapshotLocalData();
+ this.lock = result.getLock();
+ this.previousLock = result.getPreviousLock();
+ this.resolvedPreviousSnapshotId = result.getPreviousSnapshotId();
+ this.previousSnapshotLocalData = null;
+ this.isPreviousSnapshotLoaded = false;
+ }
+
+ public OmSnapshotLocalData getSnapshotLocalData() {
+ return snapshotLocalData;
+ }
+
+ public synchronized OmSnapshotLocalData getPreviousSnapshotLocalData()
throws IOException {
+ if (!isPreviousSnapshotLoaded) {
+ if (resolvedPreviousSnapshotId != null) {
+ File previousSnapshotLocalDataFile = new
File(getSnapshotLocalPropertyYamlPath(resolvedPreviousSnapshotId));
+ this.previousSnapshotLocalData =
snapshotLocalDataSerializer.load(previousSnapshotLocalDataFile);
+ }
+ this.isPreviousSnapshotLoaded = true;
+ }
+ return previousSnapshotLocalData;
+ }
+
+ private HierarchicalResourceLock acquireLock(UUID snapId, boolean
readLock) throws IOException {
+ HierarchicalResourceLock acquiredLock = readLock ?
locks.acquireReadLock(FlatResource.SNAPSHOT_LOCAL_DATA_LOCK,
+ snapId.toString()) :
locks.acquireWriteLock(FlatResource.SNAPSHOT_LOCAL_DATA_LOCK,
snapId.toString());
+ if (!acquiredLock.isLockAcquired()) {
+ throw new IOException("Unable to acquire lock for snapshotId: " +
snapId);
+ }
+ return acquiredLock;
+ }
+
+ /**
+ * Intializer the snapshot local data by acquiring the lock on the
snapshot and also acquires a read lock on the
Review Comment:
Corrected spelling of 'Intializer' to 'Initializes'.
```suggestion
* Initializes the snapshot local data by acquiring the lock on the
snapshot and also acquires a read lock on the
```
##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java:
##########
@@ -135,6 +165,376 @@ public void tearDown() throws Exception {
}
}
+ private String getReadLockMessageAcquire(UUID snapshotId) {
+ return READ_LOCK_MESSAGE_ACQUIRE + " " +
FlatResource.SNAPSHOT_LOCAL_DATA_LOCK + " " + snapshotId;
+ }
+
+ private String getReadLockMessageRelease(UUID snapshotId) {
+ return READ_LOCK_MESSAGE_UNLOCK + " " +
FlatResource.SNAPSHOT_LOCAL_DATA_LOCK + " " + snapshotId;
+ }
+
+ private String getWriteLockMessageAcquire(UUID snapshotId) {
+ return WRITE_LOCK_MESSAGE_ACQUIRE + " " +
FlatResource.SNAPSHOT_LOCAL_DATA_LOCK + " " + snapshotId;
+ }
+
+ private String getWriteLockMessageRelease(UUID snapshotId) {
+ return WRITE_LOCK_MESSAGE_UNLOCK + " " +
FlatResource.SNAPSHOT_LOCAL_DATA_LOCK + " " + snapshotId;
+ }
+
+ private HierarchicalResourceLock getHierarchicalResourceLock(FlatResource
resource, String key, boolean isWriteLock) {
+ return new HierarchicalResourceLock() {
+ @Override
+ public boolean isLockAcquired() {
+ return true;
+ }
+
+ @Override
+ public void close() {
+ if (isWriteLock) {
+ lockCapturor.add(WRITE_LOCK_MESSAGE_UNLOCK + " " + resource + " " +
key);
+ } else {
+ lockCapturor.add(READ_LOCK_MESSAGE_UNLOCK + " " + resource + " " +
key);
+ }
+ }
+ };
+ }
+
+ private void mockLockManager() throws IOException {
+ lockCapturor.clear();
+ reset(lockManager);
+ when(lockManager.acquireReadLock(any(FlatResource.class), anyString()))
+ .thenAnswer(i -> {
+ lockCapturor.add(READ_LOCK_MESSAGE_ACQUIRE + " " + i.getArgument(0)
+ " " + i.getArgument(1));
+ return getHierarchicalResourceLock(i.getArgument(0),
i.getArgument(1), false);
+ });
+ when(lockManager.acquireWriteLock(any(FlatResource.class), anyString()))
+ .thenAnswer(i -> {
+ lockCapturor.add(WRITE_LOCK_MESSAGE_ACQUIRE + " " + i.getArgument(0)
+ " " + i.getArgument(1));
+ return getHierarchicalResourceLock(i.getArgument(0),
i.getArgument(1), true);
+ });
+ }
+
+ private List<UUID> createSnapshotLocalData(OmSnapshotLocalDataManager
snapshotLocalDataManager,
+ int numberOfSnapshots) throws IOException {
+ SnapshotInfo previousSnapshotInfo = null;
+ int counter = 0;
+ Map<String, List<LiveFileMetaData>> liveFileMetaDataMap = new HashMap<>();
+ liveFileMetaDataMap.put(KEY_TABLE,
+ Lists.newArrayList(createMockLiveFileMetaData("file1.sst", KEY_TABLE,
"key1", "key2")));
+ liveFileMetaDataMap.put(FILE_TABLE,
Lists.newArrayList(createMockLiveFileMetaData("file2.sst", FILE_TABLE, "key1",
+ "key2")));
+ liveFileMetaDataMap.put(DIRECTORY_TABLE,
Lists.newArrayList(createMockLiveFileMetaData("file2.sst",
+ DIRECTORY_TABLE, "key1", "key2")));
+ liveFileMetaDataMap.put("col1",
Lists.newArrayList(createMockLiveFileMetaData("file2.sst", "col1", "key1",
+ "key2")));
+ List<UUID> snapshotIds = new ArrayList<>();
+ for (int i = 0; i < numberOfSnapshots; i++) {
+ UUID snapshotId = UUID.randomUUID();
+ SnapshotInfo snapshotInfo = createMockSnapshotInfo(snapshotId,
previousSnapshotInfo == null ? null
+ : previousSnapshotInfo.getSnapshotId());
+ mockSnapshotStore(snapshotId, liveFileMetaDataMap.values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList()));
+ snapshotLocalDataManager.createNewOmSnapshotLocalDataFile(snapshotStore,
snapshotInfo);
+ previousSnapshotInfo = snapshotInfo;
+ for (Map.Entry<String, List<LiveFileMetaData>> tableEntry :
liveFileMetaDataMap.entrySet()) {
+ String table = tableEntry.getKey();
+ tableEntry.getValue().add(createMockLiveFileMetaData("file" +
counter++ + ".sst", table, "key1", "key4"));
+ }
+ snapshotIds.add(snapshotId);
+ }
+ return snapshotIds;
+ }
+
+ private void mockSnapshotStore(UUID snapshotId, List<LiveFileMetaData>
sstFiles) throws RocksDatabaseException {
+ // Setup snapshot store mock
+ File snapshotDbLocation =
OmSnapshotManager.getSnapshotPath(omMetadataManager, snapshotId).toFile();
+ assertTrue(snapshotDbLocation.exists() || snapshotDbLocation.mkdirs());
+
+ when(snapshotStore.getDbLocation()).thenReturn(snapshotDbLocation);
+ RocksDatabase rocksDatabase = mock(RocksDatabase.class);
+ when(snapshotStore.getDb()).thenReturn(rocksDatabase);
+ when(rocksDatabase.getLiveFilesMetaData()).thenReturn(sstFiles);
+ }
+
+ /**
+ * Checks lock orders taken i.e. while reading a snapshot against the
previous snapshot.
+ * Depending on read or write locks are acquired on the snapshotId and read
lock is acquired on the previous
+ * snapshot. Once the instance is closed the read lock on previous snapshot
is released followed by releasing the
+ * lock on the snapshotId.
+ * @param read
+ * @throws IOException
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testLockOrderingAgainstAnotherSnapshot(boolean read) throws
IOException {
+ localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+ List<UUID> snapshotIds = new ArrayList<>();
+ snapshotIds.add(null);
+ snapshotIds.addAll(createSnapshotLocalData(localDataManager, 20));
+ for (int start = 0; start < snapshotIds.size(); start++) {
+ for (int end = start + 1; end < snapshotIds.size(); end++) {
+ UUID startSnapshotId = snapshotIds.get(start);
+ UUID endSnapshotId = snapshotIds.get(end);
+ lockCapturor.clear();
+ int logCaptorIdx = 0;
+ try (ReadableOmSnapshotLocalDataProvider omSnapshotLocalDataProvider =
+ read ? localDataManager.getOmSnapshotLocalData(endSnapshotId,
startSnapshotId) :
+
localDataManager.getWritableOmSnapshotLocalData(endSnapshotId,
startSnapshotId)) {
+ OmSnapshotLocalData snapshotLocalData =
omSnapshotLocalDataProvider.getSnapshotLocalData();
+ OmSnapshotLocalData previousSnapshot =
omSnapshotLocalDataProvider.getPreviousSnapshotLocalData();
+ assertEquals(endSnapshotId, snapshotLocalData.getSnapshotId());
+ if (startSnapshotId == null) {
+ assertNull(previousSnapshot);
+ assertNull(snapshotLocalData.getPreviousSnapshotId());
+ continue;
+ }
+ assertEquals(startSnapshotId, previousSnapshot.getSnapshotId());
+ assertEquals(startSnapshotId,
snapshotLocalData.getPreviousSnapshotId());
+ if (read) {
+ assertEquals(getReadLockMessageAcquire(endSnapshotId),
lockCapturor.get(logCaptorIdx++));
+ } else {
+ assertEquals(getWriteLockMessageAcquire(endSnapshotId),
lockCapturor.get(logCaptorIdx++));
+ }
+ int idx = end - 1;
+ UUID previousSnapId = snapshotIds.get(idx--);
+ assertEquals(getReadLockMessageAcquire(previousSnapId),
lockCapturor.get(logCaptorIdx++));
+ while (idx >= start) {
+ UUID prevPrevSnapId = snapshotIds.get(idx--);
+ assertEquals(getReadLockMessageAcquire(prevPrevSnapId),
lockCapturor.get(logCaptorIdx++));
+ assertEquals(getReadLockMessageRelease(previousSnapId),
lockCapturor.get(logCaptorIdx++));
+ previousSnapId = prevPrevSnapId;
+ }
+ }
+ assertEquals(getReadLockMessageRelease(startSnapshotId),
lockCapturor.get(logCaptorIdx++));
+ if (read) {
+ assertEquals(getReadLockMessageRelease(endSnapshotId),
lockCapturor.get(logCaptorIdx++));
+ } else {
+ assertEquals(getWriteLockMessageRelease(endSnapshotId),
lockCapturor.get(logCaptorIdx++));
+ }
+ assertEquals(lockCapturor.size(), logCaptorIdx);
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testVersionLockResolution(boolean read) throws IOException {
+ localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+ List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 5);
+ for (int snapIdx = 0; snapIdx < snapshotIds.size(); snapIdx++) {
+ UUID snapId = snapshotIds.get(snapIdx);
+ UUID expectedPreviousSnapId = snapIdx - 1 >= 0 ? snapshotIds.get(snapIdx
- 1) : null;
+ lockCapturor.clear();
+ int logCaptorIdx = 0;
+ try (ReadableOmSnapshotLocalDataProvider omSnapshotLocalDataProvider =
+ read ? localDataManager.getOmSnapshotLocalData(snapId) :
+ localDataManager.getWritableOmSnapshotLocalData(snapId)) {
+ OmSnapshotLocalData snapshotLocalData =
omSnapshotLocalDataProvider.getSnapshotLocalData();
+ OmSnapshotLocalData previousSnapshot =
omSnapshotLocalDataProvider.getPreviousSnapshotLocalData();
+ assertEquals(snapId, snapshotLocalData.getSnapshotId());
+ assertEquals(expectedPreviousSnapId, previousSnapshot == null ? null :
+ previousSnapshot.getSnapshotId());
+ if (read) {
+ assertEquals(getReadLockMessageAcquire(snapId),
lockCapturor.get(logCaptorIdx++));
+ } else {
+ assertEquals(getWriteLockMessageAcquire(snapId),
lockCapturor.get(logCaptorIdx++));
+ }
+ if (expectedPreviousSnapId != null) {
+ assertEquals(getReadLockMessageAcquire(expectedPreviousSnapId),
lockCapturor.get(logCaptorIdx++));
+ }
+ }
+ if (expectedPreviousSnapId != null) {
+ assertEquals(getReadLockMessageRelease(expectedPreviousSnapId),
lockCapturor.get(logCaptorIdx++));
+ }
+ if (read) {
+ assertEquals(getReadLockMessageRelease(snapId),
lockCapturor.get(logCaptorIdx++));
+ } else {
+ assertEquals(getWriteLockMessageRelease(snapId),
lockCapturor.get(logCaptorIdx++));
+ }
+ assertEquals(lockCapturor.size(), logCaptorIdx);
+ }
+ }
+
+ @Test
+ public void
testWriteVersionAdditionValidationWithoutPreviousSnapshotVersionExisting()
throws IOException {
+ localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+ List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 2);
+ UUID snapId = snapshotIds.get(1);
+ try (WritableOmSnapshotLocalDataProvider omSnapshotLocalDataProvider =
+ localDataManager.getWritableOmSnapshotLocalData(snapId)) {
+ OmSnapshotLocalData snapshotLocalData =
omSnapshotLocalDataProvider.getSnapshotLocalData();
+
snapshotLocalData.addVersionSSTFileInfos(Lists.newArrayList(createMockLiveFileMetaData("file1.sst",
KEY_TABLE,
+ "key1", "key2")), 3);
+
+ IOException ex = assertThrows(IOException.class,
omSnapshotLocalDataProvider::commit);
+ System.out.println(ex.getMessage());
Review Comment:
Avoid printing to stdout in unit tests; it adds noise to test logs. Please
remove the System.out.println(...) line.
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]