aswinshakil commented on code in PR #7200:
URL: https://github.com/apache/ozone/pull/7200#discussion_r1767550817


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java:
##########
@@ -242,4 +242,44 @@ public static String 
getOzonePathKeyForFso(OMMetadataManager metadataManager,
     final long bucketId = metadataManager.getBucketId(volumeName, bucketName);
     return OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId + OM_KEY_PREFIX;
   }
+
+  /**
+   * Returns merged repeatedKeyInfo entry with the existing deleted entry in 
the table.
+   * @param snapshotMoveKeyInfos keyInfos to be added.
+   * @param metadataManager metadataManager for a store.
+   * @return
+   * @throws IOException
+   */
+  public static RepeatedOmKeyInfo 
createMergedRepeatedOmKeyInfoFromDeletedTableEntry(
+      OzoneManagerProtocolProtos.SnapshotMoveKeyInfos snapshotMoveKeyInfos, 
OMMetadataManager metadataManager) throws
+      IOException {
+    String dbKey = snapshotMoveKeyInfos.getKey();
+    List<OmKeyInfo> keyInfoList = new ArrayList<>();
+    for (OzoneManagerProtocolProtos.KeyInfo info : 
snapshotMoveKeyInfos.getKeyInfosList()) {
+      OmKeyInfo fromProtobuf = OmKeyInfo.getFromProtobuf(info);
+      keyInfoList.add(fromProtobuf);
+    }
+    // When older version of keys are moved to the next snapshot's deletedTable
+    // The newer version might also be in the next snapshot's deletedTable and
+    // it might overwrite the existing value which inturn could lead to orphan 
block in the system.
+    // Checking the keyInfoList with the last n versions of the omKeyInfo 
versions would ensure all versions are
+    // present in the list and would also avoid redundant additions to the 
list if the last n versions match, which
+    // can happen on om transaction replay on snapshotted rocksdb.
+    RepeatedOmKeyInfo result = metadataManager.getDeletedTable().get(dbKey);
+    if (result == null) {
+      result = new RepeatedOmKeyInfo(keyInfoList);
+    } else if (!isSameAsLatestOmKeyInfo(keyInfoList, result)) {
+      keyInfoList.forEach(result::addOmKeyInfo);
+    }
+    return result;
+  }
+
+  private static boolean isSameAsLatestOmKeyInfo(List<OmKeyInfo> omKeyInfos,

Review Comment:
   I'm not exactly sure what this is doing. Can you let me know? 



##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotChain.java:
##########
@@ -305,6 +307,34 @@ public void testChainFromLoadFromTable(boolean 
increasingTIme)
         () -> chainManager.nextGlobalSnapshot(snapshotID1));
   }
 
