This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 447408585d HDDS-8653. Let directory inherit parent default ACLs (#4738)
447408585d is described below

commit 447408585deb3dec864e4920986d4e6e579b0bc7
Author: Hongbing Wang <[email protected]>
AuthorDate: Wed Aug 16 16:19:15 2023 +0800

    HDDS-8653. Let directory inherit parent default ACLs (#4738)
---
 hadoop-hdds/docs/content/security/SecurityAcls.md  |   5 +-
 .../docs/content/security/SecurityAcls.zh.md       |   6 +-
 .../hadoop/ozone/om/helpers/OzoneAclUtil.java      |   9 ++
 .../hadoop/ozone/om/TestRecursiveAclWithFSO.java   |  34 ++---
 .../om/request/file/OMDirectoryCreateRequest.java  |  33 ++---
 .../file/OMDirectoryCreateRequestWithFSO.java      |  49 ++++---
 .../ozone/om/request/file/OMFileCreateRequest.java |   7 +-
 .../request/file/OMFileCreateRequestWithFSO.java   |  11 +-
 .../ozone/om/request/file/OMFileRequest.java       |  46 +++----
 .../ozone/om/request/key/OMKeyCreateRequest.java   |  16 +--
 .../om/request/key/OMKeyCreateRequestWithFSO.java  |  11 +-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  | 107 ++++++++++-----
 .../S3InitiateMultipartUploadRequest.java          |  11 +-
 .../S3InitiateMultipartUploadRequestWithFSO.java   |  13 +-
 .../request/file/TestOMDirectoryCreateRequest.java |  81 ++++++++++++
 .../file/TestOMDirectoryCreateRequestWithFSO.java  |  84 ++++++++++++
 .../om/request/file/TestOMFileCreateRequest.java   | 147 +++++++++++++++++++++
 .../file/TestOMFileCreateRequestWithFSO.java       |   6 +
 .../om/request/key/TestOMKeyCreateRequest.java     |  75 +++++++++++
 .../request/key/TestOMKeyCreateRequestWithFSO.java |   6 +
 .../TestS3InitiateMultipartUploadRequest.java      |  83 ++++++++++++
 ...estS3InitiateMultipartUploadRequestWithFSO.java | 111 ++++++++++++++++
 22 files changed, 810 insertions(+), 141 deletions(-)

diff --git a/hadoop-hdds/docs/content/security/SecurityAcls.md 
b/hadoop-hdds/docs/content/security/SecurityAcls.md
index f25cedd82a..9976cbbc4f 100644
--- a/hadoop-hdds/docs/content/security/SecurityAcls.md
+++ b/hadoop-hdds/docs/content/security/SecurityAcls.md
@@ -81,7 +81,10 @@ allows the user to overwrite an existing ozone key.
 Where an _scope_ can be:
 
 1. **ACCESS** – Access ACL is applied only to the specific object and not 
inheritable. It controls the access to the object itself.
-2. **DEFAULT** - Default ACL is applied to the specific object and will be 
inherited by object's descendants. Default ACLs cannot be set on keys (as there 
can be no objects under a key).
+2. **DEFAULT** - Default ACL is applied to the specific object and will be 
inherited by object's descendants. Default ACLs cannot be set on keys (as there 
can be no objects under a key). <br>
+_Note_: ACLs inherited from parent's Default ACLs will follow the following 
rules based on different bucket layout:
+    - **Legacy with EnableFileSystem or FSO**: inherit the immediate parent's 
DEFAULT ACLs. If none, inherit the bucket DEFAULT ACLs. 
+    - **Legacy with DisableFileSystem or OBS**: inherit the bucket DEFAULT 
ACLs.
 
 ## Ozone Native ACL APIs
 
diff --git a/hadoop-hdds/docs/content/security/SecurityAcls.zh.md 
b/hadoop-hdds/docs/content/security/SecurityAcls.zh.md
index 17e1507f79..3d95fcf087 100644
--- a/hadoop-hdds/docs/content/security/SecurityAcls.zh.md
+++ b/hadoop-hdds/docs/content/security/SecurityAcls.zh.md
@@ -67,8 +67,10 @@ _权限_ 可选的值包括::
 _范围_ 可选的值包括::
 
 1. **ACCESS** – 这类 ACL 仅作用于对象本身,不能被继承。它控制对对象本身的访问。
-2. **DEFAULT** - 这类 ACL 不仅作用于对象本身,还会被对象的后代继承。不能在叶子对象上设置该类 ACL(因为叶子对象下不能再有其他对象)。
-
+2. **DEFAULT** - 这类 ACL 不仅作用于对象本身,还会被对象的后代继承。不能在叶子对象上设置该类 
ACL(因为叶子对象下不能再有其他对象)。 <br>
+_注意_:从父级默认 ACL 继承的 ACL, 将根据不同的桶布局遵循以下规则:
+   - **启用文件系统的 Legacy 或 FSO**:继承直接父目录的默认ACL。如果直接父目录没有默认ACL,则继承存储桶的默认ACL。
+   - **禁用文件系统的 Legacy 或 OBS**:继承桶的默认ACL。
 
 ## Ozone 原生 ACL API
 
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneAclUtil.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneAclUtil.java
index 48c8a6b562..134675cdce 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneAclUtil.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneAclUtil.java
@@ -180,6 +180,15 @@ public final class OzoneAclUtil {
     return false;
   }
 
+  /**
+   * Helper function to convert the scope of ACLs to DEFAULT.
+   * This method is called in ACL inheritance scenarios.
+   * @param acls
+   */
+  public static void toDefaultScope(List<OzoneAcl> acls) {
+    acls.forEach(a -> a.setAclScope(DEFAULT));
+  }
+
   /**
    * Convert a list of OzoneAclInfo(protoc) to list of OzoneAcl(java).
    * @param protoAcls
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestRecursiveAclWithFSO.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestRecursiveAclWithFSO.java
index 36f5eb9c04..0d3549b342 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestRecursiveAclWithFSO.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestRecursiveAclWithFSO.java
@@ -42,6 +42,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -86,13 +87,10 @@ public class TestRecursiveAclWithFSO {
     /* r = READ, w = WRITE, c = CREATE, d = DELETE
        l = LIST, a = ALL, n = NONE, x = READ_ACL, y = WRITE_ACL */
     String aclWorldAll = "world::a";
-
     List<String> keys = new ArrayList<>();
     // Create volumes with user1
-
     try (OzoneClient client = cluster.newClient()) {
       ObjectStore objectStore = client.getObjectStore();
-
       createVolumeWithOwnerAndAcl(objectStore, "volume1", "user1", 
aclWorldAll);
     }
 
