adoroszlai commented on code in PR #7368:
URL: https://github.com/apache/ozone/pull/7368#discussion_r1852315868


##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/repair/om/FSORepairTool.java:
##########
@@ -0,0 +1,757 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.repair.om;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.RocksDatabase;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.TableConfig;
+import org.apache.hadoop.hdds.utils.db.DBProfile;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.helpers.WithObjectID;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.ratis.util.Preconditions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.Stack;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
+import static 
org.apache.hadoop.hdds.utils.db.DBStoreBuilder.HDDS_DEFAULT_DB_PROFILE;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+/**
+ * Base Tool to identify and repair disconnected FSO trees across all buckets.
+ * This tool logs information about reachable, unreachable and unreferenced 
files and directories in debug mode
+ * and moves these unreferenced files and directories to the deleted tables in 
repair mode.
+
+ * If deletes are still in progress (the deleted directory table is not 
empty), the tool
+ * reports that the tree is unreachable, even though pending deletes would fix 
the issue.
+ * If not, the tool reports them as unreferenced and deletes them in repair 
mode.
+
+ * Before using the tool, make sure all OMs are stopped, and that all Ratis 
logs have been flushed to the OM DB.
+ * This can be done using `ozone admin prepare` before running the tool, and 
`ozone admin
+ * cancelprepare` when done.
+
+ * The tool will run a DFS from each bucket, and save all reachable 
directories as keys in a new temporary RocksDB
+ * instance called "reachable.db" in the same directory as om.db.
+ * It will then scan the entire file and directory tables for each bucket to 
see if each object's parent is in the
+ * reachable table of reachable.db. The reachable table will be dropped and 
recreated for each bucket.
+ * The tool is idempotent. reachable.db will not be deleted automatically when 
the tool finishes,
+ * in case users want to manually inspect it. It can be safely deleted once 
the tool finishes.
+ */
+public class FSORepairTool {
+  public static final Logger LOG = 
LoggerFactory.getLogger(FSORepairTool.class);
+
+  private final String omDBPath;
+  private final DBStore store;
+  private final Table<String, OmVolumeArgs> volumeTable;
+  private final Table<String, OmBucketInfo> bucketTable;
+  private final Table<String, OmDirectoryInfo> directoryTable;
+  private final Table<String, OmKeyInfo> fileTable;
+  private final Table<String, OmKeyInfo> deletedDirectoryTable;
+  private final Table<String, RepeatedOmKeyInfo> deletedTable;
+  private final Table<String, SnapshotInfo> snapshotInfoTable;
+  private final String volumeFilter;
+  private final String bucketFilter;
+  // The temporary DB is used to track which items have been seen.
+  // Since usage of this DB is simple, use it directly from RocksDB.
+  private String reachableDBPath;
+  private static final String REACHABLE_TABLE = "reachable";
+  private static final byte[] REACHABLE_TABLE_BYTES = 
REACHABLE_TABLE.getBytes(StandardCharsets.UTF_8);
+  private ColumnFamilyHandle reachableCFHandle;
+  private RocksDatabase reachableDB;
+  private final ReportStatistics reachableStats;
+  private final ReportStatistics unreachableStats;
+  private final ReportStatistics unreferencedStats;
+  private final boolean repair;
+  private final boolean verbose;
+
+  public FSORepairTool(String dbPath, boolean repair, String volume, String 
bucket, boolean verbose)
+      throws IOException {
+    this(getStoreFromPath(dbPath), dbPath, repair, volume, bucket, verbose);
+  }
+
+  /**
+   * Allows passing RocksDB instance from a MiniOzoneCluster directly to this 
class for testing.
+   */
+  public FSORepairTool(DBStore dbStore, String dbPath, boolean repair, String 
volume, String bucket, boolean verbose)
+      throws IOException {
+    this.reachableStats = new ReportStatistics(0, 0, 0);
+    this.unreachableStats = new ReportStatistics(0, 0, 0);
+    this.unreferencedStats = new ReportStatistics(0, 0, 0);
+
+    this.store = dbStore;
+    this.omDBPath = dbPath;
+    this.repair = repair;
+    this.volumeFilter = volume;
+    this.bucketFilter = bucket;
+    this.verbose = verbose;
+    volumeTable = store.getTable(OmMetadataManagerImpl.VOLUME_TABLE,
+        String.class,
+        OmVolumeArgs.class);
+    bucketTable = store.getTable(OmMetadataManagerImpl.BUCKET_TABLE,
+        String.class,
+        OmBucketInfo.class);
+    directoryTable = store.getTable(OmMetadataManagerImpl.DIRECTORY_TABLE,
+        String.class,
+        OmDirectoryInfo.class);
+    fileTable = store.getTable(OmMetadataManagerImpl.FILE_TABLE,
+        String.class,
+        OmKeyInfo.class);
+    deletedDirectoryTable = 
store.getTable(OmMetadataManagerImpl.DELETED_DIR_TABLE,
+        String.class,
+        OmKeyInfo.class);
+    deletedTable = store.getTable(OmMetadataManagerImpl.DELETED_TABLE,
+        String.class,
+        RepeatedOmKeyInfo.class);
+    snapshotInfoTable = 
store.getTable(OmMetadataManagerImpl.SNAPSHOT_INFO_TABLE,
+        String.class,
+        SnapshotInfo.class);
+  }
+
+  protected static DBStore getStoreFromPath(String dbPath) throws IOException {
+    File omDBFile = new File(dbPath);
+    if (!omDBFile.exists() || !omDBFile.isDirectory()) {
+      throw new IOException(String.format("Specified OM DB instance %s does " +
+          "not exist or is not a RocksDB directory.", dbPath));
+    }
+    // Load RocksDB and tables needed.
+    return OmMetadataManagerImpl.loadDB(new OzoneConfiguration(), new 
File(dbPath).getParentFile(), -1);
+  }
+
+  public FSORepairTool.Report run() throws Exception {
+
+    if (bucketFilter != null && volumeFilter == null) {
+      System.out.println("--bucket flag cannot be used without specifying 
--volume.");
+      return null;
+    }
+
+    if (volumeFilter != null) {
+      OmVolumeArgs volumeArgs = volumeTable.getIfExist(volumeFilter);
+      if (volumeArgs == null) {
+        System.out.println("Volume '" + volumeFilter + "' does not exist.");
+        return null;
+      }
+    }
+
+    // Iterate all volumes or a specific volume if specified
+    try (TableIterator<String, ? extends Table.KeyValue<String, OmVolumeArgs>>
+             volumeIterator = volumeTable.iterator()) {
+      openReachableDB();
+
+      while (volumeIterator.hasNext()) {
+        Table.KeyValue<String, OmVolumeArgs> volumeEntry = 
volumeIterator.next();
+        String volumeKey = volumeEntry.getKey();
+
+        if (volumeFilter != null && !volumeFilter.equals(volumeKey)) {
+          continue;
+        }
+
+        System.out.println("Processing volume: " + volumeKey);
+
+        if (bucketFilter != null) {
+          OmBucketInfo bucketInfo = bucketTable.getIfExist(volumeKey + "/" + 
bucketFilter);
+          if (bucketInfo == null) {
+            //Bucket does not exist in the volume
+            System.out.println("Bucket '" + bucketFilter + "' does not exist 
in volume '" + volumeKey + "'.");
+            return null;
+          }
+
+          if (bucketInfo.getBucketLayout() != 
BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+            System.out.println("Skipping non-FSO bucket " + bucketFilter);
+            continue;
+          }
+
+          processBucket(volumeEntry.getValue(), bucketInfo);
+        } else {
+
+        // Iterate all buckets in the volume.
+          try (TableIterator<String, ? extends Table.KeyValue<String, 
OmBucketInfo>>
+                   bucketIterator = bucketTable.iterator()) {
+            bucketIterator.seek(volumeKey);
+            while (bucketIterator.hasNext()) {
+              Table.KeyValue<String, OmBucketInfo> bucketEntry = 
bucketIterator.next();
+              String bucketKey = bucketEntry.getKey();
+              OmBucketInfo bucketInfo = bucketEntry.getValue();
+
+              if (bucketInfo.getBucketLayout() != 
BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+                System.out.println("Skipping non-FSO bucket " + bucketKey);
+                continue;
+              }
+
+              // Stop this loop once we have seen all buckets in the current
+              // volume.
+              if (!bucketKey.startsWith(volumeKey)) {
+                break;
+              }
+
+              processBucket(volumeEntry.getValue(), bucketInfo);
+            }
+          }
+        }
+      }
+    } finally {
+      closeReachableDB();
+    }
+
+    return buildReportAndLog();
+  }
+
+  private boolean checkIfSnapshotExistsForBucket(String volumeName, String 
bucketName) throws IOException {
+    if (snapshotInfoTable == null) {
+      return false;
+    }
+
+    try (TableIterator<String, ? extends Table.KeyValue<String, SnapshotInfo>> 
iterator =
+            snapshotInfoTable.iterator()) {
+      while (iterator.hasNext()) {
+        SnapshotInfo snapshotInfo = iterator.next().getValue();
+        String snapshotPath = (volumeName + "/" + 
bucketName).replaceFirst("^/", "");
+        if (snapshotInfo.getSnapshotPath().equals(snapshotPath)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private void processBucket(OmVolumeArgs volume, OmBucketInfo bucketInfo) 
throws IOException {
+    System.out.println("Processing bucket: " + volume.getVolume() + "/" + 
bucketInfo.getBucketName());
+    if (checkIfSnapshotExistsForBucket(volume.getVolume(), 
bucketInfo.getBucketName())) {
+      if (!repair) {
+        System.out.println(
+            "Snapshot detected in bucket '" + volume.getVolume() + "/" + 
bucketInfo.getBucketName() + "'. ");
+      } else {
+        System.out.println(
+            "Skipping repair for bucket '" + volume.getVolume() + "/" + 
bucketInfo.getBucketName() + "' " +
+            "due to snapshot presence.");
+        return;
+      }
+    }
+    dropReachableTableIfExists();
+    createReachableTable();
+    markReachableObjectsInBucket(volume, bucketInfo);
+    handleUnreachableAndUnreferencedObjects(volume, bucketInfo);
+    dropReachableTableIfExists();
+  }
+
+  private Report buildReportAndLog() {
+    Report report = new Report.Builder()
+        .setReachable(reachableStats)
+        .setUnreachable(unreachableStats)
+        .setUnreferenced(unreferencedStats)
+        .build();
+
+    System.out.println("\n" + report);
+    return report;
+  }
+
+  private void markReachableObjectsInBucket(OmVolumeArgs volume, OmBucketInfo 
bucket) throws IOException {
+    // Only put directories in the stack.
+    // Directory keys should have the form /volumeID/bucketID/parentID/name.
+    Stack<String> dirKeyStack = new Stack<>();
+
+    // Since the tool uses parent directories to check for reachability, add
+    // a reachable entry for the bucket as well.
+    addReachableEntry(volume, bucket, bucket);
+    // Initialize the stack with all immediate child directories of the
+    // bucket, and mark them all as reachable.
+    Collection<String> childDirs = 
getChildDirectoriesAndMarkAsReachable(volume, bucket, bucket);
+    dirKeyStack.addAll(childDirs);
+
+    while (!dirKeyStack.isEmpty()) {
+      // Get one directory and process its immediate children.
+      String currentDirKey = dirKeyStack.pop();
+      OmDirectoryInfo currentDir = directoryTable.get(currentDirKey);
+      if (currentDir == null) {
+        System.out.println("Directory key" + currentDirKey + "to be processed 
was not found in the directory table.");
+        continue;
+      }
+
+      // TODO revisit this for a more memory efficient implementation,
+      //  possibly making better use of RocksDB iterators.
+      childDirs = getChildDirectoriesAndMarkAsReachable(volume, bucket, 
currentDir);
+      dirKeyStack.addAll(childDirs);
+    }
+  }
+
+  private boolean isDirectoryInDeletedDirTable(String dirKey) throws 
IOException {
+    return deletedDirectoryTable.isExist(dirKey);
+  }
+
+  private boolean isFileKeyInDeletedTable(String fileKey) throws IOException {
+    return deletedTable.isExist(fileKey);
+  }
+
+  private void handleUnreachableAndUnreferencedObjects(OmVolumeArgs volume, 
OmBucketInfo bucket) throws IOException {
+    // Check for unreachable and unreferenced directories in the bucket.
+    String bucketPrefix = OM_KEY_PREFIX +
+        volume.getObjectID() +
+        OM_KEY_PREFIX +
+        bucket.getObjectID();
+
+    try (TableIterator<String, ? extends Table.KeyValue<String, 
OmDirectoryInfo>> dirIterator =
+             directoryTable.iterator()) {
+      dirIterator.seek(bucketPrefix);
+      while (dirIterator.hasNext()) {
+        Table.KeyValue<String, OmDirectoryInfo> dirEntry = dirIterator.next();
+        String dirKey = dirEntry.getKey();
+
+        // Only search directories in this bucket.
+        if (!dirKey.startsWith(bucketPrefix)) {
+          break;
+        }
+
+        if (!isReachable(dirKey)) {
+          if (!isDirectoryInDeletedDirTable(dirKey)) {
+            System.out.println("Found unreferenced directory: " + dirKey);
+            unreferencedStats.addDir();
+
+            if (!repair) {
+              if (verbose) {
+                System.out.println("Marking unreferenced directory " + dirKey 
+ " for deletion.");
+                }

Review Comment:
   ```
   
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/repair/om/FSORepairTool.java
    355: 'if rcurly' has incorrect indentation level 16, expected level should 
be 14
   ```
   
   ```suggestion
                 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to