bgaborg commented on a change in pull request #1208: HADOOP-16423. S3Guard 
fsck: Check metadata consistency between S3 and metadatastore (log)
URL: https://github.com/apache/hadoop/pull/1208#discussion_r322595234
 
 

 ##########
 File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardFsck.java
 ##########
 @@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.AWSBadRequestException;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+
+import com.google.common.base.Stopwatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.InvalidParameterException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+
+/**
+ * Main class for the FSCK factored out from S3GuardTool
+ * The implementation uses fixed DynamoDBMetadataStore as the backing store
+ * for metadata.
+ *
+ * Functions:
+ * <ul>
+ *   <li>Checking metadata consistency between S3 and metadatastore</li>
+ * </ul>
+ */
+public class S3GuardFsck {
+  private static final Logger LOG = LoggerFactory.getLogger(S3GuardFsck.class);
+  public static final String ROOT_PATH_STRING = "/";
+
+  private S3AFileSystem rawFS;
+  private DynamoDBMetadataStore metadataStore;
+
+  /**
+   * Creates an S3GuardFsck.
+   * @param fs the filesystem to compare to
+   * @param ms metadatastore the metadatastore to compare with (dynamo)
+   */
+  S3GuardFsck(S3AFileSystem fs, MetadataStore ms)
+      throws InvalidParameterException {
+    this.rawFS = fs;
+
+    if (ms == null) {
+      throw new InvalidParameterException("S3AFileSystem should be guarded by"
+          + " a " + DynamoDBMetadataStore.class.getCanonicalName());
+    }
+    this.metadataStore = (DynamoDBMetadataStore) ms;
+
+    if (rawFS.hasMetadataStore()) {
+      throw new InvalidParameterException("Raw fs should not have a "
+          + "metadatastore.");
+    }
+  }
+
+  /**
+   * Compares S3 to MS.
+   * Iterative breadth first walk on the S3 structure from a given root.
+   * Creates a list of pairs (metadata in S3 and in the MetadataStore) where
+   * the consistency or any rule is violated.
+   * Uses {@link S3GuardFsckViolationHandler} to handle violations.
+   * The violations are listed in Enums: {@link Violation}
+   *
+   * @param p the root path to start the traversal
+   * @throws IOException
+   * @return a list of {@link ComparePair}
+   */
+  public List<ComparePair> compareS3ToMs(Path p) throws IOException {
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    int scannedItems = 0;
+
+    final Path rootPath = rawFS.qualify(p);
+    S3AFileStatus root = null;
+    try {
+      root = (S3AFileStatus) rawFS.getFileStatus(rootPath);
+    } catch (AWSBadRequestException e) {
+      throw new IOException(e.getMessage());
+    }
+    final List<ComparePair> comparePairs = new ArrayList<>();
+    final Queue<S3AFileStatus> queue = new ArrayDeque<>();
+    queue.add(root);
+
+    while (!queue.isEmpty()) {
+      final S3AFileStatus currentDir = queue.poll();
+      scannedItems++;
+
+      final Path currentDirPath = currentDir.getPath();
+      List<FileStatus> s3DirListing = 
Arrays.asList(rawFS.listStatus(currentDirPath));
+
+      // DIRECTORIES
+      // Check directory authoritativeness consistency
+      compareAuthoritativeDirectoryFlag(comparePairs, currentDirPath, 
s3DirListing);
+      // Add all descendant directory to the queue
+      s3DirListing.stream().filter(pm -> pm.isDirectory())
+              .map(S3AFileStatus.class::cast)
+              .forEach(pm -> queue.add(pm));
+
+      // FILES
+      // check files for consistency
+      final List<S3AFileStatus> children = s3DirListing.stream()
+              .filter(status -> !status.isDirectory())
+              .map(S3AFileStatus.class::cast).collect(toList());
+      final List<ComparePair> compareResult =
+          compareS3DirToMs(currentDir, children).stream()
+              .filter(comparePair -> comparePair.containsViolation())
+              .collect(toList());
+      comparePairs.addAll(compareResult);
+      scannedItems += children.size();
+    }
+    stopwatch.stop();
+
+    // Create a handler and handle each violated pairs
+    S3GuardFsckViolationHandler handler =
+        new S3GuardFsckViolationHandler(rawFS, metadataStore);
+    comparePairs.forEach(handler::handle);
+
+    LOG.info("Total scan time: {}s", stopwatch.elapsed(TimeUnit.SECONDS));
+    LOG.info("Scanned entries: {}", scannedItems);
+
+    return comparePairs;
+  }
+
+  /**
+   * Compare the directory contents if the listing is authoritative
+   * @param comparePairs the list of compare pairs to add to if it contains a 
violation
+   * @param currentDirPath the current directory path
+   * @param s3DirListing the s3 directory listing to compare with
+   * @throws IOException
+   */
+  private void compareAuthoritativeDirectoryFlag(List<ComparePair> 
comparePairs, Path currentDirPath,
+                                                 List<FileStatus> 
s3DirListing) throws IOException {
+    final DirListingMetadata msDirListing =
+        metadataStore.listChildren(currentDirPath);
+    if (msDirListing != null && msDirListing.isAuthoritative()) {
+      ComparePair cP = new ComparePair(s3DirListing, msDirListing);
+
+      if (s3DirListing.size() != msDirListing.numEntries()) {
+        cP.violations.add(Violation.AUTHORITATIVE_DIRECTORY_CONTENT_MISMATCH);
 
 Review comment:
   ```java 
    /**
      * The violation handler when there's a directory listing content mismatch.
      */
     public static class AuthDirContentMismatch extends ViolationHandler {
   
       public AuthDirContentMismatch(S3GuardFsck.ComparePair comparePair) {
         super(comparePair);
       }
   
       @Override
       public String getError() {
         final String str = String.format(
             "The content of an authoritative directory listing does "
                 + "not match the content of the S3 listing. S3: %s, MS: %s",
             Arrays.asList(getS3DirListing()), getMsDirListing().getListing());
         return str;
       }
     }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to