errose28 commented on code in PR #5724:
URL: https://github.com/apache/ozone/pull/5724#discussion_r1430679517


##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/debug/TestContainerKeyScanner.java:
##########
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.debug;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.ClientVersion;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import picocli.CommandLine;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * This class tests `ozone debug ldb ckscanner` CLI that reads from RocksDB
+ * and gets keys for container ids.
+ */
+public class TestContainerKeyScanner {
+  private static final String KEY_TABLE = "keyTable";
+  private static final String FILE_TABLE = "fileTable";
+  private static final String DIRECTORY_TABLE = "directoryTable";
+  private DBStore dbStore;
+  @TempDir
+  private File tempDir;
+  private StringWriter stdout, stderr;
+  private PrintWriter pstdout, pstderr;
+  private CommandLine cmd;
+
+  private static final String KEYS_FOUND_OUTPUT = "{\n" +
+      "  \"keysProcessed\": 3,\n" +
+      "  \"containerKeys\": {\n" +
+      "    \"1\": [\n" +
+      "      {\n" +
+      "        \"containerID\": 1,\n" +
+      "        \"volumeName\": \"vol1\",\n" +
+      "        \"volumeId\": -123,\n" +
+      "        \"bucketName\": \"bucket1\",\n" +
+      "        \"bucketId\": -456,\n" +
+      "        \"keyName\": \"dir1/key1\",\n" +
+      "        \"parentId\": -789\n" +
+      "      }\n" +
+      "    ],\n" +
+      "    \"2\": [\n" +
+      "      {\n" +
+      "        \"containerID\": 2,\n" +
+      "        \"volumeName\": \"vol1\",\n" +
+      "        \"volumeId\": 0,\n" +
+      "        \"bucketName\": \"bucket1\",\n" +
+      "        \"bucketId\": 0,\n" +
+      "        \"keyName\": \"key2\",\n" +
+      "        \"parentId\": 0\n" +
+      "      }\n" +
+      "    ],\n" +
+      "    \"3\": [\n" +
+      "      {\n" +
+      "        \"containerID\": 3,\n" +
+      "        \"volumeName\": \"vol1\",\n" +
+      "        \"volumeId\": 0,\n" +
+      "        \"bucketName\": \"bucket1\",\n" +
+      "        \"bucketId\": 0,\n" +
+      "        \"keyName\": \"key3\",\n" +
+      "        \"parentId\": 0\n" +
+      "      }\n" +
+      "    ]\n" +
+      "  }\n" +
+      "}\n";
+
+  private static final String KEYS_NOT_FOUND_OUTPUT =
+      "No keys were found for container IDs: [1, 2, 3]\n" +
+          "Keys processed: 3\n";
+
+  @BeforeEach
+  public void setup() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    stdout = new StringWriter();
+    pstdout = new PrintWriter(stdout);
+    stderr = new StringWriter();
+    pstderr = new PrintWriter(stderr);
+
+    cmd = new CommandLine(new RDBParser())
+        .addSubcommand(new ContainerKeyScanner())
+        .setOut(pstdout)
+        .setErr(pstderr);
+
+    dbStore = DBStoreBuilder.newBuilder(conf).setName("om.db")
+        .setPath(tempDir.toPath()).addTable(KEY_TABLE).addTable(FILE_TABLE)
+        .addTable(DIRECTORY_TABLE)
+        .build();
+  }
+
+  @AfterEach
+  public void shutdown() throws IOException {
+    if (dbStore != null) {
+      dbStore.close();
+    }
+    pstderr.close();
+    stderr.close();
+    pstdout.close();
+    stdout.close();
+  }
+
+  @Test
+  void testWhenThereAreKeysForConatainerIds() throws IOException {

Review Comment:
   nit.
   ```suggestion
     void testWhenThereAreKeysForContainerIds() throws IOException {
   ```



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/debug/TestContainerKeyScanner.java:
##########
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.debug;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.ClientVersion;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import picocli.CommandLine;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * This class tests `ozone debug ldb ckscanner` CLI that reads from RocksDB
+ * and gets keys for container ids.
+ */
+public class TestContainerKeyScanner {
+  private static final String KEY_TABLE = "keyTable";
+  private static final String FILE_TABLE = "fileTable";
+  private static final String DIRECTORY_TABLE = "directoryTable";
+  private DBStore dbStore;
+  @TempDir
+  private File tempDir;
+  private StringWriter stdout, stderr;
+  private PrintWriter pstdout, pstderr;
+  private CommandLine cmd;
+
+  private static final String KEYS_FOUND_OUTPUT = "{\n" +

Review Comment:
   Can we store the expected output in a gson or jackson object instead? IIRC 
gson is what's being used for client side json in other places. Then the string 
written to stdout can be serialized into an object of the same type and they 
can be compared that way. This makes sure both are valid json, is more 
readable, and does not require exact white space matching.



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/debug/TestContainerKeyScanner.java:
##########
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.debug;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.ClientVersion;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import picocli.CommandLine;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * This class tests `ozone debug ldb ckscanner` CLI that reads from RocksDB
+ * and gets keys for container ids.
+ */
+public class TestContainerKeyScanner {

Review Comment:
   Let's add more tests for all 3 bucket layout types, buckets with multiple 
keys at different levels of the directory tree, keys with multiple blocks and 
containers, and a mix of input where some containers have keys and some do not.



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ContainerKeyInfoWrapper.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.debug;
+
+import java.util.List;
+
+/**
+ * Class for aggregation of collected data.
+ */
+public class ContainerKeyInfoWrapper {

Review Comment:
   Can we make this an inner class of `ContainerKeyScanner` since it is only 
used in one place to wrap a method return? Also can we combine this class with 
`ContaienrKeyInfoResponse` by just doing the json processing on this object?



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ContainerKeyScanner.java:
##########
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.debug;
+
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.cli.SubcommandWithParent;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
+import org.apache.hadoop.hdds.utils.db.DBDefinition;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.kohsuke.MetaInfServices;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+import picocli.CommandLine;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.ROOT_PATH;
+
+/**
+ * Parser for a list of container IDs, to scan for keys.
+ */
[email protected](
+    name = "ckscanner",
+    description = "Find keys that reference a container"
+)
+@MetaInfServices(SubcommandWithParent.class)
+public class ContainerKeyScanner implements Callable<Void>,
+    SubcommandWithParent {
+
+  private static final String FILE_TABLE = "fileTable";
+  private static final String KEY_TABLE = "keyTable";
+  private static final String DIRECTORY_TABLE = "directoryTable";
+
+  @CommandLine.Spec
+  private static CommandLine.Model.CommandSpec spec;
+
+  @CommandLine.ParentCommand
+  private RDBParser parent;
+
+  @CommandLine.Option(names = {"-ids", "--container-ids"},
+      split = ",",
+      paramLabel = "containerIDs",
+      required = true,
+      description = "Set of container IDs to be used for getting all " +
+          "their keys. Example-usage: 1,11,2 (Separated by ',').")
+  private Set<Long> containerIds;
+
+  private static Map<String, OmDirectoryInfo> directoryTable;
+  private static boolean isDirTableLoaded = false;
+
+  @Override
+  public Void call() throws Exception {
+    ContainerKeyInfoWrapper containerKeyInfoWrapper =
+        scanDBForContainerKeys(parent.getDbPath());
+
+    printOutput(containerKeyInfoWrapper);
+
+    closeStdChannels();
+
+    return null;
+  }
+
+  private void closeStdChannels() {
+    out().close();
+    err().close();
+  }
+
+  private Map<String, OmDirectoryInfo> getDirectoryTableData(String dbPath)
+      throws RocksDBException, IOException {
+    Map<String, OmDirectoryInfo> directoryTableData = new HashMap<>();
+
+    // Get all tables from RocksDB
+    List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+        RocksDBUtils.getColumnFamilyDescriptors(dbPath);
+    final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
+
+    // Get all table handles
+    try (ManagedRocksDB db = ManagedRocksDB.openReadOnly(dbPath,
+        columnFamilyDescriptors, columnFamilyHandles)) {
+      dbPath = removeTrailingSlashIfNeeded(dbPath);
+      DBDefinition dbDefinition = DBDefinitionFactory.getDefinition(
+          Paths.get(dbPath), new OzoneConfiguration());
+      if (dbDefinition == null) {
+        throw new IllegalStateException("Incorrect DB Path");
+      }
+
+      // Get directory table
+      DBColumnFamilyDefinition<?, ?> columnFamilyDefinition =
+          dbDefinition.getColumnFamily(DIRECTORY_TABLE);
+      if (columnFamilyDefinition == null) {
+        throw new IllegalStateException(
+            "Table with name" + DIRECTORY_TABLE + " not found");
+      }
+
+      // Get directory table handle
+      ColumnFamilyHandle columnFamilyHandle = getColumnFamilyHandle(
+          columnFamilyDefinition.getName().getBytes(UTF_8),
+          columnFamilyHandles);
+      if (columnFamilyHandle == null) {
+        throw new IllegalStateException("columnFamilyHandle is null");
+      }
+
+      // Get iterator for directory table
+      try (ManagedRocksIterator iterator = new ManagedRocksIterator(
+          db.get().newIterator(columnFamilyHandle))) {
+        iterator.get().seekToFirst();
+        while (iterator.get().isValid()) {
+          directoryTableData.put(new String(iterator.get().key(), UTF_8),
+              ((OmDirectoryInfo) columnFamilyDefinition.getValueCodec()
+                  .fromPersistedFormat(iterator.get().value())));
+          iterator.get().next();
+        }
+      }
+    }
+
+    return directoryTableData;
+  }
+
+  @Override
+  public Class<?> getParentType() {
+    return RDBParser.class;
+  }
+
+  private static PrintWriter err() {
+    return spec.commandLine().getErr();
+  }
+
+  private static PrintWriter out() {
+    return spec.commandLine().getOut();
+  }
+
+  public Map<Long, Path> getAbsolutePathForObjectIDs(
+      long bucketId, String prefix, Optional<Set<Long>> dirObjIds) {
+    // Root of a bucket would always have the
+    // key as /volumeId/bucketId/bucketId/
+    if (!dirObjIds.isPresent() || dirObjIds.get().isEmpty()) {
+      return Collections.emptyMap();
+    }
+    Set<Long> objIds = Sets.newHashSet(dirObjIds.get());
+    Map<Long, Path> objectIdPathMap = new HashMap<>();
+    Queue<Pair<Long, Path>> objectIdPathVals = new LinkedList<>();
+    Pair<Long, Path> root = Pair.of(bucketId, ROOT_PATH);
+    objectIdPathVals.add(root);
+    addToPathMap(root, objIds, objectIdPathMap);
+
+    while (!objectIdPathVals.isEmpty() && !objIds.isEmpty()) {
+      Pair<Long, Path> parentPair = objectIdPathVals.poll();
+      String subDir = prefix + parentPair.getKey() + OM_KEY_PREFIX;
+
+      Iterator<String> subDirIterator =
+          directoryTable.keySet().stream()
+              .filter(k -> k.startsWith(subDir))
+              .collect(Collectors.toList()).iterator();
+      while (!objIds.isEmpty() && subDirIterator.hasNext()) {
+        OmDirectoryInfo childDir =
+            directoryTable.get(subDirIterator.next());
+        Pair<Long, Path> pathVal = Pair.of(childDir.getObjectID(),
+            parentPair.getValue().resolve(childDir.getName()));
+        addToPathMap(pathVal, objIds, objectIdPathMap);
+        objectIdPathVals.add(pathVal);
+      }
+    }
+    // Invalid directory objectId which does not exist in the given bucket.
+    if (!objIds.isEmpty()) {
+      throw new IllegalArgumentException(
+          "Dir object Ids required but not found in bucket: " + objIds);
+    }
+    return objectIdPathMap;
+  }
+
+  private void addToPathMap(Pair<Long, Path> objectIDPath,
+                            Set<Long> dirObjIds, Map<Long, Path> pathMap) {
+    if (dirObjIds.contains(objectIDPath.getKey())) {
+      pathMap.put(objectIDPath.getKey(), objectIDPath.getValue());
+      dirObjIds.remove(objectIDPath.getKey());
+    }
+  }
+
+  private ContainerKeyInfoWrapper scanDBForContainerKeys(String dbPath)
+      throws RocksDBException, IOException {
+    List<ContainerKeyInfo> containerKeyInfos = new ArrayList<>();
+
+    List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+        RocksDBUtils.getColumnFamilyDescriptors(dbPath);
+    final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
+    long keysProcessed = 0;
+
+    try (ManagedRocksDB db = ManagedRocksDB.openReadOnly(dbPath,
+        columnFamilyDescriptors, columnFamilyHandles)) {
+      dbPath = removeTrailingSlashIfNeeded(dbPath);
+      DBDefinition dbDefinition = DBDefinitionFactory.getDefinition(
+          Paths.get(dbPath), new OzoneConfiguration());
+      if (dbDefinition == null) {
+        throw new IllegalStateException("Incorrect DB Path");
+      }
+
+      keysProcessed +=
+          processTable(dbDefinition, columnFamilyHandles, db,
+              containerKeyInfos, FILE_TABLE);
+      keysProcessed +=
+          processTable(dbDefinition, columnFamilyHandles, db,
+              containerKeyInfos, KEY_TABLE);
+    }
+    return new ContainerKeyInfoWrapper(keysProcessed, containerKeyInfos);
+  }
+
+  private long processTable(DBDefinition dbDefinition,
+                            List<ColumnFamilyHandle> columnFamilyHandles,
+                            ManagedRocksDB db,
+                            List<ContainerKeyInfo> containerKeyInfos,
+                            String tableName)
+      throws IOException {
+    long keysProcessed = 0;
+    DBColumnFamilyDefinition<?, ?> columnFamilyDefinition =
+        dbDefinition.getColumnFamily(tableName);
+    if (columnFamilyDefinition == null) {
+      throw new IllegalStateException(
+          "Table with name" + tableName + " not found");
+    }
+
+    ColumnFamilyHandle columnFamilyHandle = getColumnFamilyHandle(
+        columnFamilyDefinition.getName().getBytes(UTF_8),
+        columnFamilyHandles);
+    if (columnFamilyHandle == null) {
+      throw new IllegalStateException("columnFamilyHandle is null");
+    }
+
+    try (ManagedRocksIterator iterator = new ManagedRocksIterator(
+        db.get().newIterator(columnFamilyHandle))) {
+      iterator.get().seekToFirst();
+      while (iterator.get().isValid()) {
+        OmKeyInfo value = ((OmKeyInfo) columnFamilyDefinition.getValueCodec()
+            .fromPersistedFormat(iterator.get().value()));
+        List<OmKeyLocationInfoGroup> keyLocationVersions =
+            value.getKeyLocationVersions();
+        if (Objects.isNull(keyLocationVersions)) {
+          iterator.get().next();
+          keysProcessed++;
+          continue;
+        }
+
+        long volumeId = 0;
+        long bucketId = 0;
+        // volumeId and bucketId are only applicable to file table
+        if (tableName.equals(FILE_TABLE)) {
+          String key = new String(iterator.get().key(), UTF_8);
+          String[] keyParts = key.split(OM_KEY_PREFIX);
+          volumeId = Long.parseLong(keyParts[1]);
+          bucketId = Long.parseLong(keyParts[2]);
+        }
+
+        for (OmKeyLocationInfoGroup locationInfoGroup : keyLocationVersions) {
+          for (List<OmKeyLocationInfo> locationInfos :
+              locationInfoGroup.getLocationVersionMap().values()) {
+            for (OmKeyLocationInfo locationInfo : locationInfos) {
+              if (containerIds.contains(locationInfo.getContainerID())) {
+                // Generate asbolute key path for FSO keys
+                StringBuilder keyName = new StringBuilder();
+                if (tableName.equals(FILE_TABLE)) {
+                  // Load directory table only after the first fso key is found
+                  // to reduce necessary load if there are not fso keys
+                  if (!isDirTableLoaded) {
+                    long start = System.currentTimeMillis();
+                    directoryTable = getDirectoryTableData(parent.getDbPath());
+                    long end = System.currentTimeMillis();
+                    out().println(
+                        "directoryTable loaded in " + (end - start) + " ms.");
+                    isDirTableLoaded = true;
+                  }
+                  keyName.append(getFsoKeyPrefix(volumeId, bucketId, value));
+                }
+                keyName.append(value.getKeyName());
+                containerKeyInfos.add(
+                    new ContainerKeyInfo(locationInfo.getContainerID(),
+                        value.getVolumeName(), volumeId, value.getBucketName(),
+                        bucketId, keyName.toString(),
+                        value.getParentObjectID()));
+              }
+            }
+          }
+        }
+        iterator.get().next();
+        keysProcessed++;
+      }
+      return keysProcessed;
+    } catch (RocksDBException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static String removeBeginningSlash(String path) {
+    if (path.startsWith(OM_KEY_PREFIX)) {
+      return path.substring(1);
+    }
+
+    return path;
+  }
+
+  private String getFsoKeyPrefix(long volumeId, long bucketId,
+                                 OmKeyInfo value) {
+    String prefix =
+        OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId +
+            OM_KEY_PREFIX;
+    Set<Long> dirObjIds = new HashSet<>();
+    dirObjIds.add(value.getParentObjectID());
+    Map<Long, Path> absolutePaths =
+        getAbsolutePathForObjectIDs(bucketId, prefix, Optional.of(dirObjIds));
+    Path path = absolutePaths.get(value.getParentObjectID());
+    String keyPath;
+    if (path.toString().equals(OM_KEY_PREFIX)) {
+      keyPath = path.toString();
+    } else {
+      keyPath = path + OM_KEY_PREFIX;
+    }
+
+    return removeBeginningSlash(keyPath);
+  }
+
+
+  private ColumnFamilyHandle getColumnFamilyHandle(
+      byte[] name, List<ColumnFamilyHandle> columnFamilyHandles) {
+    return columnFamilyHandles
+        .stream()
+        .filter(
+            handle -> {
+              try {
+                return Arrays.equals(handle.getName(), name);
+              } catch (Exception ex) {
+                throw new RuntimeException(ex);
+              }
+            })
+        .findAny()
+        .orElse(null);
+  }
+
+  private String removeTrailingSlashIfNeeded(String dbPath) {
+    if (dbPath.endsWith(OzoneConsts.OZONE_URI_DELIMITER)) {
+      dbPath = dbPath.substring(0, dbPath.length() - 1);
+    }
+    return dbPath;
+  }
+
+  private void printOutput(ContainerKeyInfoWrapper containerKeyInfoWrapper) {
+    List<ContainerKeyInfo> containerKeyInfos =
+        containerKeyInfoWrapper.getContainerKeyInfos();
+    if (containerKeyInfos.isEmpty()) {
+      out().println("No keys were found for container IDs: " + containerIds);
+      out().println(

Review Comment:
   For json tools we should only print json to stdout and print everything else 
to stderr otherwise it will break when piping to tools like `jq`.



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/OMAdmin.java:
##########
@@ -43,6 +40,9 @@
 
 import java.util.Collection;
 
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
+

Review Comment:
   nit. Let's avoid import re-ordering for otherwise unaffected files.



##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java:
##########
@@ -34,6 +28,8 @@
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;

Review Comment:
   Please turn off the auto-import reordering in your IDE. It makes it 
difficult to figure out which imports actually changed during reviews.



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ContainerKeyScanner.java:
##########
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.debug;
+
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.cli.SubcommandWithParent;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
+import org.apache.hadoop.hdds.utils.db.DBDefinition;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.kohsuke.MetaInfServices;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+import picocli.CommandLine;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.ROOT_PATH;
+
+/**
+ * Parser for a list of container IDs, to scan for keys.
+ */
[email protected](
+    name = "ckscanner",
+    description = "Find keys that reference a container"
+)
+@MetaInfServices(SubcommandWithParent.class)
+public class ContainerKeyScanner implements Callable<Void>,
+    SubcommandWithParent {
+
+  private static final String FILE_TABLE = "fileTable";
+  private static final String KEY_TABLE = "keyTable";
+  private static final String DIRECTORY_TABLE = "directoryTable";
+
+  @CommandLine.Spec
+  private static CommandLine.Model.CommandSpec spec;
+
+  @CommandLine.ParentCommand
+  private RDBParser parent;
+
+  @CommandLine.Option(names = {"-ids", "--container-ids"},
+      split = ",",
+      paramLabel = "containerIDs",
+      required = true,
+      description = "Set of container IDs to be used for getting all " +
+          "their keys. Example-usage: 1,11,2 (Separated by ',').")
+  private Set<Long> containerIds;
+
+  private static Map<String, OmDirectoryInfo> directoryTable;
+  private static boolean isDirTableLoaded = false;
+
+  @Override
+  public Void call() throws Exception {
+    ContainerKeyInfoWrapper containerKeyInfoWrapper =
+        scanDBForContainerKeys(parent.getDbPath());
+
+    printOutput(containerKeyInfoWrapper);
+
+    closeStdChannels();
+
+    return null;
+  }
+
+  private void closeStdChannels() {
+    out().close();
+    err().close();
+  }
+
+  private Map<String, OmDirectoryInfo> getDirectoryTableData(String dbPath)
+      throws RocksDBException, IOException {
+    Map<String, OmDirectoryInfo> directoryTableData = new HashMap<>();
+
+    // Get all tables from RocksDB
+    List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+        RocksDBUtils.getColumnFamilyDescriptors(dbPath);

Review Comment:
   Can we use `OmMetadataManagerImpl` as a wrapper over the RocksDB instance? I 
don't think `OmMetadataManagerImpl` should actually need an `OzoneManager` 
instance that looks like a mistake introduced in the snapshot code.



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ContainerKeyScanner.java:
##########
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.debug;
+
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.cli.SubcommandWithParent;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
+import org.apache.hadoop.hdds.utils.db.DBDefinition;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.kohsuke.MetaInfServices;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+import picocli.CommandLine;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.ROOT_PATH;
+
+/**
+ * Parser for a list of container IDs, to scan for keys.
+ */
[email protected](
+    name = "ckscanner",
+    description = "Find keys that reference a container"
+)
+@MetaInfServices(SubcommandWithParent.class)
+public class ContainerKeyScanner implements Callable<Void>,
+    SubcommandWithParent {
+
+  private static final String FILE_TABLE = "fileTable";
+  private static final String KEY_TABLE = "keyTable";
+  private static final String DIRECTORY_TABLE = "directoryTable";
+
+  @CommandLine.Spec
+  private static CommandLine.Model.CommandSpec spec;
+
+  @CommandLine.ParentCommand
+  private RDBParser parent;
+
+  @CommandLine.Option(names = {"-ids", "--container-ids"},
+      split = ",",

Review Comment:
   Can we implement the container ID parsing similar to how it was done in 
#5659  for consistency?



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ContainerKeyScanner.java:
##########
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.debug;
+
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.cli.SubcommandWithParent;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
+import org.apache.hadoop.hdds.utils.db.DBDefinition;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.kohsuke.MetaInfServices;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+import picocli.CommandLine;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.ROOT_PATH;
+
+/**
+ * Parser for a list of container IDs, to scan for keys.
+ */
[email protected](
+    name = "ckscanner",
+    description = "Find keys that reference a container"
+)
+@MetaInfServices(SubcommandWithParent.class)
+public class ContainerKeyScanner implements Callable<Void>,
+    SubcommandWithParent {
+
+  private static final String FILE_TABLE = "fileTable";
+  private static final String KEY_TABLE = "keyTable";
+  private static final String DIRECTORY_TABLE = "directoryTable";
+
+  @CommandLine.Spec
+  private static CommandLine.Model.CommandSpec spec;
+
+  @CommandLine.ParentCommand
+  private RDBParser parent;
+
+  @CommandLine.Option(names = {"-ids", "--container-ids"},

Review Comment:
   I think there's been a move to try to make ozone flag names more standard, 
including not using multi character flags with a single dash.



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/debug/TestContainerKeyScanner.java:
##########
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.debug;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.ClientVersion;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import picocli.CommandLine;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * This class tests `ozone debug ldb ckscanner` CLI that reads from RocksDB
+ * and gets keys for container ids.
+ */
+public class TestContainerKeyScanner {
+  private static final String KEY_TABLE = "keyTable";
+  private static final String FILE_TABLE = "fileTable";
+  private static final String DIRECTORY_TABLE = "directoryTable";
+  private DBStore dbStore;
+  @TempDir
+  private File tempDir;
+  private StringWriter stdout, stderr;
+  private PrintWriter pstdout, pstderr;
+  private CommandLine cmd;
+
+  private static final String KEYS_FOUND_OUTPUT = "{\n" +
+      "  \"keysProcessed\": 3,\n" +
+      "  \"containerKeys\": {\n" +
+      "    \"1\": [\n" +
+      "      {\n" +
+      "        \"containerID\": 1,\n" +
+      "        \"volumeName\": \"vol1\",\n" +
+      "        \"volumeId\": -123,\n" +
+      "        \"bucketName\": \"bucket1\",\n" +
+      "        \"bucketId\": -456,\n" +
+      "        \"keyName\": \"dir1/key1\",\n" +
+      "        \"parentId\": -789\n" +
+      "      }\n" +
+      "    ],\n" +
+      "    \"2\": [\n" +
+      "      {\n" +
+      "        \"containerID\": 2,\n" +
+      "        \"volumeName\": \"vol1\",\n" +
+      "        \"volumeId\": 0,\n" +
+      "        \"bucketName\": \"bucket1\",\n" +
+      "        \"bucketId\": 0,\n" +
+      "        \"keyName\": \"key2\",\n" +
+      "        \"parentId\": 0\n" +
+      "      }\n" +
+      "    ],\n" +
+      "    \"3\": [\n" +
+      "      {\n" +
+      "        \"containerID\": 3,\n" +
+      "        \"volumeName\": \"vol1\",\n" +
+      "        \"volumeId\": 0,\n" +
+      "        \"bucketName\": \"bucket1\",\n" +
+      "        \"bucketId\": 0,\n" +
+      "        \"keyName\": \"key3\",\n" +
+      "        \"parentId\": 0\n" +
+      "      }\n" +
+      "    ]\n" +
+      "  }\n" +
+      "}\n";
+
+  private static final String KEYS_NOT_FOUND_OUTPUT =
+      "No keys were found for container IDs: [1, 2, 3]\n" +
+          "Keys processed: 3\n";
+
+  @BeforeEach
+  public void setup() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    stdout = new StringWriter();
+    pstdout = new PrintWriter(stdout);
+    stderr = new StringWriter();
+    pstderr = new PrintWriter(stderr);
+
+    cmd = new CommandLine(new RDBParser())
+        .addSubcommand(new ContainerKeyScanner())
+        .setOut(pstdout)
+        .setErr(pstderr);
+
+    dbStore = DBStoreBuilder.newBuilder(conf).setName("om.db")
+        .setPath(tempDir.toPath()).addTable(KEY_TABLE).addTable(FILE_TABLE)
+        .addTable(DIRECTORY_TABLE)
+        .build();
+  }
+
+  @AfterEach
+  public void shutdown() throws IOException {
+    if (dbStore != null) {
+      dbStore.close();
+    }
+    pstderr.close();
+    stderr.close();
+    pstdout.close();
+    stdout.close();
+  }
+
+  @Test
+  void testWhenThereAreKeysForConatainerIds() throws IOException {

Review Comment:
   I think the same typo is in the other test name too.



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ContainerKeyScanner.java:
##########
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.debug;
+
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.cli.SubcommandWithParent;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
+import org.apache.hadoop.hdds.utils.db.DBDefinition;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.kohsuke.MetaInfServices;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+import picocli.CommandLine;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.ROOT_PATH;
+
+/**
+ * Parser for a list of container IDs, to scan for keys.
+ */
[email protected](
+    name = "ckscanner",
+    description = "Find keys that reference a container"
+)
+@MetaInfServices(SubcommandWithParent.class)
+public class ContainerKeyScanner implements Callable<Void>,
+    SubcommandWithParent {
+
+  private static final String FILE_TABLE = "fileTable";
+  private static final String KEY_TABLE = "keyTable";
+  private static final String DIRECTORY_TABLE = "directoryTable";
+
+  @CommandLine.Spec
+  private static CommandLine.Model.CommandSpec spec;
+
+  @CommandLine.ParentCommand
+  private RDBParser parent;

Review Comment:
   I'm not sure this command belongs under the `ldb` subcommand. Right now that 
command has generic utils that work on any Ozone RocksDB instance, and this is 
specific to OM. Maybe it hsould be its own top level debug subcommand?



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/debug/TestContainerKeyScanner.java:
##########
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.debug;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.ClientVersion;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import picocli.CommandLine;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * This class tests `ozone debug ldb ckscanner` CLI that reads from RocksDB
+ * and gets keys for container ids.
+ */
+public class TestContainerKeyScanner {
+  private static final String KEY_TABLE = "keyTable";
+  private static final String FILE_TABLE = "fileTable";
+  private static final String DIRECTORY_TABLE = "directoryTable";
+  private DBStore dbStore;
+  @TempDir
+  private File tempDir;
+  private StringWriter stdout, stderr;
+  private PrintWriter pstdout, pstderr;
+  private CommandLine cmd;
+
+  private static final String KEYS_FOUND_OUTPUT = "{\n" +
+      "  \"keysProcessed\": 3,\n" +
+      "  \"containerKeys\": {\n" +
+      "    \"1\": [\n" +
+      "      {\n" +
+      "        \"containerID\": 1,\n" +
+      "        \"volumeName\": \"vol1\",\n" +
+      "        \"volumeId\": -123,\n" +
+      "        \"bucketName\": \"bucket1\",\n" +
+      "        \"bucketId\": -456,\n" +
+      "        \"keyName\": \"dir1/key1\",\n" +
+      "        \"parentId\": -789\n" +
+      "      }\n" +
+      "    ],\n" +
+      "    \"2\": [\n" +
+      "      {\n" +
+      "        \"containerID\": 2,\n" +
+      "        \"volumeName\": \"vol1\",\n" +
+      "        \"volumeId\": 0,\n" +
+      "        \"bucketName\": \"bucket1\",\n" +
+      "        \"bucketId\": 0,\n" +
+      "        \"keyName\": \"key2\",\n" +
+      "        \"parentId\": 0\n" +
+      "      }\n" +
+      "    ],\n" +
+      "    \"3\": [\n" +
+      "      {\n" +
+      "        \"containerID\": 3,\n" +
+      "        \"volumeName\": \"vol1\",\n" +
+      "        \"volumeId\": 0,\n" +
+      "        \"bucketName\": \"bucket1\",\n" +
+      "        \"bucketId\": 0,\n" +
+      "        \"keyName\": \"key3\",\n" +
+      "        \"parentId\": 0\n" +
+      "      }\n" +
+      "    ]\n" +
+      "  }\n" +
+      "}\n";
+
+  private static final String KEYS_NOT_FOUND_OUTPUT =
+      "No keys were found for container IDs: [1, 2, 3]\n" +
+          "Keys processed: 3\n";
+
+  @BeforeEach
+  public void setup() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    stdout = new StringWriter();
+    pstdout = new PrintWriter(stdout);
+    stderr = new StringWriter();
+    pstderr = new PrintWriter(stderr);
+
+    cmd = new CommandLine(new RDBParser())
+        .addSubcommand(new ContainerKeyScanner())
+        .setOut(pstdout)
+        .setErr(pstderr);
+
+    dbStore = DBStoreBuilder.newBuilder(conf).setName("om.db")
+        .setPath(tempDir.toPath()).addTable(KEY_TABLE).addTable(FILE_TABLE)
+        .addTable(DIRECTORY_TABLE)
+        .build();
+  }
+
+  @AfterEach
+  public void shutdown() throws IOException {
+    if (dbStore != null) {
+      dbStore.close();
+    }
+    pstderr.close();
+    stderr.close();
+    pstdout.close();
+    stdout.close();
+  }
+
+  @Test
+  void testWhenThereAreKeysForConatainerIds() throws IOException {
+
+    // create keys for tables
+    long volumeId = -123L;
+    long bucketId = -456L;
+    long dirObjectId = -789L;
+    createDirectory(volumeId, bucketId, bucketId, dirObjectId, "dir1");
+    createFile(volumeId, bucketId, "key1", -987L, dirObjectId, 1L);
+    createKey("key2", 2L);
+    createKey("key3", 3L);
+
+    String[] cmdArgs =
+        {"--db", dbStore.getDbLocation().getAbsolutePath(), "ckscanner",
+            "-ids", "1,2,3"};
+
+    int exitCode = cmd.execute(cmdArgs);
+    Assertions.assertEquals(0, exitCode);
+
+    Assertions.assertTrue(stdout.toString().contains(KEYS_FOUND_OUTPUT));
+
+    Assertions.assertTrue(stderr.toString().isEmpty());
+  }
+
+  @Test
+  void testWhenThereAreNotKeysForConatainerIds() throws IOException {
+
+    // create keys for tables
+    long volumeId = -123L;
+    long bucketId = -456L;
+    createFile(volumeId, bucketId, "key1", -987L, bucketId, 4L);
+    createKey("key2", 5L);
+    createKey("key3", 6L);
+
+    String[] cmdArgs =
+        {"--db", dbStore.getDbLocation().getAbsolutePath(), "ckscanner",
+            "-ids", "1,2,3"};
+
+    int exitCode = cmd.execute(cmdArgs);
+    Assertions.assertEquals(0, exitCode);
+
+    Assertions.assertTrue(stdout.toString().contains(KEYS_NOT_FOUND_OUTPUT));
+
+    Assertions.assertTrue(stderr.toString().isEmpty());
+  }
+
+  private void createFile(long volumeId, long bucketId, String keyName,
+                          long objectId, long parentId, long containerId)
+      throws IOException {
+    Table<byte[], byte[]> table = dbStore.getTable(FILE_TABLE);
+
+    // format: /volumeId/bucketId/parentId(bucketId)/keyName
+    String key =
+        "/" + volumeId + "/" + bucketId + "/" + parentId + "/" + keyName;
+
+    OmKeyInfo value =
+        getOmKeyInfo("vol1", "bucket1", keyName, containerId, objectId,
+            parentId);
+
+    table.put(key.getBytes(UTF_8),
+        value.getProtobuf(ClientVersion.CURRENT_VERSION).toByteArray());
+  }
+
+  private void createKey(String keyName, long containerId) throws IOException {
+    Table<byte[], byte[]> table = dbStore.getTable(KEY_TABLE);
+
+    String volumeName = "vol1";
+    String bucketName = "bucket1";
+    // format: /volumeName/bucketName/keyName
+    String key = "/" + volumeName + "/" + bucketName + "/" + keyName;

Review Comment:
   Can we use `OzoneConsts.OM_KEY_PREFIX` here instead? unfortunately the usual 
builder for these types of things in `OmMetadataManagerImpl#getOzoneKey` is not 
static.



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ContainerKeyScanner.java:
##########
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.debug;
+
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.cli.SubcommandWithParent;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
+import org.apache.hadoop.hdds.utils.db.DBDefinition;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.kohsuke.MetaInfServices;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+import picocli.CommandLine;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.ROOT_PATH;
+
+/**
+ * Parser for a list of container IDs, to scan for keys.
+ */
[email protected](
+    name = "ckscanner",
+    description = "Find keys that reference a container"
+)
+@MetaInfServices(SubcommandWithParent.class)
+public class ContainerKeyScanner implements Callable<Void>,
+    SubcommandWithParent {
+
+  private static final String FILE_TABLE = "fileTable";
+  private static final String KEY_TABLE = "keyTable";
+  private static final String DIRECTORY_TABLE = "directoryTable";
+
+  @CommandLine.Spec
+  private static CommandLine.Model.CommandSpec spec;
+
+  @CommandLine.ParentCommand
+  private RDBParser parent;
+
+  @CommandLine.Option(names = {"-ids", "--container-ids"},
+      split = ",",
+      paramLabel = "containerIDs",
+      required = true,
+      description = "Set of container IDs to be used for getting all " +
+          "their keys. Example-usage: 1,11,2 (Separated by ',').")
+  private Set<Long> containerIds;
+
+  private static Map<String, OmDirectoryInfo> directoryTable;

Review Comment:
   I think the directory table may be too large to require completely loading 
it in to memory.



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ContainerKeyScanner.java:
##########
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.debug;
+
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.cli.SubcommandWithParent;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
+import org.apache.hadoop.hdds.utils.db.DBDefinition;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.kohsuke.MetaInfServices;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+import picocli.CommandLine;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.ROOT_PATH;
+
+/**
+ * Parser for a list of container IDs, to scan for keys.
+ */
[email protected](
+    name = "ckscanner",
+    description = "Find keys that reference a container"
+)
+@MetaInfServices(SubcommandWithParent.class)
+public class ContainerKeyScanner implements Callable<Void>,
+    SubcommandWithParent {
+
+  private static final String FILE_TABLE = "fileTable";
+  private static final String KEY_TABLE = "keyTable";
+  private static final String DIRECTORY_TABLE = "directoryTable";
+
+  @CommandLine.Spec
+  private static CommandLine.Model.CommandSpec spec;
+
+  @CommandLine.ParentCommand
+  private RDBParser parent;
+
+  @CommandLine.Option(names = {"-ids", "--container-ids"},
+      split = ",",
+      paramLabel = "containerIDs",
+      required = true,
+      description = "Set of container IDs to be used for getting all " +
+          "their keys. Example-usage: 1,11,2 (Separated by ',').")
+  private Set<Long> containerIds;
+
+  private static Map<String, OmDirectoryInfo> directoryTable;
+  private static boolean isDirTableLoaded = false;
+
+  @Override
+  public Void call() throws Exception {
+    ContainerKeyInfoWrapper containerKeyInfoWrapper =
+        scanDBForContainerKeys(parent.getDbPath());
+
+    printOutput(containerKeyInfoWrapper);
+
+    closeStdChannels();
+
+    return null;
+  }
+
+  private void closeStdChannels() {
+    out().close();
+    err().close();
+  }
+
+  private Map<String, OmDirectoryInfo> getDirectoryTableData(String dbPath)
+      throws RocksDBException, IOException {
+    Map<String, OmDirectoryInfo> directoryTableData = new HashMap<>();
+
+    // Get all tables from RocksDB
+    List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+        RocksDBUtils.getColumnFamilyDescriptors(dbPath);
+    final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
+
+    // Get all table handles
+    try (ManagedRocksDB db = ManagedRocksDB.openReadOnly(dbPath,
+        columnFamilyDescriptors, columnFamilyHandles)) {
+      dbPath = removeTrailingSlashIfNeeded(dbPath);
+      DBDefinition dbDefinition = DBDefinitionFactory.getDefinition(
+          Paths.get(dbPath), new OzoneConfiguration());
+      if (dbDefinition == null) {
+        throw new IllegalStateException("Incorrect DB Path");
+      }
+
+      // Get directory table
+      DBColumnFamilyDefinition<?, ?> columnFamilyDefinition =
+          dbDefinition.getColumnFamily(DIRECTORY_TABLE);
+      if (columnFamilyDefinition == null) {
+        throw new IllegalStateException(
+            "Table with name" + DIRECTORY_TABLE + " not found");
+      }
+
+      // Get directory table handle
+      ColumnFamilyHandle columnFamilyHandle = getColumnFamilyHandle(
+          columnFamilyDefinition.getName().getBytes(UTF_8),
+          columnFamilyHandles);
+      if (columnFamilyHandle == null) {
+        throw new IllegalStateException("columnFamilyHandle is null");
+      }
+
+      // Get iterator for directory table
+      try (ManagedRocksIterator iterator = new ManagedRocksIterator(
+          db.get().newIterator(columnFamilyHandle))) {
+        iterator.get().seekToFirst();
+        while (iterator.get().isValid()) {
+          directoryTableData.put(new String(iterator.get().key(), UTF_8),
+              ((OmDirectoryInfo) columnFamilyDefinition.getValueCodec()
+                  .fromPersistedFormat(iterator.get().value())));
+          iterator.get().next();
+        }
+      }
+    }
+
+    return directoryTableData;
+  }
+
+  @Override
+  public Class<?> getParentType() {
+    return RDBParser.class;
+  }
+
+  private static PrintWriter err() {
+    return spec.commandLine().getErr();
+  }
+
+  private static PrintWriter out() {
+    return spec.commandLine().getOut();
+  }
+
+  public Map<Long, Path> getAbsolutePathForObjectIDs(
+      long bucketId, String prefix, Optional<Set<Long>> dirObjIds) {
+    // Root of a bucket would always have the
+    // key as /volumeId/bucketId/bucketId/
+    if (!dirObjIds.isPresent() || dirObjIds.get().isEmpty()) {
+      return Collections.emptyMap();
+    }
+    Set<Long> objIds = Sets.newHashSet(dirObjIds.get());
+    Map<Long, Path> objectIdPathMap = new HashMap<>();
+    Queue<Pair<Long, Path>> objectIdPathVals = new LinkedList<>();
+    Pair<Long, Path> root = Pair.of(bucketId, ROOT_PATH);
+    objectIdPathVals.add(root);
+    addToPathMap(root, objIds, objectIdPathMap);
+
+    while (!objectIdPathVals.isEmpty() && !objIds.isEmpty()) {
+      Pair<Long, Path> parentPair = objectIdPathVals.poll();
+      String subDir = prefix + parentPair.getKey() + OM_KEY_PREFIX;
+
+      Iterator<String> subDirIterator =
+          directoryTable.keySet().stream()
+              .filter(k -> k.startsWith(subDir))
+              .collect(Collectors.toList()).iterator();
+      while (!objIds.isEmpty() && subDirIterator.hasNext()) {
+        OmDirectoryInfo childDir =
+            directoryTable.get(subDirIterator.next());
+        Pair<Long, Path> pathVal = Pair.of(childDir.getObjectID(),
+            parentPair.getValue().resolve(childDir.getName()));
+        addToPathMap(pathVal, objIds, objectIdPathMap);
+        objectIdPathVals.add(pathVal);
+      }
+    }
+    // Invalid directory objectId which does not exist in the given bucket.
+    if (!objIds.isEmpty()) {
+      throw new IllegalArgumentException(
+          "Dir object Ids required but not found in bucket: " + objIds);
+    }
+    return objectIdPathMap;
+  }
+
+  private void addToPathMap(Pair<Long, Path> objectIDPath,
+                            Set<Long> dirObjIds, Map<Long, Path> pathMap) {
+    if (dirObjIds.contains(objectIDPath.getKey())) {
+      pathMap.put(objectIDPath.getKey(), objectIDPath.getValue());
+      dirObjIds.remove(objectIDPath.getKey());
+    }
+  }
+
+  private ContainerKeyInfoWrapper scanDBForContainerKeys(String dbPath)
+      throws RocksDBException, IOException {
+    List<ContainerKeyInfo> containerKeyInfos = new ArrayList<>();
+
+    List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+        RocksDBUtils.getColumnFamilyDescriptors(dbPath);
+    final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
+    long keysProcessed = 0;
+
+    try (ManagedRocksDB db = ManagedRocksDB.openReadOnly(dbPath,
+        columnFamilyDescriptors, columnFamilyHandles)) {
+      dbPath = removeTrailingSlashIfNeeded(dbPath);
+      DBDefinition dbDefinition = DBDefinitionFactory.getDefinition(
+          Paths.get(dbPath), new OzoneConfiguration());
+      if (dbDefinition == null) {
+        throw new IllegalStateException("Incorrect DB Path");
+      }
+
+      keysProcessed +=
+          processTable(dbDefinition, columnFamilyHandles, db,
+              containerKeyInfos, FILE_TABLE);
+      keysProcessed +=
+          processTable(dbDefinition, columnFamilyHandles, db,
+              containerKeyInfos, KEY_TABLE);
+    }
+    return new ContainerKeyInfoWrapper(keysProcessed, containerKeyInfos);
+  }
+
+  private long processTable(DBDefinition dbDefinition,
+                            List<ColumnFamilyHandle> columnFamilyHandles,
+                            ManagedRocksDB db,
+                            List<ContainerKeyInfo> containerKeyInfos,
+                            String tableName)
+      throws IOException {
+    long keysProcessed = 0;
+    DBColumnFamilyDefinition<?, ?> columnFamilyDefinition =
+        dbDefinition.getColumnFamily(tableName);
+    if (columnFamilyDefinition == null) {
+      throw new IllegalStateException(
+          "Table with name" + tableName + " not found");
+    }
+
+    ColumnFamilyHandle columnFamilyHandle = getColumnFamilyHandle(
+        columnFamilyDefinition.getName().getBytes(UTF_8),
+        columnFamilyHandles);
+    if (columnFamilyHandle == null) {
+      throw new IllegalStateException("columnFamilyHandle is null");
+    }
+
+    try (ManagedRocksIterator iterator = new ManagedRocksIterator(
+        db.get().newIterator(columnFamilyHandle))) {
+      iterator.get().seekToFirst();
+      while (iterator.get().isValid()) {
+        OmKeyInfo value = ((OmKeyInfo) columnFamilyDefinition.getValueCodec()
+            .fromPersistedFormat(iterator.get().value()));
+        List<OmKeyLocationInfoGroup> keyLocationVersions =
+            value.getKeyLocationVersions();
+        if (Objects.isNull(keyLocationVersions)) {
+          iterator.get().next();
+          keysProcessed++;
+          continue;
+        }
+
+        long volumeId = 0;
+        long bucketId = 0;
+        // volumeId and bucketId are only applicable to file table
+        if (tableName.equals(FILE_TABLE)) {
+          String key = new String(iterator.get().key(), UTF_8);
+          String[] keyParts = key.split(OM_KEY_PREFIX);
+          volumeId = Long.parseLong(keyParts[1]);
+          bucketId = Long.parseLong(keyParts[2]);
+        }
+
+        for (OmKeyLocationInfoGroup locationInfoGroup : keyLocationVersions) {
+          for (List<OmKeyLocationInfo> locationInfos :
+              locationInfoGroup.getLocationVersionMap().values()) {
+            for (OmKeyLocationInfo locationInfo : locationInfos) {
+              if (containerIds.contains(locationInfo.getContainerID())) {
+                // Generate asbolute key path for FSO keys
+                StringBuilder keyName = new StringBuilder();
+                if (tableName.equals(FILE_TABLE)) {
+                  // Load directory table only after the first fso key is found
+                  // to reduce necessary load if there are not fso keys
+                  if (!isDirTableLoaded) {
+                    long start = System.currentTimeMillis();
+                    directoryTable = getDirectoryTableData(parent.getDbPath());
+                    long end = System.currentTimeMillis();
+                    out().println(
+                        "directoryTable loaded in " + (end - start) + " ms.");
+                    isDirTableLoaded = true;
+                  }
+                  keyName.append(getFsoKeyPrefix(volumeId, bucketId, value));

Review Comment:
   Looks like this is doing a full directory walk for every key found. I think 
this approach would be better:
   1. Scan the file table and save all the matching keys.
   2. If no keys in the file table match, we are done.
   3. Else, do one walk through the directory table to build the full paths for 
all of those keys.
   As it is this tool is storing all matches in memory either way. 
   
   A more robust approach would be to have the tool print json matches as it 
goes so that it works with any size input and output. Then we would walk the 
directory and file table in one iteration to check for matches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to