This is an automated email from the ASF dual-hosted git repository.

sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 80c6446a11 HDDS-8101. Add tool to repair broken FSO tree (#7368)
80c6446a11 is described below

commit 80c6446a115bc454ad9f5930f8a1d0e0f45f9368
Author: Sarveksha Yeshavantha Raju 
<[email protected]>
AuthorDate: Mon Dec 9 20:11:11 2024 +0530

    HDDS-8101. Add tool to repair broken FSO tree (#7368)
---
 .../hadoop/ozone/repair/om/TestFSORepairTool.java  | 569 +++++++++++++++++
 .../hadoop/ozone/repair/om/FSORepairCLI.java       |  78 +++
 .../hadoop/ozone/repair/om/FSORepairTool.java      | 710 +++++++++++++++++++++
 .../apache/hadoop/ozone/repair/om/OMRepair.java    |  47 ++
 4 files changed, 1404 insertions(+)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/repair/om/TestFSORepairTool.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/repair/om/TestFSORepairTool.java
new file mode 100644
index 0000000000..4006ec6e82
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/repair/om/TestFSORepairTool.java
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.repair.om;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.client.BucketArgs;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.om.OMStorage;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.repair.OzoneRepair;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * FSORepairTool test cases.
+ */
+public class TestFSORepairTool {
+  public static final Logger LOG = 
LoggerFactory.getLogger(TestFSORepairTool.class);
+  private final ByteArrayOutputStream out = new ByteArrayOutputStream();
+  private final ByteArrayOutputStream err = new ByteArrayOutputStream();
+  private static final PrintStream OLD_OUT = System.out;
+  private static final PrintStream OLD_ERR = System.err;
+  private static final String DEFAULT_ENCODING = UTF_8.name();
+  private MiniOzoneCluster cluster;
+  private FileSystem fs;
+  private OzoneClient client;
+  private OzoneConfiguration conf = null;
+
+  @BeforeEach
+  public void init() throws Exception {
+    // Set configs.
+    conf = new OzoneConfiguration();
+
+    // Build cluster.
+    cluster =  MiniOzoneCluster.newBuilder(conf).build();
+    cluster.waitForClusterToBeReady();
+
+    // Init ofs.
+    final String rootPath = String.format("%s://%s/", OZONE_OFS_URI_SCHEME, 
conf.get(OZONE_OM_ADDRESS_KEY));
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+    fs = FileSystem.get(conf);
+    client = OzoneClientFactory.getRpcClient(conf);
+
+    System.setOut(new PrintStream(out, false, DEFAULT_ENCODING));
+    System.setErr(new PrintStream(err, false, DEFAULT_ENCODING));
+  }
+
+  @AfterEach
+  public void reset() throws IOException {
+    // reset stream after each unit test
+    out.reset();
+    err.reset();
+
+    // restore system streams
+    System.setOut(OLD_OUT);
+    System.setErr(OLD_ERR);
+
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    if (client != null) {
+      client.close();
+    }
+    IOUtils.closeQuietly(fs);
+  }
+
+  @Test
+  public void testConnectedTreeOneBucket() throws Exception {
+    CommandLine cmd = new OzoneRepair().getCmd();
+    String dbPath = new File(OMStorage.getOmDbDir(conf) + "/" + 
OM_DB_NAME).getPath();
+
+    FSORepairTool.Report expectedReport = buildConnectedTree("vol1", 
"bucket1");
+    String expectedOutput = serializeReport(expectedReport);
+
+    // Test the connected tree in debug mode.
+    cluster.getOzoneManager().stop();
+
+    String[] args = new String[] {"om", "fso-tree", "--db", dbPath};
+    int exitCode = cmd.execute(args);
+    assertEquals(0, exitCode);
+
+    String cliOutput = out.toString(DEFAULT_ENCODING);
+    String reportOutput = extractRelevantSection(cliOutput);
+    Assertions.assertEquals(expectedOutput, reportOutput);
+
+    out.reset();
+    err.reset();
+
+    // Running again in repair mode should give same results since the tree is 
connected.
+    String[] args1 = new String[] {"om", "fso-tree", "--db", dbPath, 
"--repair"};
+    int exitCode1 = cmd.execute(args1);
+    assertEquals(0, exitCode1);
+
+    String cliOutput1 = out.toString(DEFAULT_ENCODING);
+    String reportOutput1 = extractRelevantSection(cliOutput1);
+    Assertions.assertEquals(expectedOutput, reportOutput1);
+
+    cluster.getOzoneManager().restart();
+  }
+
+  @Test
+  public void testReportedDataSize() throws Exception {
+    CommandLine cmd = new OzoneRepair().getCmd();
+    String dbPath = new File(OMStorage.getOmDbDir(conf) + "/" + 
OM_DB_NAME).getPath();
+
+    FSORepairTool.Report report1 = buildDisconnectedTree("vol1", "bucket1", 
10);
+    FSORepairTool.Report report2 = buildConnectedTree("vol1", "bucket2", 10);
+    FSORepairTool.Report expectedReport = new FSORepairTool.Report(report1, 
report2);
+    String expectedOutput = serializeReport(expectedReport);
+
+    cluster.getOzoneManager().stop();
+
+    String[] args = new String[] {"om", "fso-tree", "--db", dbPath, 
"--repair"};
+    int exitCode = cmd.execute(args);
+    assertEquals(0, exitCode);
+
+    String cliOutput = out.toString(DEFAULT_ENCODING);
+    String reportOutput = extractRelevantSection(cliOutput);
+
+    Assertions.assertEquals(expectedOutput, reportOutput);
+    cluster.getOzoneManager().restart();
+  }
+
+  /**
+   * Test to verify how the tool processes the volume and bucket
+   * filters.
+   */
+  @Test
+  public void testVolumeAndBucketFilter() throws Exception {
+    CommandLine cmd = new OzoneRepair().getCmd();
+    String dbPath = new File(OMStorage.getOmDbDir(conf) + "/" + 
OM_DB_NAME).getPath();
+
+    FSORepairTool.Report report1 = buildDisconnectedTree("vol1", "bucket1", 
10);
+    FSORepairTool.Report report2 = buildConnectedTree("vol2", "bucket2", 10);
+    FSORepairTool.Report expectedReport1 = new FSORepairTool.Report(report1);
+    FSORepairTool.Report expectedReport2 = new FSORepairTool.Report(report2);
+
+    cluster.getOzoneManager().stop();
+
+    // When volume filter is passed
+    String[] args1 = new String[]{"om", "fso-tree", "--db", dbPath, 
"--volume", "/vol1"};
+    int exitCode1 = cmd.execute(args1);
+    assertEquals(0, exitCode1);
+
+    String cliOutput1 = out.toString(DEFAULT_ENCODING);
+    String reportOutput1 = extractRelevantSection(cliOutput1);
+    String expectedOutput1 = serializeReport(expectedReport1);
+    Assertions.assertEquals(expectedOutput1, reportOutput1);
+
+    out.reset();
+    err.reset();
+
+    // When both volume and bucket filters are passed
+    String[] args2 = new String[]{"om", "fso-tree", "--db", dbPath, 
"--volume", "/vol2",
+        "--bucket", "bucket2"};
+    int exitCode2 = cmd.execute(args2);
+    assertEquals(0, exitCode2);
+
+    String cliOutput2 = out.toString(DEFAULT_ENCODING);
+    String reportOutput2 = extractRelevantSection(cliOutput2);
+    String expectedOutput2 = serializeReport(expectedReport2);
+    Assertions.assertEquals(expectedOutput2, reportOutput2);
+
+    out.reset();
+    err.reset();
+
+    // When a non-existent bucket filter is passed
+    String[] args3 = new String[]{"om", "fso-tree", "--db", dbPath, 
"--volume", "/vol1",
+        "--bucket", "bucket2"};
+    int exitCode3 = cmd.execute(args3);
+    assertEquals(0, exitCode3);
+    String cliOutput3 = out.toString(DEFAULT_ENCODING);
+    Assertions.assertTrue(cliOutput3.contains("Bucket 'bucket2' does not exist 
in volume '/vol1'."));
+
+    out.reset();
+    err.reset();
+
+    // When a non-existent volume filter is passed
+    String[] args4 = new String[]{"om", "fso-tree", "--db", dbPath, 
"--volume", "/vol3"};
+    int exitCode4 = cmd.execute(args4);
+    assertEquals(0, exitCode4);
+    String cliOutput4 = out.toString(DEFAULT_ENCODING);
+    Assertions.assertTrue(cliOutput4.contains("Volume '/vol3' does not 
exist."));
+
+    out.reset();
+    err.reset();
+
+    // When bucket filter is passed without the volume filter.
+    String[] args5 = new String[]{"om", "fso-tree", "--db", dbPath, 
"--bucket", "bucket1"};
+    int exitCode5 = cmd.execute(args5);
+    assertEquals(0, exitCode5);
+    String cliOutput5 = out.toString(DEFAULT_ENCODING);
+    Assertions.assertTrue(cliOutput5.contains("--bucket flag cannot be used 
without specifying --volume."));
+
+    cluster.getOzoneManager().restart();
+  }
+
+  @Test
+  public void testMultipleBucketsAndVolumes() throws Exception {
+    CommandLine cmd = new OzoneRepair().getCmd();
+    String dbPath = new File(OMStorage.getOmDbDir(conf) + "/" + 
OM_DB_NAME).getPath();
+
+    FSORepairTool.Report report1 = buildConnectedTree("vol1", "bucket1");
+    FSORepairTool.Report report2 = buildDisconnectedTree("vol2", "bucket2");
+    FSORepairTool.Report expectedAggregateReport = new 
FSORepairTool.Report(report1, report2);
+    String expectedOutput = serializeReport(expectedAggregateReport);
+
+    cluster.getOzoneManager().stop();
+
+    String[] args = new String[] {"om", "fso-tree", "--db", dbPath, 
"--repair"};
+    int exitCode = cmd.execute(args);
+    assertEquals(0, exitCode);
+
+    String cliOutput = out.toString(DEFAULT_ENCODING);
+    String reportOutput = extractRelevantSection(cliOutput);
+    Assertions.assertEquals(expectedOutput, reportOutput);
+
+    cluster.getOzoneManager().restart();
+  }
+
+  /**
+   * Tests having multiple entries in the deleted file and directory tables
+   * for the same objects.
+   */
+  @Test
+  public void testDeleteOverwrite() throws Exception {
+    CommandLine cmd = new OzoneRepair().getCmd();
+    String dbPath = new File(OMStorage.getOmDbDir(conf) + "/" + 
OM_DB_NAME).getPath();
+
+    // Create files and dirs under dir1. To make sure they are added to the
+    // delete table, the keys must have data.
+    buildConnectedTree("vol1", "bucket1", 10);
+    // Move soon to be disconnected objects to the deleted table.
+    fs.delete(new Path("/vol1/bucket1/dir1/dir2/file3"), true);
+    fs.delete(new Path("/vol1/bucket1/dir1/dir2"), true);
+    fs.delete(new Path("/vol1/bucket1/dir1/file1"), true);
+    fs.delete(new Path("/vol1/bucket1/dir1/file2"), true);
+
+    // Recreate deleted objects, then disconnect dir1.
+    // This means after the repair runs, these objects will be
+    // the deleted tables multiple times. Some will have the same dir1 parent 
ID
+    // in their key name too.
+    ContractTestUtils.touch(fs, new Path("/vol1/bucket1/dir1/dir2/file3"));
+    ContractTestUtils.touch(fs, new Path("/vol1/bucket1/dir1/file1"));
+    ContractTestUtils.touch(fs, new Path("/vol1/bucket1/dir1/file2"));
+    disconnectDirectory("dir1");
+
+    cluster.getOzoneManager().stop();
+
+    String[] args = new String[]{"om", "fso-tree", "--db", dbPath, "--repair"};
+    int exitCode = cmd.execute(args);
+    assertEquals(0, exitCode);
+
+    String cliOutput = out.toString(DEFAULT_ENCODING);
+    Assertions.assertTrue(cliOutput.contains("Unreferenced:\n\tDirectories: 
1\n\tFiles: 3"));
+
+    cluster.getOzoneManager().restart();
+  }
+
+  @Test
+  public void testEmptyFileTrees() throws Exception {
+    CommandLine cmd = new OzoneRepair().getCmd();
+    String dbPath = new File(OMStorage.getOmDbDir(conf) + "/" + 
OM_DB_NAME).getPath();
+
+    FSORepairTool.Report emptyReport = buildEmptyTree();
+    String expectedOutput = serializeReport(emptyReport);
+
+    cluster.getOzoneManager().stop();
+
+    // Run when there are no file trees.
+    String[] args = new String[] {"om", "fso-tree", "--db", dbPath, 
"--repair"};
+    int exitCode = cmd.execute(args);
+    assertEquals(0, exitCode);
+
+    String cliOutput = out.toString(DEFAULT_ENCODING);
+    String reportOutput = extractRelevantSection(cliOutput);
+    Assertions.assertEquals(expectedOutput, reportOutput);
+
+    out.reset();
+    err.reset();
+    cluster.getOzoneManager().restart();
+
+    // Create an empty volume and bucket.
+    fs.mkdirs(new Path("/vol1"));
+    fs.mkdirs(new Path("/vol2/bucket1"));
+
+    cluster.getOzoneManager().stop();
+
+    // Run on an empty volume and bucket.
+    String[] args1 = new String[] {"om", "fso-tree", "--db", dbPath, 
"--repair"};
+    int exitCode1 = cmd.execute(args1);
+    assertEquals(0, exitCode1);
+
+    String cliOutput2 = out.toString(DEFAULT_ENCODING);
+    String reportOutput2 = extractRelevantSection(cliOutput2);
+    Assertions.assertEquals(expectedOutput, reportOutput2);
+
+    cluster.getOzoneManager().restart();
+  }
+
+  @Test
+  public void testNonFSOBucketsSkipped() throws Exception {
+    ObjectStore store = client.getObjectStore();
+
+    // Create legacy and OBS buckets.
+    store.createVolume("vol1");
+    store.getVolume("vol1").createBucket("obs-bucket",
+        BucketArgs.newBuilder().setBucketLayout(BucketLayout.OBJECT_STORE)
+            .build());
+    store.getVolume("vol1").createBucket("legacy-bucket",
+        BucketArgs.newBuilder().setBucketLayout(BucketLayout.LEGACY)
+            .build());
+
+    // Put a key in the legacy and OBS buckets.
+    OzoneOutputStream obsStream = store.getVolume("vol1")
+        .getBucket("obs-bucket")
+        .createKey("prefix/test-key", 3);
+    obsStream.write(new byte[]{1, 1, 1});
+    obsStream.close();
+
+    OzoneOutputStream legacyStream = store.getVolume("vol1")
+        .getBucket("legacy-bucket")
+        .createKey("prefix/test-key", 3);
+    legacyStream.write(new byte[]{1, 1, 1});
+    legacyStream.close();
+
+    CommandLine cmd = new OzoneRepair().getCmd();
+    String dbPath = new File(OMStorage.getOmDbDir(conf) + "/" + 
OM_DB_NAME).getPath();
+
+    // Add an FSO bucket with data.
+    FSORepairTool.Report connectReport = buildConnectedTree("vol1", 
"fso-bucket");
+
+    cluster.getOzoneManager().stop();
+
+    // Even in repair mode there should be no action. legacy and obs buckets
+    // will be skipped and FSO tree is connected.
+    String[] args = new String[] {"om", "fso-tree", "--db", dbPath, 
"--repair"};
+    int exitCode = cmd.execute(args);
+    assertEquals(0, exitCode);
+
+    String cliOutput = out.toString(DEFAULT_ENCODING);
+    String reportOutput = extractRelevantSection(cliOutput);
+    String expectedOutput = serializeReport(connectReport);
+
+    Assertions.assertEquals(expectedOutput, reportOutput);
+    Assertions.assertTrue(cliOutput.contains("Skipping non-FSO bucket 
/vol1/obs-bucket"));
+    Assertions.assertTrue(cliOutput.contains("Skipping non-FSO bucket 
/vol1/legacy-bucket"));
+
+    cluster.getOzoneManager().restart();
+  }
+
+  private FSORepairTool.Report buildConnectedTree(String volume, String 
bucket) throws Exception {
+    return buildConnectedTree(volume, bucket, 0);
+  }
+
+  private String extractRelevantSection(String cliOutput) {
+    int startIndex = cliOutput.indexOf("Reachable:");
+    if (startIndex == -1) {
+      throw new AssertionError("Output does not contain 'Reachable' section.");
+    }
+    return cliOutput.substring(startIndex).trim();
+  }
+
+  private String serializeReport(FSORepairTool.Report report) {
+    return String.format(
+        "Reachable:%n\tDirectories: %d%n\tFiles: %d%n\tBytes: %d%n" +
+        "Unreachable:%n\tDirectories: %d%n\tFiles: %d%n\tBytes: %d%n" +
+        "Unreferenced:%n\tDirectories: %d%n\tFiles: %d%n\tBytes: %d",
+        report.getReachable().getDirs(),
+        report.getReachable().getFiles(),
+        report.getReachable().getBytes(),
+        report.getUnreachable().getDirs(),
+        report.getUnreachable().getFiles(),
+        report.getUnreachable().getBytes(),
+        report.getUnreferenced().getDirs(),
+        report.getUnreferenced().getFiles(),
+        report.getUnreferenced().getBytes()
+    );
+  }
+
+  /**
+   * Creates a tree with 3 reachable directories and 4 reachable files.
+   */
+  private FSORepairTool.Report buildConnectedTree(String volume, String 
bucket, int fileSize) throws Exception {
+    Path bucketPath = new Path("/" + volume + "/" + bucket);
+    Path dir1 = new Path(bucketPath, "dir1");
+    Path file1 = new Path(dir1, "file1");
+    Path file2 = new Path(dir1, "file2");
+
+    Path dir2 = new Path(bucketPath, "dir1/dir2");
+    Path file3 = new Path(dir2, "file3");
+
+    Path dir3 = new Path(bucketPath, "dir3");
+    Path file4 = new Path(bucketPath, "file4");
+
+    fs.mkdirs(dir1);
+    fs.mkdirs(dir2);
+    fs.mkdirs(dir3);
+
+    // Content to put in every file.
+    String data = new String(new char[fileSize]);
+
+    FSDataOutputStream stream = fs.create(file1);
+    stream.write(data.getBytes(StandardCharsets.UTF_8));
+    stream.close();
+    stream = fs.create(file2);
+    stream.write(data.getBytes(StandardCharsets.UTF_8));
+    stream.close();
+    stream = fs.create(file3);
+    stream.write(data.getBytes(StandardCharsets.UTF_8));
+    stream.close();
+    stream = fs.create(file4);
+    stream.write(data.getBytes(StandardCharsets.UTF_8));
+    stream.close();
+
+    assertConnectedTreeReadable(volume, bucket);
+
+    FSORepairTool.ReportStatistics reachableCount =
+            new FSORepairTool.ReportStatistics(3, 4, fileSize * 4L);
+    return new FSORepairTool.Report.Builder()
+        .setReachable(reachableCount)
+        .build();
+  }
+
+  private FSORepairTool.Report buildEmptyTree() {
+    FSORepairTool.ReportStatistics reachableCount =
+            new FSORepairTool.ReportStatistics(0, 0, 0);
+    FSORepairTool.ReportStatistics unreachableCount =
+            new FSORepairTool.ReportStatistics(0, 0, 0);
+    FSORepairTool.ReportStatistics unreferencedCount =
+            new FSORepairTool.ReportStatistics(0, 0, 0);
+    return new FSORepairTool.Report.Builder()
+        .setReachable(reachableCount)
+        .setUnreachable(unreachableCount)
+        .setUnreferenced(unreferencedCount)
+        .build();
+  }
+
+  private void assertConnectedTreeReadable(String volume, String bucket) 
throws IOException {
+    Path bucketPath = new Path("/" + volume + "/" + bucket);
+    Path dir1 = new Path(bucketPath, "dir1");
+    Path file1 = new Path(dir1, "file1");
+    Path file2 = new Path(dir1, "file2");
+
+    Path dir2 = new Path(bucketPath, "dir1/dir2");
+    Path file3 = new Path(dir2, "file3");
+
+    Path dir3 = new Path(bucketPath, "dir3");
+    Path file4 = new Path(bucketPath, "file4");
+
+    Assertions.assertTrue(fs.exists(dir1));
+    Assertions.assertTrue(fs.exists(dir2));
+    Assertions.assertTrue(fs.exists(dir3));
+    Assertions.assertTrue(fs.exists(file1));
+    Assertions.assertTrue(fs.exists(file2));
+    Assertions.assertTrue(fs.exists(file3));
+    Assertions.assertTrue(fs.exists(file4));
+  }
+
+  private FSORepairTool.Report buildDisconnectedTree(String volume, String 
bucket) throws Exception {
+    return buildDisconnectedTree(volume, bucket, 0);
+  }
+
+  /**
+   * Creates a tree with 1 reachable directory, 1 reachable file, 1
+   * unreachable directory, and 3 unreachable files.
+   */
+  private FSORepairTool.Report buildDisconnectedTree(String volume, String 
bucket, int fileSize) throws Exception {
+    buildConnectedTree(volume, bucket, fileSize);
+
+    // Manually remove dir1. This should disconnect 3 of the files and 1 of
+    // the directories.
+    disconnectDirectory("dir1");
+
+    assertDisconnectedTreePartiallyReadable(volume, bucket);
+
+    // dir1 does not count towards the unreachable directories the tool
+    // will see. It was deleted completely so the tool will never see it.
+    FSORepairTool.ReportStatistics reachableCount =
+            new FSORepairTool.ReportStatistics(1, 1, fileSize);
+    FSORepairTool.ReportStatistics unreferencedCount =
+            new FSORepairTool.ReportStatistics(1, 3, fileSize * 3L);
+    return new FSORepairTool.Report.Builder()
+        .setReachable(reachableCount)
+        .setUnreferenced(unreferencedCount)
+        .build();
+  }
+
+  private void disconnectDirectory(String dirName) throws Exception {
+    Table<String, OmDirectoryInfo> dirTable = 
cluster.getOzoneManager().getMetadataManager().getDirectoryTable();
+    try (TableIterator<String, ? extends Table.KeyValue<String, 
OmDirectoryInfo>> iterator = dirTable.iterator()) {
+      while (iterator.hasNext()) {
+        Table.KeyValue<String, OmDirectoryInfo> entry = iterator.next();
+        String key = entry.getKey();
+        if (key.contains(dirName)) {
+          dirTable.delete(key);
+          break;
+        }
+      }
+    }
+  }
+
+  private void assertDisconnectedTreePartiallyReadable(String volume, String 
bucket) throws Exception {
+    Path bucketPath = new Path("/" + volume + "/" + bucket);
+    Path dir1 = new Path(bucketPath, "dir1");
+    Path file1 = new Path(dir1, "file1");
+    Path file2 = new Path(dir1, "file2");
+
+    Path dir2 = new Path(bucketPath, "dir1/dir2");
+    Path file3 = new Path(dir2, "file3");
+
+    Path dir3 = new Path(bucketPath, "dir3");
+    Path file4 = new Path(bucketPath, "file4");
+
+    Assertions.assertFalse(fs.exists(dir1));
+    Assertions.assertFalse(fs.exists(dir2));
+    Assertions.assertTrue(fs.exists(dir3));
+    Assertions.assertFalse(fs.exists(file1));
+    Assertions.assertFalse(fs.exists(file2));
+    Assertions.assertFalse(fs.exists(file3));
+    Assertions.assertTrue(fs.exists(file4));
+  }
+}
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/repair/om/FSORepairCLI.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/repair/om/FSORepairCLI.java
new file mode 100644
index 0000000000..5a217e9f2d
--- /dev/null
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/repair/om/FSORepairCLI.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.repair.om;
+
+import picocli.CommandLine;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Parser for scm.db file.
+ */
[email protected](
+    name = "fso-tree",
+    description = "Identify and repair a disconnected FSO tree by marking 
unreferenced entries for deletion. " +
+        "OM should be stopped while this tool is run."
+)
+public class FSORepairCLI implements Callable<Void> {
+
+  @CommandLine.Option(names = {"--db"},
+      required = true,
+      description = "Path to OM RocksDB")
+  private String dbPath;
+
+  @CommandLine.Option(names = {"-r", "--repair"},
+        defaultValue = "false",
+        description = "Run in repair mode to move unreferenced files and 
directories to deleted tables.")
+  private boolean repair;
+
+  @CommandLine.Option(names = {"-v", "--volume"},
+      description = "Filter by volume name. Add '/' before the volume name.")
+  private String volume;
+
+  @CommandLine.Option(names = {"-b", "--bucket"},
+      description = "Filter by bucket name")
+  private String bucket;
+
+  @CommandLine.Option(names = {"--verbose"},
+      description = "Verbose output. Show all intermediate steps and deleted 
keys info.")
+  private boolean verbose;
+
+  @Override
+  public Void call() throws Exception {
+    if (repair) {
+      System.out.println("FSO Repair Tool is running in repair mode");
+    } else {
+      System.out.println("FSO Repair Tool is running in debug mode");
+    }
+    try {
+      FSORepairTool
+          repairTool = new FSORepairTool(dbPath, repair, volume, bucket, 
verbose);
+      repairTool.run();
+    } catch (Exception ex) {
+      throw new IllegalArgumentException("FSO repair failed: " + 
ex.getMessage());
+    }
+
+    if (verbose) {
+      System.out.println("FSO repair finished.");
+    }
+
+    return null;
+  }
+}
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/repair/om/FSORepairTool.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/repair/om/FSORepairTool.java
new file mode 100644
index 0000000000..7e0fb23f5a
--- /dev/null
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/repair/om/FSORepairTool.java
@@ -0,0 +1,710 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.repair.om;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.helpers.WithObjectID;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Stack;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+/**
+ * Base Tool to identify and repair disconnected FSO trees across all buckets.
+ * This tool logs information about reachable, unreachable and unreferenced 
files and directories in debug mode
+ * and moves these unreferenced files and directories to the deleted tables in 
repair mode.
+
+ * If deletes are still in progress (the deleted directory table is not 
empty), the tool
+ * reports that the tree is unreachable, even though pending deletes would fix 
the issue.
+ * If not, the tool reports them as unreferenced and deletes them in repair 
mode.
+
+ * Before using the tool, make sure all OMs are stopped, and that all Ratis 
logs have been flushed to the OM DB.
+ * This can be done using `ozone admin prepare` before running the tool, and 
`ozone admin
+ * cancelprepare` when done.
+
+ * The tool will run a DFS from each bucket, and save all reachable 
directories as keys in a new temporary RocksDB
+ * instance called "reachable.db" in the same directory as om.db.
+ * It will then scan the entire file and directory tables for each bucket to 
see if each object's parent is in the
+ * reachable table of reachable.db. The reachable table will be dropped and 
recreated for each bucket.
+ * The tool is idempotent. reachable.db will not be deleted automatically when 
the tool finishes,
+ * in case users want to manually inspect it. It can be safely deleted once 
the tool finishes.
+ */
+public class FSORepairTool {
+  public static final Logger LOG = 
LoggerFactory.getLogger(FSORepairTool.class);
+
+  private final String omDBPath;
+  private final DBStore store;
+  private final Table<String, OmVolumeArgs> volumeTable;
+  private final Table<String, OmBucketInfo> bucketTable;
+  private final Table<String, OmDirectoryInfo> directoryTable;
+  private final Table<String, OmKeyInfo> fileTable;
+  private final Table<String, OmKeyInfo> deletedDirectoryTable;
+  private final Table<String, RepeatedOmKeyInfo> deletedTable;
+  private final Table<String, SnapshotInfo> snapshotInfoTable;
+  private final String volumeFilter;
+  private final String bucketFilter;
+  private static final String REACHABLE_TABLE = "reachable";
+  private DBStore reachableDB;
+  private final ReportStatistics reachableStats;
+  private final ReportStatistics unreachableStats;
+  private final ReportStatistics unreferencedStats;
+  private final boolean repair;
+  private final boolean verbose;
+
+  public FSORepairTool(String dbPath, boolean repair, String volume, String 
bucket, boolean verbose)
+      throws IOException {
+    this(getStoreFromPath(dbPath), dbPath, repair, volume, bucket, verbose);
+  }
+
+  /**
+   * Allows passing RocksDB instance from a MiniOzoneCluster directly to this 
class for testing.
+   */
+  public FSORepairTool(DBStore dbStore, String dbPath, boolean repair, String 
volume, String bucket, boolean verbose)
+      throws IOException {
+    this.reachableStats = new ReportStatistics(0, 0, 0);
+    this.unreachableStats = new ReportStatistics(0, 0, 0);
+    this.unreferencedStats = new ReportStatistics(0, 0, 0);
+
+    this.store = dbStore;
+    this.omDBPath = dbPath;
+    this.repair = repair;
+    this.volumeFilter = volume;
+    this.bucketFilter = bucket;
+    this.verbose = verbose;
+    volumeTable = store.getTable(OmMetadataManagerImpl.VOLUME_TABLE,
+        String.class,
+        OmVolumeArgs.class);
+    bucketTable = store.getTable(OmMetadataManagerImpl.BUCKET_TABLE,
+        String.class,
+        OmBucketInfo.class);
+    directoryTable = store.getTable(OmMetadataManagerImpl.DIRECTORY_TABLE,
+        String.class,
+        OmDirectoryInfo.class);
+    fileTable = store.getTable(OmMetadataManagerImpl.FILE_TABLE,
+        String.class,
+        OmKeyInfo.class);
+    deletedDirectoryTable = 
store.getTable(OmMetadataManagerImpl.DELETED_DIR_TABLE,
+        String.class,
+        OmKeyInfo.class);
+    deletedTable = store.getTable(OmMetadataManagerImpl.DELETED_TABLE,
+        String.class,
+        RepeatedOmKeyInfo.class);
+    snapshotInfoTable = 
store.getTable(OmMetadataManagerImpl.SNAPSHOT_INFO_TABLE,
+        String.class,
+        SnapshotInfo.class);
+  }
+
+  protected static DBStore getStoreFromPath(String dbPath) throws IOException {
+    File omDBFile = new File(dbPath);
+    if (!omDBFile.exists() || !omDBFile.isDirectory()) {
+      throw new IOException(String.format("Specified OM DB instance %s does " +
+          "not exist or is not a RocksDB directory.", dbPath));
+    }
+    // Load RocksDB and tables needed.
+    return OmMetadataManagerImpl.loadDB(new OzoneConfiguration(), new 
File(dbPath).getParentFile(), -1);
+  }
+
+  public FSORepairTool.Report run() throws Exception {
+    try {
+      if (bucketFilter != null && volumeFilter == null) {
+        System.out.println("--bucket flag cannot be used without specifying 
--volume.");
+        return null;
+      }
+
+      if (volumeFilter != null) {
+        OmVolumeArgs volumeArgs = volumeTable.getIfExist(volumeFilter);
+        if (volumeArgs == null) {
+          System.out.println("Volume '" + volumeFilter + "' does not exist.");
+          return null;
+        }
+      }
+
+      // Iterate all volumes or a specific volume if specified
+      try (TableIterator<String, ? extends Table.KeyValue<String, 
OmVolumeArgs>>
+               volumeIterator = volumeTable.iterator()) {
+        try {
+          openReachableDB();
+        } catch (IOException e) {
+          System.out.println("Failed to open reachable database: " + 
e.getMessage());
+          throw e;
+        }
+        while (volumeIterator.hasNext()) {
+          Table.KeyValue<String, OmVolumeArgs> volumeEntry = 
volumeIterator.next();
+          String volumeKey = volumeEntry.getKey();
+
+          if (volumeFilter != null && !volumeFilter.equals(volumeKey)) {
+            continue;
+          }
+
+          System.out.println("Processing volume: " + volumeKey);
+
+          if (bucketFilter != null) {
+            OmBucketInfo bucketInfo = bucketTable.getIfExist(volumeKey + "/" + 
bucketFilter);
+            if (bucketInfo == null) {
+              //Bucket does not exist in the volume
+              System.out.println("Bucket '" + bucketFilter + "' does not exist 
in volume '" + volumeKey + "'.");
+              return null;
+            }
+
+            if (bucketInfo.getBucketLayout() != 
BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+              System.out.println("Skipping non-FSO bucket " + bucketFilter);
+              continue;
+            }
+
+            processBucket(volumeEntry.getValue(), bucketInfo);
+          } else {
+
+            // Iterate all buckets in the volume.
+            try (TableIterator<String, ? extends Table.KeyValue<String, 
OmBucketInfo>>
+                         bucketIterator = bucketTable.iterator()) {
+              bucketIterator.seek(volumeKey);
+              while (bucketIterator.hasNext()) {
+                Table.KeyValue<String, OmBucketInfo> bucketEntry = 
bucketIterator.next();
+                String bucketKey = bucketEntry.getKey();
+                OmBucketInfo bucketInfo = bucketEntry.getValue();
+
+                if (bucketInfo.getBucketLayout() != 
BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+                  System.out.println("Skipping non-FSO bucket " + bucketKey);
+                  continue;
+                }
+
+                // Stop this loop once we have seen all buckets in the current
+                // volume.
+                if (!bucketKey.startsWith(volumeKey)) {
+                  break;
+                }
+
+                processBucket(volumeEntry.getValue(), bucketInfo);
+              }
+            }
+          }
+        }
+      }
+    } catch (IOException e) {
+      System.out.println("An error occurred while processing" + 
e.getMessage());
+      throw e;
+    } finally {
+      closeReachableDB();
+      store.close();
+    }
+
+    return buildReportAndLog();
+  }
+
+  private boolean checkIfSnapshotExistsForBucket(String volumeName, String 
bucketName) throws IOException {
+    if (snapshotInfoTable == null) {
+      return false;
+    }
+
+    try (TableIterator<String, ? extends Table.KeyValue<String, SnapshotInfo>> 
iterator =
+            snapshotInfoTable.iterator()) {
+      while (iterator.hasNext()) {
+        SnapshotInfo snapshotInfo = iterator.next().getValue();
+        String snapshotPath = (volumeName + "/" + 
bucketName).replaceFirst("^/", "");
+        if (snapshotInfo.getSnapshotPath().equals(snapshotPath)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private void processBucket(OmVolumeArgs volume, OmBucketInfo bucketInfo) 
throws IOException {
+    System.out.println("Processing bucket: " + volume.getVolume() + "/" + 
bucketInfo.getBucketName());
+    if (checkIfSnapshotExistsForBucket(volume.getVolume(), 
bucketInfo.getBucketName())) {
+      if (!repair) {
+        System.out.println(
+            "Snapshot detected in bucket '" + volume.getVolume() + "/" + 
bucketInfo.getBucketName() + "'. ");
+      } else {
+        System.out.println(
+            "Skipping repair for bucket '" + volume.getVolume() + "/" + 
bucketInfo.getBucketName() + "' " +
+            "due to snapshot presence.");
+        return;
+      }
+    }
+    markReachableObjectsInBucket(volume, bucketInfo);
+    handleUnreachableAndUnreferencedObjects(volume, bucketInfo);
+  }
+
+  private Report buildReportAndLog() {
+    Report report = new Report.Builder()
+        .setReachable(reachableStats)
+        .setUnreachable(unreachableStats)
+        .setUnreferenced(unreferencedStats)
+        .build();
+
+    System.out.println("\n" + report);
+    return report;
+  }
+
+  private void markReachableObjectsInBucket(OmVolumeArgs volume, OmBucketInfo 
bucket) throws IOException {
+    // Only put directories in the stack.
+    // Directory keys should have the form /volumeID/bucketID/parentID/name.
+    Stack<String> dirKeyStack = new Stack<>();
+
+    // Since the tool uses parent directories to check for reachability, add
+    // a reachable entry for the bucket as well.
+    addReachableEntry(volume, bucket, bucket);
+    // Initialize the stack with all immediate child directories of the
+    // bucket, and mark them all as reachable.
+    Collection<String> childDirs = 
getChildDirectoriesAndMarkAsReachable(volume, bucket, bucket);
+    dirKeyStack.addAll(childDirs);
+
+    while (!dirKeyStack.isEmpty()) {
+      // Get one directory and process its immediate children.
+      String currentDirKey = dirKeyStack.pop();
+      OmDirectoryInfo currentDir = directoryTable.get(currentDirKey);
+      if (currentDir == null) {
+        System.out.println("Directory key" + currentDirKey + "to be processed 
was not found in the directory table.");
+        continue;
+      }
+
+      // TODO revisit this for a more memory efficient implementation,
+      //  possibly making better use of RocksDB iterators.
+      childDirs = getChildDirectoriesAndMarkAsReachable(volume, bucket, 
currentDir);
+      dirKeyStack.addAll(childDirs);
+    }
+  }
+
+  private boolean isDirectoryInDeletedDirTable(String dirKey) throws 
IOException {
+    return deletedDirectoryTable.isExist(dirKey);
+  }
+
+  private boolean isFileKeyInDeletedTable(String fileKey) throws IOException {
+    return deletedTable.isExist(fileKey);
+  }
+
+  private void handleUnreachableAndUnreferencedObjects(OmVolumeArgs volume, 
OmBucketInfo bucket) throws IOException {
+    // Check for unreachable and unreferenced directories in the bucket.
+    String bucketPrefix = OM_KEY_PREFIX +
+        volume.getObjectID() +
+        OM_KEY_PREFIX +
+        bucket.getObjectID();
+
+    try (TableIterator<String, ? extends Table.KeyValue<String, 
OmDirectoryInfo>> dirIterator =
+             directoryTable.iterator()) {
+      dirIterator.seek(bucketPrefix);
+      while (dirIterator.hasNext()) {
+        Table.KeyValue<String, OmDirectoryInfo> dirEntry = dirIterator.next();
+        String dirKey = dirEntry.getKey();
+
+        // Only search directories in this bucket.
+        if (!dirKey.startsWith(bucketPrefix)) {
+          break;
+        }
+
+        if (!isReachable(dirKey)) {
+          if (!isDirectoryInDeletedDirTable(dirKey)) {
+            System.out.println("Found unreferenced directory: " + dirKey);
+            unreferencedStats.addDir();
+
+            if (!repair) {
+              if (verbose) {
+                System.out.println("Marking unreferenced directory " + dirKey 
+ " for deletion.");
+              }
+            } else {
+              System.out.println("Deleting unreferenced directory " + dirKey);
+              OmDirectoryInfo dirInfo = dirEntry.getValue();
+              markDirectoryForDeletion(volume.getVolume(), 
bucket.getBucketName(), dirKey, dirInfo);
+            }
+          } else {
+            unreachableStats.addDir();
+          }
+        }
+      }
+    }
+
+    // Check for unreachable and unreferenced files
+    try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
+             fileIterator = fileTable.iterator()) {
+      fileIterator.seek(bucketPrefix);
+      while (fileIterator.hasNext()) {
+        Table.KeyValue<String, OmKeyInfo> fileEntry = fileIterator.next();
+        String fileKey = fileEntry.getKey();
+        // Only search files in this bucket.
+        if (!fileKey.startsWith(bucketPrefix)) {
+          break;
+        }
+
+        OmKeyInfo fileInfo = fileEntry.getValue();
+        if (!isReachable(fileKey)) {
+          if (!isFileKeyInDeletedTable(fileKey)) {
+            System.out.println("Found unreferenced file: " + fileKey);
+            unreferencedStats.addFile(fileInfo.getDataSize());
+
+            if (!repair) {
+              if (verbose) {
+                System.out.println("Marking unreferenced file " + fileKey + " 
for deletion." + fileKey);
+              }
+            } else {
+              System.out.println("Deleting unreferenced file " + fileKey);
+              markFileForDeletion(fileKey, fileInfo);
+            }
+          } else {
+            unreachableStats.addFile(fileInfo.getDataSize());
+          }
+        } else {
+          // NOTE: We are deserializing the proto of every reachable file
+          // just to log it's size. If we don't need this information we could
+          // save time by skipping this step.
+          reachableStats.addFile(fileInfo.getDataSize());
+        }
+      }
+    }
+  }
+
+  protected void markFileForDeletion(String fileKey, OmKeyInfo fileInfo) 
throws IOException {
+    try (BatchOperation batch = store.initBatchOperation()) {
+      fileTable.deleteWithBatch(batch, fileKey);
+
+      RepeatedOmKeyInfo originalRepeatedKeyInfo = deletedTable.get(fileKey);
+      RepeatedOmKeyInfo updatedRepeatedOmKeyInfo = OmUtils.prepareKeyForDelete(
+          fileInfo, fileInfo.getUpdateID(), true);
+      // NOTE: The FSO code seems to write the open key entry with the whole
+      // path, using the object's names instead of their ID. This would only
+      // be possible when the file is deleted explicitly, and not part of a
+      // directory delete. It is also not possible here if the file's parent
+      // is gone. The name of the key does not matter so just use IDs.
+      deletedTable.putWithBatch(batch, fileKey, updatedRepeatedOmKeyInfo);
+      if (verbose) {
+        System.out.println("Added entry " + fileKey + " to open key table: " + 
updatedRepeatedOmKeyInfo);
+      }
+      store.commitBatchOperation(batch);
+    }
+  }
+
+  protected void markDirectoryForDeletion(String volumeName, String bucketName,
+                                        String dirKeyName, OmDirectoryInfo 
dirInfo) throws IOException {
+    try (BatchOperation batch = store.initBatchOperation()) {
+      directoryTable.deleteWithBatch(batch, dirKeyName);
+      // HDDS-7592: Make directory entries in deleted dir table unique.
+      String deleteDirKeyName = dirKeyName + OM_KEY_PREFIX + 
dirInfo.getObjectID();
+
+      // Convert the directory to OmKeyInfo for deletion.
+      OmKeyInfo dirAsKeyInfo = OMFileRequest.getOmKeyInfo(volumeName, 
bucketName, dirInfo, dirInfo.getName());
+      deletedDirectoryTable.putWithBatch(batch, deleteDirKeyName, 
dirAsKeyInfo);
+
+      store.commitBatchOperation(batch);
+    }
+  }
+
+  private Collection<String> 
getChildDirectoriesAndMarkAsReachable(OmVolumeArgs volume, OmBucketInfo bucket,
+                                                                   
WithObjectID currentDir) throws IOException {
+
+    Collection<String> childDirs = new ArrayList<>();
+
+    try (TableIterator<String, ? extends Table.KeyValue<String, 
OmDirectoryInfo>>
+             dirIterator = directoryTable.iterator()) {
+      String dirPrefix = buildReachableKey(volume, bucket, currentDir);
+      // Start searching the directory table at the current directory's
+      // prefix to get its immediate children.
+      dirIterator.seek(dirPrefix);
+      while (dirIterator.hasNext()) {
+        Table.KeyValue<String, OmDirectoryInfo> childDirEntry = 
dirIterator.next();
+        String childDirKey = childDirEntry.getKey();
+        // Stop processing once we have seen all immediate children of this
+        // directory.
+        if (!childDirKey.startsWith(dirPrefix)) {
+          break;
+        }
+        // This directory was reached by search.
+        addReachableEntry(volume, bucket, childDirEntry.getValue());
+        childDirs.add(childDirKey);
+        reachableStats.addDir();
+      }
+    }
+
+    return childDirs;
+  }
+
+  /**
+   * Add the specified object to the reachable table, indicating it is part
+   * of the connected FSO tree.
+   */
+  private void addReachableEntry(OmVolumeArgs volume, OmBucketInfo bucket, 
WithObjectID object) throws IOException {
+    String reachableKey = buildReachableKey(volume, bucket, object);
+    // No value is needed for this table.
+    reachableDB.getTable(REACHABLE_TABLE, String.class, 
byte[].class).put(reachableKey, new byte[]{});
+  }
+
+  /**
+   * Build an entry in the reachable table for the current object, which
+   * could be a bucket, file or directory.
+   */
+  private static String buildReachableKey(OmVolumeArgs volume, OmBucketInfo 
bucket, WithObjectID object) {
+    return OM_KEY_PREFIX +
+        volume.getObjectID() +
+        OM_KEY_PREFIX +
+        bucket.getObjectID() +
+        OM_KEY_PREFIX +
+        object.getObjectID();
+  }
+
+  /**
+   *
+   * @param fileOrDirKey The key of a file or directory in RocksDB.
+   * @return true if the entry's parent is in the reachable table.
+   */
+  protected boolean isReachable(String fileOrDirKey) throws IOException {
+    String reachableParentKey = buildReachableParentKey(fileOrDirKey);
+
+    return reachableDB.getTable(REACHABLE_TABLE, String.class, 
byte[].class).get(reachableParentKey) != null;
+  }
+
+  /**
+   * Build an entry in the reachable table for the current object's parent
+   * object. The object could be a file or directory.
+   */
+  private static String buildReachableParentKey(String fileOrDirKey) {
+    String[] keyParts = fileOrDirKey.split(OM_KEY_PREFIX);
+    // Should be /volID/bucketID/parentID/name
+    // The first part will be blank since key begins with a slash.
+    Preconditions.assertTrue(keyParts.length >= 4);
+    String volumeID = keyParts[1];
+    String bucketID = keyParts[2];
+    String parentID = keyParts[3];
+
+    return OM_KEY_PREFIX +
+        volumeID +
+        OM_KEY_PREFIX +
+        bucketID +
+        OM_KEY_PREFIX +
+        parentID;
+  }
+
+  private void openReachableDB() throws IOException {
+    File reachableDBFile = new File(new File(omDBPath).getParentFile(), 
"reachable.db");
+    System.out.println("Creating database of reachable directories at " + 
reachableDBFile);
+    // Delete the DB from the last run if it exists.
+    if (reachableDBFile.exists()) {
+      FileUtils.deleteDirectory(reachableDBFile);
+    }
+
+    ConfigurationSource conf = new OzoneConfiguration();
+    reachableDB = DBStoreBuilder.newBuilder(conf)
+        .setName("reachable.db")
+        .setPath(reachableDBFile.getParentFile().toPath())
+        .addTable(REACHABLE_TABLE)
+        .build();
+  }
+
+  private void closeReachableDB() throws IOException {
+    if (reachableDB != null) {
+      reachableDB.close();
+    }
+    File reachableDBFile = new File(new File(omDBPath).getParentFile(), 
"reachable.db");
+    if (reachableDBFile.exists()) {
+      FileUtils.deleteDirectory(reachableDBFile);
+    }
+  }
+
+  /**
+   * Define a Report to be created.
+   */
+  public static class Report {
+    private final ReportStatistics reachable;
+    private final ReportStatistics unreachable;
+    private final ReportStatistics unreferenced;
+
+    /**
+     * Builds one report that is the aggregate of multiple others.
+     */
+    public Report(FSORepairTool.Report... reports) {
+      reachable = new ReportStatistics();
+      unreachable = new ReportStatistics();
+      unreferenced = new ReportStatistics();
+
+      for (FSORepairTool.Report report : reports) {
+        reachable.add(report.reachable);
+        unreachable.add(report.unreachable);
+        unreferenced.add(report.unreferenced);
+      }
+    }
+
+    private Report(FSORepairTool.Report.Builder builder) {
+      this.reachable = builder.reachable;
+      this.unreachable = builder.unreachable;
+      this.unreferenced = builder.unreferenced;
+    }
+
+    public ReportStatistics getReachable() {
+      return reachable;
+    }
+
+    public ReportStatistics getUnreachable() {
+      return unreachable;
+    }
+
+    public ReportStatistics getUnreferenced() {
+      return unreferenced;
+    }
+
+    public String toString() {
+      return "Reachable:" + reachable + "\nUnreachable:" + unreachable + 
"\nUnreferenced:" + unreferenced;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == this) {
+        return true;
+      }
+      if (other == null || getClass() != other.getClass()) {
+        return false;
+      }
+      FSORepairTool.Report report = (FSORepairTool.Report) other;
+
+      // Useful for testing.
+      System.out.println("Comparing reports\nExpect:\n" + this + "\nActual:\n" 
+ report);
+
+      return reachable.equals(report.reachable) && 
unreachable.equals(report.unreachable) &&
+             unreferenced.equals(report.unreferenced);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(reachable, unreachable, unreferenced);
+    }
+
+    /**
+     * Builder class for a Report.
+     */
+    public static final class Builder {
+      private ReportStatistics reachable = new ReportStatistics();
+      private ReportStatistics unreachable = new ReportStatistics();
+      private ReportStatistics unreferenced = new ReportStatistics();
+
+      public Builder() {
+      }
+
+      public Builder setReachable(ReportStatistics reachable) {
+        this.reachable = reachable;
+        return this;
+      }
+
+      public Builder setUnreachable(ReportStatistics unreachable) {
+        this.unreachable = unreachable;
+        return this;
+      }
+
+      public Builder setUnreferenced(ReportStatistics unreferenced) {
+        this.unreferenced = unreferenced;
+        return this;
+      }
+
+      public Report build() {
+        return new Report(this);
+      }
+    }
+  }
+
+  /**
+   * Represents the statistics of reachable and unreachable data.
+   * This gives the count of dirs, files and bytes.
+   */
+
+  public static class ReportStatistics {
+    private long dirs;
+    private long files;
+    private long bytes;
+
+    public ReportStatistics() { }
+
+    public ReportStatistics(long dirs, long files, long bytes) {
+      this.dirs = dirs;
+      this.files = files;
+      this.bytes = bytes;
+    }
+
+    public void add(ReportStatistics other) {
+      this.dirs += other.dirs;
+      this.files += other.files;
+      this.bytes += other.bytes;
+    }
+
+    public long getDirs() {
+      return dirs;
+    }
+
+    public long getFiles() {
+      return files;
+    }
+
+    public long getBytes() {
+      return bytes;
+    }
+
+    @Override
+    public String toString() {
+      return "\n\tDirectories: " + dirs +
+              "\n\tFiles: " + files +
+              "\n\tBytes: " + bytes;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == this) {
+        return true;
+      }
+      if (other == null || getClass() != other.getClass()) {
+        return false;
+      }
+      ReportStatistics stats = (ReportStatistics) other;
+
+      return bytes == stats.bytes && files == stats.files && dirs == 
stats.dirs;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(bytes, files, dirs);
+    }
+
+    public void addDir() {
+      dirs++;
+    }
+
+    public void addFile(long size) {
+      files++;
+      bytes += size;
+    }
+  }
+}
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/repair/om/OMRepair.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/repair/om/OMRepair.java
new file mode 100644
index 0000000000..56d42d23f4
--- /dev/null
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/repair/om/OMRepair.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.repair.om;
+
+import org.apache.hadoop.hdds.cli.GenericCli;
+import org.apache.hadoop.hdds.cli.RepairSubcommand;
+import org.kohsuke.MetaInfServices;
+import picocli.CommandLine;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Ozone Repair CLI for OM.
+ */
[email protected](name = "om",
+    subcommands = {
+        FSORepairCLI.class,
+    },
+    description = "Operational tool to repair OM.")
+@MetaInfServices(RepairSubcommand.class)
+public class OMRepair implements Callable<Void>, RepairSubcommand {
+
+  @CommandLine.Spec
+  private CommandLine.Model.CommandSpec spec;
+
+  @Override
+  public Void call() {
+    GenericCli.missingSubcommand(spec);
+    return null;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to