This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 43b0c0b  HDFS-15607. Create trash dir when allowing snapshottable dir 
(#2352)
43b0c0b is described below

commit 43b0c0b0546d79a1ab4df5fe02310d007f6bc178
Author: Siyao Meng <50227127+smen...@users.noreply.github.com>
AuthorDate: Mon Oct 5 05:02:00 2020 -0700

    HDFS-15607. Create trash dir when allowing snapshottable dir (#2352)
---
 .../apache/hadoop/hdfs/DistributedFileSystem.java  | 132 +++++++++++++++++++--
 .../hadoop/hdfs/ViewDistributedFileSystem.java     |  14 +++
 .../org/apache/hadoop/hdfs/client/HdfsAdmin.java   |  23 +++-
 .../org/apache/hadoop/hdfs/tools/DFSAdmin.java     |  46 ++++++-
 .../hadoop/hdfs/TestDistributedFileSystem.java     |  17 ++-
 .../org/apache/hadoop/hdfs/tools/TestDFSAdmin.java |  55 +++++++++
 6 files changed, 272 insertions(+), 15 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index f4fb621..9b08434 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -2032,6 +2032,19 @@ public class DistributedFileSystem extends FileSystem
     return setSafeMode(SafeModeAction.SAFEMODE_GET, true);
   }
 
+  /**
+   * HDFS only.
+   *
+   * Returns if the NameNode enabled the snapshot trash root configuration
+   * dfs.namenode.snapshot.trashroot.enabled
+   * @return true if NameNode enabled snapshot trash root
+   * @throws IOException
+   *           when there is an issue communicating with the NameNode
+   */
+  public boolean isSnapshotTrashRootEnabled() throws IOException {
+    return dfs.isSnapshotTrashRootEnabled();
+  }
+
   /** @see org.apache.hadoop.hdfs.client.HdfsAdmin#allowSnapshot(Path) */
   public void allowSnapshot(final Path path) throws IOException {
     statistics.incrementWriteOps(1);
@@ -2068,12 +2081,7 @@ public class DistributedFileSystem extends FileSystem
     new FileSystemLinkResolver<Void>() {
       @Override
       public Void doCall(final Path p) throws IOException {
-        String ssTrashRoot =
-            new Path(p, FileSystem.TRASH_PREFIX).toUri().getPath();
-        if (dfs.exists(ssTrashRoot)) {
-          throw new IOException("Found trash root under path " + p + ". "
-              + "Please remove or move the trash root and then try again.");
-        }
+        checkTrashRootAndRemoveIfEmpty(p);
         dfs.disallowSnapshot(getPathName(p));
         return null;
       }
@@ -2083,6 +2091,7 @@ public class DistributedFileSystem extends FileSystem
           throws IOException {
         if (fs instanceof DistributedFileSystem) {
           DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+          myDfs.checkTrashRootAndRemoveIfEmpty(p);
           myDfs.disallowSnapshot(p);
         } else {
           throw new UnsupportedOperationException("Cannot perform snapshot"
@@ -2094,6 +2103,41 @@ public class DistributedFileSystem extends FileSystem
     }.resolve(this, absF);
   }
 
+  /**
+   * Helper function to check if a trash root exists in the given directory,
+   * remove the trash root if it is empty, or throw IOException if not empty
+   * @param p Path to a directory.
+   */
+  private void checkTrashRootAndRemoveIfEmpty(final Path p) throws IOException 
{
+    Path trashRoot = new Path(p, FileSystem.TRASH_PREFIX);
+    try {
+      // listStatus has 4 possible outcomes here:
+      // 1) throws FileNotFoundException: the trash root doesn't exist.
+      // 2) returns empty array: the trash path is an empty directory.
+      // 3) returns non-empty array, len >= 2: the trash root is not empty.
+      // 4) returns non-empty array, len == 1:
+      //    i) if the element's path is exactly p, the trash path is not a dir.
+      //       e.g. a file named .Trash. Ignore.
+      //   ii) if the element's path isn't p, the trash root is not empty.
+      FileStatus[] fileStatuses = listStatus(trashRoot);
+      if (fileStatuses.length == 0) {
+        DFSClient.LOG.debug("Removing empty trash root {}", trashRoot);
+        delete(trashRoot, false);
+      } else {
+        if (fileStatuses.length == 1
+            && !fileStatuses[0].isDirectory()
+            && !fileStatuses[0].getPath().equals(p)) {
+          // Ignore the trash path because it is not a directory.
+          DFSClient.LOG.warn("{} is not a directory.", trashRoot);
+        } else {
+          throw new IOException("Found non-empty trash root at " +
+              trashRoot + ". Rename or delete it, then try again.");
+        }
+      }
+    } catch (FileNotFoundException ignored) {
+    }
+  }
+
   @Override
   public Path createSnapshot(final Path path, final String snapshotName)
       throws IOException {
@@ -2901,6 +2945,80 @@ public class DistributedFileSystem extends FileSystem
     setPermission(trashPath, trashPermission);
   }
 
+  /**
+   * HDFS only.
+   *
+   * Provision snapshottable directory trash.
+   * @param path Path to a snapshottable directory.
+   * @param trashPermission Expected FsPermission of the trash root.
+   * @return Path of the provisioned trash root
+   */
+  public Path provisionSnapshotTrash(final Path path,
+      final FsPermission trashPermission) throws IOException {
+    Path absF = fixRelativePart(path);
+    return new FileSystemLinkResolver<Path>() {
+      @Override
+      public Path doCall(Path p) throws IOException {
+        return provisionSnapshotTrash(getPathName(p), trashPermission);
+      }
+
+      @Override
+      public Path next(FileSystem fs, Path p) throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+          return myDfs.provisionSnapshotTrash(p, trashPermission);
+        }
+        throw new UnsupportedOperationException(
+            "Cannot provisionSnapshotTrash through a symlink to" +
+            " a non-DistributedFileSystem: " + fs + " -> " + p);
+      }
+    }.resolve(this, absF);
+  }
+
+  private Path provisionSnapshotTrash(
+      String pathStr, FsPermission trashPermission) throws IOException {
+    Path path = new Path(pathStr);
+    // Given path must be a snapshottable directory
+    FileStatus fileStatus = getFileStatus(path);
+    if (!fileStatus.isSnapshotEnabled()) {
+      throw new IllegalArgumentException(
+          path + " is not a snapshottable directory.");
+    }
+
+    // Check if trash root already exists
+    Path trashPath = new Path(path, FileSystem.TRASH_PREFIX);
+    try {
+      FileStatus trashFileStatus = getFileStatus(trashPath);
+      String errMessage = "Can't provision trash for snapshottable directory " 
+
+          pathStr + " because trash path " + trashPath.toString() +
+          " already exists.";
+      if (!trashFileStatus.isDirectory()) {
+        errMessage += "\r\n" +
+            "WARNING: " + trashPath.toString() + " is not a directory.";
+      }
+      if (!trashFileStatus.getPermission().equals(trashPermission)) {
+        errMessage += "\r\n" +
+            "WARNING: Permission of " + trashPath.toString() +
+            " differs from provided permission " + trashPermission;
+      }
+      throw new FileAlreadyExistsException(errMessage);
+    } catch (FileNotFoundException ignored) {
+      // Trash path doesn't exist. Continue
+    }
+
+    // Create trash root and set the permission
+    mkdir(trashPath, trashPermission);
+    setPermission(trashPath, trashPermission);
+
+    // Print a warning if snapshot trash root feature is not enabled
+    if (!isSnapshotTrashRootEnabled()) {
+      DFSClient.LOG.warn("New trash is provisioned, but the snapshot trash 
root"
+          + " feature is disabled. This new trash but won't be automatically"
+          + " utilized unless the feature is enabled on the NameNode.");
+    }
+    return trashPath;
+  }
+
   @Override
   public void setXAttr(Path path, final String name, final byte[] value,
       final EnumSet<XAttrSetFlag> flag) throws IOException {
@@ -3124,7 +3242,7 @@ public class DistributedFileSystem extends FileSystem
   }
 
   /**
-   * Get erasure coding policy information for the specified path
+   * Get erasure coding policy information for the specified path.
    *
    * @param path The path of the file or directory
    * @return Returns the policy information if file or directory on the path
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java
index 70ba886..936c65b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java
@@ -1678,6 +1678,20 @@ public class ViewDistributedFileSystem extends 
DistributedFileSystem {
   }
 
   @Override
+  public Path provisionSnapshotTrash(final Path path,
+      final FsPermission trashPermission) throws IOException {
+    if (this.vfs == null) {
+      return super.provisionSnapshotTrash(path, trashPermission);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(path, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "provisionSnapshotTrash");
+    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .provisionSnapshotTrash(mountPathInfo.getPathOnTarget(),
+          trashPermission);
+  }
+
+  @Override
   public void setXAttr(Path path, String name, byte[] value,
       EnumSet<XAttrSetFlag> flag) throws IOException {
     if (this.vfs == null) {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
index 3b3d563..716ec3a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.viewfs.ViewFileSystemOverloadScheme;
 import org.apache.hadoop.hdfs.DFSInotifyEventInputStream;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
@@ -67,8 +68,8 @@ import java.util.EnumSet;
 @InterfaceStability.Evolving
 public class HdfsAdmin {
 
-  private DistributedFileSystem dfs;
-  private static final FsPermission TRASH_PERMISSION = new FsPermission(
+  final private DistributedFileSystem dfs;
+  public static final FsPermission TRASH_PERMISSION = new FsPermission(
       FsAction.ALL, FsAction.ALL, FsAction.ALL, true);
 
   /**
@@ -80,6 +81,10 @@ public class HdfsAdmin {
    */
   public HdfsAdmin(URI uri, Configuration conf) throws IOException {
     FileSystem fs = FileSystem.get(uri, conf);
+    if ((fs instanceof ViewFileSystemOverloadScheme)) {
+      fs = ((ViewFileSystemOverloadScheme) fs)
+          .getRawFileSystem(new Path(FileSystem.getDefaultUri(conf)), conf);
+    }
     if (!(fs instanceof DistributedFileSystem)) {
       throw new IllegalArgumentException("'" + uri + "' is not an HDFS URI.");
     } else {
@@ -165,6 +170,20 @@ public class HdfsAdmin {
    */
   public void allowSnapshot(Path path) throws IOException {
     dfs.allowSnapshot(path);
+    if (dfs.isSnapshotTrashRootEnabled()) {
+      dfs.provisionSnapshotTrash(path, TRASH_PERMISSION);
+    }
+  }
+
+  /**
+   * Provision a trash directory for a given snapshottable directory.
+   * @param path the root of the snapshottable directory
+   * @return Path of the provisioned trash root
+   * @throws IOException if the trash directory can not be created.
+   */
+  public Path provisionSnapshotTrash(Path path)
+      throws IOException {
+    return dfs.provisionSnapshotTrash(path, TRASH_PERMISSION);
   }
 
   /**
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
index b6164d8..baeec97 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Joiner;
 
+import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -460,6 +461,7 @@ public class DFSAdmin extends FsShell {
     "\t[-fetchImage <local directory>]\n" +
     "\t[-allowSnapshot <snapshotDir>]\n" +
     "\t[-disallowSnapshot <snapshotDir>]\n" +
+      "\t[-provisionSnapshotTrash <snapshotDir>]\n" +
     "\t[-shutdownDatanode <datanode_host:ipc_port> [upgrade]]\n" +
     "\t[-evictWriters <datanode_host:ipc_port>]\n" +
     "\t[-getDatanodeInfo <datanode_host:ipc_port>]\n" +
@@ -765,9 +767,9 @@ public class DFSAdmin extends FsShell {
    */
   public void allowSnapshot(String[] argv) throws IOException {
     Path p = new Path(argv[1]);
-    final DistributedFileSystem dfs = AdminHelper.getDFS(p.toUri(), getConf());
+    final HdfsAdmin admin = new HdfsAdmin(p.toUri(), getConf());
     try {
-      dfs.allowSnapshot(p);
+      admin.allowSnapshot(p);
     } catch (SnapshotException e) {
       throw new RemoteException(e.getClass().getName(), e.getMessage());
     }
@@ -782,14 +784,33 @@ public class DFSAdmin extends FsShell {
    */
   public void disallowSnapshot(String[] argv) throws IOException {
     Path p = new Path(argv[1]);
-    final DistributedFileSystem dfs = AdminHelper.getDFS(p.toUri(), getConf());
+    final HdfsAdmin admin = new HdfsAdmin(p.toUri(), getConf());
     try {
-      dfs.disallowSnapshot(p);
+      admin.disallowSnapshot(p);
     } catch (SnapshotException e) {
       throw new RemoteException(e.getClass().getName(), e.getMessage());
     }
     System.out.println("Disallowing snapshot on " + argv[1] + " succeeded");
   }
+
+  /**
+   * Provision trash root in a snapshottable directory.
+   * Usage: hdfs dfsadmin -provisionSnapshotTrash snapshotDir
+   * @param argv List of of command line parameters.
+   * @exception IOException
+   */
+  public void provisionSnapshotTrash(String[] argv) throws IOException {
+    Path p = new Path(argv[1]);
+    final HdfsAdmin admin = new HdfsAdmin(p.toUri(), getConf());
+    Path trashRoot;
+    try {
+      trashRoot = admin.provisionSnapshotTrash(p);
+    } catch (SnapshotException e) {
+      throw new RemoteException(e.getClass().getName(), e.getMessage());
+    }
+    System.out.println("Successfully provisioned snapshot trash at " +
+        trashRoot);
+  }
   
   /**
    * Command to ask the namenode to save the namespace.
@@ -1245,6 +1266,10 @@ public class DFSAdmin extends FsShell {
     String disallowSnapshot = "-disallowSnapshot <snapshotDir>:\n" +
         "\tDo not allow snapshots to be taken on a directory any more.\n";
 
+    String provisionSnapshotTrash = "-provisionSnapshotTrash <snapshotDir>:\n" 
+
+        "\tProvision trash root in a snapshottable directory with permission"
+        + "\t" + HdfsAdmin.TRASH_PERMISSION + ".\n";
+
     String shutdownDatanode = "-shutdownDatanode <datanode_host:ipc_port> 
[upgrade]\n"
         + "\tSubmit a shutdown request for the given datanode. If an 
optional\n"
         + "\t\"upgrade\" argument is specified, clients accessing the 
datanode\n"
@@ -1334,6 +1359,8 @@ public class DFSAdmin extends FsShell {
       System.out.println(allowSnapshot);
     } else if ("disallowSnapshot".equalsIgnoreCase(cmd)) {
       System.out.println(disallowSnapshot);
+    } else if ("provisionSnapshotTrash".equalsIgnoreCase(cmd)) {
+      System.out.println(provisionSnapshotTrash);
     } else if ("shutdownDatanode".equalsIgnoreCase(cmd)) {
       System.out.println(shutdownDatanode);
     } else if ("evictWriters".equalsIgnoreCase(cmd)) {
@@ -1376,6 +1403,7 @@ public class DFSAdmin extends FsShell {
       System.out.println(fetchImage);
       System.out.println(allowSnapshot);
       System.out.println(disallowSnapshot);
+      System.out.println(provisionSnapshotTrash);
       System.out.println(shutdownDatanode);
       System.out.println(evictWriters);
       System.out.println(getDatanodeInfo);
@@ -2085,6 +2113,9 @@ public class DFSAdmin extends FsShell {
     } else if ("-disallowSnapshot".equalsIgnoreCase(cmd)) {
       System.err.println("Usage: hdfs dfsadmin"
           + " [-disallowSnapshot <snapshotDir>]");
+    } else if ("-provisionSnapshotTrash".equalsIgnoreCase(cmd)) {
+      System.err.println("Usage: hdfs dfsadmin"
+          + " [-provisionSnapshotTrash <snapshotDir>]");
     } else if ("-saveNamespace".equals(cmd)) {
       System.err.println("Usage: hdfs dfsadmin"
           + " [-saveNamespace [-beforeShutdown]]");
@@ -2218,6 +2249,11 @@ public class DFSAdmin extends FsShell {
         printUsage(cmd);
         return exitCode;
       }
+    } else if ("-provisionSnapshotTrash".equalsIgnoreCase(cmd)) {
+      if (argv.length != 2) {
+        printUsage(cmd);
+        return exitCode;
+      }
     } else if ("-report".equals(cmd)) {
       if (argv.length > 6) {
         printUsage(cmd);
@@ -2354,6 +2390,8 @@ public class DFSAdmin extends FsShell {
         allowSnapshot(argv);
       } else if ("-disallowSnapshot".equalsIgnoreCase(cmd)) {
         disallowSnapshot(argv);
+      } else if ("-provisionSnapshotTrash".equalsIgnoreCase(cmd)) {
+        provisionSnapshotTrash(argv);
       } else if ("-saveNamespace".equals(cmd)) {
         exitCode = saveNamespace(argv);
       } else if ("-rollEdits".equals(cmd)) {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
index 37b2350..9bb9703 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeys.FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsAdmin.TRASH_PERMISSION;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -2155,8 +2156,9 @@ public class TestDistributedFileSystem {
     try {
       DistributedFileSystem dfs = cluster.getFileSystem();
       Path testDir = new Path("/ssgtr/test1/");
+      Path testDirTrashRoot = new Path(testDir, FileSystem.TRASH_PREFIX);
       Path file0path = new Path(testDir, "file-0");
-      dfs.create(file0path);
+      dfs.create(file0path).close();
 
       Path trBeforeAllowSnapshot = dfs.getTrashRoot(file0path);
       String trBeforeAllowSnapshotStr = 
trBeforeAllowSnapshot.toUri().getPath();
@@ -2166,6 +2168,15 @@ public class TestDistributedFileSystem {
 
       dfs.allowSnapshot(testDir);
 
+      // Provision trash root
+      // Note: DFS#allowSnapshot doesn't auto create trash root.
+      //  Only HdfsAdmin#allowSnapshot creates trash root when
+      //  dfs.namenode.snapshot.trashroot.enabled is set to true on NameNode.
+      dfs.provisionSnapshotTrash(testDir, TRASH_PERMISSION);
+      // Expect trash root to be created with permission 777 and sticky bit
+      FileStatus trashRootFileStatus = dfs.getFileStatus(testDirTrashRoot);
+      assertEquals(TRASH_PERMISSION, trashRootFileStatus.getPermission());
+
       Path trAfterAllowSnapshot = dfs.getTrashRoot(file0path);
       String trAfterAllowSnapshotStr = trAfterAllowSnapshot.toUri().getPath();
       // The trash root should now be in the snapshot root
@@ -2182,6 +2193,7 @@ public class TestDistributedFileSystem {
       assertTrue(trBeforeAllowSnapshotStr.startsWith(homeDirStr));
 
       // Cleanup
+      // DFS#disallowSnapshot would remove empty trash root without throwing.
       dfs.disallowSnapshot(testDir);
       dfs.delete(testDir, true);
       dfs.delete(test2Dir, true);
@@ -2469,7 +2481,8 @@ public class TestDistributedFileSystem {
       dfs.allowSnapshot(testDir);
       // Create trash root manually
       Path testDirTrashRoot = new Path(testDir, FileSystem.TRASH_PREFIX);
-      dfs.mkdirs(testDirTrashRoot);
+      Path dirInsideTrash = new Path(testDirTrashRoot, "user1");
+      dfs.mkdirs(dirInsideTrash);
       // Try disallowing snapshot, should throw
       LambdaTestUtils.intercept(IOException.class,
           () -> dfs.disallowSnapshot(testDir));
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
index f6d07fb..fa6a55b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
@@ -30,6 +30,9 @@ import com.google.common.collect.Lists;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.text.TextStringBuilder;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -84,11 +87,13 @@ import java.util.List;
 import java.util.Scanner;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.hadoop.hdfs.client.HdfsAdmin.TRASH_PERMISSION;
 import static org.hamcrest.CoreMatchers.allOf;
 import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -121,6 +126,8 @@ public class TestDFSAdmin {
     conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512);
     conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
         GenericTestUtils.getRandomizedTempPath());
+    conf.setInt(DFSConfigKeys.FS_TRASH_INTERVAL_KEY, 60);
+    conf.setBoolean("dfs.namenode.snapshot.trashroot.enabled", true);
     restartCluster();
 
     admin = new DFSAdmin(conf);
@@ -919,6 +926,54 @@ public class TestDFSAdmin {
   }
 
   @Test
+  public void testAllowDisallowSnapshot() throws Exception {
+    final Path dirPath = new Path("/ssdir1");
+    final Path trashRoot = new Path(dirPath, ".Trash");
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+    final DFSAdmin dfsAdmin = new DFSAdmin(conf);
+
+    dfs.mkdirs(dirPath);
+    assertEquals(0, ToolRunner.run(dfsAdmin,
+        new String[]{"-allowSnapshot", dirPath.toString()}));
+
+    // Verify .Trash creation after -allowSnapshot command
+    assertTrue(dfs.exists(trashRoot));
+    assertEquals(TRASH_PERMISSION,
+        dfs.getFileStatus(trashRoot).getPermission());
+
+    // Move a file to trash
+    final Path file1 = new Path(dirPath, "file1");
+    try (FSDataOutputStream s = dfs.create(file1)) {
+      s.write(0);
+    }
+    FsShell fsShell = new FsShell(dfs.getConf());
+    assertEquals(0, ToolRunner.run(fsShell,
+        new String[]{"-rm", file1.toString()}));
+
+    // User directory inside snapshottable directory trash should have 700
+    final String username =
+        UserGroupInformation.getLoginUser().getShortUserName();
+    final Path trashRootUserSubdir = new Path(trashRoot, username);
+    assertTrue(dfs.exists(trashRootUserSubdir));
+    final FsPermission trashUserdirPermission = new FsPermission(
+        FsAction.ALL, FsAction.NONE, FsAction.NONE, false);
+    assertEquals(trashUserdirPermission,
+        dfs.getFileStatus(trashRootUserSubdir).getPermission());
+
+    // disallowSnapshot should fail when .Trash is not empty
+    assertNotEquals(0, ToolRunner.run(dfsAdmin,
+        new String[]{"-disallowSnapshot", dirPath.toString()}));
+
+    dfs.delete(trashRootUserSubdir, true);
+    // disallowSnapshot should succeed now that we have an empty .Trash
+    assertEquals(0, ToolRunner.run(dfsAdmin,
+        new String[]{"-disallowSnapshot", dirPath.toString()}));
+
+    // Cleanup
+    dfs.delete(dirPath, true);
+  }
+
+  @Test
   public void testSetBalancerBandwidth() throws Exception {
     redirectStream();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to