ayushtkn commented on code in PR #5561:
URL: https://github.com/apache/hadoop/pull/5561#discussion_r1213964431


##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java:
##########
@@ -564,4 +566,113 @@ public void testConcatOnSameFile() throws Exception {
 
     assertEquals(1, dfs.getContentSummary(new Path(dir)).getFileCount());
   }
+
+  /**
+   * Verifies concat with wrong user when dfs.permissions.enabled is false.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testConcatPermissionEnabled() throws IOException {
+    Configuration conf2 = new Configuration();
+    conf2.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    conf2.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+    MiniDFSCluster cluster2 = new 
MiniDFSCluster.Builder(conf2).numDataNodes(REPL_FACTOR).build();
+    cluster2.waitClusterUp();
+    DistributedFileSystem dfs2 = cluster2.getFileSystem();
+
+    String testPathDir = "/dir2";
+    Path dir = new Path(testPathDir);
+    dfs2.mkdirs(dir);
+    Path trg = new Path(testPathDir, "trg");
+    Path src = new Path(testPathDir, "src");
+    DFSTestUtil.createFile(dfs2, trg, blockSize, REPL_FACTOR, 1);
+    DFSTestUtil.createFile(dfs2, src, blockSize, REPL_FACTOR, 1);
+
+    // Check permissions with the wrong user when dfs.permissions.enabled is 
true.
+    final UserGroupInformation user =
+        UserGroupInformation.createUserForTesting("theDoctor", new String[] 
{"tardis"});
+    DistributedFileSystem hdfs1 = (DistributedFileSystem) 
DFSTestUtil.getFileSystemAs(user, conf2);
+    try {
+      hdfs1.concat(trg, new Path[] {src});
+      fail("Permission exception expected");
+    } catch (IOException ie) {
+      LOG.info("Got expected exception for permissions:" + 
ie.getLocalizedMessage());
+    }
+
+    dfs2.close();
+    cluster2.shutdownDataNodes();
+    cluster2.shutdown();
+
+    conf2.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
+    cluster2 = new 
MiniDFSCluster.Builder(conf2).numDataNodes(REPL_FACTOR).build();
+    cluster2.waitClusterUp();
+    dfs2 = cluster2.getFileSystem();
+    dfs2.mkdirs(dir);
+    DFSTestUtil.createFile(dfs2, trg, blockSize, REPL_FACTOR, 1);
+    DFSTestUtil.createFile(dfs2, src, blockSize, REPL_FACTOR, 1);
+
+    // Check permissions with the wrong user when dfs.permissions.enabled is 
false.
+    DistributedFileSystem hdfs2 = (DistributedFileSystem) 
DFSTestUtil.getFileSystemAs(user, conf2);
+    try {
+      hdfs2.concat(trg, new Path[] {src});
+    } catch (IOException ie) {
+      fail("Got unexpected exception for permissions:" + 
ie.getLocalizedMessage());
+    }
+    dfs2.close();
+    cluster2.shutdownDataNodes();
+    cluster2.shutdown();
+  }
+
+  /**
+   * Test permissions of Concat operation.
+   */
+  @Test
+  public void testConcatPermissions() throws IOException {
+    String testPathDir = "/dir";
+    Path dir = new Path(testPathDir);
+    dfs.mkdirs(dir);
+    dfs.setPermission(dir, new FsPermission((short) 0777));
+
+    Path dst = new Path(testPathDir, "dst");
+    Path src = new Path(testPathDir, "src");
+    DFSTestUtil.createFile(dfs, dst, blockSize, REPL_FACTOR, 1);
+
+    // Create a user who is not the owner of the file and try concat operation.
+    final UserGroupInformation user =
+        UserGroupInformation.createUserForTesting("theDoctor", new String[] 
{"group"});
+    DistributedFileSystem dfs2 = (DistributedFileSystem) 
DFSTestUtil.getFileSystemAs(user, conf);
+
+    // Test 1: User is not the owner of the file and has src & dst permission.
+    DFSTestUtil.createFile(dfs, src, blockSize, REPL_FACTOR, 1);
+    dfs.setPermission(dst, new FsPermission((short) 0777));
+    dfs.setPermission(src, new FsPermission((short) 0777));
+    try {
+      dfs2.concat(dst, new Path[] {src});
+    } catch (AccessControlException ace) {
+      fail("Got unexpected exception:" + ace.getLocalizedMessage());
+    }

Review Comment:
   Change the test like this
   ```
    /**
      * Verifies concat with wrong user when dfs.permissions.enabled is false.
      *
      * @throws IOException
      */
     @Test
     public void testConcatPermissionEnabled() throws Exception {
       Configuration conf2 = new Configuration();
       conf2.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
       conf2.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
       MiniDFSCluster cluster2 = new 
MiniDFSCluster.Builder(conf2).numDataNodes(REPL_FACTOR).build();
       try {
         cluster2.waitClusterUp();
         DistributedFileSystem dfs2 = cluster2.getFileSystem();
   
         String testPathDir = "/dir2";
         Path dir = new Path(testPathDir);
         dfs2.mkdirs(dir);
         Path trg = new Path(testPathDir, "trg");
         Path src = new Path(testPathDir, "src");
         DFSTestUtil.createFile(dfs2, trg, blockSize, REPL_FACTOR, 1);
         DFSTestUtil.createFile(dfs2, src, blockSize, REPL_FACTOR, 1);
   
         // Check permissions with the wrong user when dfs.permissions.enabled 
is true.
         final UserGroupInformation user =
             UserGroupInformation.createUserForTesting("theDoctor", new 
String[] { "tardis" });
         DistributedFileSystem hdfs1 =
             (DistributedFileSystem) DFSTestUtil.getFileSystemAs(user, conf2);
         LambdaTestUtils.intercept(AccessControlException.class,
             "Permission denied: user=theDoctor, access=WRITE",
             () -> hdfs1.concat(trg, new Path[] {src}));
   
         conf2.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
         cluster2 = new 
MiniDFSCluster.Builder(conf2).numDataNodes(REPL_FACTOR).build();
         cluster2.waitClusterUp();
         dfs2 = cluster2.getFileSystem();
         dfs2.mkdirs(dir);
         DFSTestUtil.createFile(dfs2, trg, blockSize, REPL_FACTOR, 1);
         DFSTestUtil.createFile(dfs2, src, blockSize, REPL_FACTOR, 1);
   
         // Check permissions with the wrong user when dfs.permissions.enabled 
is false.
         DistributedFileSystem hdfs2 =
             (DistributedFileSystem) DFSTestUtil.getFileSystemAs(user, conf2);
         hdfs2.concat(trg, new Path[] { src });
       } finally {
         if (cluster2 != null) {
           cluster2.shutdown();
         }
       }
     }
   
     /**
      * Test permissions of Concat operation.
      */
     @Test
     public void testConcatPermissions() throws Exception {
       String testPathDir = "/dir";
       Path dir = new Path(testPathDir);
       dfs.mkdirs(dir);
       dfs.setPermission(dir, new FsPermission((short) 0777));
   
       Path dst = new Path(testPathDir, "dst");
       Path src = new Path(testPathDir, "src");
       DFSTestUtil.createFile(dfs, dst, blockSize, REPL_FACTOR, 1);
   
       // Create a user who is not the owner of the file and try concat 
operation.
       final UserGroupInformation user =
           UserGroupInformation.createUserForTesting("theDoctor", new String[] 
{"group"});
       DistributedFileSystem dfs2 = (DistributedFileSystem) 
DFSTestUtil.getFileSystemAs(user, conf);
   
       // Test 1: User is not the owner of the file and has src & dst 
permission.
       DFSTestUtil.createFile(dfs, src, blockSize, REPL_FACTOR, 1);
       dfs.setPermission(dst, new FsPermission((short) 0777));
       dfs.setPermission(src, new FsPermission((short) 0777));
         dfs2.concat(dst, new Path[] {src});
   
       // Test 2: User is not the owner of the file and has only dst permission.
       DFSTestUtil.createFile(dfs, src, blockSize, REPL_FACTOR, 1);
       dfs.setPermission(dst, new FsPermission((short) 0777));
       dfs.setPermission(src, new FsPermission((short) 0700));
       LambdaTestUtils.intercept(AccessControlException.class,
           "Permission denied: user=theDoctor, access=READ",
           () -> dfs2.concat(dst, new Path[] {src}));
   
       // Test 3: User is not the owner of the file and has only src permission.
       DFSTestUtil.createFile(dfs, src, blockSize, REPL_FACTOR, 1);
       dfs.setPermission(dst, new FsPermission((short) 0700));
       dfs.setPermission(src, new FsPermission((short) 0777));
       LambdaTestUtils.intercept(AccessControlException.class,
           "Permission denied: user=theDoctor, access=WRITE",
           () -> dfs2.concat(dst, new Path[] {src}));
     }
   ```
   
   +1, once changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to