This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch HDDS-2939
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit c3261997f21b428f14e1f0907a607b4e0aa91c8b
Author: Sadanand Shenoy <[email protected]>
AuthorDate: Wed Apr 7 17:24:04 2021 +0530

    HDDS-4691. [FSO]Authorizer: OM can do recursive ACL check for subpaths 
(#2011)
---
 .../hadoop/fs/ozone/TestOzoneFileSystemV1.java     |   6 +
 .../ozone/om/TestRecursiveAclWithFSOBucket.java    | 338 +++++++++++++++++++++
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  53 ++++
 3 files changed, 397 insertions(+)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemV1.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemV1.java
index ed62990..aa289a0 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemV1.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemV1.java
@@ -336,4 +336,10 @@ public class TestOzoneFileSystemV1 extends 
TestOzoneFileSystem {
   @Ignore("TODO:HDDS-2939")
   public void testListStatusWithIntermediateDir() throws Exception {
   }
+
+  @Override
+  @Test
+  @Ignore("TODO:HDDS-5012")
+  public void testListStatusOnLargeDirectory() throws Exception {
+  }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestRecursiveAclWithFSOBucket.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestRecursiveAclWithFSOBucket.java
new file mode 100644
index 0000000..f4a077e
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestRecursiveAclWithFSOBucket.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.client.BucketArgs;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
+import static org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType.OZONE;
+
+/**
+ * Test recursive acl checks for delete and rename for FSO Buckets.
+ */
+public class TestRecursiveAclWithFSOBucket {
+
+  @Rule public Timeout timeout = Timeout.seconds(120);
+
+  private MiniOzoneCluster cluster;
+
+  private final UserGroupInformation adminUser =
+      UserGroupInformation.createUserForTesting("om", new String[] {"ozone"});
+  private final UserGroupInformation user1 = UserGroupInformation
+      .createUserForTesting("user1", new String[] {"test1"});
+  private final UserGroupInformation user2 = UserGroupInformation
+      .createUserForTesting("user2", new String[] {"test2"});
+
+  @Before
+  public void init() throws Exception {
+    // loginUser is the user running this test.
+    // Implication: loginUser is automatically added to the OM admin list.
+    UserGroupInformation.setLoginUser(adminUser);
+    // ozone.acl.enabled = true
+    // start a cluster
+    startCluster();
+  }
+
+  @Test
+  public void testKeyDeleteAndRenameWithoutPermission() throws Exception {
+
+    List<String> keys = new ArrayList<>();
+    // Create volumes with user1
+
+    OzoneClient client = cluster.getClient();
+    ObjectStore objectStore = client.getObjectStore();
+
+    /* r = READ, w = WRITE, c = CREATE, d = DELETE
+       l = LIST, a = ALL, n = NONE, x = READ_ACL, y = WRITE_ACL */
+    String aclWorldAll = "world::a";
+    createVolumeWithOwnerAndAcl(objectStore, "volume1", "user1", aclWorldAll);
+
+    // Login as user1, create directories and keys
+    UserGroupInformation.setLoginUser(user1);
+    client = cluster.getClient();
+    objectStore = client.getObjectStore();
+
+    OzoneVolume volume = objectStore.getVolume("volume1");
+
+    BucketArgs omBucketArgs =
+        BucketArgs.newBuilder().setStorageType(StorageType.DISK).build();
+
+    // create bucket with user1
+    volume.createBucket("bucket1", omBucketArgs);
+    setBucketAcl(objectStore, volume.getName(), "bucket1", aclWorldAll);
+    OzoneBucket ozoneBucket = volume.getBucket("bucket1");
+
+    /**
+     *                       buck-1
+     *                        |
+     *                        a
+     *                        |
+     *          ------------------------------------
+     *         |           |              |        |
+     *         b1          b2             b3      file1
+     *       -----       ------           -----
+     *       |    |      |    |          |    |
+     *      c1   c2     d1   d2          e1   e2
+     *       |    |      |    |           |    |
+     *       f1   f2     f3  --------     f5   f6
+     *                      |        |
+     *                    d21        file2
+     *                     |
+     *                     f4
+     *
+     *     Test Case 1 :
+     *     Remove delete acl from file File2
+     *     Try deleting b2
+     *
+     *     Test case 2:
+     *     Remove delete acl fro dir c2
+     *     Try deleting b1
+     *
+     *     Test case 3
+     *     try deleting b3
+     */
+
+    String keyf1 = "a/b1/c1/f1";
+    String keyf2 = "a/b1/c2/f2";
+    String keyf3 = "a/b2/d1/f3";
+    String keyf4 = "a/b2/d2/d21/f4";
+    String keyf5 = "/a/b3/e1/f5";
+    String keyf6 = "/a/b3/e2/f6";
+
+    String file1 = "a/" + "file" + RandomStringUtils.randomNumeric(5);
+    String file2 = "a/b2/d2/" + "file" + RandomStringUtils.randomNumeric(5);
+
+    keys.add(keyf1);
+    keys.add(keyf2);
+    keys.add(keyf3);
+    keys.add(keyf4);
+    keys.add(keyf5);
+    keys.add(keyf6);
+    keys.add(file1);
+    keys.add(file2);
+
+    createKeys(objectStore, ozoneBucket, keys);
+
+    // Test case 1
+    // Remove acls from file2
+    // Delete/Rename on directory a/b2 should throw permission denied
+    // (since file2 is a child)
+    removeAclsFromKey(objectStore, ozoneBucket, file2);
+    OzoneObj ozoneObj;
+    List<OzoneAcl> aclList1;
+
+    UserGroupInformation.setLoginUser(user2);
+    client = cluster.getClient();
+    objectStore = client.getObjectStore();
+    volume = objectStore.getVolume("volume1");
+    ozoneBucket = volume.getBucket("bucket1");
+
+    // perform  delete
+    try {
+      ozoneBucket.deleteDirectory("a/b2", true);
+      Assert.fail("Should throw permission denied !");
+    } catch (OMException ome) {
+      // expect permission error
+      Assert.assertEquals("Permission check failed",
+          OMException.ResultCodes.PERMISSION_DENIED, ome.getResult());
+    }
+
+    // perform rename
+    try {
+      ozoneBucket.renameKey("a/b2", "a/b2_renamed");
+      Assert.fail("Should throw permission denied !");
+    } catch (OMException ome) {
+      // expect permission error
+      Assert.assertEquals("Permission check failed",
+          OMException.ResultCodes.PERMISSION_DENIED, ome.getResult());
+    }
+
+    // Test case 2
+    // Remove acl from directory c2, delete/rename a/b1 should throw
+    // permission denied since c2 is a subdirectory
+
+    UserGroupInformation.setLoginUser(user1);
+    removeAclsFromKey(objectStore, ozoneBucket, "a/b1/c2");
+
+    UserGroupInformation.setLoginUser(user2);
+    // perform  delete
+    try {
+      ozoneBucket.deleteDirectory("a/b1", true);
+      Assert.fail("Should throw permission denied !");
+    } catch (OMException ome) {
+      // expect permission error
+      Assert.assertEquals("Permission check failed",
+          OMException.ResultCodes.PERMISSION_DENIED, ome.getResult());
+    }
+
+    // perform rename
+    try {
+      ozoneBucket.renameKey("a/b1", "a/b1_renamed");
+      Assert.fail("Should throw permission denied !");
+    } catch (OMException ome) {
+      // expect permission error
+      Assert.assertEquals("Permission check failed",
+          OMException.ResultCodes.PERMISSION_DENIED, ome.getResult());
+    }
+
+    // Test case 3
+    // delete b3 and this shouldn't throw exception because acls have not
+    // been removed from subpaths.
+    ozoneBucket.deleteDirectory("a/b3", true);
+  }
+
+  private void removeAclsFromKey(ObjectStore objectStore,
+      OzoneBucket ozoneBucket, String key) throws IOException {
+    OzoneObj ozoneObj = OzoneObjInfo.Builder.newBuilder().setKeyName(key)
+        .setBucketName(ozoneBucket.getName())
+        .setVolumeName(ozoneBucket.getVolumeName())
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .setResType(OzoneObj.ResourceType.KEY).build();
+    List<OzoneAcl> aclList1 = objectStore.getAcl(ozoneObj);
+    for (OzoneAcl acl : aclList1) {
+      objectStore.removeAcl(ozoneObj, acl);
+    }
+  }
+
+  /**
+   * Create a MiniOzoneCluster for testing.
+   */
+  private void startCluster() throws Exception {
+
+    OzoneConfiguration conf = new OzoneConfiguration();
+    String clusterId = UUID.randomUUID().toString();
+    String scmId = UUID.randomUUID().toString();
+    String omId = UUID.randomUUID().toString();
+
+    // Use native impl here, default impl doesn't do actual checks
+    conf.set(OZONE_ACL_AUTHORIZER_CLASS, OZONE_ACL_AUTHORIZER_CLASS_NATIVE);
+    // Note: OM doesn't support live config reloading
+    conf.setBoolean(OZONE_ACL_ENABLED, true);
+
+    TestOMRequestUtils.configureFSOptimizedPaths(conf, true,
+        OMConfigKeys.OZONE_OM_LAYOUT_VERSION_V1);
+
+    cluster =
+        MiniOzoneCluster.newBuilder(conf).setClusterId(clusterId)
+            .setScmId(scmId).setOmId(omId).build();
+    cluster.waitForClusterToBeReady();
+
+  }
+
+  @After
+  public void stopCluster() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private void createVolumeWithOwnerAndAcl(ObjectStore objectStore,
+      String volumeName, String ownerName, String aclString)
+      throws IOException {
+    ClientProtocol proxy = objectStore.getClientProxy();
+    objectStore.createVolume(volumeName);
+    proxy.setVolumeOwner(volumeName, ownerName);
+    setVolumeAcl(objectStore, volumeName, aclString);
+  }
+
+  /**
+   * Helper function to set volume ACL.
+   */
+  private void setVolumeAcl(ObjectStore objectStore, String volumeName,
+      String aclString) throws IOException {
+    OzoneObj obj = OzoneObjInfo.Builder.newBuilder().setVolumeName(volumeName)
+        .setResType(OzoneObj.ResourceType.VOLUME).setStoreType(OZONE).build();
+    Assert.assertTrue(objectStore.setAcl(obj, OzoneAcl.parseAcls(aclString)));
+  }
+
+  /**
+   * Helper function to set bucket ACL.
+   */
+  private void setBucketAcl(ObjectStore objectStore, String volumeName,
+      String bucket, String aclString) throws IOException {
+    OzoneObj obj = OzoneObjInfo.Builder.newBuilder().setVolumeName(volumeName)
+        .setBucketName(bucket).setResType(OzoneObj.ResourceType.BUCKET)
+        .setStoreType(OZONE).build();
+    Assert.assertTrue(objectStore.setAcl(obj, OzoneAcl.parseAcls(aclString)));
+  }
+
+  /**
+   * Helper function to set key ACL.
+   */
+  private void setKeyAcl(ObjectStore objectStore, String volumeName,
+      String bucket, String key, String aclString) throws IOException {
+    OzoneObj obj = OzoneObjInfo.Builder.newBuilder().setVolumeName(volumeName)
+        .setBucketName(bucket).setKeyName(key)
+        .setResType(OzoneObj.ResourceType.KEY).setStoreType(OZONE).build();
+    Assert.assertTrue(objectStore.setAcl(obj, OzoneAcl.parseAcls(aclString)));
+  }
+
+  private void createKeys(ObjectStore objectStore, OzoneBucket ozoneBucket,
+      List<String> keys) throws Exception {
+    int length = 10;
+    String aclWorldAll = "world::a";
+    byte[] input = new byte[length];
+    Arrays.fill(input, (byte) 96);
+    for (String key : keys) {
+      createKey(ozoneBucket, key, 10, input);
+      setKeyAcl(objectStore, ozoneBucket.getVolumeName(), 
ozoneBucket.getName(),
+          key, aclWorldAll);
+    }
+  }
+
+  private void createKey(OzoneBucket ozoneBucket, String key, int length,
+      byte[] input) throws Exception {
+    OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(key, length);
+    ozoneOutputStream.write(input);
+    ozoneOutputStream.write(input, 0, 10);
+    ozoneOutputStream.close();
+  }
+
+}
+
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 38a4307..d751154 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.Stack;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
@@ -1764,6 +1765,12 @@ public class KeyManagerImpl implements KeyManager {
       if (context.getAclRights() == IAccessAuthorizer.ACLType.WRITE) {
         keyInfo = metadataManager.getOpenKeyTable().get(objectKey);
       } else {
+        // Recursive check is done only for ACL_TYPE DELETE
+        // Rename and delete operations will send ACL_TYPE DELETE
+        if (context.isRecursiveAccessCheck()
+            && context.getAclRights() == IAccessAuthorizer.ACLType.DELETE) {
+          return checkChildrenAcls(ozObject, context);
+        }
         try {
           OzoneFileStatus fileStatus = getFileStatus(args);
           keyInfo = fileStatus.getKeyInfo();
@@ -1809,6 +1816,52 @@ public class KeyManagerImpl implements KeyManager {
   }
 
   /**
+   * check acls for all subpaths of a directory.
+   *
+   * @param ozObject
+   * @param context
+   * @return
+   * @throws IOException
+   */
+  private boolean checkChildrenAcls(OzoneObj ozObject, RequestContext context)
+      throws IOException {
+    OmKeyInfo keyInfo;
+    OzoneFileStatus ozoneFileStatus =
+        ozObject.getOzonePrefixPathViewer().getOzoneFileStatus();
+    keyInfo = ozoneFileStatus.getKeyInfo();
+    // Using stack to check acls for subpaths
+    Stack<OzoneFileStatus> directories = new Stack<>();
+    // check whether given file/dir  has access
+    boolean hasAccess = OzoneAclUtil.checkAclRight(keyInfo.getAcls(), context);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("user:{} has access rights for key:{} :{} ",
+          context.getClientUgi(), ozObject.getKeyName(), hasAccess);
+    }
+    if (ozoneFileStatus.isDirectory() && hasAccess) {
+      directories.add(ozoneFileStatus);
+    }
+    while (!directories.isEmpty() && hasAccess) {
+      ozoneFileStatus = directories.pop();
+      String keyPath = ozoneFileStatus.getTrimmedName();
+      Iterator<? extends OzoneFileStatus> children =
+          ozObject.getOzonePrefixPathViewer().getChildren(keyPath);
+      while (hasAccess && children.hasNext()) {
+        ozoneFileStatus = children.next();
+        keyInfo = ozoneFileStatus.getKeyInfo();
+        hasAccess = OzoneAclUtil.checkAclRight(keyInfo.getAcls(), context);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("user:{} has access rights for key:{} :{} ",
+              context.getClientUgi(), keyInfo.getKeyName(), hasAccess);
+        }
+        if (hasAccess && ozoneFileStatus.isDirectory()) {
+          directories.add(ozoneFileStatus);
+        }
+      }
+    }
+    return hasAccess;
+  }
+
+  /**
    * Helper method to validate ozone object.
    * @param obj
    * */

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to