[ 
https://issues.apache.org/jira/browse/HADOOP-18144?focusedWorklogId=737805&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-737805
 ]

ASF GitHub Bot logged work on HADOOP-18144:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Mar/22 21:34
            Start Date: 07/Mar/22 21:34
    Worklog Time Spent: 10m 
      Work Description: omalley commented on a change in pull request #4034:
URL: https://github.com/apache/hadoop/pull/4034#discussion_r821086272



##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
##########
@@ -1132,47 +1132,71 @@ public BlockStoragePolicySpi getStoragePolicy(Path src) 
throws IOException {
    * Get the trash root directory for current user when the path
    * specified is deleted.
    *
-   * If CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH is not set, return
-   * the default trash root from targetFS.
+   * If FORCE_INSIDE_MOUNT_POINT flag is not set, return the default trash root
+   * from targetFS.
    *
-   * When CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH is set to true,
-   * 1) If path p is in fallback FS or from the same mount point as the default
-   *    trash root for targetFS, return the default trash root for targetFS.
-   * 2) else, return a trash root in the mounted targetFS
-   *    (/mntpoint/.Trash/{user})
+   * When FORCE_INSIDE_MOUNT_POINT is set to true,
+   * 1) If path p in fallback FS, return trash root from fallback FS. Print a
+   *    warning if there is a mount point for the trash root.
+   * 2) If the trash root for path p is in the same mount point as path p,
+   *    when trashRoot.startWith(mountPoint.destPath) == true,
+   *    get the corresponding viewFS path for the trash root and return it.
+   * 3) else, return the trash root under the root of the mount point
+   *          (/mntpoint/.Trash/{user}).
    *
    * @param path the trash root of the path to be determined.
    * @return the trash root path.
    */
   @Override
   public Path getTrashRoot(Path path) {
-    boolean useMountPointLocalTrash =
-        config.getBoolean(CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH,
-            CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH_DEFAULT);
+    boolean trashForceInsideMountPoint =

Review comment:
       Push this down closer to where it is used.

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
##########
@@ -1132,47 +1132,71 @@ public BlockStoragePolicySpi getStoragePolicy(Path src) 
throws IOException {
    * Get the trash root directory for current user when the path
    * specified is deleted.
    *
-   * If CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH is not set, return
-   * the default trash root from targetFS.
+   * If FORCE_INSIDE_MOUNT_POINT flag is not set, return the default trash root
+   * from targetFS.
    *
-   * When CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH is set to true,
-   * 1) If path p is in fallback FS or from the same mount point as the default
-   *    trash root for targetFS, return the default trash root for targetFS.
-   * 2) else, return a trash root in the mounted targetFS
-   *    (/mntpoint/.Trash/{user})
+   * When FORCE_INSIDE_MOUNT_POINT is set to true,
+   * 1) If path p in fallback FS, return trash root from fallback FS. Print a
+   *    warning if there is a mount point for the trash root.
+   * 2) If the trash root for path p is in the same mount point as path p,
+   *    when trashRoot.startWith(mountPoint.destPath) == true,
+   *    get the corresponding viewFS path for the trash root and return it.
+   * 3) else, return the trash root under the root of the mount point
+   *          (/mntpoint/.Trash/{user}).
    *
    * @param path the trash root of the path to be determined.
    * @return the trash root path.
    */
   @Override
   public Path getTrashRoot(Path path) {
-    boolean useMountPointLocalTrash =
-        config.getBoolean(CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH,
-            CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH_DEFAULT);
+    boolean trashForceInsideMountPoint =
+        config.getBoolean(CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT,
+            CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT_DEFAULT);
 
     try {
       InodeTree.ResolveResult<FileSystem> res =
           fsState.resolve(getUriPath(path), true);
 
-      Path trashRoot = res.targetFileSystem.getTrashRoot(res.remainingPath);
-      if (!useMountPointLocalTrash) {
-        return trashRoot;
-      } else {
-        // Path p is either in a mount point or in the fallback FS
+      Path targetFSTrashRoot =
+          res.targetFileSystem.getTrashRoot(res.remainingPath);
+      String targetFSTrashRootPath = targetFSTrashRoot.toUri().getPath();
+      String mountTargetPath = res.targetFileSystem.getUri().getPath();
 
-        if (ROOT_PATH.equals(new Path(res.resolvedPath))
-            || trashRoot.toUri().getPath().startsWith(res.resolvedPath)) {
-          // Path p is in the fallback FS or targetFileSystem.trashRoot is in
-          // the same mount point as Path p
-          return trashRoot;
-        } else {
-          // targetFileSystem.trashRoot is in a different mount point from
-          // Path p. Return the trash root for the mount point.
-          Path mountPointRoot =
-              res.targetFileSystem.getFileStatus(new Path("/")).getPath();
-          return new Path(mountPointRoot,
-              TRASH_PREFIX + "/" + ugi.getShortUserName());
+      if (!trashForceInsideMountPoint) {
+        return targetFSTrashRoot;
+      }
+
+      // New trash root policy
+      if (ROOT_PATH.equals(new Path(res.resolvedPath))) {
+        // Path path is in fallback FS. Return targetFSTrashRootPath;
+
+        // Check and print a warning if a mountpoint exists for
+        // targetFSTrashRootPath. The trash root from fallback FS should be in
+        // the fallback FS, not in a different mount point.
+        InodeTree.ResolveResult<FileSystem> res2 =
+            fsState.resolve(targetFSTrashRootPath, true);
+        if (!ROOT_PATH.equals(new Path(res2.resolvedPath))) {
+          LOG.warn(String.format("A mount point %s exists for trash root %s "
+              + "returned by fallback FS for path %s. Rename between path %s "
+                  + "and trash root %s will fail, as the trash root is in a "
+                  + "mount point while path %s is in the fallback FS",
+              res2.resolvedPath, targetFSTrashRootPath, path, path,
+              targetFSTrashRootPath, path));
         }
+        return new Path(targetFSTrashRootPath);

Review comment:
       This needs to be made fully qualified using fs.makeQualified(...).

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
##########
@@ -1192,59 +1219,66 @@ public Path getTrashRoot(Path path) {
       trashRoots.addAll(fs.getTrashRoots(allUsers));
     }
 
-    // Add trash dirs for each mount point
-    boolean useMountPointLocalTrash =
-        config.getBoolean(CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH,
-            CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH_DEFAULT);
-    if (useMountPointLocalTrash) {
+    boolean trashForceInsideMountPoint =
+        config.getBoolean(CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT,
+            CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT_DEFAULT);
+    // Return trashRoots if FORCE_INSIDE_MOUNT_POINT is disabled.
+    if (!trashForceInsideMountPoint) {
+      return trashRoots;
+    }
 
-      Set<Path> currentTrashPaths = new HashSet<>();
-      for (FileStatus file : trashRoots) {
-        currentTrashPaths.add(file.getPath());
-      }
+    // We use targetFS Path to check for duplicate paths.
+    Set<Path> uniqueTrashRoots = new HashSet<>();
+    for (FileStatus file : trashRoots) {
+      uniqueTrashRoots.add(file.getPath());
+    }
 
-      MountPoint[] mountPoints = getMountPoints();
-      try {
-        for (int i = 0; i < mountPoints.length; i++) {
-          Path trashRoot = makeQualified(
-              new Path(mountPoints[i].mountedOnPath + "/" + TRASH_PREFIX));
+    List<InodeTree.MountPoint<FileSystem>> mountPoints =
+        fsState.getMountPoints();
+    try {
+      for (InodeTree.MountPoint<FileSystem> mountPoint : mountPoints) {
 
-          // Continue if trashRoot does not exist for this filesystem
-          if (!exists(trashRoot)) {
-            continue;
-          }
+        Path trashRoot = new Path(mountPoint.src + "/" + TRASH_PREFIX);
 
-          InodeTree.ResolveResult<FileSystem> res =
-              fsState.resolve(getUriPath(trashRoot), true);
+        // Continue if trashRoot does not exist for this mount point
+        if (!exists(trashRoot)) {
+          continue;
+        }
 
-          if (!allUsers) {
-            Path userTrash =
+        if (!allUsers) {
+          Path userTrashRoot = new Path(trashRoot, ugi.getShortUserName());
+          if (exists(userTrashRoot)) {
+            Path targetFsTrashPath =
                 new Path("/" + TRASH_PREFIX + "/" + ugi.getShortUserName());
-            try {
-              FileStatus file = res.targetFileSystem.getFileStatus(userTrash);
-              if (!currentTrashPaths.contains(file.getPath())) {
-                trashRoots.add(file);
-                currentTrashPaths.add(file.getPath());
-              }
-            } catch (FileNotFoundException ignored) {
+            FileStatus targetFsTrash = mountPoint.target.getTargetFileSystem()
+                .getFileStatus(targetFsTrashPath);
+            if (!uniqueTrashRoots.contains(targetFsTrash.getPath())) {

Review comment:
       With a HashMap, you don't need to test it first. Just update the map.

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicyDefault.java
##########
@@ -191,8 +191,8 @@ public boolean moveToTrash(Path path) throws IOException {
         cause = e;
       }
     }
-    throw (IOException)
-      new IOException("Failed to move to trash: " + path).initCause(cause);
+    throw (IOException) new IOException(

Review comment:
       @ibuenros and my original comment was about removing the line break. 
Changing the message is a little risky, but ok.
   
   Please change this to
   throw new IOException("Failed ..." + ..., cause);
   
   IOException has a constructor that takes a cause and thus you don't need the 
cast or separate initCause.

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
##########
@@ -1192,59 +1219,66 @@ public Path getTrashRoot(Path path) {
       trashRoots.addAll(fs.getTrashRoots(allUsers));
     }
 
-    // Add trash dirs for each mount point
-    boolean useMountPointLocalTrash =
-        config.getBoolean(CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH,
-            CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH_DEFAULT);
-    if (useMountPointLocalTrash) {
+    boolean trashForceInsideMountPoint =
+        config.getBoolean(CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT,
+            CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT_DEFAULT);
+    // Return trashRoots if FORCE_INSIDE_MOUNT_POINT is disabled.
+    if (!trashForceInsideMountPoint) {
+      return trashRoots;
+    }
 
-      Set<Path> currentTrashPaths = new HashSet<>();
-      for (FileStatus file : trashRoots) {
-        currentTrashPaths.add(file.getPath());
-      }
+    // We use targetFS Path to check for duplicate paths.
+    Set<Path> uniqueTrashRoots = new HashSet<>();

Review comment:
       The return can be trashRoots.values().

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
##########
@@ -1132,47 +1132,71 @@ public BlockStoragePolicySpi getStoragePolicy(Path src) 
throws IOException {
    * Get the trash root directory for current user when the path
    * specified is deleted.
    *
-   * If CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH is not set, return
-   * the default trash root from targetFS.
+   * If FORCE_INSIDE_MOUNT_POINT flag is not set, return the default trash root
+   * from targetFS.
    *
-   * When CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH is set to true,
-   * 1) If path p is in fallback FS or from the same mount point as the default
-   *    trash root for targetFS, return the default trash root for targetFS.
-   * 2) else, return a trash root in the mounted targetFS
-   *    (/mntpoint/.Trash/{user})
+   * When FORCE_INSIDE_MOUNT_POINT is set to true,
+   * 1) If path p in fallback FS, return trash root from fallback FS. Print a
+   *    warning if there is a mount point for the trash root.
+   * 2) If the trash root for path p is in the same mount point as path p,
+   *    when trashRoot.startWith(mountPoint.destPath) == true,
+   *    get the corresponding viewFS path for the trash root and return it.
+   * 3) else, return the trash root under the root of the mount point
+   *          (/mntpoint/.Trash/{user}).
    *
    * @param path the trash root of the path to be determined.
    * @return the trash root path.
    */
   @Override
   public Path getTrashRoot(Path path) {
-    boolean useMountPointLocalTrash =
-        config.getBoolean(CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH,
-            CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH_DEFAULT);
+    boolean trashForceInsideMountPoint =
+        config.getBoolean(CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT,
+            CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT_DEFAULT);
 
     try {
       InodeTree.ResolveResult<FileSystem> res =
           fsState.resolve(getUriPath(path), true);
 
-      Path trashRoot = res.targetFileSystem.getTrashRoot(res.remainingPath);
-      if (!useMountPointLocalTrash) {
-        return trashRoot;
-      } else {
-        // Path p is either in a mount point or in the fallback FS
+      Path targetFSTrashRoot =
+          res.targetFileSystem.getTrashRoot(res.remainingPath);
+      String targetFSTrashRootPath = targetFSTrashRoot.toUri().getPath();
+      String mountTargetPath = res.targetFileSystem.getUri().getPath();
 
-        if (ROOT_PATH.equals(new Path(res.resolvedPath))
-            || trashRoot.toUri().getPath().startsWith(res.resolvedPath)) {
-          // Path p is in the fallback FS or targetFileSystem.trashRoot is in
-          // the same mount point as Path p
-          return trashRoot;
-        } else {
-          // targetFileSystem.trashRoot is in a different mount point from
-          // Path p. Return the trash root for the mount point.
-          Path mountPointRoot =
-              res.targetFileSystem.getFileStatus(new Path("/")).getPath();
-          return new Path(mountPointRoot,
-              TRASH_PREFIX + "/" + ugi.getShortUserName());
+      if (!trashForceInsideMountPoint) {
+        return targetFSTrashRoot;
+      }
+
+      // New trash root policy
+      if (ROOT_PATH.equals(new Path(res.resolvedPath))) {
+        // Path path is in fallback FS. Return targetFSTrashRootPath;
+
+        // Check and print a warning if a mountpoint exists for
+        // targetFSTrashRootPath. The trash root from fallback FS should be in
+        // the fallback FS, not in a different mount point.
+        InodeTree.ResolveResult<FileSystem> res2 =
+            fsState.resolve(targetFSTrashRootPath, true);
+        if (!ROOT_PATH.equals(new Path(res2.resolvedPath))) {
+          LOG.warn(String.format("A mount point %s exists for trash root %s "
+              + "returned by fallback FS for path %s. Rename between path %s "
+                  + "and trash root %s will fail, as the trash root is in a "
+                  + "mount point while path %s is in the fallback FS",
+              res2.resolvedPath, targetFSTrashRootPath, path, path,
+              targetFSTrashRootPath, path));
         }
+        return new Path(targetFSTrashRootPath);
+      } else if (targetFSTrashRootPath.startsWith(mountTargetPath)) {
+        // targetFSTrash is inside mountPoint.targetPath
+        String relativeTrashRoot =
+            targetFSTrashRootPath.substring(mountTargetPath.length());
+        // remove the starting '/' if it exists
+        if (relativeTrashRoot.startsWith("/")) {
+          relativeTrashRoot = relativeTrashRoot.substring(1);
+        }
+        return new Path(res.resolvedPath, relativeTrashRoot);

Review comment:
       This also needs to be made fully qualified.

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
##########
@@ -1192,59 +1219,66 @@ public Path getTrashRoot(Path path) {
       trashRoots.addAll(fs.getTrashRoots(allUsers));
     }
 
-    // Add trash dirs for each mount point
-    boolean useMountPointLocalTrash =
-        config.getBoolean(CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH,
-            CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH_DEFAULT);
-    if (useMountPointLocalTrash) {
+    boolean trashForceInsideMountPoint =
+        config.getBoolean(CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT,
+            CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT_DEFAULT);
+    // Return trashRoots if FORCE_INSIDE_MOUNT_POINT is disabled.
+    if (!trashForceInsideMountPoint) {
+      return trashRoots;
+    }
 
-      Set<Path> currentTrashPaths = new HashSet<>();
-      for (FileStatus file : trashRoots) {
-        currentTrashPaths.add(file.getPath());
-      }
+    // We use targetFS Path to check for duplicate paths.
+    Set<Path> uniqueTrashRoots = new HashSet<>();
+    for (FileStatus file : trashRoots) {
+      uniqueTrashRoots.add(file.getPath());
+    }
 
-      MountPoint[] mountPoints = getMountPoints();
-      try {
-        for (int i = 0; i < mountPoints.length; i++) {
-          Path trashRoot = makeQualified(
-              new Path(mountPoints[i].mountedOnPath + "/" + TRASH_PREFIX));
+    List<InodeTree.MountPoint<FileSystem>> mountPoints =
+        fsState.getMountPoints();
+    try {
+      for (InodeTree.MountPoint<FileSystem> mountPoint : mountPoints) {
 
-          // Continue if trashRoot does not exist for this filesystem
-          if (!exists(trashRoot)) {
-            continue;
-          }
+        Path trashRoot = new Path(mountPoint.src + "/" + TRASH_PREFIX);
 
-          InodeTree.ResolveResult<FileSystem> res =
-              fsState.resolve(getUriPath(trashRoot), true);
+        // Continue if trashRoot does not exist for this mount point
+        if (!exists(trashRoot)) {
+          continue;
+        }
 
-          if (!allUsers) {
-            Path userTrash =
+        if (!allUsers) {
+          Path userTrashRoot = new Path(trashRoot, ugi.getShortUserName());
+          if (exists(userTrashRoot)) {
+            Path targetFsTrashPath =

Review comment:
       Use the userTrashRoot, so that it is fully qualified in the viewfs.
   
   You want the FileStatus to be in the viewfs, not the underlying filesystem.

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
##########
@@ -1132,47 +1132,71 @@ public BlockStoragePolicySpi getStoragePolicy(Path src) 
throws IOException {
    * Get the trash root directory for current user when the path
    * specified is deleted.
    *
-   * If CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH is not set, return
-   * the default trash root from targetFS.
+   * If FORCE_INSIDE_MOUNT_POINT flag is not set, return the default trash root
+   * from targetFS.
    *
-   * When CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH is set to true,
-   * 1) If path p is in fallback FS or from the same mount point as the default
-   *    trash root for targetFS, return the default trash root for targetFS.
-   * 2) else, return a trash root in the mounted targetFS
-   *    (/mntpoint/.Trash/{user})
+   * When FORCE_INSIDE_MOUNT_POINT is set to true,
+   * 1) If path p in fallback FS, return trash root from fallback FS. Print a
+   *    warning if there is a mount point for the trash root.
+   * 2) If the trash root for path p is in the same mount point as path p,
+   *    when trashRoot.startWith(mountPoint.destPath) == true,
+   *    get the corresponding viewFS path for the trash root and return it.
+   * 3) else, return the trash root under the root of the mount point
+   *          (/mntpoint/.Trash/{user}).
    *
    * @param path the trash root of the path to be determined.
    * @return the trash root path.
    */
   @Override
   public Path getTrashRoot(Path path) {
-    boolean useMountPointLocalTrash =
-        config.getBoolean(CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH,
-            CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH_DEFAULT);
+    boolean trashForceInsideMountPoint =
+        config.getBoolean(CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT,
+            CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT_DEFAULT);
 
     try {
       InodeTree.ResolveResult<FileSystem> res =
           fsState.resolve(getUriPath(path), true);
 
-      Path trashRoot = res.targetFileSystem.getTrashRoot(res.remainingPath);
-      if (!useMountPointLocalTrash) {
-        return trashRoot;
-      } else {
-        // Path p is either in a mount point or in the fallback FS
+      Path targetFSTrashRoot =
+          res.targetFileSystem.getTrashRoot(res.remainingPath);
+      String targetFSTrashRootPath = targetFSTrashRoot.toUri().getPath();

Review comment:
       Move these after the check of trashForceInsideMountPoint.

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
##########
@@ -1192,59 +1219,66 @@ public Path getTrashRoot(Path path) {
       trashRoots.addAll(fs.getTrashRoots(allUsers));
     }
 
-    // Add trash dirs for each mount point
-    boolean useMountPointLocalTrash =
-        config.getBoolean(CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH,
-            CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH_DEFAULT);
-    if (useMountPointLocalTrash) {
+    boolean trashForceInsideMountPoint =
+        config.getBoolean(CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT,
+            CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT_DEFAULT);
+    // Return trashRoots if FORCE_INSIDE_MOUNT_POINT is disabled.
+    if (!trashForceInsideMountPoint) {
+      return trashRoots;
+    }
 
-      Set<Path> currentTrashPaths = new HashSet<>();
-      for (FileStatus file : trashRoots) {
-        currentTrashPaths.add(file.getPath());
-      }
+    // We use targetFS Path to check for duplicate paths.
+    Set<Path> uniqueTrashRoots = new HashSet<>();
+    for (FileStatus file : trashRoots) {
+      uniqueTrashRoots.add(file.getPath());
+    }
 
-      MountPoint[] mountPoints = getMountPoints();
-      try {
-        for (int i = 0; i < mountPoints.length; i++) {
-          Path trashRoot = makeQualified(
-              new Path(mountPoints[i].mountedOnPath + "/" + TRASH_PREFIX));
+    List<InodeTree.MountPoint<FileSystem>> mountPoints =
+        fsState.getMountPoints();
+    try {
+      for (InodeTree.MountPoint<FileSystem> mountPoint : mountPoints) {
 
-          // Continue if trashRoot does not exist for this filesystem
-          if (!exists(trashRoot)) {
-            continue;
-          }
+        Path trashRoot = new Path(mountPoint.src + "/" + TRASH_PREFIX);
 
-          InodeTree.ResolveResult<FileSystem> res =
-              fsState.resolve(getUriPath(trashRoot), true);
+        // Continue if trashRoot does not exist for this mount point
+        if (!exists(trashRoot)) {
+          continue;
+        }
 
-          if (!allUsers) {
-            Path userTrash =
+        if (!allUsers) {
+          Path userTrashRoot = new Path(trashRoot, ugi.getShortUserName());
+          if (exists(userTrashRoot)) {
+            Path targetFsTrashPath =
                 new Path("/" + TRASH_PREFIX + "/" + ugi.getShortUserName());
-            try {
-              FileStatus file = res.targetFileSystem.getFileStatus(userTrash);
-              if (!currentTrashPaths.contains(file.getPath())) {
-                trashRoots.add(file);
-                currentTrashPaths.add(file.getPath());
-              }
-            } catch (FileNotFoundException ignored) {
+            FileStatus targetFsTrash = mountPoint.target.getTargetFileSystem()
+                .getFileStatus(targetFsTrashPath);
+            if (!uniqueTrashRoots.contains(targetFsTrash.getPath())) {
+              FileStatus trash = getFileStatus(userTrashRoot);
+              trashRoots.add(trash);
+              uniqueTrashRoots.add(targetFsTrash.getPath());
             }
-          } else {
-            FileStatus[] targetFsTrashRoots =
-                res.targetFileSystem.listStatus(new Path("/" + TRASH_PREFIX));
-            for (FileStatus file : targetFsTrashRoots) {
-              // skip if we already include it in currentTrashPaths
-              if (currentTrashPaths.contains(file.getPath())) {
-                continue;
-              }
-
-              trashRoots.add(file);
-              currentTrashPaths.add(file.getPath());
+          }
+        } else {
+          FileStatus[] mountPointTrashRoots = listStatus(trashRoot);
+          for (FileStatus trash : mountPointTrashRoots) {
+            // Remove the mountPoint to get the targetFS path
+            Path targetFsTrashPath =

Review comment:
       Leave these in the viewfs namespace and just add them to the hash map.

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
##########
@@ -1192,59 +1219,66 @@ public Path getTrashRoot(Path path) {
       trashRoots.addAll(fs.getTrashRoots(allUsers));
     }
 
-    // Add trash dirs for each mount point
-    boolean useMountPointLocalTrash =
-        config.getBoolean(CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH,
-            CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH_DEFAULT);
-    if (useMountPointLocalTrash) {
+    boolean trashForceInsideMountPoint =
+        config.getBoolean(CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT,
+            CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT_DEFAULT);
+    // Return trashRoots if FORCE_INSIDE_MOUNT_POINT is disabled.
+    if (!trashForceInsideMountPoint) {
+      return trashRoots;
+    }
 
-      Set<Path> currentTrashPaths = new HashSet<>();
-      for (FileStatus file : trashRoots) {
-        currentTrashPaths.add(file.getPath());
-      }
+    // We use targetFS Path to check for duplicate paths.
+    Set<Path> uniqueTrashRoots = new HashSet<>();

Review comment:
       I think you should make trashRoots into a HashMap<Path, FileStatus>. 
Then you don't need to deal with uniqueTrashRoots.

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
##########
@@ -1132,47 +1132,71 @@ public BlockStoragePolicySpi getStoragePolicy(Path src) 
throws IOException {
    * Get the trash root directory for current user when the path
    * specified is deleted.
    *
-   * If CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH is not set, return
-   * the default trash root from targetFS.
+   * If FORCE_INSIDE_MOUNT_POINT flag is not set, return the default trash root
+   * from targetFS.
    *
-   * When CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH is set to true,
-   * 1) If path p is in fallback FS or from the same mount point as the default
-   *    trash root for targetFS, return the default trash root for targetFS.
-   * 2) else, return a trash root in the mounted targetFS
-   *    (/mntpoint/.Trash/{user})
+   * When FORCE_INSIDE_MOUNT_POINT is set to true,
+   * 1) If path p in fallback FS, return trash root from fallback FS. Print a
+   *    warning if there is a mount point for the trash root.
+   * 2) If the trash root for path p is in the same mount point as path p,
+   *    when trashRoot.startWith(mountPoint.destPath) == true,
+   *    get the corresponding viewFS path for the trash root and return it.
+   * 3) else, return the trash root under the root of the mount point
+   *          (/mntpoint/.Trash/{user}).
    *
    * @param path the trash root of the path to be determined.
    * @return the trash root path.
    */
   @Override
   public Path getTrashRoot(Path path) {
-    boolean useMountPointLocalTrash =
-        config.getBoolean(CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH,
-            CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH_DEFAULT);
+    boolean trashForceInsideMountPoint =
+        config.getBoolean(CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT,
+            CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT_DEFAULT);
 
     try {
       InodeTree.ResolveResult<FileSystem> res =
           fsState.resolve(getUriPath(path), true);
 
-      Path trashRoot = res.targetFileSystem.getTrashRoot(res.remainingPath);
-      if (!useMountPointLocalTrash) {
-        return trashRoot;
-      } else {
-        // Path p is either in a mount point or in the fallback FS
+      Path targetFSTrashRoot =
+          res.targetFileSystem.getTrashRoot(res.remainingPath);
+      String targetFSTrashRootPath = targetFSTrashRoot.toUri().getPath();
+      String mountTargetPath = res.targetFileSystem.getUri().getPath();
 
-        if (ROOT_PATH.equals(new Path(res.resolvedPath))
-            || trashRoot.toUri().getPath().startsWith(res.resolvedPath)) {
-          // Path p is in the fallback FS or targetFileSystem.trashRoot is in
-          // the same mount point as Path p
-          return trashRoot;
-        } else {
-          // targetFileSystem.trashRoot is in a different mount point from
-          // Path p. Return the trash root for the mount point.
-          Path mountPointRoot =
-              res.targetFileSystem.getFileStatus(new Path("/")).getPath();
-          return new Path(mountPointRoot,
-              TRASH_PREFIX + "/" + ugi.getShortUserName());
+      if (!trashForceInsideMountPoint) {
+        return targetFSTrashRoot;
+      }
+
+      // New trash root policy
+      if (ROOT_PATH.equals(new Path(res.resolvedPath))) {
+        // Path path is in fallback FS. Return targetFSTrashRootPath;
+
+        // Check and print a warning if a mountpoint exists for
+        // targetFSTrashRootPath. The trash root from fallback FS should be in
+        // the fallback FS, not in a different mount point.
+        InodeTree.ResolveResult<FileSystem> res2 =
+            fsState.resolve(targetFSTrashRootPath, true);
+        if (!ROOT_PATH.equals(new Path(res2.resolvedPath))) {
+          LOG.warn(String.format("A mount point %s exists for trash root %s "
+              + "returned by fallback FS for path %s. Rename between path %s "
+                  + "and trash root %s will fail, as the trash root is in a "
+                  + "mount point while path %s is in the fallback FS",
+              res2.resolvedPath, targetFSTrashRootPath, path, path,
+              targetFSTrashRootPath, path));
         }
+        return new Path(targetFSTrashRootPath);
+      } else if (targetFSTrashRootPath.startsWith(mountTargetPath)) {
+        // targetFSTrash is inside mountPoint.targetPath
+        String relativeTrashRoot =
+            targetFSTrashRootPath.substring(mountTargetPath.length());
+        // remove the starting '/' if it exists
+        if (relativeTrashRoot.startsWith("/")) {
+          relativeTrashRoot = relativeTrashRoot.substring(1);
+        }
+        return new Path(res.resolvedPath, relativeTrashRoot);
+      } else {
+        // Return the trash root for the mount point.
+        return new Path(res.resolvedPath,

Review comment:
       And this one.

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
##########
@@ -134,8 +134,9 @@
       HCFSMountTableConfigLoader.class;
 
   /**
-   * Enable ViewFileSystem to return a trashRoot which is local to mount point.
+   * Force ViewFileSystem to return a trashRoot that is inside a mount point.
    */
-  String CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH = 
"fs.viewfs.mount.point.local.trash";
-  boolean CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH_DEFAULT = false;
+  String CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT =
+      "fs.viewfs.trash.force-inside-mount-point";
+  boolean CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT_DEFAULT = false;

Review comment:
       I feel like this should default to true. I don't see any way that 
returning the old value (ie. the trash path from the underlying filesystem) 
using the underlying filesystem won't lead to failures.

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
##########
@@ -134,8 +134,9 @@
       HCFSMountTableConfigLoader.class;
 
   /**
-   * Enable ViewFileSystem to return a trashRoot which is local to mount point.
+   * Force ViewFileSystem to return a trashRoot that is inside a mount point.
    */
-  String CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH = 
"fs.viewfs.mount.point.local.trash";
-  boolean CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH_DEFAULT = false;
+  String CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT =
+      "fs.viewfs.trash.force-inside-mount-point";
+  boolean CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT_DEFAULT = false;

Review comment:
       I don't insist on this one, but this seems like a better default than 
the original behavior.

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
##########
@@ -1192,59 +1219,66 @@ public Path getTrashRoot(Path path) {
       trashRoots.addAll(fs.getTrashRoots(allUsers));
     }
 
-    // Add trash dirs for each mount point
-    boolean useMountPointLocalTrash =
-        config.getBoolean(CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH,
-            CONFIG_VIEWFS_MOUNT_POINT_LOCAL_TRASH_DEFAULT);
-    if (useMountPointLocalTrash) {
+    boolean trashForceInsideMountPoint =
+        config.getBoolean(CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT,
+            CONFIG_VIEWFS_TRASH_FORCE_INSIDE_MOUNT_POINT_DEFAULT);
+    // Return trashRoots if FORCE_INSIDE_MOUNT_POINT is disabled.
+    if (!trashForceInsideMountPoint) {
+      return trashRoots;
+    }
 
-      Set<Path> currentTrashPaths = new HashSet<>();
-      for (FileStatus file : trashRoots) {
-        currentTrashPaths.add(file.getPath());
-      }
+    // We use targetFS Path to check for duplicate paths.
+    Set<Path> uniqueTrashRoots = new HashSet<>();
+    for (FileStatus file : trashRoots) {
+      uniqueTrashRoots.add(file.getPath());
+    }
 
-      MountPoint[] mountPoints = getMountPoints();
-      try {
-        for (int i = 0; i < mountPoints.length; i++) {
-          Path trashRoot = makeQualified(
-              new Path(mountPoints[i].mountedOnPath + "/" + TRASH_PREFIX));
+    List<InodeTree.MountPoint<FileSystem>> mountPoints =
+        fsState.getMountPoints();
+    try {
+      for (InodeTree.MountPoint<FileSystem> mountPoint : mountPoints) {
 
-          // Continue if trashRoot does not exist for this filesystem
-          if (!exists(trashRoot)) {
-            continue;
-          }
+        Path trashRoot = new Path(mountPoint.src + "/" + TRASH_PREFIX);

Review comment:
       You need to make this fully qualified.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 737805)
    Time Spent: 3h 20m  (was: 3h 10m)

> getTrashRoot/s in ViewFileSystem should return viewFS path, not targetFS path
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-18144
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18144
>             Project: Hadoop Common
>          Issue Type: Improvement
>          Components: common
>            Reporter: Xing Lin
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> It is probably incorrect that we return a targetFS path from getTrashRoot() 
> in ViewFileSystem, as that path will be used later on by ViewFileSystem in 
> other operations, such as rename. ViewFileSystem is assuming the path that it 
> receives is a viewFS path, but not a target FS path. For example, rename() in 
> ViewFileSystem will call getUriPath() for src/dst path, which will remove the 
> scheme/authority and then try to resolve the path-only component. It thus 
> sometimes leads to incorrect path resolution, as we are doing the path 
> resolution again on a targetFS path. 
>  
> On the other hand, it is not always trivial/feasible to determine the correct 
> viewFS path for a given trash root in targetFS path. 
> Example:
> Assume we have a mount point for /user/foo -> abfs:/containerA
> User foo calls getTrashRoot("/a/b/c") and "/a/b/c" does not match any mount 
> point. We fall back to the fallback hdfs, which by default returns 
> hdfs://localhost/user/foo/.Trash. In this case, it is incorrect to return the 
> trash root as viewfs:/user/foo, as it will be resolved to the abfs mount 
> point, instead of the fallback hdfs.
>   



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to