@@ -100,9 +98,7 @@ public class TestRecursiveAclWithFSO {
     UserGroupInformation.setLoginUser(user1);
     try (OzoneClient client = cluster.newClient()) {
       ObjectStore objectStore = client.getObjectStore();
-
       OzoneVolume volume = objectStore.getVolume("volume1");
-
       BucketArgs omBucketArgs =
           BucketArgs.newBuilder().setStorageType(StorageType.DISK).build();
 
@@ -134,7 +130,7 @@ public class TestRecursiveAclWithFSO {
        *     Try deleting b2
        *
        *     Test case 2:
-       *     Remove delete acl fro dir c2
+       *     Remove delete acl from dir c2
        *     Try deleting b1
        *
        *     Test case 3
@@ -147,7 +143,6 @@ public class TestRecursiveAclWithFSO {
       String keyf4 = "a/b2/d2/d21/f4";
       String keyf5 = "/a/b3/e1/f5";
       String keyf6 = "/a/b3/e2/f6";
-
       String file1 = "a/" + "file" + RandomStringUtils.randomNumeric(5);
       String file2 = "a/b2/d2/" + "file" + RandomStringUtils.randomNumeric(5);
 
@@ -159,7 +154,6 @@ public class TestRecursiveAclWithFSO {
       keys.add(keyf6);
       keys.add(file1);
       keys.add(file2);
-
       createKeys(objectStore, ozoneBucket, keys);
 
       // Test case 1
@@ -184,7 +178,6 @@ public class TestRecursiveAclWithFSO {
         assertEquals(OMException.ResultCodes.PERMISSION_DENIED,
             ome.getResult(), "Permission check failed");
       }
-
       // perform rename
       try {
         ozoneBucket.renameKey("a/b2", "a/b2_renamed");
@@ -198,9 +191,14 @@ public class TestRecursiveAclWithFSO {
       // Test case 2
       // Remove acl from directory c2, delete/rename a/b1 should throw
       // permission denied since c2 is a subdirectory
-
-      UserGroupInformation.setLoginUser(user1);
-      removeAclsFromKey(objectStore, ozoneBucket, "a/b1/c2");
+      user1.doAs((PrivilegedExceptionAction<Void>) () -> {
+        try (OzoneClient c = cluster.newClient()) {
+          ObjectStore o = c.getObjectStore();
+          OzoneBucket b = o.getVolume("volume1").getBucket("bucket1");
+          removeAclsFromKey(o, b, "a/b1/c2");
+        }
+        return null;
+      });
 
       UserGroupInformation.setLoginUser(user2);
       // perform  delete
@@ -224,9 +222,15 @@ public class TestRecursiveAclWithFSO {
       }
 
       // Test case 3
-      // delete b3 and this shouldn't throw exception because acls have not
-      // been removed from subpaths.
-      ozoneBucket.deleteDirectory("a/b3", true);
+      // delete b3 and this should throw exception because user2 has no acls
+      try {
+        ozoneBucket.deleteDirectory("a/b3", true);
+        fail("Should throw permission denied !");
+      } catch (OMException ome) {
+        // expect permission error
+        assertEquals(OMException.ResultCodes.PERMISSION_DENIED,
+            ome.getResult(), "Permission check failed");
+      }
     }
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
index 73f30c80aa..44f45bbdb7 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
@@ -30,9 +30,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
-import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -200,18 +198,17 @@ public class OMDirectoryCreateRequest extends 
OMKeyRequest {
           omDirectoryResult == NONE) {
         List<String> missingParents = omPathInfo.getMissingParents();
         long baseObjId = ozoneManager.getObjectIdFromTxId(trxnLogIndex);
-        List<OzoneAcl> inheritAcls = omPathInfo.getAcls();
+        OmBucketInfo omBucketInfo =
+            getBucketInfo(omMetadataManager, volumeName, bucketName);
 
         dirKeyInfo = createDirectoryKeyInfoWithACL(keyName, keyArgs, baseObjId,
-            OzoneAclUtil.fromProtobuf(keyArgs.getAclsList()), trxnLogIndex,
+            omBucketInfo, omPathInfo, trxnLogIndex,
             ozoneManager.getDefaultReplicationConfig());
 
         missingParentInfos = getAllParentInfo(ozoneManager, keyArgs,
-            missingParents, inheritAcls, trxnLogIndex);
+            missingParents, omBucketInfo, omPathInfo, trxnLogIndex);
 
         numMissingParents = missingParentInfos.size();
-        OmBucketInfo omBucketInfo =
-            getBucketInfo(omMetadataManager, volumeName, bucketName);
         checkBucketQuotaInNamespace(omBucketInfo, numMissingParents + 1L);
         omBucketInfo.incrUsedNamespace(numMissingParents + 1L);
 
@@ -257,14 +254,16 @@ public class OMDirectoryCreateRequest extends 
OMKeyRequest {
    * @param ozoneManager
    * @param keyArgs
    * @param missingParents list of parent directories to be created
-   * @param inheritAcls ACLs to be assigned to each new parent dir
+   * @param bucketInfo
+   * @param omPathInfo
    * @param trxnLogIndex
    * @return
    * @throws IOException
    */
   public static List<OmKeyInfo> getAllParentInfo(OzoneManager ozoneManager,
-      KeyArgs keyArgs, List<String> missingParents, List<OzoneAcl> inheritAcls,
-      long trxnLogIndex) throws IOException {
+      KeyArgs keyArgs, List<String> missingParents, OmBucketInfo bucketInfo,
+      OMFileRequest.OMPathInfo omPathInfo, long trxnLogIndex)
+      throws IOException {
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
     List<OmKeyInfo> missingParentInfos = new ArrayList<>();
 
@@ -290,10 +289,10 @@ public class OMDirectoryCreateRequest extends 
OMKeyRequest {
       }
 
       LOG.debug("missing parent {} getting added to KeyTable", missingKey);
-      // what about keyArgs for parent directories? TODO
+
       OmKeyInfo parentKeyInfo =
           createDirectoryKeyInfoWithACL(missingKey, keyArgs, nextObjId,
-              inheritAcls, trxnLogIndex,
+              bucketInfo, omPathInfo, trxnLogIndex,
               ozoneManager.getDefaultReplicationConfig());
       objectCount++;
 
@@ -349,15 +348,19 @@ public class OMDirectoryCreateRequest extends 
OMKeyRequest {
    * @param keyName
    * @param keyArgs
    * @param objectId
+   * @param bucketInfo
+   * @param omPathInfo
    * @param transactionIndex
    * @param serverDefaultReplConfig
    * @return the OmKeyInfo structure
    */
   public static OmKeyInfo createDirectoryKeyInfoWithACL(String keyName,
-      KeyArgs keyArgs, long objectId, List<OzoneAcl> inheritAcls,
-      long transactionIndex, ReplicationConfig serverDefaultReplConfig) {
+      KeyArgs keyArgs, long objectId, OmBucketInfo bucketInfo,
+      OMFileRequest.OMPathInfo omPathInfo, long transactionIndex,
+      ReplicationConfig serverDefaultReplConfig) {
     return dirKeyInfoBuilderNoACL(keyName, keyArgs, objectId,
-        serverDefaultReplConfig).setAcls(inheritAcls)
+        serverDefaultReplConfig)
+        .setAcls(getAclsForDir(keyArgs, bucketInfo, omPathInfo))
         .setUpdateID(transactionIndex).build();
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestWithFSO.java
index 081efd42e7..61ada892d6 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestWithFSO.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.ozone.om.request.file;
 
-import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.audit.AuditLogger;
 import org.apache.hadoop.ozone.audit.OMAction;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -28,7 +27,6 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
-import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -151,10 +149,12 @@ public class OMDirectoryCreateRequestWithFSO extends 
OMDirectoryCreateRequest {
       } else if (omDirectoryResult == DIRECTORY_EXISTS_IN_GIVENPATH ||
           omDirectoryResult == NONE) {
 
+        OmBucketInfo omBucketInfo =
+            getBucketInfo(omMetadataManager, volumeName, bucketName);
         // prepare all missing parents
         missingParentInfos =
-                OMDirectoryCreateRequestWithFSO.getAllMissingParentDirInfo(
-                        ozoneManager, keyArgs, omPathInfo, trxnLogIndex);
+            OMDirectoryCreateRequestWithFSO.getAllMissingParentDirInfo(
+                ozoneManager, keyArgs, omBucketInfo, omPathInfo, trxnLogIndex);
 
         final long volumeId = omMetadataManager.getVolumeId(volumeName);
         final long bucketId = omMetadataManager
@@ -162,8 +162,6 @@ public class OMDirectoryCreateRequestWithFSO extends 
OMDirectoryCreateRequest {
 
         // total number of keys created.
         numKeysCreated = missingParentInfos.size() + 1;
-        OmBucketInfo omBucketInfo =
-            getBucketInfo(omMetadataManager, volumeName, bucketName);
         checkBucketQuotaInNamespace(omBucketInfo, numKeysCreated);
         omBucketInfo.incrUsedNamespace(numKeysCreated);
 
@@ -172,7 +170,7 @@ public class OMDirectoryCreateRequestWithFSO extends 
OMDirectoryCreateRequest {
             omPathInfo.getLeafNodeName(),
             keyArgs, omPathInfo.getLeafNodeObjectId(),
             omPathInfo.getLastKnownParentId(), trxnLogIndex,
-            OzoneAclUtil.fromProtobuf(keyArgs.getAclsList()));
+            omBucketInfo, omPathInfo);
         OMFileRequest.addDirectoryTableCacheEntries(omMetadataManager,
             volumeId, bucketId, trxnLogIndex,
             missingParentInfos, dirInfo);
@@ -254,9 +252,9 @@ public class OMDirectoryCreateRequestWithFSO extends 
OMDirectoryCreateRequest {
    * @throws IOException DB failure
    */
   public static List<OmDirectoryInfo> getAllMissingParentDirInfo(
-          OzoneManager ozoneManager, KeyArgs keyArgs,
-          OMFileRequest.OMPathInfoWithFSO pathInfo, long trxnLogIndex)
-          throws IOException {
+      OzoneManager ozoneManager, KeyArgs keyArgs, OmBucketInfo bucketInfo,
+      OMFileRequest.OMPathInfoWithFSO pathInfo, long trxnLogIndex)
+      throws IOException {
     List<OmDirectoryInfo> missingParentInfos = new ArrayList<>();
 
     // The base id is left shifted by 8 bits for creating space to
@@ -273,7 +271,6 @@ public class OMDirectoryCreateRequestWithFSO extends 
OMDirectoryCreateRequest {
 
     long lastKnownParentId = pathInfo.getLastKnownParentId();
     List<String> missingParents = pathInfo.getMissingParents();
-    List<OzoneAcl> inheritAcls = pathInfo.getAcls();
     for (String missingKey : missingParents) {
       long nextObjId = baseObjId + objectCount;
       if (nextObjId > maxObjId) {
@@ -286,7 +283,8 @@ public class OMDirectoryCreateRequestWithFSO extends 
OMDirectoryCreateRequest {
       LOG.debug("missing parent {} getting added to DirectoryTable",
               missingKey);
       OmDirectoryInfo dirInfo = createDirectoryInfoWithACL(missingKey,
-              keyArgs, nextObjId, lastKnownParentId, trxnLogIndex, 
inheritAcls);
+          keyArgs, nextObjId, lastKnownParentId, trxnLogIndex,
+          bucketInfo, pathInfo);
       objectCount++;
 
       missingParentInfos.add(dirInfo);
@@ -301,28 +299,27 @@ public class OMDirectoryCreateRequestWithFSO extends 
OMDirectoryCreateRequest {
 
   /**
    * Fill in a DirectoryInfo for a new directory entry in OM database.
-   * without initializing ACLs from the KeyArgs - used for intermediate
-   * directories which get created internally/recursively during file
-   * and directory create.
    * @param dirName
    * @param keyArgs
    * @param objectId
    * @param parentObjectId
-   * @param inheritAcls
+   * @param bucketInfo
+   * @param omPathInfo
    * @return the OmDirectoryInfo structure
    */
   private static OmDirectoryInfo createDirectoryInfoWithACL(
-          String dirName, KeyArgs keyArgs, long objectId,
-          long parentObjectId, long transactionIndex,
-          List<OzoneAcl> inheritAcls) {
+      String dirName, KeyArgs keyArgs, long objectId,
+      long parentObjectId, long transactionIndex,
+      OmBucketInfo bucketInfo, OMFileRequest.OMPathInfo omPathInfo) {
 
     return OmDirectoryInfo.newBuilder()
-            .setName(dirName)
-            .setCreationTime(keyArgs.getModificationTime())
-            .setModificationTime(keyArgs.getModificationTime())
-            .setObjectID(objectId)
-            .setUpdateID(transactionIndex)
-            .setParentObjectID(parentObjectId)
-            .setAcls(inheritAcls).build();
+        .setName(dirName)
+        .setCreationTime(keyArgs.getModificationTime())
+        .setModificationTime(keyArgs.getModificationTime())
+        .setObjectID(objectId)
+        .setUpdateID(transactionIndex)
+        .setParentObjectID(parentObjectId)
+        .setAcls(getAclsForDir(keyArgs, bucketInfo, omPathInfo))
+        .build();
   }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
index d004dc22b7..f0bc1f5639 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
@@ -29,7 +29,6 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.ozone.OmUtils;
-import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OzoneConfigUtil;
@@ -233,7 +232,6 @@ public class OMFileCreateRequest extends OMKeyRequest {
               bucketName, keyName, Paths.get(keyName));
       OMFileRequest.OMDirectoryResult omDirectoryResult =
           pathInfo.getDirectoryResult();
-      List<OzoneAcl> inheritAcls = pathInfo.getAcls();
 
       // Check if a file or directory exists with same key name.
       checkDirectoryResult(keyName, isOverWrite, omDirectoryResult);
@@ -253,7 +251,7 @@ public class OMFileCreateRequest extends OMKeyRequest {
 
       omKeyInfo = prepareKeyInfo(omMetadataManager, keyArgs, dbKeyInfo,
           keyArgs.getDataSize(), locations, getFileEncryptionInfo(keyArgs),
-          ozoneManager.getPrefixManager(), omBucketInfo, trxnLogIndex,
+          ozoneManager.getPrefixManager(), omBucketInfo, pathInfo, 
trxnLogIndex,
           ozoneManager.getObjectIdFromTxId(trxnLogIndex),
           ozoneManager.isRatisEnabled(), repConfig);
 
@@ -264,7 +262,8 @@ public class OMFileCreateRequest extends OMKeyRequest {
 
       missingParentInfos = OMDirectoryCreateRequest
           .getAllParentInfo(ozoneManager, keyArgs,
-              pathInfo.getMissingParents(), inheritAcls, trxnLogIndex);
+              pathInfo.getMissingParents(), omBucketInfo,
+              pathInfo, trxnLogIndex);
 
       // Append new blocks
       List<OmKeyLocationInfo> newLocationList = keyArgs.getKeyLocationsList()
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestWithFSO.java
index 279d564b94..792be15f27 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestWithFSO.java
@@ -156,17 +156,18 @@ public class OMFileCreateRequestWithFSO extends 
OMFileCreateRequest {
         checkAllParentsExist(keyArgs, pathInfoFSO);
       }
 
+      // do open key
+      OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().get(
+          omMetadataManager.getBucketKey(volumeName, bucketName));
       // add all missing parents to dir table
+
       missingParentInfos =
-              OMDirectoryCreateRequestWithFSO.getAllMissingParentDirInfo(
-                      ozoneManager, keyArgs, pathInfoFSO, trxnLogIndex);
+          OMDirectoryCreateRequestWithFSO.getAllMissingParentDirInfo(
+              ozoneManager, keyArgs, bucketInfo, pathInfoFSO, trxnLogIndex);
 
       // total number of keys created.
       numKeysCreated = missingParentInfos.size();
 
-      // do open key
-      OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().get(
-          omMetadataManager.getBucketKey(volumeName, bucketName));
       final ReplicationConfig repConfig = OzoneConfigUtil
           .resolveReplicationConfigPreference(keyArgs.getType(),
               keyArgs.getFactor(), keyArgs.getEcReplicationConfig(),
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
index 17565fec5d..2e547b51eb 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
@@ -96,7 +96,9 @@ public final class OMFileRequest {
     String dirNameFromDetails = omMetadataManager.getOzoneDirKey(volumeName,
         bucketName, keyName);
     List<String> missing = new ArrayList<>();
-    List<OzoneAcl> inheritAcls = new ArrayList<>();
+    // Get parent all acls including ACCESS and DEFAULT acls
+    // The logic of specific inherited acl should be when creating dir/file
+    List<OzoneAcl> acls = new ArrayList<>();
     OMDirectoryResult result = OMDirectoryResult.NONE;
 
     while (keyPath != null) {
@@ -126,11 +128,10 @@ public final class OMFileRequest {
           result = OMDirectoryResult.DIRECTORY_EXISTS;
         } else {
           result = OMDirectoryResult.DIRECTORY_EXISTS_IN_GIVENPATH;
-          inheritAcls = omMetadataManager.getKeyTable(
+          acls = omMetadataManager.getKeyTable(
               getBucketLayout(omMetadataManager, volumeName, bucketName))
               .get(dbDirKeyName).getAcls();
-          LOG.trace("Acls inherited from parent " + dbDirKeyName + " are : "
-              + inheritAcls);
+          LOG.trace("Acls from parent {} are : {}", dbDirKeyName, acls);
         }
       } else {
         if (!dbDirKeyName.equals(dirNameFromDetails)) {
@@ -140,26 +141,25 @@ public final class OMFileRequest {
 
       if (result != OMDirectoryResult.NONE) {
 
-        LOG.trace("verifyFiles in Path : " + "/" + volumeName
-            + "/" + bucketName + "/" + keyName + ":" + result);
-        return new OMPathInfo(missing, result, inheritAcls);
+        LOG.trace("verifyFiles in Path : /{}/{}/{} : {}",
+            volumeName, bucketName, keyName, result);
+        return new OMPathInfo(missing, result, acls);
       }
       keyPath = keyPath.getParent();
     }
 
-    if (inheritAcls.isEmpty()) {
+    if (acls.isEmpty()) {
       String bucketKey = omMetadataManager.getBucketKey(volumeName,
           bucketName);
-      inheritAcls = omMetadataManager.getBucketTable().get(bucketKey)
+      acls = omMetadataManager.getBucketTable().get(bucketKey)
           .getAcls();
-      LOG.trace("Acls inherited from bucket " + bucketName + " are : "
-          + inheritAcls);
+      LOG.trace("Acls from bucket {} are : {}", bucketName, acls);
     }
 
-    LOG.trace("verifyFiles in Path : " + volumeName + "/" + bucketName + "/"
-        + keyName + ":" + result);
+    LOG.trace("verifyFiles in Path : /{}/{}/{} : {}",
+        volumeName, bucketName, keyName, result);
     // Found no files/ directories in the given path.
-    return new OMPathInfo(missing, OMDirectoryResult.NONE, inheritAcls);
+    return new OMPathInfo(missing, OMDirectoryResult.NONE, acls);
   }
 
   /**
@@ -195,8 +195,9 @@ public final class OMFileRequest {
     final long bucketId = omMetadataManager.getBucketId(volumeName,
             bucketName);
 
-    // by default, inherit bucket ACLs
-    List<OzoneAcl> inheritAcls = omBucketInfo.getAcls();
+    // Get parent all acls including ACCESS and DEFAULT acls
+    // The logic of specific inherited acl should be when creating dir/file
+    List<OzoneAcl> acls = omBucketInfo.getAcls();
 
     long lastKnownParentId = omBucketInfo.getObjectID();
     String dbDirName = ""; // absolute path for trace logs
@@ -230,7 +231,7 @@ public final class OMFileRequest {
         if (elements.hasNext()) {
           result = OMDirectoryResult.DIRECTORY_EXISTS_IN_GIVENPATH;
           lastKnownParentId = omDirInfo.getObjectID();
-          inheritAcls = omDirInfo.getAcls();
+          acls = omDirInfo.getAcls();
           continue;
         } else {
           // Checked all the sub-dirs till the leaf node.
@@ -261,22 +262,21 @@ public final class OMFileRequest {
       }
     }
 
-    LOG.trace("verifyFiles/Directories in Path : " + "/" + volumeName
-            + "/" + bucketName + "/" + keyName + ":" + result);
+    LOG.trace("verifyFiles/Directories in Path : /{}/{}/{} : {}",
+        volumeName, bucketName, keyName, result);
 
     if (result == OMDirectoryResult.FILE_EXISTS_IN_GIVENPATH || result ==
             OMDirectoryResult.FILE_EXISTS) {
       return new OMPathInfoWithFSO(leafNodeName, lastKnownParentId, missing,
-              result, inheritAcls, fullKeyPath.toString());
+              result, acls, fullKeyPath.toString());
     }
 
     String dbDirKeyName = omMetadataManager.getOzoneDirKey(volumeName,
             bucketName, dbDirName);
-    LOG.trace("Acls inherited from parent " + dbDirKeyName + " are : "
-            + inheritAcls);
+    LOG.trace("Acls from parent {} are : {}", dbDirKeyName, acls);
 
     return new OMPathInfoWithFSO(leafNodeName, lastKnownParentId, missing,
-            result, inheritAcls);
+            result, acls);
   }
 
   /**
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
index c947891c7e..b40db65696 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
@@ -28,7 +28,6 @@ import java.util.stream.Collectors;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.ozone.OmUtils;
-import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OzoneConfigUtil;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -235,19 +234,19 @@ public class OMKeyCreateRequest extends OMKeyRequest {
           getBucketInfo(omMetadataManager, volumeName, bucketName);
 
       // If FILE_EXISTS we just override like how we used to do for Key Create.
-      List< OzoneAcl > inheritAcls;
       if (LOG.isDebugEnabled()) {
         LOG.debug("BucketName: {}, BucketLayout: {}",
             bucketInfo.getBucketName(), bucketInfo.getBucketLayout());
       }
+
+      OMFileRequest.OMPathInfo pathInfo = null;
+
       if (bucketInfo.getBucketLayout()
           .shouldNormalizePaths(ozoneManager.getEnableFileSystemPaths())) {
-        OMFileRequest.OMPathInfo pathInfo =
-            OMFileRequest.verifyFilesInPath(omMetadataManager, volumeName,
-                bucketName, keyName, Paths.get(keyName));
+        pathInfo = OMFileRequest.verifyFilesInPath(omMetadataManager,
+            volumeName, bucketName, keyName, Paths.get(keyName));
         OMFileRequest.OMDirectoryResult omDirectoryResult =
             pathInfo.getDirectoryResult();
-        inheritAcls = pathInfo.getAcls();
 
         // Check if a file or directory exists with same key name.
         if (omDirectoryResult == DIRECTORY_EXISTS) {
@@ -262,7 +261,8 @@ public class OMKeyCreateRequest extends OMKeyRequest {
 
         missingParentInfos = OMDirectoryCreateRequest
             .getAllParentInfo(ozoneManager, keyArgs,
-                pathInfo.getMissingParents(), inheritAcls, trxnLogIndex);
+                pathInfo.getMissingParents(), bucketInfo,
+                pathInfo, trxnLogIndex);
 
         numMissingParents = missingParentInfos.size();
       }
@@ -275,7 +275,7 @@ public class OMKeyCreateRequest extends OMKeyRequest {
 
       omKeyInfo = prepareKeyInfo(omMetadataManager, keyArgs, dbKeyInfo,
           keyArgs.getDataSize(), locations, getFileEncryptionInfo(keyArgs),
-          ozoneManager.getPrefixManager(), bucketInfo, trxnLogIndex,
+          ozoneManager.getPrefixManager(), bucketInfo, pathInfo, trxnLogIndex,
           ozoneManager.getObjectIdFromTxId(trxnLogIndex),
           ozoneManager.isRatisEnabled(), replicationConfig);
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
index 89f42bf85c..de6ed6b545 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
@@ -142,17 +142,18 @@ public class OMKeyCreateRequestWithFSO extends 
OMKeyCreateRequest {
             + " as there is already file in the given path", NOT_A_FILE);
       }
 
+      // do open key
+      OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().get(
+              omMetadataManager.getBucketKey(volumeName, bucketName));
+
       // add all missing parents to dir table
       missingParentInfos =
-              OMDirectoryCreateRequestWithFSO.getAllMissingParentDirInfo(
-                      ozoneManager, keyArgs, pathInfoFSO, trxnLogIndex);
+          OMDirectoryCreateRequestWithFSO.getAllMissingParentDirInfo(
+              ozoneManager, keyArgs, bucketInfo, pathInfoFSO, trxnLogIndex);
 
       // total number of keys created.
       numKeysCreated = missingParentInfos.size();
 
-      // do open key
-      OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().get(
-              omMetadataManager.getBucketKey(volumeName, bucketName));
       final ReplicationConfig repConfig = OzoneConfigUtil
           .resolveReplicationConfigPreference(keyArgs.getType(),
               keyArgs.getFactor(), keyArgs.getEcReplicationConfig(),
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 13bdbbca3a..749b0f3fe6 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -276,9 +276,10 @@ public abstract class OMKeyRequest extends OMClientRequest 
{
   }
 
   protected List< OzoneAcl > getAclsForKey(KeyArgs keyArgs,
-      OmBucketInfo bucketInfo, PrefixManager prefixManager) {
-    List<OzoneAcl> acls = new ArrayList<>();
+      OmBucketInfo bucketInfo, OMFileRequest.OMPathInfo omPathInfo,
+      PrefixManager prefixManager) {
 
+    List<OzoneAcl> acls = new ArrayList<>();
     if (keyArgs.getAclsList() != null) {
       acls.addAll(OzoneAclUtil.fromProtobuf(keyArgs.getAclsList()));
     }
@@ -302,8 +303,16 @@ public abstract class OMKeyRequest extends OMClientRequest 
{
       }
     }
 
+    // Inherit DEFAULT acls from parent-dir only if DEFAULT acls for
+    // prefix are not set
+    if (omPathInfo != null) {
+      if (OzoneAclUtil.inheritDefaultAcls(acls, omPathInfo.getAcls())) {
+        return acls;
+      }
+    }
+
     // Inherit DEFAULT acls from bucket only if DEFAULT acls for
-    // prefix are not set.
+    // parent-dir are not set.
     if (bucketInfo != null) {
       if (OzoneAclUtil.inheritDefaultAcls(acls, bucketInfo.getAcls())) {
         return acls;
@@ -313,6 +322,39 @@ public abstract class OMKeyRequest extends OMClientRequest 
{
     return acls;
   }
 
+  /**
+   * Inherit parent DEFAULT acls and generate its own ACCESS acls.
+   * @param keyArgs
+   * @param bucketInfo
+   * @param omPathInfo
+   * @return Acls which inherited parent DEFAULT and keyArgs ACCESS acls.
+   */
+  protected static List<OzoneAcl> getAclsForDir(KeyArgs keyArgs,
+      OmBucketInfo bucketInfo, OMFileRequest.OMPathInfo omPathInfo) {
+    // Acls inherited from parent or bucket will convert to DEFAULT scope
+    List<OzoneAcl> acls = new ArrayList<>();
+
+    // Inherit DEFAULT acls from parent-dir
+    if (omPathInfo != null) {
+      if (OzoneAclUtil.inheritDefaultAcls(acls, omPathInfo.getAcls())) {
+        OzoneAclUtil.toDefaultScope(acls);
+      }
+    }
+
+    // Inherit DEFAULT acls from bucket only if DEFAULT acls for
+    // parent-dir are not set.
+    if (acls.isEmpty() && bucketInfo != null) {
+      if (OzoneAclUtil.inheritDefaultAcls(acls, bucketInfo.getAcls())) {
+        OzoneAclUtil.toDefaultScope(acls);
+      }
+    }
+
+    // add itself acls
+    acls.addAll(OzoneAclUtil.fromProtobuf(keyArgs.getAclsList()));
+
+    return acls;
+  }
+
   /**
    * Check Acls for the ozone bucket.
    * @param ozoneManager
@@ -629,12 +671,13 @@ public abstract class OMKeyRequest extends 
OMClientRequest {
           @Nullable FileEncryptionInfo encInfo,
           @Nonnull PrefixManager prefixManager,
           @Nullable OmBucketInfo omBucketInfo,
+          OMFileRequest.OMPathInfo omPathInfo,
           long transactionLogIndex, long objectID, boolean isRatisEnabled,
           ReplicationConfig replicationConfig)
           throws IOException {
 
     return prepareFileInfo(omMetadataManager, keyArgs, dbKeyInfo, size,
-            locations, encInfo, prefixManager, omBucketInfo, null,
+            locations, encInfo, prefixManager, omBucketInfo, omPathInfo,
             transactionLogIndex, objectID, isRatisEnabled, replicationConfig);
   }
 
@@ -651,7 +694,7 @@ public abstract class OMKeyRequest extends OMClientRequest {
           @Nullable FileEncryptionInfo encInfo,
           @Nonnull PrefixManager prefixManager,
           @Nullable OmBucketInfo omBucketInfo,
-          OMFileRequest.OMPathInfoWithFSO omPathInfo,
+          OMFileRequest.OMPathInfo omPathInfo,
           long transactionLogIndex, long objectID,
           boolean isRatisEnabled, ReplicationConfig replicationConfig)
           throws IOException {
@@ -700,30 +743,32 @@ public abstract class OMKeyRequest extends 
OMClientRequest {
       @Nullable FileEncryptionInfo encInfo,
       @Nonnull PrefixManager prefixManager,
       @Nullable OmBucketInfo omBucketInfo,
-      OMFileRequest.OMPathInfoWithFSO omPathInfo,
-      long transactionLogIndex, long objectID
-  ) {
+      OMFileRequest.OMPathInfo omPathInfo,
+      long transactionLogIndex, long objectID) {
     OmKeyInfo.Builder builder = new OmKeyInfo.Builder();
     builder.setVolumeName(keyArgs.getVolumeName())
-        .setBucketName(keyArgs.getBucketName())
-        .setKeyName(keyArgs.getKeyName())
-        .setOmKeyLocationInfos(Collections.singletonList(
-            new OmKeyLocationInfoGroup(0, locations)))
-        .setCreationTime(keyArgs.getModificationTime())
-        .setModificationTime(keyArgs.getModificationTime())
-        .setDataSize(size)
-        .setReplicationConfig(replicationConfig)
-        .setFileEncryptionInfo(encInfo)
-        .setAcls(getAclsForKey(keyArgs, omBucketInfo, prefixManager))
-        .addAllMetadata(KeyValueUtil.getFromProtobuf(
-            keyArgs.getMetadataList()))
-        .setUpdateID(transactionLogIndex)
-        .setFile(true);
-    if (omPathInfo != null) {
+            .setBucketName(keyArgs.getBucketName())
+            .setKeyName(keyArgs.getKeyName())
+            .setOmKeyLocationInfos(Collections.singletonList(
+                    new OmKeyLocationInfoGroup(0, locations)))
+            .setCreationTime(keyArgs.getModificationTime())
+            .setModificationTime(keyArgs.getModificationTime())
+            .setDataSize(size)
+            .setReplicationConfig(replicationConfig)
+            .setFileEncryptionInfo(encInfo)
+            .setAcls(getAclsForKey(
+                keyArgs, omBucketInfo, omPathInfo, prefixManager))
+            .addAllMetadata(KeyValueUtil.getFromProtobuf(
+                    keyArgs.getMetadataList()))
+            .setUpdateID(transactionLogIndex)
+            .setFile(true);
+    if (omPathInfo instanceof OMFileRequest.OMPathInfoWithFSO) {
       // FileTable metadata format
-      objectID = omPathInfo.getLeafNodeObjectId();
-      builder.setParentObjectID(omPathInfo.getLastKnownParentId());
-      builder.setFileName(omPathInfo.getLeafNodeName());
+      OMFileRequest.OMPathInfoWithFSO omPathInfoFSO
+          = (OMFileRequest.OMPathInfoWithFSO) omPathInfo;
+      objectID = omPathInfoFSO.getLeafNodeObjectId();
+      builder.setParentObjectID(omPathInfoFSO.getLastKnownParentId());
+      builder.setFileName(omPathInfoFSO.getLeafNodeName());
     }
     builder.setObjectID(objectID);
     return builder.build();
@@ -742,7 +787,7 @@ public abstract class OMKeyRequest extends OMClientRequest {
           @Nonnull List<OmKeyLocationInfo> locations,
           FileEncryptionInfo encInfo,  @Nonnull PrefixManager prefixManager,
           @Nullable OmBucketInfo omBucketInfo,
-          OMFileRequest.OMPathInfoWithFSO omPathInfo,
+          OMFileRequest.OMPathInfo omPathInfo,
           @Nonnull long transactionLogIndex, long objectID)
           throws IOException {
 
@@ -755,15 +800,17 @@ public abstract class OMKeyRequest extends 
OMClientRequest {
     String uploadID = args.getMultipartUploadID();
     Preconditions.checkNotNull(uploadID);
     String multipartKey = "";
-    if (omPathInfo != null) {
+    if (omPathInfo instanceof OMFileRequest.OMPathInfoWithFSO) {
+      OMFileRequest.OMPathInfoWithFSO omPathInfoFSO
+          = (OMFileRequest.OMPathInfoWithFSO) omPathInfo;
       final long volumeId = omMetadataManager.getVolumeId(
               args.getVolumeName());
       final long bucketId = omMetadataManager.getBucketId(
               args.getVolumeName(), args.getBucketName());
       // FileTable metadata format
       multipartKey = omMetadataManager.getMultipartKey(volumeId, bucketId,
-              omPathInfo.getLastKnownParentId(),
-              omPathInfo.getLeafNodeName(), uploadID);
+          omPathInfoFSO.getLastKnownParentId(),
+          omPathInfoFSO.getLeafNodeName(), uploadID);
     } else {
       multipartKey = omMetadataManager
               .getMultipartKey(args.getVolumeName(), args.getBucketName(),
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
index c753fb4c62..90d35e86c3 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.request.validation.RequestFeatureValidator;
@@ -57,6 +58,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Map;
@@ -177,6 +179,13 @@ public class S3InitiateMultipartUploadRequest extends 
OMKeyRequest {
       // initiate MPU.
       final OmBucketInfo bucketInfo = omMetadataManager.getBucketTable()
           .get(omMetadataManager.getBucketKey(volumeName, bucketName));
+
+      OMFileRequest.OMPathInfo pathInfo = null;
+      if (bucketInfo != null && bucketInfo.getBucketLayout()
+          .shouldNormalizePaths(ozoneManager.getEnableFileSystemPaths())) {
+        pathInfo = OMFileRequest.verifyFilesInPath(omMetadataManager,
+            volumeName, bucketName, keyName, Paths.get(keyName));
+      }
       final ReplicationConfig replicationConfig = OzoneConfigUtil
           .resolveReplicationConfigPreference(keyArgs.getType(),
               keyArgs.getFactor(), keyArgs.getEcReplicationConfig(),
@@ -202,7 +211,7 @@ public class S3InitiateMultipartUploadRequest extends 
OMKeyRequest {
           .setReplicationConfig(replicationConfig)
           .setOmKeyLocationInfos(Collections.singletonList(
               new OmKeyLocationInfoGroup(0, new ArrayList<>())))
-          .setAcls(getAclsForKey(keyArgs, bucketInfo,
+          .setAcls(getAclsForKey(keyArgs, bucketInfo, pathInfo,
               ozoneManager.getPrefixManager()))
           .setObjectID(objectID)
           .setUpdateID(transactionLogIndex)
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
index bcfaa8e6a0..2a0b891e16 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.file.OMDirectoryCreateRequestWithFSO;
 import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
@@ -118,10 +117,13 @@ public class S3InitiateMultipartUploadRequestWithFSO
       // check if the directory already existed in OM
       checkDirectoryResult(keyName, pathInfoFSO.getDirectoryResult());
 
+      final OmBucketInfo bucketInfo = getBucketInfo(omMetadataManager,
+          volumeName, bucketName);
+
       // add all missing parents to dir table
       missingParentInfos = OMDirectoryCreateRequestWithFSO
-          .getAllMissingParentDirInfo(ozoneManager, keyArgs, pathInfoFSO,
-              transactionLogIndex);
+          .getAllMissingParentDirInfo(ozoneManager, keyArgs, bucketInfo,
+              pathInfoFSO, transactionLogIndex);
 
       // We are adding uploadId to key, because if multiple users try to
       // perform multipart upload on the same key, each will try to upload, who
@@ -156,8 +158,6 @@ public class S3InitiateMultipartUploadRequestWithFSO
       // care of in the final complete multipart upload. AWS S3 behavior is
       // also like this, even when key exists in a bucket, user can still
       // initiate MPU.
-      final OmBucketInfo bucketInfo = getBucketInfo(omMetadataManager,
-          volumeName, bucketName);
       final ReplicationConfig replicationConfig = OzoneConfigUtil
           .resolveReplicationConfigPreference(keyArgs.getType(),
               keyArgs.getFactor(), keyArgs.getEcReplicationConfig(),
@@ -183,7 +183,8 @@ public class S3InitiateMultipartUploadRequestWithFSO
           .setReplicationConfig(replicationConfig)
           .setOmKeyLocationInfos(Collections.singletonList(
               new OmKeyLocationInfoGroup(0, new ArrayList<>())))
-          .setAcls(OzoneAclUtil.fromProtobuf(keyArgs.getAclsList()))
+          .setAcls(getAclsForKey(keyArgs, bucketInfo, pathInfoFSO,
+              ozoneManager.getPrefixManager()))
           .setObjectID(pathInfoFSO.getLeafNodeObjectId())
           .setUpdateID(transactionLogIndex)
           .setFileEncryptionInfo(keyArgs.hasFileEncryptionInfo() ?
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequest.java
index f3debcc484..4faf60decb 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequest.java
@@ -18,13 +18,20 @@
 
 package org.apache.hadoop.ozone.om.request.file;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.ResolvedBucket;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
@@ -56,6 +63,7 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
 
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.when;
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.VOLUME_NOT_FOUND;
@@ -420,6 +428,79 @@ public class TestOMDirectoryCreateRequest {
     Assert.assertEquals(4L, omMetrics.getNumKeys());
   }
 
+  @Test
+  public void testCreateDirectoryInheritParentDefaultAcls() throws Exception {
+    String volumeName = "vol1";
+    String bucketName = "bucket1";
+    String keyName = genRandomKeyName();
+
+    List<OzoneAcl> acls = new ArrayList<>();
+    acls.add(OzoneAcl.parseAcl("user:newUser:rw[DEFAULT]"));
+    acls.add(OzoneAcl.parseAcl("user:noInherit:rw"));
+    acls.add(OzoneAcl.parseAcl("group:newGroup:rwl[DEFAULT]"));
+
+    // create bucket with DEFAULT acls
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, omMetadataManager,
+        OmBucketInfo.newBuilder().setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setBucketLayout(getBucketLayout())
+            .setAcls(acls));
+
+    // Verify bucket has DEFAULT acls.
+    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+    List<OzoneAcl> bucketAcls = omMetadataManager.getBucketTable()
+        .get(bucketKey).getAcls();
+    Assert.assertEquals(acls, bucketAcls);
+
+    // Create sub dirs
+    OMRequest omRequest = createDirectoryRequest(volumeName, bucketName,
+        keyName);
+    OMDirectoryCreateRequest omDirectoryCreateRequest =
+        new OMDirectoryCreateRequest(omRequest, getBucketLayout());
+
+    OMRequest modifiedOmRequest =
+        omDirectoryCreateRequest.preExecute(ozoneManager);
+
+    omDirectoryCreateRequest =
+        new OMDirectoryCreateRequest(modifiedOmRequest, getBucketLayout());
+
+    OMClientResponse omClientResponse =
+        omDirectoryCreateRequest.validateAndUpdateCache(ozoneManager, 100L,
+            ozoneManagerDoubleBufferHelper);
+
+    Assert.assertSame(omClientResponse.getOMResponse().getStatus(),
+        OzoneManagerProtocolProtos.Status.OK);
+
+    // Verify sub dirs inherit parent DEFAULT acls.
+    verifyDirectoriesInheritAcls(volumeName, bucketName, keyName, bucketAcls);
+
+  }
+
+  private void verifyDirectoriesInheritAcls(String volumeName,
+      String bucketName, String keyName, List<OzoneAcl> bucketAcls)
+      throws IOException {
+    List<String> nodes = Arrays.asList(keyName.split(OZONE_URI_DELIMITER));
+
+    List<OzoneAcl> expectedInheritAcls = bucketAcls.stream()
+        .filter(acl -> acl.getAclScope() == OzoneAcl.AclScope.DEFAULT)
+        .collect(Collectors.toList());
+    String prefix = "";
+
+    for (int indx = 0; indx < nodes.size(); indx++) {
+      String dirName = prefix + nodes.get(indx);
+      OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable(getBucketLayout())
+          .get(omMetadataManager
+              .getOzoneDirKey(volumeName, bucketName, dirName));
+
+      List<OzoneAcl> omKeyAcls = omKeyInfo.getAcls();
+
+      Assert.assertEquals("Failed to inherit parent acls!,",
+          expectedInheritAcls, omKeyAcls);
+
+      prefix = dirName + OZONE_URI_DELIMITER;
+      expectedInheritAcls = omKeyAcls;
+    }
+  }
 
   /**
    * Create OMRequest which encapsulates CreateDirectory request.
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequestWithFSO.java
index 0a8b4b44b5..9b79344741 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequestWithFSO.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.AuditLogger;
 import org.apache.hadoop.ozone.audit.AuditMessage;
@@ -59,6 +60,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.VOLUME_NOT_FOUND;
 import static org.mockito.ArgumentMatchers.any;
@@ -656,6 +658,88 @@ public class TestOMDirectoryCreateRequestWithFSO {
     Assert.assertEquals(dirs.size(), omMetrics.getNumKeys());
   }
 
+  @Test
+  public void testCreateDirectoryInheritParentDefaultAcls() throws Exception {
+    String volumeName = "vol1";
+    String bucketName = "bucket1";
+    List<String> dirs = new ArrayList<>();
+    String keyName = createDirKey(dirs, 3);
+
+    List<OzoneAcl> acls = new ArrayList<>();
+    acls.add(OzoneAcl.parseAcl("user:newUser:rw[DEFAULT]"));
+    acls.add(OzoneAcl.parseAcl("user:noInherit:rw"));
+    acls.add(OzoneAcl.parseAcl("group:newGroup:rwl[DEFAULT]"));
+
+    // Create bucket with DEFAULT acls
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, omMetadataManager,
+        OmBucketInfo.newBuilder().setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setBucketLayout(getBucketLayout())
+            .setAcls(acls));
+
+    // Verify bucket has DEFAULT acls.
+    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+    List<OzoneAcl> bucketAcls = omMetadataManager.getBucketTable()
+        .get(bucketKey).getAcls();
+    Assert.assertEquals(acls, bucketAcls);
+
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+        bucketName);
+
+    // Create dir with acls inherited from parent DEFAULT acls
+    OMRequest omRequest = createDirectoryRequest(volumeName, bucketName,
+        keyName);
+    OMDirectoryCreateRequestWithFSO omDirCreateReqFSO =
+        new OMDirectoryCreateRequestWithFSO(omRequest,
+            BucketLayout.FILE_SYSTEM_OPTIMIZED);
+    OMRequest modifiedOmReq = omDirCreateReqFSO.preExecute(ozoneManager);
+
+    omDirCreateReqFSO = new OMDirectoryCreateRequestWithFSO(modifiedOmReq,
+        BucketLayout.FILE_SYSTEM_OPTIMIZED);
+
+    OMClientResponse omClientResponse =
+        omDirCreateReqFSO.validateAndUpdateCache(ozoneManager, 100L,
+            ozoneManagerDoubleBufferHelper);
+    Assert.assertSame(omClientResponse.getOMResponse().getStatus(),
+        OzoneManagerProtocolProtos.Status.OK);
+
+    // Verify sub dirs inherit parent DEFAULT acls.
+    verifyDirectoriesInheritAcls(dirs, volumeId, bucketId, bucketAcls);
+
+  }
+
+  private void verifyDirectoriesInheritAcls(List<String> dirs,
+      long volumeId, long bucketId, List<OzoneAcl> bucketAcls)
+      throws IOException {
+    // bucketID is the parent
+    long parentID = bucketId;
+    List<OzoneAcl> expectedInheritAcls = bucketAcls.stream()
+        .filter(acl -> acl.getAclScope() == OzoneAcl.AclScope.DEFAULT)
+        .collect(Collectors.toList());
+    System.out.println("expectedInheritAcls: " + expectedInheritAcls);
+
+    // dir should inherit parent DEFAULT acls and self has DEFAULT scope
+    // [user:newUser:rw[DEFAULT], group:newGroup:rwl[DEFAULT]]
+    for (int indx = 0; indx < dirs.size(); indx++) {
+      String dirName = dirs.get(indx);
+      String dbKey = "";
+      // for index=0, parentID is bucketID
+      dbKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
+          parentID, dirName);
+      OmDirectoryInfo omDirInfo =
+          omMetadataManager.getDirectoryTable().get(dbKey);
+      List<OzoneAcl> omDirAcls = omDirInfo.getAcls();
+      System.out.println(
+          "  subdir acls : " + omDirInfo + " ==> " + omDirAcls);
+
+      Assert.assertEquals("Failed to inherit parent DEFAULT acls!",
+          expectedInheritAcls, omDirAcls);
+
+      parentID = omDirInfo.getObjectID();
+      expectedInheritAcls = omDirAcls;
+    }
+  }
 
   @NotNull
   private String createDirKey(List<String> dirs, int depth) {
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMFileCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMFileCreateRequest.java
index 367e5d5cce..f79b9f9b47 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMFileCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMFileCreateRequest.java
@@ -18,11 +18,16 @@
 
 package org.apache.hadoop.ozone.om.request.file;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.jetbrains.annotations.NotNull;
 import org.junit.Assert;
@@ -337,6 +342,148 @@ public class TestOMFileCreateRequest extends 
TestOMKeyRequest {
     testNonRecursivePath(key, false, false, true);
   }
 
+  @Test
+  public void testCreateFileInheritParentDefaultAcls()
+      throws Exception {
+    volumeName = UUID.randomUUID().toString();
+    bucketName = UUID.randomUUID().toString();
+    String prefix = "a/b/c/";
+    List<String> dirs = new ArrayList<>();
+    dirs.add("a");
+    dirs.add("b");
+    dirs.add("c");
+    String keyName = prefix + UUID.randomUUID();
+    List<OzoneAcl> bucketAclResults = new ArrayList<>();
+
+    OmKeyInfo omKeyInfo = createFileWithInheritAcls(keyName, bucketAclResults);
+
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName, 
bucketName);
+
+    verifyInheritAcls(dirs, omKeyInfo, volumeId, bucketId, bucketAclResults);
+  }
+
+  protected OmKeyInfo createFileWithInheritAcls(String keyName,
+      List<OzoneAcl> bucketAclResults) throws Exception {
+    List<OzoneAcl> acls = new ArrayList<>();
+    acls.add(OzoneAcl.parseAcl("user:newUser:rw[DEFAULT]"));
+    acls.add(OzoneAcl.parseAcl("user:noInherit:rw"));
+    acls.add(OzoneAcl.parseAcl("group:newGroup:rwl[DEFAULT]"));
+
+    // Create bucket with DEFAULT acls
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, omMetadataManager,
+        OmBucketInfo.newBuilder().setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setBucketLayout(getBucketLayout())
+            .setAcls(acls));
+
+    // Verify bucket has DEFAULT acls.
+    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+    bucketAclResults.addAll(omMetadataManager.getBucketTable()
+        .get(bucketKey).getAcls());
+    Assert.assertEquals(acls, bucketAclResults);
+
+    // Recursive create file with acls inherited from bucket DEFAULT acls
+    OMRequest omRequest = createFileRequest(volumeName, bucketName,
+        keyName, HddsProtos.ReplicationFactor.ONE,
+        HddsProtos.ReplicationType.RATIS, false, true);
+
+    OMFileCreateRequest omFileCreateRequest = 
getOMFileCreateRequest(omRequest);
+    OMRequest modifiedOmRequest = omFileCreateRequest.preExecute(ozoneManager);
+
+    omFileCreateRequest = getOMFileCreateRequest(modifiedOmRequest);
+    OMClientResponse omFileCreateResponse =
+        omFileCreateRequest.validateAndUpdateCache(ozoneManager, 100L,
+            ozoneManagerDoubleBufferHelper);
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+        omFileCreateResponse.getOMResponse().getStatus());
+
+    long id = modifiedOmRequest.getCreateFileRequest().getClientID();
+    return verifyPathInOpenKeyTable(keyName, id, true);
+  }
+
+  /**
+   * The following layout should inherit the parent DEFAULT acls:
+   *  (1) FSO
+   *  (2) Legacy when EnableFileSystemPaths
+   *
+   *  The following layout should inherit the bucket DEFAULT acls:
+   *  (1) OBS
+   *  (2) Legacy when DisableFileSystemPaths
+   *
+   * Note: Acl which dir inherited itself has DEFAULT scope,
+   * and acl which leaf file inherited itself has ACCESS scope.
+   */
+  protected void verifyInheritAcls(List<String> dirs, OmKeyInfo omKeyInfo,
+      long volumeId, long bucketId, List<OzoneAcl> bucketAcls)
+      throws IOException {
+
+    if (getBucketLayout().shouldNormalizePaths(
+        ozoneManager.getEnableFileSystemPaths())) {
+
+      // bucketID is the parent
+      long parentID = bucketId;
+      List<OzoneAcl> expectedInheritAcls = bucketAcls.stream()
+          .filter(acl -> acl.getAclScope() == OzoneAcl.AclScope.DEFAULT)
+          .collect(Collectors.toList());
+      System.out.println("expectedInheritAcls: " + expectedInheritAcls);
+
+      // dir should inherit parent DEFAULT acls and itself has DEFAULT scope
+      // [user:newUser:rw[DEFAULT], group:newGroup:rwl[DEFAULT]]
+      for (int indx = 0; indx < dirs.size(); indx++) {
+        String dirName = dirs.get(indx);
+        String dbKey = "";
+        // for index=0, parentID is bucketID
+        dbKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
+            parentID, dirName);
+        OmDirectoryInfo omDirInfo =
+            omMetadataManager.getDirectoryTable().get(dbKey);
+        List<OzoneAcl> omDirAcls = omDirInfo.getAcls();
+
+        System.out.println(
+            "  subdir acls : " + omDirInfo + " ==> " + omDirAcls);
+        Assert.assertEquals("Failed to inherit parent DEFAULT acls!",
+            expectedInheritAcls, omDirAcls);
+
+        parentID = omDirInfo.getObjectID();
+        expectedInheritAcls = omDirAcls;
+
+        // file should inherit parent DEFAULT acls and itself has ACCESS scope
+        // [user:newUser:rw[ACCESS], group:newGroup:rwl[ACCESS]]
+        if (indx == dirs.size() - 1) {
+          // verify file acls
+          Assert.assertEquals(omDirInfo.getObjectID(),
+              omKeyInfo.getParentObjectID());
+          List<OzoneAcl> fileAcls = omDirInfo.getAcls();
+          System.out.println("  file acls : " + omKeyInfo + " ==> " + 
fileAcls);
+          Assert.assertEquals("Failed to inherit parent DEFAULT acls!",
+              expectedInheritAcls.stream()
+                  .map(acl -> acl.setAclScope(OzoneAcl.AclScope.ACCESS))
+                  .collect(Collectors.toList()), fileAcls);
+        }
+      }
+    } else {
+      List<OzoneAcl> keyAcls = omKeyInfo.getAcls();
+
+      List<OzoneAcl> parentDefaultAcl = bucketAcls.stream()
+          .filter(acl -> acl.getAclScope() == OzoneAcl.AclScope.DEFAULT)
+          .collect(Collectors.toList());
+
+      OzoneAcl parentAccessAcl = bucketAcls.stream()
+          .filter(acl -> acl.getAclScope() == OzoneAcl.AclScope.ACCESS)
+          .findAny().orElse(null);
+
+      // Should inherit parent DEFAULT acls
+      // [user:newUser:rw[ACCESS], group:newGroup:rwl[ACCESS]]
+      Assert.assertEquals("Failed to inherit bucket DEFAULT acls!",
+          parentDefaultAcl.stream()
+              .map(acl -> acl.setAclScope(OzoneAcl.AclScope.ACCESS))
+              .collect(Collectors.toList()), keyAcls);
+      // Should not inherit parent ACCESS acls
+      Assert.assertFalse(keyAcls.contains(parentAccessAcl));
+    }
+  }
+
   @Test
   public void testPreExecuteWithInvalidKeyPrefix() throws Exception {
     String[] invalidKeyNames = {
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMFileCreateRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMFileCreateRequestWithFSO.java
index 2bd4db2581..4ce9f48d60 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMFileCreateRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMFileCreateRequestWithFSO.java
@@ -157,6 +157,12 @@ public class TestOMFileCreateRequestWithFSO extends 
TestOMFileCreateRequest {
     testNonRecursivePath(key, false, false, true);
   }
 
+  @Test
+  public void testCreateFileInheritParentDefaultAcls()
+      throws Exception {
+    super.testCreateFileInheritParentDefaultAcls();
+  }
+
   @Test
   public void testValidateAndUpdateCacheWithSnapshotReservedWord()
       throws Exception {
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
index 23337f534b..29cb6c86c6 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
@@ -21,14 +21,17 @@ package org.apache.hadoop.ozone.om.request.key;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
@@ -658,6 +661,78 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
             + OM_SNAPSHOT_INDICATOR + OM_KEY_PREFIX));
   }
 
+  @Test
+  public void testKeyCreateInheritParentDefaultAcls()
+      throws Exception {
+    when(ozoneManager.getOzoneLockProvider()).thenReturn(
+        new OzoneLockProvider(keyPathLockEnabled, enableFileSystemPaths));
+
+    List<OzoneAcl> acls = new ArrayList<>();
+    acls.add(OzoneAcl.parseAcl("user:newUser:rw[DEFAULT]"));
+    acls.add(OzoneAcl.parseAcl("user:noInherit:rw"));
+    acls.add(OzoneAcl.parseAcl("group:newGroup:rwl[DEFAULT]"));
+
+    // create bucket with DEFAULT acls
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, omMetadataManager,
+        OmBucketInfo.newBuilder().setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setBucketLayout(getBucketLayout())
+            .setAcls(acls));
+
+    // Verify bucket has DEFAULT acls.
+    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+    List<OzoneAcl> bucketAcls = omMetadataManager.getBucketTable()
+        .get(bucketKey).getAcls();
+    Assert.assertEquals(acls, bucketAcls);
+
+    // create file inherit bucket DEFAULT acls
+    OMRequest modifiedOmRequest =
+        doPreExecute(createKeyRequest(false, 0));
+
+    OMKeyCreateRequest omKeyCreateRequest =
+        getOMKeyCreateRequest(modifiedOmRequest);
+
+    long id = modifiedOmRequest.getCreateKeyRequest().getClientID();
+    String openKey = getOpenKey(id);
+
+    OMClientResponse omKeyCreateResponse =
+        omKeyCreateRequest.validateAndUpdateCache(ozoneManager, 100L,
+            ozoneManagerDoubleBufferHelper);
+    checkResponse(modifiedOmRequest, omKeyCreateResponse, id, false,
+        omKeyCreateRequest.getBucketLayout());
+
+    OmKeyInfo omKeyInfo =
+        omMetadataManager.getOpenKeyTable(getBucketLayout()).get(openKey);
+
+    verifyKeyInheritAcls(omKeyInfo.getAcls(), bucketAcls);
+
+  }
+
+  /**
+   * Leaf file has ACCESS scope acls which inherited
+   * from parent DEFAULT acls.
+   */
+  private void verifyKeyInheritAcls(List<OzoneAcl> keyAcls,
+      List<OzoneAcl> bucketAcls) {
+
+    List<OzoneAcl> parentDefaultAcl = bucketAcls.stream()
+        .filter(acl -> acl.getAclScope() == OzoneAcl.AclScope.DEFAULT)
+        .collect(Collectors.toList());
+
+    OzoneAcl parentAccessAcl = bucketAcls.stream()
+        .filter(acl -> acl.getAclScope() == OzoneAcl.AclScope.ACCESS)
+        .findAny().orElse(null);
+
+    // Should inherit parent DEFAULT Acls
+    Assert.assertEquals("Failed to inherit parent DEFAULT acls!,",
+        parentDefaultAcl.stream()
+            .map(acl -> acl.setAclScope(OzoneAcl.AclScope.ACCESS))
+            .collect(Collectors.toList()), keyAcls);
+
+    // Should not inherit parent ACCESS Acls
+    Assert.assertFalse(keyAcls.contains(parentAccessAcl));
+  }
+
   protected void addToKeyTable(String keyName) throws Exception {
     OMRequestTestUtils.addKeyToTable(false, volumeName, bucketName,
         keyName.substring(1), 0L, RATIS, THREE, omMetadataManager);
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java
index fc766bcbc2..8f92c1ab22 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java
@@ -85,6 +85,12 @@ public class TestOMKeyCreateRequestWithFSO extends 
TestOMKeyCreateRequest {
     }
   }
 
+  @Test
+  public void testKeyCreateInheritParentDefaultAcls()
+      throws Exception {
+    super.testKeyCreateInheritParentDefaultAcls();
+  }
+
   @Override
   protected OzoneConfiguration getOzoneConfiguration() {
     OzoneConfiguration config = super.getOzoneConfiguration();
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java
index d473bf9621..499da5370c 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java
@@ -19,8 +19,14 @@
 
 package org.apache.hadoop.ozone.om.request.s3.multipart;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -161,4 +167,81 @@ public class TestS3InitiateMultipartUploadRequest
     return omMetadataManager.getMultipartKey(volumeName,
         bucketName, keyName, multipartUploadID);
   }
+
+  @Test
+  public void testMultipartUploadInheritParentDefaultAcls()
+      throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    List<OzoneAcl> acls = new ArrayList<>();
+    acls.add(OzoneAcl.parseAcl("user:newUser:rw[DEFAULT]"));
+    acls.add(OzoneAcl.parseAcl("user:noInherit:rw"));
+    acls.add(OzoneAcl.parseAcl("group:newGroup:rwl[DEFAULT]"));
+
+    // create bucket with DEFAULT acls
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, omMetadataManager,
+        OmBucketInfo.newBuilder().setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setBucketLayout(getBucketLayout())
+            .setAcls(acls));
+
+    // Verify bucket has DEFAULT acls.
+    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+    List<OzoneAcl> bucketAcls = omMetadataManager.getBucketTable()
+        .get(bucketKey).getAcls();
+    Assert.assertEquals(acls, bucketAcls);
+
+    // create file with acls inherited from parent DEFAULT acls
+    OMRequest modifiedRequest = doPreExecuteInitiateMPU(volumeName,
+        bucketName, keyName);
+
+    S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
+        getS3InitiateMultipartUploadReq(modifiedRequest);
+
+    OMClientResponse omClientResponse =
+        s3InitiateMultipartUploadRequest.validateAndUpdateCache(ozoneManager,
+            100L, ozoneManagerDoubleBufferHelper);
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+        omClientResponse.getOMResponse().getStatus());
+
+    String multipartKey = getMultipartKey(volumeName, bucketName, keyName,
+        modifiedRequest.getInitiateMultiPartUploadRequest()
+            .getKeyArgs().getMultipartUploadID());
+
+    OmKeyInfo omKeyInfo = omMetadataManager
+        .getOpenKeyTable(s3InitiateMultipartUploadRequest.getBucketLayout())
+        .get(multipartKey);
+
+    verifyKeyInheritAcls(omKeyInfo.getAcls(), bucketAcls);
+
+  }
+
+  /**
+   * Leaf key has ACCESS scope acls which inherited
+   * from parent DEFAULT acls.
+   */
+  private void verifyKeyInheritAcls(List<OzoneAcl> keyAcls,
+      List<OzoneAcl> bucketAcls) {
+
+    List<OzoneAcl> parentDefaultAcl = bucketAcls.stream()
+        .filter(acl -> acl.getAclScope() == OzoneAcl.AclScope.DEFAULT)
+        .collect(Collectors.toList());
+
+    OzoneAcl parentAccessAcl = bucketAcls.stream()
+        .filter(acl -> acl.getAclScope() == OzoneAcl.AclScope.ACCESS)
+        .findAny().orElse(null);
+
+    // Should inherit parent DEFAULT Acls
+    // [user:newUser:rw[DEFAULT], group:newGroup:rwl[DEFAULT]]
+    Assert.assertEquals("Failed to inherit parent DEFAULT acls!",
+        parentDefaultAcl.stream()
+            .map(acl -> acl.setAclScope(OzoneAcl.AclScope.ACCESS))
+            .collect(Collectors.toList()), keyAcls);
+
+    // Should not inherit parent ACCESS Acls
+    Assert.assertFalse(keyAcls.contains(parentAccessAcl));
+  }
+
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequestWithFSO.java
index 6fb26486ad..cd96054bf1 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequestWithFSO.java
@@ -19,6 +19,8 @@
 
 package org.apache.hadoop.ozone.om.request.s3.multipart;
 
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
@@ -34,6 +36,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 /**
  * Tests S3 Initiate Multipart Upload request.
@@ -146,6 +149,114 @@ public class TestS3InitiateMultipartUploadRequestWithFSO
         BucketLayout.FILE_SYSTEM_OPTIMIZED);
   }
 
+  @Test
+  public void testMultipartUploadInheritParentDefaultAcls()
+      throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String prefix = "a/b/c/";
+    List<String> dirs = new ArrayList<>();
+    dirs.add("a");
+    dirs.add("b");
+    dirs.add("c");
+    String fileName = UUID.randomUUID().toString();
+    String keyName = prefix + fileName;
+
+    List<OzoneAcl> acls = new ArrayList<>();
+    acls.add(OzoneAcl.parseAcl("user:newUser:rw[DEFAULT]"));
+    acls.add(OzoneAcl.parseAcl("user:noInherit:rw"));
+    acls.add(OzoneAcl.parseAcl("group:newGroup:rwl[DEFAULT]"));
+
+    // create bucket with DEFAULT acls
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, omMetadataManager,
+        OmBucketInfo.newBuilder().setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setBucketLayout(getBucketLayout())
+            .setAcls(acls));
+
+    // Verify bucket has DEFAULT acls.
+    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+    List<OzoneAcl> bucketAcls = omMetadataManager.getBucketTable()
+        .get(bucketKey).getAcls();
+    Assert.assertEquals(acls, bucketAcls);
+
+    // create dir with acls inherited from parent DEFAULT acls
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName,
+        bucketName);
+    OMRequest modifiedRequest = doPreExecuteInitiateMPUWithFSO(volumeName,
+        bucketName, keyName);
+
+    S3InitiateMultipartUploadRequest s3InitiateMultipartUploadReqFSO =
+        getS3InitiateMultipartUploadReq(modifiedRequest);
+
+    OMClientResponse omClientResponse =
+        s3InitiateMultipartUploadReqFSO.validateAndUpdateCache(
+            ozoneManager, 100L,
+            ozoneManagerDoubleBufferHelper);
+
+    // create file with acls inherited from parent DEFAULT acls
+    long parentID = verifyDirectoriesInDB(dirs, volumeId, bucketId);
+    String multipartOpenFileKey = omMetadataManager.getMultipartKey(volumeId,
+        bucketId, parentID, fileName,
+        modifiedRequest.getInitiateMultiPartUploadRequest()
+            .getKeyArgs().getMultipartUploadID());
+    OmKeyInfo omKeyInfo = omMetadataManager
+        .getOpenKeyTable(s3InitiateMultipartUploadReqFSO.getBucketLayout())
+        .get(multipartOpenFileKey);
+
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+        omClientResponse.getOMResponse().getStatus());
+
+    verifyKeyInheritAcls(dirs, omKeyInfo, volumeId, bucketId, bucketAcls);
+
+  }
+
+  private void verifyKeyInheritAcls(List<String> dirs, OmKeyInfo fileInfo,
+      long volumeId, long bucketId, List<OzoneAcl> bucketAcls)
+      throws IOException {
+    // bucketID is the parent
+    long parentID = bucketId;
+    List<OzoneAcl> expectedInheritAcls = bucketAcls.stream()
+        .filter(acl -> acl.getAclScope() == OzoneAcl.AclScope.DEFAULT)
+        .collect(Collectors.toList());
+    System.out.println("expectedInheritAcls: " + expectedInheritAcls);
+
+    // dir should inherit parent DEFAULT acls and self has DEFAULT scope
+    // [user:newUser:rw[DEFAULT], group:newGroup:rwl[DEFAULT]]
+    for (int indx = 0; indx < dirs.size(); indx++) {
+      String dirName = dirs.get(indx);
+      String dbKey = "";
+      // for index=0, parentID is bucketID
+      dbKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
+          parentID, dirName);
+      OmDirectoryInfo omDirInfo =
+          omMetadataManager.getDirectoryTable().get(dbKey);
+      List<OzoneAcl> omDirAcls = omDirInfo.getAcls();
+
+      System.out.println("  subdir acls : " + omDirInfo + " ==> " + omDirAcls);
+      Assert.assertEquals("Failed to inherit parent DEFAULT acls!",
+          expectedInheritAcls, omDirAcls);
+
+      parentID = omDirInfo.getObjectID();
+      expectedInheritAcls = omDirAcls;
+
+      // file should inherit parent DEFAULT acls and self has ACCESS scope
+      // [user:newUser:rw[ACCESS], group:newGroup:rwl[ACCESS]]
+      if (indx == dirs.size() - 1) {
+        // verify file acls
+        Assert.assertEquals(fileInfo.getParentObjectID(),
+            omDirInfo.getObjectID());
+        List<OzoneAcl> fileAcls = fileInfo.getAcls();
+        System.out.println("  file acls : " + fileInfo + " ==> " + fileAcls);
+        Assert.assertEquals("Failed to inherit parent DEFAULT acls!",
+            expectedInheritAcls.stream()
+                .map(acl -> acl.setAclScope(OzoneAcl.AclScope.ACCESS))
+                .collect(Collectors.toList()), fileAcls);
+      }
+    }
+  }
+
   @Override
   public BucketLayout getBucketLayout() {
     return BucketLayout.FILE_SYSTEM_OPTIMIZED;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to