[ 
https://issues.apache.org/jira/browse/HDFS-16983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728573#comment-17728573
 ] 

ASF GitHub Bot commented on HDFS-16983:
---------------------------------------

ayushtkn commented on code in PR #5561:
URL: https://github.com/apache/hadoop/pull/5561#discussion_r1213964431


##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java:
##########
@@ -564,4 +566,113 @@ public void testConcatOnSameFile() throws Exception {
 
     assertEquals(1, dfs.getContentSummary(new Path(dir)).getFileCount());
   }
+
+  /**
+   * Verifies concat with wrong user when dfs.permissions.enabled is false.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testConcatPermissionEnabled() throws IOException {
+    Configuration conf2 = new Configuration();
+    conf2.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    conf2.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+    MiniDFSCluster cluster2 = new 
MiniDFSCluster.Builder(conf2).numDataNodes(REPL_FACTOR).build();
+    cluster2.waitClusterUp();
+    DistributedFileSystem dfs2 = cluster2.getFileSystem();
+
+    String testPathDir = "/dir2";
+    Path dir = new Path(testPathDir);
+    dfs2.mkdirs(dir);
+    Path trg = new Path(testPathDir, "trg");
+    Path src = new Path(testPathDir, "src");
+    DFSTestUtil.createFile(dfs2, trg, blockSize, REPL_FACTOR, 1);
+    DFSTestUtil.createFile(dfs2, src, blockSize, REPL_FACTOR, 1);
+
+    // Check permissions with the wrong user when dfs.permissions.enabled is 
true.
+    final UserGroupInformation user =
+        UserGroupInformation.createUserForTesting("theDoctor", new String[] 
{"tardis"});
+    DistributedFileSystem hdfs1 = (DistributedFileSystem) 
DFSTestUtil.getFileSystemAs(user, conf2);
+    try {
+      hdfs1.concat(trg, new Path[] {src});
+      fail("Permission exception expected");
+    } catch (IOException ie) {
+      LOG.info("Got expected exception for permissions:" + 
ie.getLocalizedMessage());
+    }
+
+    dfs2.close();
+    cluster2.shutdownDataNodes();
+    cluster2.shutdown();
+
+    conf2.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
+    cluster2 = new 
MiniDFSCluster.Builder(conf2).numDataNodes(REPL_FACTOR).build();
+    cluster2.waitClusterUp();
+    dfs2 = cluster2.getFileSystem();
+    dfs2.mkdirs(dir);
+    DFSTestUtil.createFile(dfs2, trg, blockSize, REPL_FACTOR, 1);
+    DFSTestUtil.createFile(dfs2, src, blockSize, REPL_FACTOR, 1);
+
+    // Check permissions with the wrong user when dfs.permissions.enabled is 
false.
+    DistributedFileSystem hdfs2 = (DistributedFileSystem) 
DFSTestUtil.getFileSystemAs(user, conf2);
+    try {
+      hdfs2.concat(trg, new Path[] {src});
+    } catch (IOException ie) {
+      fail("Got unexpected exception for permissions:" + 
ie.getLocalizedMessage());
+    }
+    dfs2.close();
+    cluster2.shutdownDataNodes();
+    cluster2.shutdown();
+  }
+
+  /**
+   * Test permissions of Concat operation.
+   */
+  @Test
+  public void testConcatPermissions() throws IOException {
+    String testPathDir = "/dir";
+    Path dir = new Path(testPathDir);
+    dfs.mkdirs(dir);
+    dfs.setPermission(dir, new FsPermission((short) 0777));
+
+    Path dst = new Path(testPathDir, "dst");
+    Path src = new Path(testPathDir, "src");
+    DFSTestUtil.createFile(dfs, dst, blockSize, REPL_FACTOR, 1);
+
+    // Create a user who is not the owner of the file and try concat operation.
+    final UserGroupInformation user =
+        UserGroupInformation.createUserForTesting("theDoctor", new String[] 
{"group"});
+    DistributedFileSystem dfs2 = (DistributedFileSystem) 
DFSTestUtil.getFileSystemAs(user, conf);
+
+    // Test 1: User is not the owner of the file and has src & dst permission.
+    DFSTestUtil.createFile(dfs, src, blockSize, REPL_FACTOR, 1);
+    dfs.setPermission(dst, new FsPermission((short) 0777));
+    dfs.setPermission(src, new FsPermission((short) 0777));
+    try {
+      dfs2.concat(dst, new Path[] {src});
+    } catch (AccessControlException ace) {
+      fail("Got unexpected exception:" + ace.getLocalizedMessage());
+    }

Review Comment:
   Change the test like this
   ```
    /**
      * Verifies concat with wrong user when dfs.permissions.enabled is false.
      *
      * @throws IOException
      */
     @Test
     public void testConcatPermissionEnabled() throws Exception {
       Configuration conf2 = new Configuration();
       conf2.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
       conf2.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
       MiniDFSCluster cluster2 = new 
MiniDFSCluster.Builder(conf2).numDataNodes(REPL_FACTOR).build();
       try {
         cluster2.waitClusterUp();
         DistributedFileSystem dfs2 = cluster2.getFileSystem();
   
         String testPathDir = "/dir2";
         Path dir = new Path(testPathDir);
         dfs2.mkdirs(dir);
         Path trg = new Path(testPathDir, "trg");
         Path src = new Path(testPathDir, "src");
         DFSTestUtil.createFile(dfs2, trg, blockSize, REPL_FACTOR, 1);
         DFSTestUtil.createFile(dfs2, src, blockSize, REPL_FACTOR, 1);
   
         // Check permissions with the wrong user when dfs.permissions.enabled 
is true.
         final UserGroupInformation user =
             UserGroupInformation.createUserForTesting("theDoctor", new 
String[] { "tardis" });
         DistributedFileSystem hdfs1 =
             (DistributedFileSystem) DFSTestUtil.getFileSystemAs(user, conf2);
         LambdaTestUtils.intercept(AccessControlException.class,
             "Permission denied: user=theDoctor, access=WRITE",
             () -> hdfs1.concat(trg, new Path[] {src}));
   
         conf2.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
         cluster2 = new 
MiniDFSCluster.Builder(conf2).numDataNodes(REPL_FACTOR).build();
         cluster2.waitClusterUp();
         dfs2 = cluster2.getFileSystem();
         dfs2.mkdirs(dir);
         DFSTestUtil.createFile(dfs2, trg, blockSize, REPL_FACTOR, 1);
         DFSTestUtil.createFile(dfs2, src, blockSize, REPL_FACTOR, 1);
   
         // Check permissions with the wrong user when dfs.permissions.enabled 
is false.
         DistributedFileSystem hdfs2 =
             (DistributedFileSystem) DFSTestUtil.getFileSystemAs(user, conf2);
         hdfs2.concat(trg, new Path[] { src });
       } finally {
         if (cluster2 != null) {
           cluster2.shutdown();
         }
       }
     }
   
     /**
      * Test permissions of Concat operation.
      */
     @Test
     public void testConcatPermissions() throws Exception {
       String testPathDir = "/dir";
       Path dir = new Path(testPathDir);
       dfs.mkdirs(dir);
       dfs.setPermission(dir, new FsPermission((short) 0777));
   
       Path dst = new Path(testPathDir, "dst");
       Path src = new Path(testPathDir, "src");
       DFSTestUtil.createFile(dfs, dst, blockSize, REPL_FACTOR, 1);
   
       // Create a user who is not the owner of the file and try concat 
operation.
       final UserGroupInformation user =
           UserGroupInformation.createUserForTesting("theDoctor", new String[] 
{"group"});
       DistributedFileSystem dfs2 = (DistributedFileSystem) 
DFSTestUtil.getFileSystemAs(user, conf);
   
       // Test 1: User is not the owner of the file and has src & dst 
permission.
       DFSTestUtil.createFile(dfs, src, blockSize, REPL_FACTOR, 1);
       dfs.setPermission(dst, new FsPermission((short) 0777));
       dfs.setPermission(src, new FsPermission((short) 0777));
         dfs2.concat(dst, new Path[] {src});
   
       // Test 2: User is not the owner of the file and has only dst permission.
       DFSTestUtil.createFile(dfs, src, blockSize, REPL_FACTOR, 1);
       dfs.setPermission(dst, new FsPermission((short) 0777));
       dfs.setPermission(src, new FsPermission((short) 0700));
       LambdaTestUtils.intercept(AccessControlException.class,
           "Permission denied: user=theDoctor, access=READ",
           () -> dfs2.concat(dst, new Path[] {src}));
   
       // Test 3: User is not the owner of the file and has only src permission.
       DFSTestUtil.createFile(dfs, src, blockSize, REPL_FACTOR, 1);
       dfs.setPermission(dst, new FsPermission((short) 0700));
       dfs.setPermission(src, new FsPermission((short) 0777));
       LambdaTestUtils.intercept(AccessControlException.class,
           "Permission denied: user=theDoctor, access=WRITE",
           () -> dfs2.concat(dst, new Path[] {src}));
     }
   ```
   
   +1, once changed





> Whether checking path access permissions should be decided by 
> dfs.permissions.enabled in concat operation
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: HDFS-16983
>                 URL: https://issues.apache.org/jira/browse/HDFS-16983
>             Project: Hadoop HDFS
>          Issue Type: Bug
>          Components: namenode
>    Affects Versions: 3.4.0
>            Reporter: caozhiqiang
>            Assignee: caozhiqiang
>            Priority: Major
>              Labels: pull-request-available
>
> In concat RPC, it will call FSDirConcatOp::verifySrcFiles() to check the 
> source files. In this function, it would make permission check for srcs. 
> Whether do the permission check should be decided by dfs.permissions.enabled 
> configuration. And the 'pc' parameter is always not null.
> So we should change 'if (pc != null)' to 'if (fsd.isPermissionEnabled())'.
> {code:java}
> // permission check for srcs
> if (pc != null) {
>   fsd.checkPathAccess(pc, iip, FsAction.READ); // read the file
>   fsd.checkParentAccess(pc, iip, FsAction.WRITE); // for delete
> } 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to