hemantk-12 commented on code in PR #4823:
URL: https://github.com/apache/ozone/pull/4823#discussion_r1228754044


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/FSODirectoryPathResolver.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+public class FSODirectoryPathResolver implements ObjectPathResolver {
+
+  private String prefix;
+  private long bucketId;
+  private Table<String, OmDirectoryInfo> dirInfoTable;

Review Comment:
   Please make class variables final.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/FSODirectoryPathResolver.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+public class FSODirectoryPathResolver implements ObjectPathResolver {
+
+  private String prefix;
+  private long bucketId;
+  private Table<String, OmDirectoryInfo> dirInfoTable;
+
+  public FSODirectoryPathResolver(String prefix, long bucketId,
+      Table<String, OmDirectoryInfo> dirInfoTable) {
+    this.prefix = prefix;
+    this.dirInfoTable = dirInfoTable;
+    this.bucketId = bucketId;
+  }
+
+  private void addToPathMap(ObjectIDPathVal objectIDPathVal,
+                            Set<Long> dirObjIds, Map<Long, Path> pathMap) {
+    if (dirObjIds.contains(objectIDPathVal.getObjectId())) {
+      pathMap.put(objectIDPathVal.getObjectId(), objectIDPathVal.getPath());
+      dirObjIds.remove(objectIDPathVal.getObjectId());
+    }
+  }
+
+  /**
+   * Assuming all dirObjIds belong to a bucket this function resolves absolute
+   * path for a given FSO bucket.
+   * @param dirObjIds
+   * @return Map of Path corresponding to provided directory object IDs
+   */
+  @Override
+  public Map<Long, Path> getAbsolutePathForObjectIDs(Set<Long> dirObjIds)
+      throws IOException {
+    // Root of a bucket would always have the key as 
/volumeId/bucketId/bucketId/
+    if (dirObjIds.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    Map<Long, Path> pathMap = new HashMap<>();
+    Queue<ObjectIDPathVal> objectIdPathVals = new LinkedList<>();
+    ObjectIDPathVal root = new ObjectIDPathVal(bucketId, Paths.get("/"));
+    objectIdPathVals.add(root);
+    addToPathMap(root, dirObjIds, pathMap);
+
+    while (!objectIdPathVals.isEmpty() && dirObjIds.size() > 0) {
+      ObjectIDPathVal parent = objectIdPathVals.poll();
+      try(TableIterator<String, ? extends Table.KeyValue<String, 
OmDirectoryInfo>>
+              subDirIter = dirInfoTable.iterator(
+                  prefix + parent.getObjectId())) {
+        while (dirObjIds.size() > 0 && subDirIter.hasNext()) {
+          OmDirectoryInfo dir = subDirIter.next().getValue();

Review Comment:
   Name it to `childDir` since you are using parent above. Parent/child (or 
current/next) will improve readability.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/FSODirectoryPathResolver.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+public class FSODirectoryPathResolver implements ObjectPathResolver {
+
+  private String prefix;
+  private long bucketId;
+  private Table<String, OmDirectoryInfo> dirInfoTable;
+
+  public FSODirectoryPathResolver(String prefix, long bucketId,
+      Table<String, OmDirectoryInfo> dirInfoTable) {
+    this.prefix = prefix;
+    this.dirInfoTable = dirInfoTable;
+    this.bucketId = bucketId;
+  }
+
+  private void addToPathMap(ObjectIDPathVal objectIDPathVal,
+                            Set<Long> dirObjIds, Map<Long, Path> pathMap) {
+    if (dirObjIds.contains(objectIDPathVal.getObjectId())) {
+      pathMap.put(objectIDPathVal.getObjectId(), objectIDPathVal.getPath());
+      dirObjIds.remove(objectIDPathVal.getObjectId());
+    }
+  }
+
+  /**
+   * Assuming all dirObjIds belong to a bucket this function resolves absolute
+   * path for a given FSO bucket.
+   * @param dirObjIds
+   * @return Map of Path corresponding to provided directory object IDs
+   */
+  @Override
+  public Map<Long, Path> getAbsolutePathForObjectIDs(Set<Long> dirObjIds)
+      throws IOException {
+    // Root of a bucket would always have the key as 
/volumeId/bucketId/bucketId/
+    if (dirObjIds.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    Map<Long, Path> pathMap = new HashMap<>();
+    Queue<ObjectIDPathVal> objectIdPathVals = new LinkedList<>();
+    ObjectIDPathVal root = new ObjectIDPathVal(bucketId, Paths.get("/"));
+    objectIdPathVals.add(root);
+    addToPathMap(root, dirObjIds, pathMap);
+
+    while (!objectIdPathVals.isEmpty() && dirObjIds.size() > 0) {
+      ObjectIDPathVal parent = objectIdPathVals.poll();
+      try(TableIterator<String, ? extends Table.KeyValue<String, 
OmDirectoryInfo>>

Review Comment:
   Space after `try`.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/FSODirectoryPathResolver.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+public class FSODirectoryPathResolver implements ObjectPathResolver {
+
+  private String prefix;
+  private long bucketId;
+  private Table<String, OmDirectoryInfo> dirInfoTable;
+
+  public FSODirectoryPathResolver(String prefix, long bucketId,
+      Table<String, OmDirectoryInfo> dirInfoTable) {
+    this.prefix = prefix;
+    this.dirInfoTable = dirInfoTable;
+    this.bucketId = bucketId;
+  }
+
+  private void addToPathMap(ObjectIDPathVal objectIDPathVal,
+                            Set<Long> dirObjIds, Map<Long, Path> pathMap) {
+    if (dirObjIds.contains(objectIDPathVal.getObjectId())) {
+      pathMap.put(objectIDPathVal.getObjectId(), objectIDPathVal.getPath());
+      dirObjIds.remove(objectIDPathVal.getObjectId());
+    }
+  }
+
+  /**
+   * Assuming all dirObjIds belong to a bucket this function resolves absolute
+   * path for a given FSO bucket.
+   * @param dirObjIds
+   * @return Map of Path corresponding to provided directory object IDs
+   */
+  @Override
+  public Map<Long, Path> getAbsolutePathForObjectIDs(Set<Long> dirObjIds)
+      throws IOException {
+    // Root of a bucket would always have the key as 
/volumeId/bucketId/bucketId/

Review Comment:
   Is it supposed to be `/volumeId/bucketId/bucketId/` or just 
`/volumeId/bucketId/`?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/FSODirectoryPathResolver.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+public class FSODirectoryPathResolver implements ObjectPathResolver {

Review Comment:
   Please add java doc comment here and describe what this class does instead 
of 
https://github.com/apache/ozone/pull/4823/files#diff-beda3f6f29a6baf97eb45e09ea060c720f90c2ec2c400d0092cdc22aae7bdddbR58.
 



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/FSODirectoryPathResolver.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+public class FSODirectoryPathResolver implements ObjectPathResolver {
+
+  private String prefix;
+  private long bucketId;
+  private Table<String, OmDirectoryInfo> dirInfoTable;
+
+  public FSODirectoryPathResolver(String prefix, long bucketId,
+      Table<String, OmDirectoryInfo> dirInfoTable) {
+    this.prefix = prefix;
+    this.dirInfoTable = dirInfoTable;
+    this.bucketId = bucketId;
+  }
+
+  private void addToPathMap(ObjectIDPathVal objectIDPathVal,
+                            Set<Long> dirObjIds, Map<Long, Path> pathMap) {
+    if (dirObjIds.contains(objectIDPathVal.getObjectId())) {
+      pathMap.put(objectIDPathVal.getObjectId(), objectIDPathVal.getPath());
+      dirObjIds.remove(objectIDPathVal.getObjectId());

Review Comment:
   Why do you need to remove the element? Is it to not add duplicate values?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java:
##########
@@ -858,26 +871,37 @@ private void addToObjectIdMap(Table<String, ? extends 
WithObjectID> fsTable,
                  : sstFileReader.getKeyStream()) {
       keysToCheck.forEach(key -> {
         try {
-          final WithObjectID oldKey = fsTable.get(key);
-          final WithObjectID newKey = tsTable.get(key);
+          final WithParentObjectId oldKey = fsTable.get(key);
+          final WithParentObjectId newKey = tsTable.get(key);
           if (areKeysEqual(oldKey, newKey) || !isKeyInBucket(key, 
tablePrefixes,
               fsTable.getName())) {
             // We don't have to do anything.
             return;
           }
           if (oldKey != null) {
             byte[] rawObjId = codecRegistry.asRawData(oldKey.getObjectID());
+            // Removing volume bucket info by removing the table bucket Prefix
+            // from the key.
+            // For FSO buckets will be left with the parent id/keyname.
+            // For OBS buckets will be left with the complete path
             byte[] rawValue = codecRegistry.asRawData(
-                getKeyOrDirectoryName(isDirectoryTable, oldKey));
+                key.substring(tablePrefix.length()));

Review Comment:
   You didn't do the same change for `newKey != null` case at line 899. Is it 
intentional or just missed? 



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/FSODirectoryPathResolver.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+public class FSODirectoryPathResolver implements ObjectPathResolver {
+
+  private String prefix;
+  private long bucketId;
+  private Table<String, OmDirectoryInfo> dirInfoTable;
+
+  public FSODirectoryPathResolver(String prefix, long bucketId,
+      Table<String, OmDirectoryInfo> dirInfoTable) {
+    this.prefix = prefix;
+    this.dirInfoTable = dirInfoTable;
+    this.bucketId = bucketId;
+  }
+
+  private void addToPathMap(ObjectIDPathVal objectIDPathVal,
+                            Set<Long> dirObjIds, Map<Long, Path> pathMap) {
+    if (dirObjIds.contains(objectIDPathVal.getObjectId())) {
+      pathMap.put(objectIDPathVal.getObjectId(), objectIDPathVal.getPath());
+      dirObjIds.remove(objectIDPathVal.getObjectId());
+    }
+  }
+
+  /**
+   * Assuming all dirObjIds belong to a bucket this function resolves absolute
+   * path for a given FSO bucket.
+   * @param dirObjIds
+   * @return Map of Path corresponding to provided directory object IDs
+   */
+  @Override
+  public Map<Long, Path> getAbsolutePathForObjectIDs(Set<Long> dirObjIds)
+      throws IOException {
+    // Root of a bucket would always have the key as 
/volumeId/bucketId/bucketId/
+    if (dirObjIds.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    Map<Long, Path> pathMap = new HashMap<>();
+    Queue<ObjectIDPathVal> objectIdPathVals = new LinkedList<>();
+    ObjectIDPathVal root = new ObjectIDPathVal(bucketId, Paths.get("/"));
+    objectIdPathVals.add(root);
+    addToPathMap(root, dirObjIds, pathMap);
+
+    while (!objectIdPathVals.isEmpty() && dirObjIds.size() > 0) {
+      ObjectIDPathVal parent = objectIdPathVals.poll();
+      try(TableIterator<String, ? extends Table.KeyValue<String, 
OmDirectoryInfo>>
+              subDirIter = dirInfoTable.iterator(
+                  prefix + parent.getObjectId())) {
+        while (dirObjIds.size() > 0 && subDirIter.hasNext()) {
+          OmDirectoryInfo dir = subDirIter.next().getValue();
+          ObjectIDPathVal pathVal = new ObjectIDPathVal(dir.getObjectID(),
+              parent.getPath().resolve(dir.getName()));
+          addToPathMap(pathVal, dirObjIds, pathMap);
+        }
+      }
+    }
+
+    if (dirObjIds.size() > 0) {
+      throw new IllegalArgumentException(
+          "Dir object Ids required but not found in bucket: " + dirObjIds);
+    }
+    return pathMap;
+  }
+
+  private static class ObjectIDPathVal {

Review Comment:
   Can we use `Pair` instead?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/FSODirectoryPathResolver.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+public class FSODirectoryPathResolver implements ObjectPathResolver {
+
+  private String prefix;
+  private long bucketId;
+  private Table<String, OmDirectoryInfo> dirInfoTable;
+
+  public FSODirectoryPathResolver(String prefix, long bucketId,
+      Table<String, OmDirectoryInfo> dirInfoTable) {
+    this.prefix = prefix;
+    this.dirInfoTable = dirInfoTable;
+    this.bucketId = bucketId;
+  }
+
+  private void addToPathMap(ObjectIDPathVal objectIDPathVal,
+                            Set<Long> dirObjIds, Map<Long, Path> pathMap) {
+    if (dirObjIds.contains(objectIDPathVal.getObjectId())) {
+      pathMap.put(objectIDPathVal.getObjectId(), objectIDPathVal.getPath());
+      dirObjIds.remove(objectIDPathVal.getObjectId());
+    }
+  }
+
+  /**
+   * Assuming all dirObjIds belong to a bucket this function resolves absolute
+   * path for a given FSO bucket.
+   * @param dirObjIds
+   * @return Map of Path corresponding to provided directory object IDs
+   */
+  @Override
+  public Map<Long, Path> getAbsolutePathForObjectIDs(Set<Long> dirObjIds)
+      throws IOException {
+    // Root of a bucket would always have the key as 
/volumeId/bucketId/bucketId/
+    if (dirObjIds.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    Map<Long, Path> pathMap = new HashMap<>();
+    Queue<ObjectIDPathVal> objectIdPathVals = new LinkedList<>();
+    ObjectIDPathVal root = new ObjectIDPathVal(bucketId, Paths.get("/"));
+    objectIdPathVals.add(root);
+    addToPathMap(root, dirObjIds, pathMap);
+
+    while (!objectIdPathVals.isEmpty() && dirObjIds.size() > 0) {
+      ObjectIDPathVal parent = objectIdPathVals.poll();
+      try(TableIterator<String, ? extends Table.KeyValue<String, 
OmDirectoryInfo>>
+              subDirIter = dirInfoTable.iterator(
+                  prefix + parent.getObjectId())) {
+        while (dirObjIds.size() > 0 && subDirIter.hasNext()) {
+          OmDirectoryInfo dir = subDirIter.next().getValue();
+          ObjectIDPathVal pathVal = new ObjectIDPathVal(dir.getObjectID(),
+              parent.getPath().resolve(dir.getName()));
+          addToPathMap(pathVal, dirObjIds, pathMap);
+        }
+      }
+    }
+
+    if (dirObjIds.size() > 0) {
+      throw new IllegalArgumentException(
+          "Dir object Ids required but not found in bucket: " + dirObjIds);
+    }
+    return pathMap;
+  }
+
+  private static class ObjectIDPathVal {
+    private long objectId;
+    private Path path;

Review Comment:
   Make them final.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/FSODirectoryPathResolver.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+public class FSODirectoryPathResolver implements ObjectPathResolver {
+
+  private String prefix;
+  private long bucketId;
+  private Table<String, OmDirectoryInfo> dirInfoTable;
+
+  public FSODirectoryPathResolver(String prefix, long bucketId,
+      Table<String, OmDirectoryInfo> dirInfoTable) {
+    this.prefix = prefix;
+    this.dirInfoTable = dirInfoTable;
+    this.bucketId = bucketId;
+  }
+
+  private void addToPathMap(ObjectIDPathVal objectIDPathVal,
+                            Set<Long> dirObjIds, Map<Long, Path> pathMap) {
+    if (dirObjIds.contains(objectIDPathVal.getObjectId())) {
+      pathMap.put(objectIDPathVal.getObjectId(), objectIDPathVal.getPath());
+      dirObjIds.remove(objectIDPathVal.getObjectId());
+    }
+  }
+
+  /**
+   * Assuming all dirObjIds belong to a bucket this function resolves absolute
+   * path for a given FSO bucket.
+   * @param dirObjIds
+   * @return Map of Path corresponding to provided directory object IDs
+   */
+  @Override
+  public Map<Long, Path> getAbsolutePathForObjectIDs(Set<Long> dirObjIds)
+      throws IOException {
+    // Root of a bucket would always have the key as 
/volumeId/bucketId/bucketId/
+    if (dirObjIds.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    Map<Long, Path> pathMap = new HashMap<>();
+    Queue<ObjectIDPathVal> objectIdPathVals = new LinkedList<>();
+    ObjectIDPathVal root = new ObjectIDPathVal(bucketId, Paths.get("/"));
+    objectIdPathVals.add(root);
+    addToPathMap(root, dirObjIds, pathMap);
+
+    while (!objectIdPathVals.isEmpty() && dirObjIds.size() > 0) {
+      ObjectIDPathVal parent = objectIdPathVals.poll();
+      try(TableIterator<String, ? extends Table.KeyValue<String, 
OmDirectoryInfo>>
+              subDirIter = dirInfoTable.iterator(
+                  prefix + parent.getObjectId())) {
+        while (dirObjIds.size() > 0 && subDirIter.hasNext()) {
+          OmDirectoryInfo dir = subDirIter.next().getValue();
+          ObjectIDPathVal pathVal = new ObjectIDPathVal(dir.getObjectID(),
+              parent.getPath().resolve(dir.getName()));
+          addToPathMap(pathVal, dirObjIds, pathMap);
+        }
+      }
+    }
+
+    if (dirObjIds.size() > 0) {

Review Comment:
   What is this check doing? You can [removing items from 
dirObjIds](https://github.com/apache/ozone/pull/4823/files#diff-beda3f6f29a6baf97eb45e09ea060c720f90c2ec2c400d0092cdc22aae7bdddbR53)
 and using this as condition in while loop, so I assume it is possible that it 
is empty. It would be great if you add a reasoning behind this.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/FSODirectoryPathResolver.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+public class FSODirectoryPathResolver implements ObjectPathResolver {
+
+  private String prefix;
+  private long bucketId;
+  private Table<String, OmDirectoryInfo> dirInfoTable;
+
+  public FSODirectoryPathResolver(String prefix, long bucketId,
+      Table<String, OmDirectoryInfo> dirInfoTable) {
+    this.prefix = prefix;
+    this.dirInfoTable = dirInfoTable;
+    this.bucketId = bucketId;
+  }
+
+  private void addToPathMap(ObjectIDPathVal objectIDPathVal,
+                            Set<Long> dirObjIds, Map<Long, Path> pathMap) {
+    if (dirObjIds.contains(objectIDPathVal.getObjectId())) {
+      pathMap.put(objectIDPathVal.getObjectId(), objectIDPathVal.getPath());
+      dirObjIds.remove(objectIDPathVal.getObjectId());
+    }
+  }
+
+  /**
+   * Assuming all dirObjIds belong to a bucket this function resolves absolute
+   * path for a given FSO bucket.
+   * @param dirObjIds
+   * @return Map of Path corresponding to provided directory object IDs
+   */
+  @Override
+  public Map<Long, Path> getAbsolutePathForObjectIDs(Set<Long> dirObjIds)
+      throws IOException {
+    // Root of a bucket would always have the key as 
/volumeId/bucketId/bucketId/
+    if (dirObjIds.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    Map<Long, Path> pathMap = new HashMap<>();
+    Queue<ObjectIDPathVal> objectIdPathVals = new LinkedList<>();
+    ObjectIDPathVal root = new ObjectIDPathVal(bucketId, Paths.get("/"));
+    objectIdPathVals.add(root);
+    addToPathMap(root, dirObjIds, pathMap);
+
+    while (!objectIdPathVals.isEmpty() && dirObjIds.size() > 0) {
+      ObjectIDPathVal parent = objectIdPathVals.poll();
+      try(TableIterator<String, ? extends Table.KeyValue<String, 
OmDirectoryInfo>>
+              subDirIter = dirInfoTable.iterator(
+                  prefix + parent.getObjectId())) {
+        while (dirObjIds.size() > 0 && subDirIter.hasNext()) {
+          OmDirectoryInfo dir = subDirIter.next().getValue();
+          ObjectIDPathVal pathVal = new ObjectIDPathVal(dir.getObjectID(),
+              parent.getPath().resolve(dir.getName()));
+          addToPathMap(pathVal, dirObjIds, pathMap);
+        }
+      }
+    }
+
+    if (dirObjIds.size() > 0) {
+      throw new IllegalArgumentException(
+          "Dir object Ids required but not found in bucket: " + dirObjIds);
+    }
+    return pathMap;
+  }
+
+  private static class ObjectIDPathVal {
+    private long objectId;
+    private Path path;
+
+    public ObjectIDPathVal(long objectId, Path path) {

Review Comment:
   I don't think it has to be `public` since it is class private.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java:
##########
@@ -717,12 +722,17 @@ private void generateSnapshotDiffReport(final String 
jobKey,
           .getKeyTable(bucketLayout);
       Table<String, OmKeyInfo> tsKeyTable = toSnapshot.getMetadataManager()
           .getKeyTable(bucketLayout);
-
+      Set<Long> oldParentIds = null;
+      Set<Long> newParentIds = null;

Review Comment:
   nit: This is the case when Optional should be used.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java:
##########
@@ -858,26 +871,37 @@ private void addToObjectIdMap(Table<String, ? extends 
WithObjectID> fsTable,
                  : sstFileReader.getKeyStream()) {
       keysToCheck.forEach(key -> {
         try {
-          final WithObjectID oldKey = fsTable.get(key);
-          final WithObjectID newKey = tsTable.get(key);
+          final WithParentObjectId oldKey = fsTable.get(key);
+          final WithParentObjectId newKey = tsTable.get(key);

Review Comment:
   Not sure if `WithParentObjectId` is only for FSO type. It will cause casting 
exception if `WithObjectID` type is returned for legacy and OBS  bucket type.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/FSODirectoryPathResolver.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;

Review Comment:
   I don't think you are using this anywhere. If no, please remove it.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java:
##########
@@ -737,15 +747,32 @@ private void generateSnapshotDiffReport(final String 
jobKey,
             fromSnapshot, toSnapshot, fsInfo, tsInfo, useFullDiff,
             tablePrefixes, objectIdToKeyNameMapForFromSnapshot,
             objectIdToKeyNameMapForToSnapshot, objectIDsToCheckMap,
-            path.toString());
+            oldParentIds, newParentIds, path.toString());
       }
 
       validateSnapshotsAreActive(volumeName, bucketName, fromSnapshotName,
           toSnapshotName);
-      long totalDiffEntries = generateDiffReport(jobId,
-          objectIDsToCheckMap,
+      Map<Long, Path> oldParentIdPathMap = null;
+      Map<Long, Path> newParentIdPathMap = null;

Review Comment:
   nit: Same as above. Use optional here.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/FSODirectoryPathResolver.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+public class FSODirectoryPathResolver implements ObjectPathResolver {
+
+  private String prefix;
+  private long bucketId;
+  private Table<String, OmDirectoryInfo> dirInfoTable;
+
+  public FSODirectoryPathResolver(String prefix, long bucketId,
+      Table<String, OmDirectoryInfo> dirInfoTable) {
+    this.prefix = prefix;
+    this.dirInfoTable = dirInfoTable;
+    this.bucketId = bucketId;
+  }
+
+  private void addToPathMap(ObjectIDPathVal objectIDPathVal,
+                            Set<Long> dirObjIds, Map<Long, Path> pathMap) {
+    if (dirObjIds.contains(objectIDPathVal.getObjectId())) {
+      pathMap.put(objectIDPathVal.getObjectId(), objectIDPathVal.getPath());
+      dirObjIds.remove(objectIDPathVal.getObjectId());
+    }
+  }
+
+  /**
+   * Assuming all dirObjIds belong to a bucket this function resolves absolute
+   * path for a given FSO bucket.
+   * @param dirObjIds
+   * @return Map of Path corresponding to provided directory object IDs
+   */
+  @Override
+  public Map<Long, Path> getAbsolutePathForObjectIDs(Set<Long> dirObjIds)
+      throws IOException {
+    // Root of a bucket would always have the key as 
/volumeId/bucketId/bucketId/
+    if (dirObjIds.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    Map<Long, Path> pathMap = new HashMap<>();
+    Queue<ObjectIDPathVal> objectIdPathVals = new LinkedList<>();
+    ObjectIDPathVal root = new ObjectIDPathVal(bucketId, Paths.get("/"));
+    objectIdPathVals.add(root);
+    addToPathMap(root, dirObjIds, pathMap);
+
+    while (!objectIdPathVals.isEmpty() && dirObjIds.size() > 0) {
+      ObjectIDPathVal parent = objectIdPathVals.poll();
+      try(TableIterator<String, ? extends Table.KeyValue<String, 
OmDirectoryInfo>>
+              subDirIter = dirInfoTable.iterator(
+                  prefix + parent.getObjectId())) {
+        while (dirObjIds.size() > 0 && subDirIter.hasNext()) {
+          OmDirectoryInfo dir = subDirIter.next().getValue();
+          ObjectIDPathVal pathVal = new ObjectIDPathVal(dir.getObjectID(),

Review Comment:
   Don't you need to add `pathVal` to `objectIdPathVals`? I don't see any 
recursion as well. It won't be doing any traversal without that if I'm not 
wrong.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/FSODirectoryPathResolver.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+public class FSODirectoryPathResolver implements ObjectPathResolver {
+
+  private String prefix;
+  private long bucketId;
+  private Table<String, OmDirectoryInfo> dirInfoTable;
+
+  public FSODirectoryPathResolver(String prefix, long bucketId,
+      Table<String, OmDirectoryInfo> dirInfoTable) {
+    this.prefix = prefix;
+    this.dirInfoTable = dirInfoTable;
+    this.bucketId = bucketId;
+  }
+
+  private void addToPathMap(ObjectIDPathVal objectIDPathVal,
+                            Set<Long> dirObjIds, Map<Long, Path> pathMap) {
+    if (dirObjIds.contains(objectIDPathVal.getObjectId())) {
+      pathMap.put(objectIDPathVal.getObjectId(), objectIDPathVal.getPath());
+      dirObjIds.remove(objectIDPathVal.getObjectId());
+    }
+  }
+
+  /**
+   * Assuming all dirObjIds belong to a bucket this function resolves absolute
+   * path for a given FSO bucket.
+   * @param dirObjIds
+   * @return Map of Path corresponding to provided directory object IDs
+   */
+  @Override
+  public Map<Long, Path> getAbsolutePathForObjectIDs(Set<Long> dirObjIds)

Review Comment:
   I would have written this like and not created and used `addToPathMap` for 
one liner function.
   ```
     @Override
     public Map<Long, Path> getAbsolutePathForObjectIDs(final Set<Long> 
dirObjIds)
         throws IOException {
       // Root of a bucket would always have the key as /volumeId/bucketId/
       if (dirObjIds.isEmpty()) {
         return Collections.emptyMap();
       }
   
       Map<Long, Path> objectIdToPath = new HashMap<>();
       Queue<ObjectIDPathVal> objectIdPathQueue = new LinkedList<>();
       ObjectIDPathVal root = new ObjectIDPathVal(bucketId, Paths.get("/"));
       objectIdPathQueue.add(root);
   
       while (!objectIdPathQueue.isEmpty()) {
         ObjectIDPathVal current = objectIdPathQueue.poll();
         if (dirObjIds.contains(current.getObjectId())) {
           objectIdToPath.put(current.getObjectId(), current.getPath());
         }
         // Throw exception in else { }, if dirObjIds should always contain
         // the objectId.
   
         try (TableIterator<String, ? extends KeyValue<String, OmDirectoryInfo>>
                  subDirIter =
                  dirInfoTable.iterator(prefix + current.getObjectId())) {
           while (subDirIter.hasNext()) {
             OmDirectoryInfo nextDir = subDirIter.next().getValue();
             ObjectIDPathVal next =
                 new ObjectIDPathVal(nextDir.getObjectID(),
                     current.getPath().resolve(nextDir.getName()));
             objectIdPathQueue.add(next);
           }
         }
       }
   
       return objectIdToPath;
     }
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java:
##########
@@ -1033,26 +1075,31 @@ private long generateDiffReport(
             throw new IllegalStateException(
                 "Old and new key name both are null");
           } else if (oldKeyName == null) { // Key Created.
-            String key = codecRegistry.asObject(newKeyName, String.class);
+            String key = resolveAbsolutePath(isFSOBucket, newParentIdPathMap,

Review Comment:
   Shouldn't you be doing `resolveAbsolutePath` outside if/else and compare the 
resolved name? I think renamed of `/vol/bucket/dir1/dir2/key` to 
`/vol/bucket/dir3/dir2/key` will come as Modify diff because `objectId` would 
be same as well as `keyName` but it should be `Rename`.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java:
##########
@@ -858,26 +871,37 @@ private void addToObjectIdMap(Table<String, ? extends 
WithObjectID> fsTable,
                  : sstFileReader.getKeyStream()) {
       keysToCheck.forEach(key -> {
         try {
-          final WithObjectID oldKey = fsTable.get(key);
-          final WithObjectID newKey = tsTable.get(key);
+          final WithParentObjectId oldKey = fsTable.get(key);
+          final WithParentObjectId newKey = tsTable.get(key);
           if (areKeysEqual(oldKey, newKey) || !isKeyInBucket(key, 
tablePrefixes,
               fsTable.getName())) {
             // We don't have to do anything.
             return;
           }
           if (oldKey != null) {
             byte[] rawObjId = codecRegistry.asRawData(oldKey.getObjectID());
+            // Removing volume bucket info by removing the table bucket Prefix
+            // from the key.
+            // For FSO buckets will be left with the parent id/keyname.
+            // For OBS buckets will be left with the complete path
             byte[] rawValue = codecRegistry.asRawData(
-                getKeyOrDirectoryName(isDirectoryTable, oldKey));
+                key.substring(tablePrefix.length()));
             oldObjIdToKeyMap.put(rawObjId, rawValue);
             objectIDsToCheck.add(rawObjId);
+            if (oldParentIds != null) {
+              oldParentIds.add(oldKey.getParentObjectID());
+            }
+

Review Comment:
   nit: Remove this extra line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to