+  @ParameterizedTest
+  @ValueSource(ints = {0, 1, 2, 5, 10})
+  public void testSnapshotChainIterator(int numberOfSnapshots) throws 
IOException {

Review Comment:
   Can we also add tests for `getOldestGlobalSnapshotId`. Just a line should be 
enough similar to `testChainFromLoadFromTable` and `testAddSnapshot`



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveTableKeysRequest.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.request.snapshot;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.TransactionInfo;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.SnapshotChainManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import 
org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotMoveTableKeysResponse;
+import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
+import org.apache.hadoop.ozone.om.upgrade.DisallowedUntilLayoutVersion;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveTableKeysRequest;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.HddsUtils.fromProtobuf;
+import static 
org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.FILESYSTEM_SNAPSHOT;
+
+/**
+ * Handles OMSnapshotMoveTableKeysRequest Request.
+ * This is an OM internal request. Does not need @RequireSnapshotFeatureState.
+ */
+public class OMSnapshotMoveTableKeysRequest extends OMClientRequest {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(OMSnapshotMoveTableKeysRequest.class);
+
+  public OMSnapshotMoveTableKeysRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  @DisallowedUntilLayoutVersion(FILESYSTEM_SNAPSHOT)
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, 
TermIndex termIndex) {
+    OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl) 
ozoneManager.getMetadataManager();
+    SnapshotChainManager snapshotChainManager = 
omMetadataManager.getSnapshotChainManager();
+
+    SnapshotMoveTableKeysRequest moveTableKeysRequest = 
getOmRequest().getSnapshotMoveTableKeysRequest();
+
+    OMClientResponse omClientResponse;
+    OzoneManagerProtocolProtos.OMResponse.Builder omResponse = 
OmResponseUtil.getOMResponseBuilder(getOmRequest());
+    try {
+      SnapshotInfo fromSnapshot = SnapshotUtils.getSnapshotInfo(ozoneManager,
+          snapshotChainManager, 
fromProtobuf(moveTableKeysRequest.getFromSnapshotID()));
+      // If there is no Non-Deleted Snapshot move the
+      // keys to Active Object Store.
+      SnapshotInfo nextSnapshot = SnapshotUtils.getNextSnapshot(ozoneManager, 
snapshotChainManager, fromSnapshot);
+
+      // If next snapshot is not active then ignore move. Since this could be 
a redundant operations.
+      if (nextSnapshot != null && nextSnapshot.getSnapshotStatus() != 
SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE) {
+        throw new OMException("Next snapshot : " + nextSnapshot + " in chain 
is not active.",
+            OMException.ResultCodes.INVALID_SNAPSHOT_ERROR);
+      }
+
+      String expectedBucketKeyPrefix = 
omMetadataManager.getBucketKeyPrefix(fromSnapshot.getVolumeName(),
+          fromSnapshot.getBucketName());
+      String expectedBucketKeyPrefixFSO = 
omMetadataManager.getBucketKeyPrefixFSO(fromSnapshot.getVolumeName(),
+          fromSnapshot.getBucketName());
+
+      // Filter only deleted keys with atlest one keyInfo per key.
+      Set<String> keys = new HashSet<>();
+      List<SnapshotMoveKeyInfos> deletedKeys =
+          moveTableKeysRequest.getDeletedKeysList().stream()
+              .filter(snapshotMoveKeyInfos -> 
!snapshotMoveKeyInfos.getKeyInfosList().isEmpty())
+              .collect(Collectors.toList());
+      //validate deleted key starts with bucket 
prefix.[/<volName>/<bucketName>/]
+      for (SnapshotMoveKeyInfos deletedKey : deletedKeys) {
+        if (!deletedKey.getKey().startsWith(expectedBucketKeyPrefix)) {
+          throw new OMException("Deleted Key: " + deletedKey + " doesn't start 
with prefix " + expectedBucketKeyPrefix,
+              OMException.ResultCodes.INVALID_KEY_NAME);
+        }
+        if (keys.contains(deletedKey.getKey())) {
+          throw new OMException("Duplicate Deleted Key: " + deletedKey + " in 
request",
+              OMException.ResultCodes.INVALID_REQUEST);
+        } else {
+          keys.add(deletedKey.getKey());
+        }
+      }
+      keys.clear();
+      List<HddsProtos.KeyValue> renamedKeysList = 
moveTableKeysRequest.getRenamedKeysList().stream()
+          .filter(keyValue -> keyValue.hasKey() && 
keyValue.hasValue()).collect(Collectors.toList());
+      //validate rename key starts with bucket 
prefix.[/<volName>/<bucketName>/]
+      for (HddsProtos.KeyValue renamedKey : renamedKeysList) {
+        if (!renamedKey.getKey().startsWith(expectedBucketKeyPrefix)) {
+          throw new OMException("Rename Key: " + renamedKey + " doesn't start 
with prefix " + expectedBucketKeyPrefix,
+              OMException.ResultCodes.INVALID_KEY_NAME);
+        }
+        if (keys.contains(renamedKey.getKey())) {
+          throw new OMException("Duplicate rename Key: " + renamedKey + " in 
request",
+              OMException.ResultCodes.INVALID_REQUEST);
+        } else {
+          keys.add(renamedKey.getKey());
+        }
+      }
+      keys.clear();
+      // Filter only deleted dirs with only one keyInfo per key.
+      List<SnapshotMoveKeyInfos> deletedDirs = 
moveTableKeysRequest.getDeletedDirsList().stream()
+          .filter(snapshotMoveKeyInfos -> 
snapshotMoveKeyInfos.getKeyInfosList().size() == 1)

Review Comment:
   We would only commit the wrong change only if we send wrong change from 
`SnapshotDeletingService` itself. Otherwise we know deleteDir will only have 
one keyInfo. 



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java:
##########
@@ -136,316 +157,90 @@ public BackgroundTaskResult call() throws 
InterruptedException {
 
       getRunCount().incrementAndGet();
 
-      ReferenceCounted<OmSnapshot> rcOmSnapshot = null;
-      ReferenceCounted<OmSnapshot> rcOmPreviousSnapshot = null;
-
-      Table<String, SnapshotInfo> snapshotInfoTable =
-          ozoneManager.getMetadataManager().getSnapshotInfoTable();
-      List<String> purgeSnapshotKeys = new ArrayList<>();
-      try (TableIterator<String, ? extends Table.KeyValue
-          <String, SnapshotInfo>> iterator = snapshotInfoTable.iterator()) {
-
+      try {
+        int remaining = keyLimitPerTask;
+        Iterator<UUID> iterator = chainManager.iterator(true);
+        List<String> snapshotsToBePurged = new ArrayList<>();
         long snapshotLimit = snapshotDeletionPerTask;
-
         while (iterator.hasNext() && snapshotLimit > 0) {
-          SnapshotInfo snapInfo = iterator.next().getValue();
-
-          // Only Iterate in deleted snapshot
+          SnapshotInfo snapInfo = SnapshotUtils.getSnapshotInfo(ozoneManager, 
chainManager, iterator.next());
           if (shouldIgnoreSnapshot(snapInfo)) {
             continue;
           }
-
-          // Note: Can refactor this to use try-with-resources.
-          // Handling RC decrements manually for now to minimize conflicts.
-          rcOmSnapshot = omSnapshotManager.getSnapshot(
-              snapInfo.getVolumeName(),
-              snapInfo.getBucketName(),
-              snapInfo.getName());
-          OmSnapshot omSnapshot = rcOmSnapshot.get();
-
-          Table<String, RepeatedOmKeyInfo> snapshotDeletedTable =
-              omSnapshot.getMetadataManager().getDeletedTable();
-          Table<String, OmKeyInfo> snapshotDeletedDirTable =
-              omSnapshot.getMetadataManager().getDeletedDirTable();
-
-          Table<String, String> renamedTable =
-              omSnapshot.getMetadataManager().getSnapshotRenamedTable();
-
-          long volumeId = ozoneManager.getMetadataManager()
-              .getVolumeId(snapInfo.getVolumeName());
-          // Get bucketInfo for the snapshot bucket to get bucket layout.
-          String dbBucketKey = ozoneManager.getMetadataManager().getBucketKey(
-              snapInfo.getVolumeName(), snapInfo.getBucketName());
-          OmBucketInfo bucketInfo = ozoneManager.getMetadataManager()
-              .getBucketTable().get(dbBucketKey);
-
-          if (bucketInfo == null) {
-            // Decrement ref count
-            rcOmSnapshot.close();
-            rcOmSnapshot = null;
-            throw new IllegalStateException("Bucket " + "/" +
-                snapInfo.getVolumeName() + "/" + snapInfo.getBucketName() +
-                " is not found. BucketInfo should not be null for snapshotted" 
+
-                " bucket. The OM is in unexpected state.");
-          }
-
-          String snapshotBucketKey = dbBucketKey + OzoneConsts.OM_KEY_PREFIX;
-          String dbBucketKeyForDir = ozoneManager.getMetadataManager()
-              .getBucketKey(Long.toString(volumeId),
-                  Long.toString(bucketInfo.getObjectID())) + OM_KEY_PREFIX;
-
-          if (isSnapshotReclaimable(snapshotDeletedTable,
-              snapshotDeletedDirTable, snapshotBucketKey, dbBucketKeyForDir)) {
-            purgeSnapshotKeys.add(snapInfo.getTableKey());
-            // Decrement ref count
-            rcOmSnapshot.close();
-            rcOmSnapshot = null;
+          LOG.info("Started Snapshot Deletion Processing for snapshot : {}", 
snapInfo.getTableKey());
+          SnapshotInfo nextSnapshot = 
SnapshotUtils.getNextSnapshot(ozoneManager, chainManager, snapInfo);
+          // Continue if the next snapshot is not active. This is to avoid 
unnecessary copies from one snapshot to
+          // another.
+          if (nextSnapshot != null &&
+              nextSnapshot.getSnapshotStatus() != 
SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE) {
             continue;
           }
 
-          //TODO: [SNAPSHOT] Add lock to deletedTable and Active DB.
-          SnapshotInfo previousSnapshot = getPreviousActiveSnapshot(snapInfo, 
chainManager);
-          Table<String, OmKeyInfo> previousKeyTable = null;
-          Table<String, OmDirectoryInfo> previousDirTable = null;
-          OmSnapshot omPreviousSnapshot = null;
-
-          // Split RepeatedOmKeyInfo and update current snapshot 
deletedKeyTable
-          // and next snapshot deletedKeyTable.
-          if (previousSnapshot != null) {
-            rcOmPreviousSnapshot = omSnapshotManager.getSnapshot(
-                previousSnapshot.getVolumeName(),
-                previousSnapshot.getBucketName(),
-                previousSnapshot.getName());
-            omPreviousSnapshot = rcOmPreviousSnapshot.get();
-
-            previousKeyTable = omPreviousSnapshot
-                
.getMetadataManager().getKeyTable(bucketInfo.getBucketLayout());
-            previousDirTable = omPreviousSnapshot
-                .getMetadataManager().getDirectoryTable();
+          // nextSnapshot = null means entries would be moved to AOS, hence 
ensure that KeyDeletingService &
+          // DirectoryDeletingService is not running while the entries are 
moving.
+          if (nextSnapshot == null) {
+            waitForKeyDeletingService();

Review Comment:
   Quick question, why do we need to wait for KDS and DDS to stop? It will 
anyway run the next time which is 60sec by default. So even within this 
iteration of `SnapshotDeletingService` it is possible the KDS and DDS will run 
again. 



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java:
##########
@@ -662,6 +663,61 @@ public PendingKeysDeletion getPendingDeletionKeys(final 
int count)
         .getPendingDeletionKeys(count, ozoneManager.getOmSnapshotManager());
   }
 
+  @Override
+  public List<Table.KeyValue<String, String>> getRenamesKeyEntries(
+      String volume, String bucket, String startKey, int count) throws 
IOException {
+    // Bucket prefix would be empty if volume is empty i.e. either null or "".
+    Optional<String> bucketPrefix = Optional.ofNullable(volume).map(vol -> 
vol.isEmpty() ? null : vol)

Review Comment:
   We should still do this, makes it more readable. 



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java:
##########
@@ -136,316 +157,90 @@ public BackgroundTaskResult call() throws 
InterruptedException {
 
       getRunCount().incrementAndGet();
 
-      ReferenceCounted<OmSnapshot> rcOmSnapshot = null;
-      ReferenceCounted<OmSnapshot> rcOmPreviousSnapshot = null;
-
-      Table<String, SnapshotInfo> snapshotInfoTable =
-          ozoneManager.getMetadataManager().getSnapshotInfoTable();
-      List<String> purgeSnapshotKeys = new ArrayList<>();
-      try (TableIterator<String, ? extends Table.KeyValue
-          <String, SnapshotInfo>> iterator = snapshotInfoTable.iterator()) {
-
+      try {
+        int remaining = keyLimitPerTask;
+        Iterator<UUID> iterator = chainManager.iterator(true);
+        List<String> snapshotsToBePurged = new ArrayList<>();
         long snapshotLimit = snapshotDeletionPerTask;
-
         while (iterator.hasNext() && snapshotLimit > 0) {

Review Comment:
   We can exit early if `remaining <= 0`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to