This is an automated email from the ASF dual-hosted git repository.

gabota pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4e273a3  HADOOP-16423. S3Guard fsck: Check metadata consistency 
between S3 and metadatastore (log) (#1208). Contributed by Gabor Bota.
4e273a3 is described below

commit 4e273a31f66013b7c20e8114451f5bc6c741f2cc
Author: Gabor Bota <gabor.b...@cloudera.com>
AuthorDate: Thu Sep 12 13:12:46 2019 +0200

    HADOOP-16423. S3Guard fsck: Check metadata consistency between S3 and 
metadatastore (log) (#1208). Contributed by Gabor Bota.
    
    Change-Id: I6bbb331b6c0a41c61043e482b95504fda8a50596
---
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |   2 +-
 .../apache/hadoop/fs/s3a/s3guard/S3GuardFsck.java  | 483 ++++++++++++++++++++
 .../s3a/s3guard/S3GuardFsckViolationHandler.java   | 346 ++++++++++++++
 .../apache/hadoop/fs/s3a/s3guard/S3GuardTool.java  |  97 +++-
 .../fs/s3a/ITestS3GuardOutOfBandOperations.java    |  11 +-
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java     |  34 ++
 .../hadoop/fs/s3a/s3guard/ITestS3GuardFsck.java    | 504 +++++++++++++++++++++
 .../fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java   |  30 ++
 .../fs/s3a/s3guard/MetadataStoreTestBase.java      |   2 +-
 9 files changed, 1500 insertions(+), 9 deletions(-)

diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 0ce9823..6bdbba3 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -1501,7 +1501,7 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
    * is set for this filesystem.
    */
   @VisibleForTesting
-  boolean hasAuthoritativeMetadataStore() {
+  public boolean hasAuthoritativeMetadataStore() {
     return hasMetadataStore() && allowAuthoritativeMetadataStore;
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardFsck.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardFsck.java
new file mode 100644
index 0000000..a9925df
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardFsck.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.InvalidParameterException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.base.Stopwatch;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+
+/**
+ * Main class for the FSCK factored out from S3GuardTool
+ * The implementation uses fixed DynamoDBMetadataStore as the backing store
+ * for metadata.
+ *
+ * Functions:
+ * <ul>
+ *   <li>Checking metadata consistency between S3 and metadatastore</li>
+ * </ul>
+ */
+public class S3GuardFsck {
+  private static final Logger LOG = LoggerFactory.getLogger(S3GuardFsck.class);
+  public static final String ROOT_PATH_STRING = "/";
+
+  private final S3AFileSystem rawFS;
+  private final DynamoDBMetadataStore metadataStore;
+
+  private static final long MOD_TIME_RANGE = 2000L;
+
+  /**
+   * Creates an S3GuardFsck.
+   * @param fs the filesystem to compare to
+   * @param ms metadatastore the metadatastore to compare with (dynamo)
+   */
+  public S3GuardFsck(S3AFileSystem fs, MetadataStore ms)
+      throws InvalidParameterException {
+    this.rawFS = fs;
+
+    if (ms == null) {
+      throw new InvalidParameterException("S3A Bucket " + fs.getBucket()
+          + " should be guarded by a "
+          + DynamoDBMetadataStore.class.getCanonicalName());
+    }
+    this.metadataStore = (DynamoDBMetadataStore) ms;
+
+    Preconditions.checkArgument(!rawFS.hasMetadataStore(),
+        "Raw fs should not have a metadatastore.");
+  }
+
+  /**
+   * Compares S3 to MS.
+   * Iterative breadth first walk on the S3 structure from a given root.
+   * Creates a list of pairs (metadata in S3 and in the MetadataStore) where
+   * the consistency or any rule is violated.
+   * Uses {@link S3GuardFsckViolationHandler} to handle violations.
+   * The violations are listed in Enums: {@link Violation}
+   *
+   * @param p the root path to start the traversal
+   * @return a list of {@link ComparePair}
+   * @throws IOException
+   */
+  public List<ComparePair> compareS3ToMs(Path p) throws IOException {
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    int scannedItems = 0;
+
+    final Path rootPath = rawFS.qualify(p);
+    S3AFileStatus root = (S3AFileStatus) rawFS.getFileStatus(rootPath);
+    final List<ComparePair> comparePairs = new ArrayList<>();
+    final Queue<S3AFileStatus> queue = new ArrayDeque<>();
+    queue.add(root);
+
+    while (!queue.isEmpty()) {
+      final S3AFileStatus currentDir = queue.poll();
+
+
+      final Path currentDirPath = currentDir.getPath();
+      try {
+        List<FileStatus> s3DirListing = Arrays.asList(
+            rawFS.listStatus(currentDirPath));
+
+        // Check authoritative directory flag.
+        compareAuthoritativeDirectoryFlag(comparePairs, currentDirPath,
+            s3DirListing);
+        // Add all descendant directory to the queue
+        s3DirListing.stream().filter(pm -> pm.isDirectory())
+            .map(S3AFileStatus.class::cast)
+            .forEach(pm -> queue.add(pm));
+
+        // Check file and directory metadata for consistency.
+        final List<S3AFileStatus> children = s3DirListing.stream()
+            .filter(status -> !status.isDirectory())
+            .map(S3AFileStatus.class::cast).collect(toList());
+        final List<ComparePair> compareResult =
+            compareS3DirContentToMs(currentDir, children);
+        comparePairs.addAll(compareResult);
+
+        // Increase the scanned file size.
+        // One for the directory, one for the children.
+        scannedItems++;
+        scannedItems += children.size();
+      } catch (FileNotFoundException e) {
+        LOG.error("The path has been deleted since it was queued: "
+            + currentDirPath, e);
+      }
+
+    }
+    stopwatch.stop();
+
+    // Create a handler and handle each violated pairs
+    S3GuardFsckViolationHandler handler =
+        new S3GuardFsckViolationHandler(rawFS, metadataStore);
+    comparePairs.forEach(handler::handle);
+
+    LOG.info("Total scan time: {}s", stopwatch.elapsed(TimeUnit.SECONDS));
+    LOG.info("Scanned entries: {}", scannedItems);
+
+    return comparePairs;
+  }
+
+  /**
+   * Compare the directory contents if the listing is authoritative.
+   *
+   * @param comparePairs the list of compare pairs to add to
+   *                     if it contains a violation
+   * @param currentDirPath the current directory path
+   * @param s3DirListing the s3 directory listing to compare with
+   * @throws IOException
+   */
+  private void compareAuthoritativeDirectoryFlag(List<ComparePair> 
comparePairs,
+      Path currentDirPath, List<FileStatus> s3DirListing) throws IOException {
+    final DirListingMetadata msDirListing =
+        metadataStore.listChildren(currentDirPath);
+    if (msDirListing != null && msDirListing.isAuthoritative()) {
+      ComparePair cP = new ComparePair(s3DirListing, msDirListing);
+
+      if (s3DirListing.size() != msDirListing.numEntries()) {
+        cP.violations.add(Violation.AUTHORITATIVE_DIRECTORY_CONTENT_MISMATCH);
+      } else {
+        final Set<Path> msPaths = msDirListing.getListing().stream()
+                .map(pm -> pm.getFileStatus().getPath()).collect(toSet());
+        final Set<Path> s3Paths = s3DirListing.stream()
+                .map(pm -> pm.getPath()).collect(toSet());
+        if (!s3Paths.equals(msPaths)) {
+          
cP.violations.add(Violation.AUTHORITATIVE_DIRECTORY_CONTENT_MISMATCH);
+        }
+      }
+
+      if (cP.containsViolation()) {
+        comparePairs.add(cP);
+      }
+    }
+  }
+
+  /**
+   * Compares S3 directory content to the metadata store.
+   *
+   * @param s3CurrentDir file status of the current directory
+   * @param children the contents of the directory
+   * @return the compare pairs with violations of consistency
+   * @throws IOException
+   */
+  protected List<ComparePair> compareS3DirContentToMs(
+      S3AFileStatus s3CurrentDir,
+      List<S3AFileStatus> children) throws IOException {
+    final Path path = s3CurrentDir.getPath();
+    final PathMetadata pathMetadata = metadataStore.get(path);
+    List<ComparePair> violationComparePairs = new ArrayList<>();
+
+    final ComparePair rootComparePair =
+        compareFileStatusToPathMetadata(s3CurrentDir, pathMetadata);
+    if (rootComparePair.containsViolation()) {
+      violationComparePairs.add(rootComparePair);
+    }
+
+    children.forEach(s3ChildMeta -> {
+      try {
+        final PathMetadata msChildMeta =
+            metadataStore.get(s3ChildMeta.getPath());
+        final ComparePair comparePair =
+            compareFileStatusToPathMetadata(s3ChildMeta, msChildMeta);
+        if (comparePair.containsViolation()) {
+          violationComparePairs.add(comparePair);
+        }
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+      }
+    });
+
+    return violationComparePairs;
+  }
+
+  /**
+   * Compares a {@link S3AFileStatus} from S3 to a {@link PathMetadata}
+   * from the metadata store. Finds violated invariants and consistency
+   * issues.
+   *
+   * @param s3FileStatus the file status from S3
+   * @param msPathMetadata the path metadata from metadatastore
+   * @return {@link ComparePair} with the found issues
+   * @throws IOException
+   */
+  protected ComparePair compareFileStatusToPathMetadata(
+      S3AFileStatus s3FileStatus,
+      PathMetadata msPathMetadata) throws IOException {
+    final Path path = s3FileStatus.getPath();
+
+    if (msPathMetadata != null) {
+      LOG.info("Path: {} - Length S3: {}, MS: {} " +
+              "- Etag S3: {}, MS: {} ",
+          path,
+          s3FileStatus.getLen(), msPathMetadata.getFileStatus().getLen(),
+          s3FileStatus.getETag(), msPathMetadata.getFileStatus().getETag());
+    } else {
+      LOG.info("Path: {} - Length S3: {} - Etag S3: {}, no record in MS.",
+              path, s3FileStatus.getLen(), s3FileStatus.getETag());
+    }
+
+    ComparePair comparePair = new ComparePair(s3FileStatus, msPathMetadata);
+
+    if (!path.equals(path(ROOT_PATH_STRING))) {
+      final Path parentPath = path.getParent();
+      final PathMetadata parentPm = metadataStore.get(parentPath);
+
+      if (parentPm == null) {
+        comparePair.violations.add(Violation.NO_PARENT_ENTRY);
+      } else {
+        if (!parentPm.getFileStatus().isDirectory()) {
+          comparePair.violations.add(Violation.PARENT_IS_A_FILE);
+        }
+        if (parentPm.isDeleted()) {
+          comparePair.violations.add(Violation.PARENT_TOMBSTONED);
+        }
+      }
+    } else {
+      LOG.debug("Entry is in the root directory, so there's no parent");
+    }
+
+    // If the msPathMetadata is null, we RETURN because
+    // there is no metadata compare with
+    if (msPathMetadata == null) {
+      comparePair.violations.add(Violation.NO_METADATA_ENTRY);
+      return comparePair;
+    }
+
+    final S3AFileStatus msFileStatus = msPathMetadata.getFileStatus();
+    if (s3FileStatus.isDirectory() && !msFileStatus.isDirectory()) {
+      comparePair.violations.add(Violation.DIR_IN_S3_FILE_IN_MS);
+    }
+    if (!s3FileStatus.isDirectory() && msFileStatus.isDirectory()) {
+      comparePair.violations.add(Violation.FILE_IN_S3_DIR_IN_MS);
+    }
+
+    if(msPathMetadata.isDeleted()) {
+      comparePair.violations.add(Violation.TOMBSTONED_IN_MS_NOT_DELETED_IN_S3);
+    }
+
+    /**
+     * Attribute check
+     */
+    if (s3FileStatus.getLen() != msFileStatus.getLen()) {
+      comparePair.violations.add(Violation.LENGTH_MISMATCH);
+    }
+
+    // ModTime should be in the accuracy range defined.
+    long modTimeDiff = Math.abs(
+        s3FileStatus.getModificationTime() - msFileStatus.getModificationTime()
+    );
+    if (modTimeDiff > MOD_TIME_RANGE) {
+      comparePair.violations.add(Violation.MOD_TIME_MISMATCH);
+    }
+
+    if(msPathMetadata.getFileStatus().getVersionId() == null
+        || s3FileStatus.getVersionId() == null ) {
+      LOG.debug("Missing versionIDs skipped. A HEAD request is "
+          + "required for each object to get the versionID.");
+    } else 
if(!s3FileStatus.getVersionId().equals(msFileStatus.getVersionId())) {
+      comparePair.violations.add(Violation.VERSIONID_MISMATCH);
+    }
+
+    // check etag only for files, and not directories
+    if (!s3FileStatus.isDirectory()) {
+      if (msPathMetadata.getFileStatus().getETag() == null) {
+        comparePair.violations.add(Violation.NO_ETAG);
+      } else if (s3FileStatus.getETag() != null &&
+          !s3FileStatus.getETag().equals(msFileStatus.getETag())) {
+        comparePair.violations.add(Violation.ETAG_MISMATCH);
+      }
+    }
+
+    return comparePair;
+  }
+
+  private Path path(String s) {
+    return rawFS.makeQualified(new Path(s));
+  }
+
+  /**
+   * A compare pair with the pair of metadata and the list of violations.
+   */
+  public static class ComparePair {
+    private final S3AFileStatus s3FileStatus;
+    private final PathMetadata msPathMetadata;
+
+    private final List<FileStatus> s3DirListing;
+    private final DirListingMetadata msDirListing;
+
+    private final Path path;
+
+    private final Set<Violation> violations = new HashSet<>();
+
+    ComparePair(S3AFileStatus status, PathMetadata pm) {
+      this.s3FileStatus = status;
+      this.msPathMetadata = pm;
+      this.s3DirListing = null;
+      this.msDirListing = null;
+      this.path = status.getPath();
+    }
+
+    ComparePair(List<FileStatus> s3DirListing, DirListingMetadata 
msDirListing) {
+      this.s3DirListing = s3DirListing;
+      this.msDirListing = msDirListing;
+      this.s3FileStatus = null;
+      this.msPathMetadata = null;
+      this.path = msDirListing.getPath();
+    }
+
+    public S3AFileStatus getS3FileStatus() {
+      return s3FileStatus;
+    }
+
+    public PathMetadata getMsPathMetadata() {
+      return msPathMetadata;
+    }
+
+    public Set<Violation> getViolations() {
+      return violations;
+    }
+
+    public boolean containsViolation() {
+      return !violations.isEmpty();
+    }
+
+    public DirListingMetadata getMsDirListing() {
+      return msDirListing;
+    }
+
+    public List<FileStatus> getS3DirListing() {
+      return s3DirListing;
+    }
+
+    public Path getPath() {
+      return path;
+    }
+
+    @Override public String toString() {
+      return "ComparePair{" + "s3FileStatus=" + s3FileStatus
+          + ", msPathMetadata=" + msPathMetadata + ", s3DirListing=" +
+          s3DirListing + ", msDirListing=" + msDirListing + ", path="
+          + path + ", violations=" + violations + '}';
+    }
+  }
+
+  /**
+   * Violation with severity and the handler.
+   * Defines the severity of the violation between 0-2
+   * where 0 is the most severe and 2 is the least severe.
+   */
+  public enum Violation {
+    /**
+     * No entry in metadatastore.
+     */
+    NO_METADATA_ENTRY(1,
+        S3GuardFsckViolationHandler.NoMetadataEntry.class),
+    /**
+     * A file or directory entry does not have a parent entry - excluding
+     * files and directories in the root.
+     */
+    NO_PARENT_ENTRY(0,
+        S3GuardFsckViolationHandler.NoParentEntry.class),
+    /**
+     * An entry’s parent is a file.
+     */
+    PARENT_IS_A_FILE(0,
+        S3GuardFsckViolationHandler.ParentIsAFile.class),
+    /**
+     * A file exists under a path for which there is a
+     * tombstone entry in the MS.
+     */
+    PARENT_TOMBSTONED(0,
+        S3GuardFsckViolationHandler.ParentTombstoned.class),
+    /**
+     * A directory in S3 is a file entry in the MS.
+     */
+    DIR_IN_S3_FILE_IN_MS(0,
+        S3GuardFsckViolationHandler.DirInS3FileInMs.class),
+    /**
+     * A file in S3 is a directory in the MS.
+     */
+    FILE_IN_S3_DIR_IN_MS(0,
+        S3GuardFsckViolationHandler.FileInS3DirInMs.class),
+    AUTHORITATIVE_DIRECTORY_CONTENT_MISMATCH(1,
+        S3GuardFsckViolationHandler.AuthDirContentMismatch.class),
+    /**
+     * An entry in the MS is tombstoned, but the object is not deleted on S3
+     */
+    TOMBSTONED_IN_MS_NOT_DELETED_IN_S3(0,
+        S3GuardFsckViolationHandler.TombstonedInMsNotDeletedInS3.class),
+    /**
+     * Attribute mismatch.
+     */
+    LENGTH_MISMATCH(0,
+        S3GuardFsckViolationHandler.LengthMismatch.class),
+    MOD_TIME_MISMATCH(2,
+        S3GuardFsckViolationHandler.ModTimeMismatch.class),
+    /**
+     * If there's a versionID the mismatch is severe.
+     */
+    VERSIONID_MISMATCH(0,
+        S3GuardFsckViolationHandler.VersionIdMismatch.class),
+    /**
+     * If there's an etag the mismatch is severe.
+     */
+    ETAG_MISMATCH(0,
+        S3GuardFsckViolationHandler.EtagMismatch.class),
+    /**
+     * Don't worry too much if we don't have an etag.
+     */
+    NO_ETAG(2,
+        S3GuardFsckViolationHandler.NoEtag.class);
+
+    private final int severity;
+    private final Class<? extends 
S3GuardFsckViolationHandler.ViolationHandler> handler;
+
+    Violation(int s,
+        Class<? extends S3GuardFsckViolationHandler.ViolationHandler> h) {
+      this.severity = s;
+      this.handler = h;
+    }
+
+    public int getSeverity() {
+      return severity;
+    }
+
+    public Class<? extends S3GuardFsckViolationHandler.ViolationHandler> 
getHandler() {
+      return handler;
+    }
+  }
+}
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardFsckViolationHandler.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardFsckViolationHandler.java
new file mode 100644
index 0000000..97e6fe6
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardFsckViolationHandler.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+
+/**
+ * Violation handler for the S3Guard's fsck.
+ */
+public class S3GuardFsckViolationHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      S3GuardFsckViolationHandler.class);
+
+  // The rawFS and metadataStore are here to prepare when the ViolationHandlers
+  // will not just log, but fix the violations, so they will have access.
+  private final S3AFileSystem rawFs;
+  private final DynamoDBMetadataStore metadataStore;
+
+  private static String newLine = System.getProperty("line.separator");
+
+  public S3GuardFsckViolationHandler(S3AFileSystem fs,
+      DynamoDBMetadataStore ddbms) {
+
+    this.metadataStore = ddbms;
+    this.rawFs = fs;
+  }
+
+  public void handle(S3GuardFsck.ComparePair comparePair) {
+    if (!comparePair.containsViolation()) {
+      LOG.debug("There is no violation in the compare pair: {}", comparePair);
+      return;
+    }
+
+    StringBuilder sB = new StringBuilder();
+    sB.append(newLine)
+        .append("On path: ").append(comparePair.getPath()).append(newLine);
+
+    handleComparePair(comparePair, sB);
+
+    LOG.error(sB.toString());
+  }
+
+  /**
+   * Create a new instance of the violation handler for all the violations
+   * found in the compare pair and use it.
+   *
+   * @param comparePair the compare pair with violations
+   * @param sB StringBuilder to append error strings from violations.
+   */
+  protected static void handleComparePair(S3GuardFsck.ComparePair comparePair,
+      StringBuilder sB) {
+
+    for (S3GuardFsck.Violation violation : comparePair.getViolations()) {
+      try {
+        ViolationHandler handler = violation.getHandler()
+            .getDeclaredConstructor(S3GuardFsck.ComparePair.class)
+            .newInstance(comparePair);
+        final String errorStr = handler.getError();
+        sB.append(errorStr);
+      } catch (NoSuchMethodException e) {
+        LOG.error("Can not find declared constructor for handler: {}",
+            violation.getHandler());
+      } catch (IllegalAccessException | InstantiationException | 
InvocationTargetException e) {
+        LOG.error("Can not instantiate handler: {}",
+            violation.getHandler());
+      }
+      sB.append(newLine);
+    }
+  }
+
+  /**
+   * Violation handler abstract class.
+   * This class should be extended for violation handlers.
+   */
+  public static abstract class ViolationHandler {
+    private final PathMetadata pathMetadata;
+    private final S3AFileStatus s3FileStatus;
+    private final S3AFileStatus msFileStatus;
+    private final List<FileStatus> s3DirListing;
+    private final DirListingMetadata msDirListing;
+
+    public ViolationHandler(S3GuardFsck.ComparePair comparePair) {
+      pathMetadata = comparePair.getMsPathMetadata();
+      s3FileStatus = comparePair.getS3FileStatus();
+      if (pathMetadata != null) {
+        msFileStatus = pathMetadata.getFileStatus();
+      } else {
+        msFileStatus = null;
+      }
+      s3DirListing = comparePair.getS3DirListing();
+      msDirListing = comparePair.getMsDirListing();
+    }
+
+    public abstract String getError();
+
+    public PathMetadata getPathMetadata() {
+      return pathMetadata;
+    }
+
+    public S3AFileStatus getS3FileStatus() {
+      return s3FileStatus;
+    }
+
+    public S3AFileStatus getMsFileStatus() {
+      return msFileStatus;
+    }
+
+    public List<FileStatus> getS3DirListing() {
+      return s3DirListing;
+    }
+
+    public DirListingMetadata getMsDirListing() {
+      return msDirListing;
+    }
+  }
+
+  /**
+   * The violation handler when there's no matching metadata entry in the MS.
+   */
+  public static class NoMetadataEntry extends ViolationHandler {
+
+    public NoMetadataEntry(S3GuardFsck.ComparePair comparePair) {
+      super(comparePair);
+    }
+
+    @Override
+    public String getError() {
+      return "No PathMetadata for this path in the MS.";
+    }
+  }
+
+  /**
+   * The violation handler when there's no parent entry.
+   */
+  public static class NoParentEntry extends ViolationHandler {
+
+    public NoParentEntry(S3GuardFsck.ComparePair comparePair) {
+      super(comparePair);
+    }
+
+    @Override
+    public String getError() {
+      return "Entry does not have a parent entry (not root)";
+    }
+  }
+
+  /**
+   * The violation handler when the parent of an entry is a file.
+   */
+  public static class ParentIsAFile extends ViolationHandler {
+
+    public ParentIsAFile(S3GuardFsck.ComparePair comparePair) {
+      super(comparePair);
+    }
+
+    @Override
+    public String getError() {
+      return "The entry's parent in the metastore database is a file.";
+    }
+  }
+
+  /**
+   * The violation handler when the parent of an entry is tombstoned.
+   */
+  public static class ParentTombstoned extends ViolationHandler {
+
+    public ParentTombstoned(S3GuardFsck.ComparePair comparePair) {
+      super(comparePair);
+    }
+
+    @Override
+    public String getError() {
+      return "The entry in the metastore database has a parent entry " +
+          "which is a tombstone marker";
+    }
+  }
+
+  /**
+   * The violation handler when there's a directory is a file metadata in MS.
+   */
+  public static class DirInS3FileInMs extends ViolationHandler {
+
+    public DirInS3FileInMs(S3GuardFsck.ComparePair comparePair) {
+      super(comparePair);
+    }
+
+    @Override
+    public String getError() {
+      return "A directory in S3 is a file entry in the MS";
+    }
+  }
+
+  /**
+   * The violation handler when a file metadata is a directory in MS.
+   */
+  public static class FileInS3DirInMs extends ViolationHandler {
+
+    public FileInS3DirInMs(S3GuardFsck.ComparePair comparePair) {
+      super(comparePair);
+    }
+
+    @Override
+    public String getError() {
+      return "A file in S3 is a directory entry in the MS";
+    }
+  }
+
+  /**
+   * The violation handler when there's a directory listing content mismatch.
+   */
+  public static class AuthDirContentMismatch extends ViolationHandler {
+
+    public AuthDirContentMismatch(S3GuardFsck.ComparePair comparePair) {
+      super(comparePair);
+    }
+
+    @Override
+    public String getError() {
+      final String str = String.format(
+          "The content of an authoritative directory listing does "
+              + "not match the content of the S3 listing. S3: %s, MS: %s",
+          Arrays.asList(getS3DirListing()), getMsDirListing().getListing());
+      return str;
+    }
+  }
+
+  /**
+   * The violation handler when there's a length mismatch.
+   */
+  public static class LengthMismatch extends ViolationHandler {
+
+    public LengthMismatch(S3GuardFsck.ComparePair comparePair) {
+      super(comparePair);
+    }
+
+    @Override public String getError() {
+      return String.format("File length mismatch - S3: %s, MS: %s",
+          getS3FileStatus().getLen(), getMsFileStatus().getLen());
+    }
+  }
+
+  /**
+   * The violation handler when there's a modtime mismatch.
+   */
+  public static class ModTimeMismatch extends ViolationHandler {
+
+    public ModTimeMismatch(S3GuardFsck.ComparePair comparePair) {
+      super(comparePair);
+    }
+
+    @Override
+    public String getError() {
+      return String.format("File timestamp mismatch - S3: %s, MS: %s",
+          getS3FileStatus().getModificationTime(),
+          getMsFileStatus().getModificationTime());
+    }
+  }
+
+  /**
+   * The violation handler when there's a version id mismatch.
+   */
+  public static class VersionIdMismatch extends ViolationHandler {
+
+    public VersionIdMismatch(S3GuardFsck.ComparePair comparePair) {
+      super(comparePair);
+    }
+
+    @Override
+    public String getError() {
+      return String.format("getVersionId mismatch - S3: %s, MS: %s",
+          getS3FileStatus().getVersionId(), getMsFileStatus().getVersionId());
+    }
+  }
+
+  /**
+   * The violation handler when there's an etag mismatch.
+   */
+  public static class EtagMismatch extends ViolationHandler {
+
+    public EtagMismatch(S3GuardFsck.ComparePair comparePair) {
+      super(comparePair);
+    }
+
+    @Override
+    public String getError() {
+      return String.format("Etag mismatch - S3: %s, MS: %s",
+        getS3FileStatus().getETag(), getMsFileStatus().getETag());
+    }
+  }
+
+  /**
+   * The violation handler when there's no etag.
+   */
+  public static class NoEtag extends ViolationHandler {
+
+    public NoEtag(S3GuardFsck.ComparePair comparePair) {
+      super(comparePair);
+    }
+
+    @Override
+    public String getError() {
+      return "No etag.";
+    }
+  }
+
+  /**
+   * The violation handler when there's a tombstoned entry in the ms is
+   * present, but the object is not deleted in S3.
+   */
+  public static class TombstonedInMsNotDeletedInS3 extends ViolationHandler {
+
+    public TombstonedInMsNotDeletedInS3(S3GuardFsck.ComparePair comparePair) {
+      super(comparePair);
+    }
+
+    @Override
+    public String getError() {
+      return "The entry for the path is tombstoned in the MS.";
+    }
+  }
+}
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
index 492c566..25a0cb0 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
@@ -94,7 +94,8 @@ public abstract class S3GuardTool extends Configured 
implements Tool {
       "\t" + Diff.NAME + " - " + Diff.PURPOSE + "\n" +
       "\t" + Prune.NAME + " - " + Prune.PURPOSE + "\n" +
       "\t" + SetCapacity.NAME + " - " + SetCapacity.PURPOSE + "\n" +
-      "\t" + SelectTool.NAME + " - " + SelectTool.PURPOSE + "\n";
+      "\t" + SelectTool.NAME + " - " + SelectTool.PURPOSE + "\n" +
+      "\t" + Fsck.NAME + " - " + Fsck.PURPOSE + "\n";
   private static final String DATA_IN_S3_IS_PRESERVED
       = "(all data in S3 is preserved)";
 
@@ -1485,6 +1486,97 @@ public abstract class S3GuardTool extends Configured 
implements Tool {
     }
   }
 
+  /**
+   * Fsck - check for consistency between S3 and the metadatastore.
+   */
+  static class Fsck extends S3GuardTool {
+    public static final String CHECK_FLAG = "check";
+
+    public static final String NAME = "fsck";
+    public static final String PURPOSE = "Compares S3 with MetadataStore, and "
+        + "returns a failure status if any rules or invariants are violated. "
+        + "Only works with DynamoDB metadata stores.";
+    private static final String USAGE = NAME + " [OPTIONS] [s3a://BUCKET]\n" +
+        "\t" + PURPOSE + "\n\n" +
+        "Common options:\n" +
+        "  -" + CHECK_FLAG + " Check the metadata store for errors, but do "
+        + "not fix any issues.\n";
+
+    Fsck(Configuration conf) {
+      super(conf, CHECK_FLAG);
+    }
+
+    @Override
+    public String getName() {
+      return NAME;
+    }
+
+    @Override
+    public String getUsage() {
+      return USAGE;
+    }
+
+    public int run(String[] args, PrintStream out) throws
+        InterruptedException, IOException {
+      List<String> paths = parseArgs(args);
+      if (paths.isEmpty()) {
+        out.println(USAGE);
+        throw invalidArgs("no arguments");
+      }
+      int exitValue = EXIT_SUCCESS;
+
+      String s3Path = paths.get(0);
+      try {
+        initS3AFileSystem(s3Path);
+      } catch (Exception e) {
+        errorln("Failed to initialize S3AFileSystem from path: " + s3Path);
+        throw e;
+      }
+
+      URI uri = toUri(s3Path);
+      Path root;
+      if (uri.getPath().isEmpty()) {
+        root = new Path("/");
+      } else {
+        root = new Path(uri.getPath());
+      }
+
+      final S3AFileSystem fs = getFilesystem();
+      initMetadataStore(false);
+      final MetadataStore ms = getStore();
+
+      if (ms == null ||
+          !(ms instanceof DynamoDBMetadataStore)) {
+        errorln(s3Path + " path uses MS: " + ms);
+        errorln(NAME + " can be only used with a DynamoDB backed s3a bucket.");
+        errorln(USAGE);
+        return ERROR;
+      }
+
+      final CommandFormat commandFormat = getCommandFormat();
+      if (commandFormat.getOpt(CHECK_FLAG)) {
+        // do the check
+        S3GuardFsck s3GuardFsck = new S3GuardFsck(fs, ms);
+        try {
+          final List<S3GuardFsck.ComparePair> comparePairs
+              = s3GuardFsck.compareS3ToMs(fs.qualify(root));
+          if (comparePairs.size() > 0) {
+            exitValue = EXIT_FAIL;
+          }
+        } catch (IOException e) {
+          throw e;
+        }
+      } else {
+        errorln("No supported operation is selected.");
+        errorln(USAGE);
+        return ERROR;
+      }
+
+      out.flush();
+      return exitValue;
+    }
+  }
+
   private static S3GuardTool command;
 
   /**
@@ -1664,6 +1756,9 @@ public abstract class S3GuardTool extends Configured 
implements Tool {
       // because this is the defacto S3 CLI.
       command = new SelectTool(conf);
       break;
+    case Fsck.NAME:
+      command = new Fsck(conf);
+      break;
     default:
       printHelp();
       throw new ExitUtil.ExitException(E_USAGE,
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java
index c9d083e..9e6e232 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java
@@ -61,6 +61,11 @@ import static 
org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
 import static org.apache.hadoop.fs.s3a.Constants.RETRY_INTERVAL;
 import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
 import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.PROBE_INTERVAL_MILLIS;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.STABILIZATION_TIME;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.TIMESTAMP_SLEEP;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.awaitDeletedFileDisappearance;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.awaitFileStatus;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.checkListingContainsPath;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.checkListingDoesNotContainPath;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.metadataStorePersistsAuthoritativeBit;
@@ -115,12 +120,6 @@ import static org.mockito.Mockito.when;
 @RunWith(Parameterized.class)
 public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
 
-  public static final int TIMESTAMP_SLEEP = 2000;
-
-  public static final int STABILIZATION_TIME = 20_000;
-
-  public static final int PROBE_INTERVAL_MILLIS = 2500;
-
   private S3AFileSystem guardedFs;
   private S3AFileSystem rawFS;
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 1a6de9e..e6f32af 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -55,6 +55,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -72,6 +73,7 @@ import static 
org.apache.hadoop.fs.s3a.FailureInjectionPolicy.*;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
+import static org.apache.hadoop.test.LambdaTestUtils.eventually;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
 import static org.junit.Assert.*;
@@ -92,6 +94,10 @@ public final class S3ATestUtils {
   public static final String UNSET_PROPERTY = "unset";
   public static final int PURGE_DELAY_SECONDS = 60 * 60;
 
+  public static final int TIMESTAMP_SLEEP = 2000;
+  public static final int STABILIZATION_TIME = 20_000;
+  public static final int PROBE_INTERVAL_MILLIS = 500;
+
   /** Add any deprecated keys. */
   @SuppressWarnings("deprecation")
   private static void addDeprecatedKeys() {
@@ -1309,4 +1315,32 @@ public final class S3ATestUtils {
           listStatusHasIt);
   }
 
+  /**
+   * Wait for a deleted file to no longer be visible.
+   * @param fs filesystem
+   * @param testFilePath path to query
+   * @throws Exception failure
+   */
+  public static void awaitDeletedFileDisappearance(final S3AFileSystem fs,
+      final Path testFilePath) throws Exception {
+    eventually(
+        STABILIZATION_TIME, PROBE_INTERVAL_MILLIS,
+        () -> intercept(FileNotFoundException.class,
+            () -> fs.getFileStatus(testFilePath)));
+  }
+
+  /**
+   * Wait for a file to be visible.
+   * @param fs filesystem
+   * @param testFilePath path to query
+   * @return the file status.
+   * @throws Exception failure
+   */
+  public static S3AFileStatus awaitFileStatus(S3AFileSystem fs,
+      final Path testFilePath)
+      throws Exception {
+    return (S3AFileStatus) eventually(
+        STABILIZATION_TIME, PROBE_INTERVAL_MILLIS,
+        () -> fs.getFileStatus(testFilePath));
+  }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardFsck.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardFsck.java
new file mode 100644
index 0000000..ea92f69
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardFsck.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.UUID;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+
+import static org.apache.hadoop.fs.s3a.Constants.AUTHORITATIVE_PATH;
+import static org.junit.Assume.assumeTrue;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
+import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.awaitFileStatus;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.metadataStorePersistsAuthoritativeBit;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+
+/**
+ * Integration tests for the S3Guard Fsck against a dyamodb backed metadata
+ * store.
+ */
+public class ITestS3GuardFsck extends AbstractS3ATestBase {
+
+  private S3AFileSystem guardedFs;
+  private S3AFileSystem rawFs;
+
+  private MetadataStore metadataStore;
+
+  @Before
+  public void setup() throws Exception {
+    super.setup();
+    S3AFileSystem fs = getFileSystem();
+    // These test will fail if no ms
+    assumeTrue("FS needs to have a metadatastore.",
+        fs.hasMetadataStore());
+    assumeTrue("Metadatastore should persist authoritative bit",
+        metadataStorePersistsAuthoritativeBit(fs.getMetadataStore()));
+
+    guardedFs = fs;
+    metadataStore = fs.getMetadataStore();
+
+    // create raw fs without s3guard
+    rawFs = createUnguardedFS();
+    assertFalse("Raw FS still has S3Guard " + rawFs,
+        rawFs.hasMetadataStore());
+  }
+
+  @Override
+  public void teardown() throws Exception {
+    if (guardedFs != null) {
+      IOUtils.cleanupWithLogger(LOG, guardedFs);
+    }
+    IOUtils.cleanupWithLogger(LOG, rawFs);
+    super.teardown();
+  }
+
+  /**
+   * Create a test filesystem which is always unguarded.
+   * This filesystem MUST be closed in test teardown.
+   * @return the new FS
+   */
+  private S3AFileSystem createUnguardedFS() throws Exception {
+    S3AFileSystem testFS = getFileSystem();
+    Configuration config = new Configuration(testFS.getConf());
+    URI uri = testFS.getUri();
+
+    removeBaseAndBucketOverrides(uri.getHost(), config,
+        S3_METADATA_STORE_IMPL, METADATASTORE_AUTHORITATIVE,
+        AUTHORITATIVE_PATH);
+    S3AFileSystem fs2 = new S3AFileSystem();
+    fs2.initialize(uri, config);
+    return fs2;
+  }
+
+  @Test
+  public void testIDetectNoMetadataEntry() throws Exception {
+    final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
+    final Path file = new Path(cwd, "file");
+    try {
+      touchRawAndWaitRaw(file);
+
+      final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
+      final List<S3GuardFsck.ComparePair> comparePairs =
+          s3GuardFsck.compareS3ToMs(cwd);
+
+      assertComparePairsSize(comparePairs, 2);
+      final S3GuardFsck.ComparePair pair = comparePairs.get(0);
+      checkForViolationInPairs(file, comparePairs,
+          S3GuardFsck.Violation.NO_METADATA_ENTRY);
+    } finally {
+      // delete the working directory with all of its contents
+      cleanup(file, cwd);
+    }
+  }
+
+  @Test
+  public void testIDetectNoParentEntry() throws Exception {
+    final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
+    final Path file = new Path(cwd, "file");
+    try {
+      touchGuardedAndWaitRaw(file);
+      // delete the parent from the MS
+      metadataStore.forgetMetadata(cwd);
+
+      final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
+      final List<S3GuardFsck.ComparePair> comparePairs =
+          s3GuardFsck.compareS3ToMs(cwd);
+
+      assertComparePairsSize(comparePairs, 2);
+      // check the parent that it does not exist
+      checkForViolationInPairs(cwd, comparePairs,
+          S3GuardFsck.Violation.NO_METADATA_ENTRY);
+      // check the child that there's no parent entry.
+      checkForViolationInPairs(file, comparePairs,
+          S3GuardFsck.Violation.NO_PARENT_ENTRY);
+    } finally {
+      cleanup(file, cwd);
+    }
+  }
+
+  @Test
+  public void testIDetectParentIsAFile() throws Exception {
+    final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
+    final Path file = new Path(cwd, "file");
+    try {
+      touchGuardedAndWaitRaw(file);
+      // modify the cwd metadata and set that it's not a directory
+      final S3AFileStatus newParentFile = MetadataStoreTestBase
+          .basicFileStatus(cwd, 1, false, 1);
+      metadataStore.put(new PathMetadata(newParentFile));
+
+      final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
+      final List<S3GuardFsck.ComparePair> comparePairs =
+          s3GuardFsck.compareS3ToMs(cwd);
+
+      assertComparePairsSize(comparePairs, 2);
+      // check the parent that it does not exist
+      checkForViolationInPairs(cwd, comparePairs,
+          S3GuardFsck.Violation.DIR_IN_S3_FILE_IN_MS);
+      // check the child that the parent is a file.
+      checkForViolationInPairs(file, comparePairs,
+          S3GuardFsck.Violation.PARENT_IS_A_FILE);
+    } finally {
+      cleanup(file, cwd);
+    }
+  }
+
+  @Test
+  public void testIDetectParentTombstoned() throws Exception {
+    final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
+    final Path file = new Path(cwd, "file");
+    try {
+      touchGuardedAndWaitRaw(file);
+      // modify the parent metadata and set that it's not a directory
+      final PathMetadata cwdPmd = metadataStore.get(cwd);
+      cwdPmd.setIsDeleted(true);
+      metadataStore.put(cwdPmd);
+
+      final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
+      final List<S3GuardFsck.ComparePair> comparePairs =
+          s3GuardFsck.compareS3ToMs(cwd);
+
+      // check the child that the parent is tombstoned
+      checkForViolationInPairs(file, comparePairs,
+          S3GuardFsck.Violation.PARENT_TOMBSTONED);
+    } finally {
+      cleanup(file, cwd);
+    }
+  }
+
+  @Test
+  public void testIDetectDirInS3FileInMs() throws Exception {
+    final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
+    try {
+      // create a file with guarded fs
+      mkdirs(cwd);
+      awaitFileStatus(guardedFs, cwd);
+      // modify the cwd metadata and set that it's not a directory
+      final S3AFileStatus newParentFile = MetadataStoreTestBase
+          .basicFileStatus(cwd, 1, false, 1);
+      metadataStore.put(new PathMetadata(newParentFile));
+
+      final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
+      final List<S3GuardFsck.ComparePair> comparePairs =
+          s3GuardFsck.compareS3ToMs(cwd);
+      assertComparePairsSize(comparePairs, 1);
+
+      // check the child that the dir in s3 is a file in the ms
+      checkForViolationInPairs(cwd, comparePairs,
+          S3GuardFsck.Violation.DIR_IN_S3_FILE_IN_MS);
+    } finally {
+      cleanup(cwd);
+    }
+  }
+
+  @Test
+  public void testIDetectFileInS3DirInMs() throws Exception {
+    final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
+    final Path file = new Path(cwd, "file");
+    try {
+      touchGuardedAndWaitRaw(file);
+      // modify the cwd metadata and set that it's not a directory
+      final S3AFileStatus newFile = MetadataStoreTestBase
+          .basicFileStatus(file, 1, true, 1);
+      metadataStore.put(new PathMetadata(newFile));
+
+      final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
+      final List<S3GuardFsck.ComparePair> comparePairs =
+          s3GuardFsck.compareS3ToMs(cwd);
+
+      assertComparePairsSize(comparePairs, 1);
+      // check the child that the dir in s3 is a file in the ms
+      checkForViolationInPairs(file, comparePairs,
+          S3GuardFsck.Violation.FILE_IN_S3_DIR_IN_MS);
+    } finally {
+      cleanup(file, cwd);
+    }
+  }
+
+  @Test
+  public void testIAuthoritativeDirectoryContentMismatch() throws Exception {
+    assumeTrue("Authoritative directory listings should be enabled for this "
+            + "test", guardedFs.hasAuthoritativeMetadataStore());
+    // first dir listing will be correct
+    final Path cwdCorrect = path("/" + getMethodName() + "-" + 
UUID.randomUUID());
+    final Path fileC1 = new Path(cwdCorrect, "fileC1");
+    final Path fileC2 = new Path(cwdCorrect, "fileC2");
+
+    // second dir listing will be incorrect: missing entry from Dynamo
+    final Path cwdIncorrect = path("/" + getMethodName() + "-" + 
UUID.randomUUID());
+    final Path fileIc1 = new Path(cwdIncorrect, "fileC1");
+    final Path fileIc2 = new Path(cwdIncorrect, "fileC2");
+    try {
+      touchGuardedAndWaitRaw(fileC1);
+      touchGuardedAndWaitRaw(fileC2);
+      touchGuardedAndWaitRaw(fileIc1);
+
+      // get listing from ms and set it authoritative
+      final DirListingMetadata dlmC = metadataStore.listChildren(cwdCorrect);
+      final DirListingMetadata dlmIc = 
metadataStore.listChildren(cwdIncorrect);
+      dlmC.setAuthoritative(true);
+      dlmIc.setAuthoritative(true);
+      metadataStore.put(dlmC, null);
+      metadataStore.put(dlmIc, null);
+
+      // add a file raw so the listing will be different.
+      touchRawAndWaitRaw(fileIc2);
+
+      final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
+      final List<S3GuardFsck.ComparePair> pairsCorrect =
+          s3GuardFsck.compareS3ToMs(cwdCorrect);
+      final List<S3GuardFsck.ComparePair> pairsIncorrect =
+          s3GuardFsck.compareS3ToMs(cwdIncorrect);
+
+      // Assert that the correct dir does not contain the violation.
+      assertTrue(pairsCorrect.stream()
+          .noneMatch(p -> p.getPath().equals(cwdCorrect)));
+
+      // Assert that the incorrect listing contains the violation.
+      checkForViolationInPairs(cwdIncorrect, pairsIncorrect,
+          S3GuardFsck.Violation.AUTHORITATIVE_DIRECTORY_CONTENT_MISMATCH);
+    } finally {
+      cleanup(fileC1, fileC2, fileIc1, fileIc2, cwdCorrect, cwdIncorrect);
+    }
+  }
+
+  @Test
+  public void testIDetectLengthMismatch() throws Exception {
+    final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
+    final Path file = new Path(cwd, "file");
+    try {
+      // create a file with guarded fs
+      touchGuardedAndWaitRaw(file);
+
+      // modify the file metadata so the length will not match
+      final S3AFileStatus newFile = MetadataStoreTestBase
+          .basicFileStatus(file, 9999, false, 1);
+      metadataStore.put(new PathMetadata(newFile));
+
+      final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
+      final List<S3GuardFsck.ComparePair> comparePairs =
+          s3GuardFsck.compareS3ToMs(cwd);
+
+      assertComparePairsSize(comparePairs, 1);
+      // Assert that the correct dir does not contain the violation.
+      assertTrue(comparePairs.stream()
+          .noneMatch(p -> p.getPath().equals(cwd)));
+      // Assert that the incorrect file meta contains the violation.
+      checkForViolationInPairs(file, comparePairs,
+          S3GuardFsck.Violation.LENGTH_MISMATCH);
+    } finally {
+      cleanup(file, cwd);
+    }
+  }
+
+  @Test
+  public void testIDetectModTimeMismatch() throws Exception {
+    final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
+    final Path file = new Path(cwd, "file");
+    try {
+      // create a file with guarded fs
+      touchGuardedAndWaitRaw(file);
+      // modify the parent meta entry so the MOD_TIME will surely be up to date
+      final FileStatus oldCwdFileStatus = rawFs.getFileStatus(cwd);
+      final S3AFileStatus newCwdFileStatus = MetadataStoreTestBase
+          .basicFileStatus(cwd, 0, true,
+              oldCwdFileStatus.getModificationTime());
+      metadataStore.put(new PathMetadata(newCwdFileStatus));
+
+      // modify the file metadata so the length will not match
+      final S3AFileStatus newFileStatus = MetadataStoreTestBase
+          .basicFileStatus(file, 0, false, 1);
+      metadataStore.put(new PathMetadata(newFileStatus));
+
+      final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
+      final List<S3GuardFsck.ComparePair> comparePairs =
+          s3GuardFsck.compareS3ToMs(cwd);
+
+      assertComparePairsSize(comparePairs, 1);
+      // Assert that the correct dir does not contain the violation.
+      assertTrue(comparePairs.stream()
+          .noneMatch(p -> p.getPath().equals(cwd)));
+      // check the file meta that there's a violation.
+      checkForViolationInPairs(file, comparePairs,
+          S3GuardFsck.Violation.MOD_TIME_MISMATCH);
+    } finally {
+      cleanup(file, cwd);
+    }
+  }
+
+  @Test
+  public void testIEtagMismatch() throws Exception {
+    final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
+    final Path file = new Path(cwd, "file");
+    try {
+      touchGuardedAndWaitRaw(file);
+      // modify the file metadata so the etag will not match
+      final S3AFileStatus newFileStatus = new S3AFileStatus(1, 1, file, 1, "",
+          "etag", "versionId");
+      metadataStore.put(new PathMetadata(newFileStatus));
+
+      final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
+      final List<S3GuardFsck.ComparePair> comparePairs =
+          s3GuardFsck.compareS3ToMs(cwd);
+
+      assertComparePairsSize(comparePairs, 1);
+      // check the child that there's a BLOCKSIZE_MISMATCH
+      checkForViolationInPairs(file, comparePairs,
+          S3GuardFsck.Violation.ETAG_MISMATCH);
+    } finally {
+      cleanup(file, cwd);
+    }
+  }
+
+  @Test
+  public void testINoEtag() throws Exception {
+    final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
+    final Path file1 = new Path(cwd, "file1");
+    final Path file2 = new Path(cwd, "file2");
+    try {
+      // create a file1 with guarded fs
+      touchGuardedAndWaitRaw(file1);
+      touchGuardedAndWaitRaw(file2);
+      // modify the file1 metadata so there's no etag
+      final S3AFileStatus newFile1Status =
+          new S3AFileStatus(1, 1, file1, 1, "", null, "versionId");
+      final S3AFileStatus newFile2Status =
+          new S3AFileStatus(1, 1, file2, 1, "", "etag", "versionId");
+      metadataStore.put(new PathMetadata(newFile1Status));
+      metadataStore.put(new PathMetadata(newFile2Status));
+
+      final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
+      final List<S3GuardFsck.ComparePair> comparePairs =
+          s3GuardFsck.compareS3ToMs(cwd);
+
+      assertComparePairsSize(comparePairs, 2);
+
+      // check file 1 that there's NO_ETAG
+      checkForViolationInPairs(file1, comparePairs,
+          S3GuardFsck.Violation.NO_ETAG);
+      // check the child that there's no NO_ETAG violation
+      checkNoViolationInPairs(file2, comparePairs,
+          S3GuardFsck.Violation.NO_ETAG);
+    } finally {
+      cleanup(file1, file2, cwd);
+    }
+  }
+
+  @Test
+  public void testTombstonedInMsNotDeletedInS3() throws Exception {
+    final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
+    final Path file = new Path(cwd, "file");
+    try {
+      // create a file with guarded fs
+      touchGuardedAndWaitRaw(file);
+      // set isDeleted flag in ms to true (tombstone item)
+      final PathMetadata fileMeta = metadataStore.get(file);
+      fileMeta.setIsDeleted(true);
+      metadataStore.put(fileMeta);
+
+      final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
+      final List<S3GuardFsck.ComparePair> comparePairs =
+          s3GuardFsck.compareS3ToMs(cwd);
+
+      assertComparePairsSize(comparePairs, 1);
+
+      // check fil1 that there's the violation
+      checkForViolationInPairs(file, comparePairs,
+          S3GuardFsck.Violation.TOMBSTONED_IN_MS_NOT_DELETED_IN_S3);
+      // check the child that there's no NO_ETAG violation
+    } finally {
+      cleanup(file, cwd);
+    }
+  }
+
+  protected void assertComparePairsSize(
+      List<S3GuardFsck.ComparePair> comparePairs, int num) {
+    Assertions.assertThat(comparePairs)
+        .describedAs("Number of compare pairs")
+        .hasSize(num);
+  }
+
+  private void touchGuardedAndWaitRaw(Path file) throws Exception {
+    touchAndWait(guardedFs, rawFs, file);
+  }
+
+  private void touchRawAndWaitRaw(Path file) throws Exception {
+    touchAndWait(rawFs, rawFs, file);
+  }
+
+  private void touchAndWait(FileSystem forTouch, FileSystem forWait, Path file)
+      throws IOException {
+    touch(forTouch, file);
+    touch(forWait, file);
+  }
+
+  private void checkForViolationInPairs(Path file,
+      List<S3GuardFsck.ComparePair> comparePairs,
+      S3GuardFsck.Violation violation) {
+    final S3GuardFsck.ComparePair childPair = comparePairs.stream()
+        .filter(p -> p.getPath().equals(file))
+        .findFirst().get();
+    assertNotNull("The pair should not be null.", childPair);
+    assertTrue("The pair must contain a violation.",
+        childPair.containsViolation());
+    Assertions.assertThat(childPair.getViolations())
+        .describedAs("Violations in the pair")
+        .contains(violation);
+  }
+
+  private void checkNoViolationInPairs(Path file2,
+      List<S3GuardFsck.ComparePair> comparePairs,
+      S3GuardFsck.Violation violation) {
+    final S3GuardFsck.ComparePair file2Pair = comparePairs.stream()
+        .filter(p -> p.getPath().equals(file2))
+        .findFirst().get();
+    assertNotNull("The pair should not be null.", file2Pair);
+    Assertions.assertThat(file2Pair.getViolations())
+        .describedAs("Violations in the pair")
+        .doesNotContain(violation);
+  }
+
+  private void cleanup(Path... paths) {
+    for (Path path : paths) {
+      try {
+        metadataStore.forgetMetadata(path);
+        rawFs.delete(path, true);
+      } catch (IOException e) {
+        LOG.error("Error during cleanup.", e);
+      }
+    }
+  }
+}
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java
index 9e0a08b..205eb65 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.s3a.s3guard;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
@@ -32,6 +33,7 @@ import 
com.amazonaws.services.dynamodbv2.model.ListTagsOfResourceRequest;
 import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
 import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
 import com.amazonaws.services.dynamodbv2.model.Tag;
+
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.AssumptionViolatedException;
@@ -42,6 +44,7 @@ import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Destroy;
 import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Init;
+import org.apache.hadoop.util.ExitUtil;
 
 import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_REGION_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_NAME_KEY;
@@ -289,4 +292,31 @@ public class ITestS3GuardToolDynamoDB extends 
AbstractS3GuardToolTestBase {
         "-meta", "dynamodb://" + getTestTableName(DYNAMODB_TABLE));
   }
 
+  @Test
+  public void testCLIFsckWithoutParam() throws Exception {
+    intercept(ExitUtil.ExitException.class, () -> run(Fsck.NAME));
+  }
+
+  @Test
+  public void testCLIFsckWithParam() throws Exception {
+    final int result = run(S3GuardTool.Fsck.NAME, "-check",
+        "s3a://" + getFileSystem().getBucket());
+    LOG.info("This test serves the purpose to run fsck with the correct " +
+        "parameters, so there will be no exception thrown. " +
+        "The return value of the run: {}", result);
+  }
+
+  @Test
+  public void testCLIFsckWithParamParentOfRoot() throws Exception {
+    intercept(IOException.class, "Invalid URI",
+        () -> run(S3GuardTool.Fsck.NAME, "-check",
+            "s3a://" + getFileSystem().getBucket() + "/.."));
+  }
+
+  @Test
+  public void testCLIFsckFailInitializeFs() throws Exception {
+    intercept(FileNotFoundException.class, "does not exist",
+        () -> run(S3GuardTool.Fsck.NAME, "-check",
+            "s3a://this-bucket-does-not-exist-" + UUID.randomUUID()));
+  }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
index 198a2de..e65ff8a 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
@@ -1199,7 +1199,7 @@ public abstract class MetadataStoreTestBase extends 
HadoopTestBase {
     return basicFileStatus(path, size, isDir, modTime);
   }
 
-  protected S3AFileStatus basicFileStatus(int size, boolean isDir,
+  public static S3AFileStatus basicFileStatus(int size, boolean isDir,
       long blockSize, long modificationTime, Path path) {
     if (isDir) {
       return new S3AFileStatus(Tristate.UNKNOWN, path, null);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to