hemantk-12 commented on code in PR #7200:
URL: https://github.com/apache/ozone/pull/7200#discussion_r1767347509


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java:
##########
@@ -354,7 +355,27 @@ public V getValue() {
       public String toString() {
         return "(key=" + key + ", value=" + value + ")";
       }
+
+      @Override
+      public boolean equals(Object obj) {
+        if (!(obj instanceof KeyValue)) {
+          return false;
+        }
+        KeyValue<?, ?> kv = (KeyValue<?, ?>) obj;
+        try {
+          return getKey().equals(kv.getKey()) && 
getValue().equals(kv.getValue());
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public int hashCode() {
+        return Objects.hash(getKey(), getValue());
+      }

Review Comment:
   qq: Is this for testing?



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java:
##########
@@ -354,7 +355,27 @@ public V getValue() {
       public String toString() {
         return "(key=" + key + ", value=" + value + ")";
       }
+
+      @Override
+      public boolean equals(Object obj) {
+        if (!(obj instanceof KeyValue)) {
+          return false;
+        }
+        KeyValue<?, ?> kv = (KeyValue<?, ?>) obj;
+        try {
+          return getKey().equals(kv.getKey()) && 
getValue().equals(kv.getValue());
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public int hashCode() {
+        return Objects.hash(getKey(), getValue());
+      }
     };
+
+

Review Comment:
   nit: remove extra line.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveTableKeysRequest.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.request.snapshot;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.TransactionInfo;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.SnapshotChainManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import 
org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotMoveTableKeysResponse;
+import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
+import org.apache.hadoop.ozone.om.upgrade.DisallowedUntilLayoutVersion;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveTableKeysRequest;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.HddsUtils.fromProtobuf;
+import static 
org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.FILESYSTEM_SNAPSHOT;
+
+/**
+ * Handles OMSnapshotMoveTableKeysRequest Request.
+ * This is an OM internal request. Does not need @RequireSnapshotFeatureState.
+ */
+public class OMSnapshotMoveTableKeysRequest extends OMClientRequest {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(OMSnapshotMoveTableKeysRequest.class);
+
+  public OMSnapshotMoveTableKeysRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+    OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl) 
ozoneManager.getMetadataManager();
+    SnapshotChainManager snapshotChainManager = 
omMetadataManager.getSnapshotChainManager();
+    SnapshotMoveTableKeysRequest moveTableKeysRequest = 
getOmRequest().getSnapshotMoveTableKeysRequest();
+    SnapshotInfo fromSnapshot = SnapshotUtils.getSnapshotInfo(ozoneManager,
+        snapshotChainManager, 
fromProtobuf(moveTableKeysRequest.getFromSnapshotID()));
+    String bucketKeyPrefix = 
omMetadataManager.getBucketKeyPrefix(fromSnapshot.getVolumeName(),
+        fromSnapshot.getBucketName());
+    String bucketKeyPrefixFSO = 
omMetadataManager.getBucketKeyPrefixFSO(fromSnapshot.getVolumeName(),
+        fromSnapshot.getBucketName());
+
+

Review Comment:
   nit: remove empty line.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java:
##########
@@ -838,6 +838,28 @@ public String getBucketKey(String volume, String bucket) {
     return builder.toString();
   }
 
+  /**
+   * Given a volume and bucket, return the corresponding DB key prefix.
+   *
+   * @param volume - Volume name
+   * @param bucket - Bucket name
+   */

Review Comment:
   nit/fyi: you can also use this if there is no change in Javadoc comment from 
the interface.
   ```suggestion
     /**
      * {@inheritDoc}
      */
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java:
##########
@@ -662,6 +664,53 @@ public PendingKeysDeletion getPendingDeletionKeys(final 
int count)
         .getPendingDeletionKeys(count, ozoneManager.getOmSnapshotManager());
   }
 
+  private <V, R> List<Table.KeyValue<String, R>> getTableEntries(String 
startKey,
+          TableIterator<String, ? extends Table.KeyValue<String, V>> 
tableIterator,
+          Function<V, R> valueFunction, int count) throws IOException {
+    List<Table.KeyValue<String, R>> entries = new ArrayList<>();
+    /* Seeking to the start key if it not null. The next key picked up would 
be ensured to start with the bucket
+         prefix, {@link 
org.apache.hadoop.hdds.utils.db.Table#iterator(bucketPrefix)} would ensure this.
+    */
+    if (startKey != null) {
+      tableIterator.seek(startKey);
+    }
+    int currentCount = 0;
+    while (tableIterator.hasNext() && currentCount < count) {
+      Table.KeyValue<String, V> kv = tableIterator.next();
+      if (kv != null) {

Review Comment:
   Yes, there is no harm but it isn't how an iterator is supposed to be used 
unless null is allowed value in the collection.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveTableKeysRequest.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.request.snapshot;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.TransactionInfo;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.SnapshotChainManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import 
org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotMoveTableKeysResponse;
+import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
+import org.apache.hadoop.ozone.om.upgrade.DisallowedUntilLayoutVersion;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveTableKeysRequest;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.HddsUtils.fromProtobuf;
+import static 
org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.FILESYSTEM_SNAPSHOT;
+
+/**
+ * Handles OMSnapshotMoveTableKeysRequest Request.
+ * This is an OM internal request. Does not need @RequireSnapshotFeatureState.
+ */
+public class OMSnapshotMoveTableKeysRequest extends OMClientRequest {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(OMSnapshotMoveTableKeysRequest.class);
+
+  public OMSnapshotMoveTableKeysRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+    OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl) 
ozoneManager.getMetadataManager();
+    SnapshotChainManager snapshotChainManager = 
omMetadataManager.getSnapshotChainManager();
+    SnapshotMoveTableKeysRequest moveTableKeysRequest = 
getOmRequest().getSnapshotMoveTableKeysRequest();
+    SnapshotInfo fromSnapshot = SnapshotUtils.getSnapshotInfo(ozoneManager,
+        snapshotChainManager, 
fromProtobuf(moveTableKeysRequest.getFromSnapshotID()));
+    String bucketKeyPrefix = 
omMetadataManager.getBucketKeyPrefix(fromSnapshot.getVolumeName(),
+        fromSnapshot.getBucketName());
+    String bucketKeyPrefixFSO = 
omMetadataManager.getBucketKeyPrefixFSO(fromSnapshot.getVolumeName(),
+        fromSnapshot.getBucketName());
+
+
+    Set<String> keys = new HashSet<>();
+    List<SnapshotMoveKeyInfos> deletedKeys = new 
ArrayList<>(moveTableKeysRequest.getDeletedKeysList().size());
+
+    //validate deleted key starts with bucket prefix.[/<volName>/<bucketName>/]
+    for (SnapshotMoveKeyInfos deletedKey : 
moveTableKeysRequest.getDeletedKeysList()) {
+      // Filter only deleted keys with atleast one keyInfo per key.
+      if (!deletedKey.getKeyInfosList().isEmpty()) {
+        deletedKeys.add(deletedKey);
+        if (!deletedKey.getKey().startsWith(bucketKeyPrefix)) {
+          throw new OMException("Deleted Key: " + deletedKey + " doesn't start 
with prefix " + bucketKeyPrefix,
+              OMException.ResultCodes.INVALID_KEY_NAME);
+        }
+        if (keys.contains(deletedKey.getKey())) {
+          throw new OMException("Duplicate Deleted Key: " + deletedKey + " in 
request",
+              OMException.ResultCodes.INVALID_REQUEST);
+        } else {
+          keys.add(deletedKey.getKey());
+        }
+      }
+    }
+
+    keys.clear();
+    List<HddsProtos.KeyValue> renamedKeysList = new 
ArrayList<>(moveTableKeysRequest.getRenamedKeysList().size());
+    //validate rename key starts with bucket prefix.[/<volName>/<bucketName>/]
+    for (HddsProtos.KeyValue renamedKey : 
moveTableKeysRequest.getRenamedKeysList()) {
+      if (renamedKey.hasKey() && renamedKey.hasValue()) {
+        renamedKeysList.add(renamedKey);
+        if (!renamedKey.getKey().startsWith(bucketKeyPrefix)) {
+          throw new OMException("Rename Key: " + renamedKey + " doesn't start 
with prefix " + bucketKeyPrefix,
+              OMException.ResultCodes.INVALID_KEY_NAME);
+        }
+        if (keys.contains(renamedKey.getKey())) {
+          throw new OMException("Duplicate rename Key: " + renamedKey + " in 
request",
+              OMException.ResultCodes.INVALID_REQUEST);
+        } else {
+          keys.add(renamedKey.getKey());
+        }
+      }
+    }
+    keys.clear();
+
+    // Filter only deleted dirs with only one keyInfo per key.
+    List<SnapshotMoveKeyInfos> deletedDirs = new 
ArrayList<>(moveTableKeysRequest.getDeletedDirsList().size());
+    //validate deleted key starts with bucket FSO path 
prefix.[/<volId>/<bucketId>/]
+    for (SnapshotMoveKeyInfos deletedDir : 
moveTableKeysRequest.getDeletedDirsList()) {
+      // Filter deleted directories with exactly one keyInfo per key.

Review Comment:
   Isn't it the same as line 120?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java:
##########
@@ -136,316 +134,108 @@ public BackgroundTaskResult call() throws 
InterruptedException {
 
       getRunCount().incrementAndGet();
 
-      ReferenceCounted<OmSnapshot> rcOmSnapshot = null;
-      ReferenceCounted<OmSnapshot> rcOmPreviousSnapshot = null;
-
-      Table<String, SnapshotInfo> snapshotInfoTable =
-          ozoneManager.getMetadataManager().getSnapshotInfoTable();
-      List<String> purgeSnapshotKeys = new ArrayList<>();
-      try (TableIterator<String, ? extends Table.KeyValue
-          <String, SnapshotInfo>> iterator = snapshotInfoTable.iterator()) {
-
+      try {
+        int remaining = keyLimitPerTask;
+        Iterator<UUID> iterator = snapshotChainManager.iterator(true);
+        List<SnapshotInfo> snapshotsToBePurged = new ArrayList<>();
         long snapshotLimit = snapshotDeletionPerTask;
-
         while (iterator.hasNext() && snapshotLimit > 0) {
-          SnapshotInfo snapInfo = iterator.next().getValue();
-
-          // Only Iterate in deleted snapshot
+          SnapshotInfo snapInfo = SnapshotUtils.getSnapshotInfo(ozoneManager, 
snapshotChainManager, iterator.next());
+          // Only Iterate in deleted snapshot & only if all the changes have 
been flushed into disk.
           if (shouldIgnoreSnapshot(snapInfo)) {
             continue;
           }
-
-          // Note: Can refactor this to use try-with-resources.
-          // Handling RC decrements manually for now to minimize conflicts.
-          rcOmSnapshot = omSnapshotManager.getSnapshot(
-              snapInfo.getVolumeName(),
-              snapInfo.getBucketName(),
-              snapInfo.getName());
-          OmSnapshot omSnapshot = rcOmSnapshot.get();
-
-          Table<String, RepeatedOmKeyInfo> snapshotDeletedTable =
-              omSnapshot.getMetadataManager().getDeletedTable();
-          Table<String, OmKeyInfo> snapshotDeletedDirTable =
-              omSnapshot.getMetadataManager().getDeletedDirTable();
-
-          Table<String, String> renamedTable =
-              omSnapshot.getMetadataManager().getSnapshotRenamedTable();
-
-          long volumeId = ozoneManager.getMetadataManager()
-              .getVolumeId(snapInfo.getVolumeName());
-          // Get bucketInfo for the snapshot bucket to get bucket layout.
-          String dbBucketKey = ozoneManager.getMetadataManager().getBucketKey(
-              snapInfo.getVolumeName(), snapInfo.getBucketName());
-          OmBucketInfo bucketInfo = ozoneManager.getMetadataManager()
-              .getBucketTable().get(dbBucketKey);
-
-          if (bucketInfo == null) {
-            // Decrement ref count
-            rcOmSnapshot.close();
-            rcOmSnapshot = null;
-            throw new IllegalStateException("Bucket " + "/" +
-                snapInfo.getVolumeName() + "/" + snapInfo.getBucketName() +
-                " is not found. BucketInfo should not be null for snapshotted" 
+
-                " bucket. The OM is in unexpected state.");
-          }
-
-          String snapshotBucketKey = dbBucketKey + OzoneConsts.OM_KEY_PREFIX;
-          String dbBucketKeyForDir = ozoneManager.getMetadataManager()
-              .getBucketKey(Long.toString(volumeId),
-                  Long.toString(bucketInfo.getObjectID())) + OM_KEY_PREFIX;
-
-          if (isSnapshotReclaimable(snapshotDeletedTable,
-              snapshotDeletedDirTable, snapshotBucketKey, dbBucketKeyForDir)) {
-            purgeSnapshotKeys.add(snapInfo.getTableKey());
-            // Decrement ref count
-            rcOmSnapshot.close();
-            rcOmSnapshot = null;
+          SnapshotInfo nextSnapshot = 
SnapshotUtils.getNextSnapshot(ozoneManager, snapshotChainManager, snapInfo);
+          // Continue if the next snapshot is not active. This is to avoid 
unnecessary copies from one snapshot to
+          // another.
+          if (nextSnapshot != null &&
+              nextSnapshot.getSnapshotStatus() != 
SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE) {
             continue;
           }
 
-          //TODO: [SNAPSHOT] Add lock to deletedTable and Active DB.
-          SnapshotInfo previousSnapshot = getPreviousActiveSnapshot(snapInfo, 
chainManager);
-          Table<String, OmKeyInfo> previousKeyTable = null;
-          Table<String, OmDirectoryInfo> previousDirTable = null;
-          OmSnapshot omPreviousSnapshot = null;
-
-          // Split RepeatedOmKeyInfo and update current snapshot 
deletedKeyTable
-          // and next snapshot deletedKeyTable.
-          if (previousSnapshot != null) {
-            rcOmPreviousSnapshot = omSnapshotManager.getSnapshot(
-                previousSnapshot.getVolumeName(),
-                previousSnapshot.getBucketName(),
-                previousSnapshot.getName());
-            omPreviousSnapshot = rcOmPreviousSnapshot.get();
-
-            previousKeyTable = omPreviousSnapshot
-                
.getMetadataManager().getKeyTable(bucketInfo.getBucketLayout());
-            previousDirTable = omPreviousSnapshot
-                .getMetadataManager().getDirectoryTable();
-          }
-
-          // Move key to either next non deleted snapshot's deletedTable
-          // or keep it in current snapshot deleted table.
-          List<SnapshotMoveKeyInfos> toReclaimList = new ArrayList<>();
-          List<SnapshotMoveKeyInfos> toNextDBList = new ArrayList<>();
-          // A list of renamed keys/files/dirs
-          List<HddsProtos.KeyValue> renamedList = new ArrayList<>();
-          List<String> dirsToMove = new ArrayList<>();
-
-          long remainNum = handleDirectoryCleanUp(snapshotDeletedDirTable,
-              previousDirTable, renamedTable, dbBucketKeyForDir, snapInfo,
-              omSnapshot, dirsToMove, renamedList);
-          int deletionCount = 0;
-
-          try (TableIterator<String, ? extends Table.KeyValue<String,
-              RepeatedOmKeyInfo>> deletedIterator = snapshotDeletedTable
-              .iterator()) {
-
-            List<BlockGroup> keysToPurge = new ArrayList<>();
-            deletedIterator.seek(snapshotBucketKey);
-
-            while (deletedIterator.hasNext() &&
-                deletionCount < remainNum) {
-              Table.KeyValue<String, RepeatedOmKeyInfo>
-                  deletedKeyValue = deletedIterator.next();
-              String deletedKey = deletedKeyValue.getKey();
-
-              // Exit if it is out of the bucket scope.
-              if (!deletedKey.startsWith(snapshotBucketKey)) {
-                // If snapshot deletedKeyTable doesn't have any
-                // entry in the snapshot scope it can be reclaimed
-                break;
-              }
-
-              RepeatedOmKeyInfo repeatedOmKeyInfo = deletedKeyValue.getValue();
-
-              SnapshotMoveKeyInfos.Builder toReclaim = SnapshotMoveKeyInfos
-                  .newBuilder()
-                  .setKey(deletedKey);
-              SnapshotMoveKeyInfos.Builder toNextDb = SnapshotMoveKeyInfos
-                  .newBuilder()
-                  .setKey(deletedKey);
-              HddsProtos.KeyValue.Builder renamedKey = HddsProtos.KeyValue
-                  .newBuilder();
-
-              for (OmKeyInfo keyInfo : repeatedOmKeyInfo.getOmKeyInfoList()) {
-                splitRepeatedOmKeyInfo(toReclaim, toNextDb, renamedKey,
-                    keyInfo, previousKeyTable, renamedTable,
-                    bucketInfo, volumeId);
+          // nextSnapshot = null means entries would be moved to AOS, hence 
ensure that KeyDeletingService &
+          // DirectoryDeletingService is not running while the entries are 
moving.

Review Comment:
   Do we still need this comment? if so, can you please reword it to something 
like
   ```
           // If nextSnapshot is null, check and wait for KeyDeletingService 
and DirectoryDeletingService
           // if they are running and moving entries on AOS."
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotChainManager.java:
##########
@@ -382,6 +389,41 @@ public UUID getLatestGlobalSnapshotId() throws IOException 
{
     return latestGlobalSnapshotId;
   }
 
+  /**
+   * Get oldest of global snapshot in snapshot chain.
+   */
+  public UUID getOldestGlobalSnapshotId() throws IOException {
+    validateSnapshotChain();
+    return oldestGlobalSnapshotId;
+  }
+
+  public Iterator<UUID> iterator(final boolean reverse) throws IOException {
+    validateSnapshotChain();
+    return new Iterator<UUID>() {
+      private UUID currentSnapshotId = reverse ? getLatestGlobalSnapshotId() : 
getOldestGlobalSnapshotId();
+      @Override
+      public boolean hasNext() {
+        try {
+          return reverse ? hasPreviousGlobalSnapshot(currentSnapshotId) : 
hasNextGlobalSnapshot(currentSnapshotId);
+        } catch (IOException e) {
+          return false;
+        }
+      }
+
+      @Override
+      public UUID next() {
+        try {
+          UUID prevSnapshotId = currentSnapshotId;
+          currentSnapshotId =
+              reverse ? previousGlobalSnapshot(currentSnapshotId) : 
nextGlobalSnapshot(currentSnapshotId);
+          return prevSnapshotId;
+        } catch (IOException e) {
+          throw new UncheckedIOException("Error while getting next snapshot 
for " + currentSnapshotId, e);

Review Comment:
   Oh, I thought it was about not having the next element.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java:
##########
@@ -463,92 +253,36 @@ private void submitSnapshotPurgeRequest(List<String> 
purgeSnapshotKeys) {
       }
     }
 
-    @SuppressWarnings("checkstyle:ParameterNumber")
-    private void splitRepeatedOmKeyInfo(SnapshotMoveKeyInfos.Builder toReclaim,
-        SnapshotMoveKeyInfos.Builder toNextDb,
-        HddsProtos.KeyValue.Builder renamedKey, OmKeyInfo keyInfo,
-        Table<String, OmKeyInfo> previousKeyTable,
-        Table<String, String> renamedTable,
-        OmBucketInfo bucketInfo, long volumeId) throws IOException {
-
-      if (isKeyReclaimable(previousKeyTable, renamedTable,
-          keyInfo, bucketInfo, volumeId, renamedKey)) {
-        // Update in current db's deletedKeyTable
-        toReclaim.addKeyInfos(keyInfo
-            .getProtobuf(ClientVersion.CURRENT_VERSION));
-      } else {
-        // Move to next non deleted snapshot's deleted table
-        toNextDb.addKeyInfos(keyInfo.getProtobuf(
-            ClientVersion.CURRENT_VERSION));
-      }
-    }
-
-    private boolean isDirReclaimable(
-        Table.KeyValue<String, OmKeyInfo> deletedDir,
-        Table<String, OmDirectoryInfo> previousDirTable,
-        Table<String, String> renamedTable,
-        List<HddsProtos.KeyValue> renamedList) throws IOException {
-
-      if (previousDirTable == null) {
-        return true;
-      }
-
-      String deletedDirDbKey = deletedDir.getKey();
-      OmKeyInfo deletedDirInfo = deletedDir.getValue();
-      String dbRenameKey = ozoneManager.getMetadataManager().getRenameKey(
-          deletedDirInfo.getVolumeName(), deletedDirInfo.getBucketName(),
-          deletedDirInfo.getObjectID());
-
-      /*
-      snapshotRenamedTable: /volumeName/bucketName/objectID ->
-          /volumeId/bucketId/parentId/dirName
-       */
-      String dbKeyBeforeRename = renamedTable.getIfExist(dbRenameKey);
-      String prevDbKey = null;
-
-      if (dbKeyBeforeRename != null) {
-        prevDbKey = dbKeyBeforeRename;
-        HddsProtos.KeyValue renamedDir = HddsProtos.KeyValue
-            .newBuilder()
-            .setKey(dbRenameKey)
-            .setValue(dbKeyBeforeRename)
-            .build();
-        renamedList.add(renamedDir);
-      } else {
-        // In OMKeyDeleteResponseWithFSO OzonePathKey is converted to
-        // OzoneDeletePathKey. Changing it back to check the previous DirTable.
-        prevDbKey = ozoneManager.getMetadataManager()
-            .getOzoneDeletePathDirKey(deletedDirDbKey);
-      }
-
-      OmDirectoryInfo prevDirectoryInfo = previousDirTable.get(prevDbKey);
-      if (prevDirectoryInfo == null) {
-        return true;
-      }
-
-      return prevDirectoryInfo.getObjectID() != deletedDirInfo.getObjectID();
-    }
-
-    public void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo,
-        List<SnapshotMoveKeyInfos> toReclaimList,
-        List<SnapshotMoveKeyInfos> toNextDBList,
-        List<HddsProtos.KeyValue> renamedList,
-        List<String> dirsToMove) throws InterruptedException {
+    private void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo,
+                                              List<SnapshotMoveKeyInfos> 
deletedKeys,
+                                              List<HddsProtos.KeyValue> 
renamedList,
+                                              List<SnapshotMoveKeyInfos> 
dirsToMove) {
 
-      SnapshotMoveDeletedKeysRequest.Builder moveDeletedKeysBuilder =
-          SnapshotMoveDeletedKeysRequest.newBuilder()
-              .setFromSnapshot(snapInfo.getProtobuf());
+      SnapshotMoveTableKeysRequest.Builder moveDeletedKeysBuilder = 
SnapshotMoveTableKeysRequest.newBuilder()
+          .setFromSnapshotID(toProtobuf(snapInfo.getSnapshotId()));
 
-      SnapshotMoveDeletedKeysRequest moveDeletedKeys = moveDeletedKeysBuilder
-          .addAllReclaimKeys(toReclaimList)
-          .addAllNextDBKeys(toNextDBList)
+      SnapshotMoveTableKeysRequest moveDeletedKeys = moveDeletedKeysBuilder
+          .addAllDeletedKeys(deletedKeys)
           .addAllRenamedKeys(renamedList)
-          .addAllDeletedDirsToMove(dirsToMove)
+          .addAllDeletedDirs(dirsToMove)
           .build();
+      if (isBufferLimitCrossed(ratisByteLimit, 0, 
moveDeletedKeys.getSerializedSize())) {
+        int remaining = MIN_ERR_LIMIT_PER_TASK;
+        deletedKeys = deletedKeys.subList(0, Math.min(remaining, 
deletedKeys.size()));

Review Comment:
   Is it limited because we will move `1000` entries at max?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java:
##########
@@ -119,6 +122,29 @@ ListKeysResult listKeys(String volumeName, String 
bucketName, String startKey,
    */
   PendingKeysDeletion getPendingDeletionKeys(int count) throws IOException;
 
+  /**
+   * Returns a list rename entries from the snapshotRenamedTable.
+   *
+   * @param count max number of keys to return.
+   * @return a Pair of list of {@link 
org.apache.hadoop.hdds.utils.db.Table.KeyValue} representing the keys in the
+   * underlying metadataManager.
+   * @throws IOException
+   */
+  List<Table.KeyValue<String, String>> getRenamesKeyEntries(

Review Comment:
   I'm not asking to create another map. I'm saying the return type could be a 
map rather than a list of pairs.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to