This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-13323-sts
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-13323-sts by this push:
     new 2062a06942e HDDS-13942. [STS] Part 4 - Create utility to convert IAM 
policy to groupings of OzoneObj and Acls (#9315)
2062a06942e is described below

commit 2062a06942e14e1093b0b78b5de022e266b88d35
Author: fmorg-git <[email protected]>
AuthorDate: Mon Dec 8 23:51:41 2025 -0800

    HDDS-13942. [STS] Part 4 - Create utility to convert IAM policy to 
groupings of OzoneObj and Acls (#9315)
---
 .../security/acl/iam/IamSessionPolicyResolver.java |   72 +-
 .../acl/iam/TestIamSessionPolicyResolver.java      | 1177 ++++++++++++++++++--
 2 files changed, 1144 insertions(+), 105 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/iam/IamSessionPolicyResolver.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/iam/IamSessionPolicyResolver.java
index 7e10d566b59..b90bb43c193 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/iam/IamSessionPolicyResolver.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/iam/IamSessionPolicyResolver.java
@@ -126,7 +126,9 @@ public static Set<AssumeRoleRequest.OzoneGrant> 
resolve(String policyJson, Strin
 
     validateInputParameters(policyJson, volumeName, authorizerType);
 
-    final Set<AssumeRoleRequest.OzoneGrant> result = new LinkedHashSet<>();
+    // Accumulate ACLs across ALL statements using a single map to allow
+    // cross-statement deduplication and ALL-permission collapsing.
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMap = new LinkedHashMap<>();
 
     // Parse JSON into set of statements
     final Set<JsonNode> statements = 
parseJsonAndRetrieveStatements(policyJson);
@@ -151,13 +153,11 @@ public static Set<AssumeRoleRequest.OzoneGrant> 
resolve(String policyJson, Strin
       final Set<ResourceSpec> resourceSpecs = 
validateAndCategorizeResources(authorizerType, resources);
 
       // For each action, map to Ozone objects (paths) and acls based on 
resource specs and prefixes
-      final Set<AssumeRoleRequest.OzoneGrant> stmtResults = 
createPathsAndPermissions(
-          volumeName, authorizerType, mappedS3Actions, resourceSpecs, 
prefixes);
-
-      result.addAll(stmtResults);
+      createPathsAndPermissions(volumeName, authorizerType, mappedS3Actions, 
resourceSpecs, prefixes, objToAclsMap);
     }
 
-    return result;
+    // Group accumulated objects by their ACL sets to create final result
+    return groupObjectsByAcls(objToAclsMap);
   }
 
   /**
@@ -418,24 +418,19 @@ static Set<ResourceSpec> 
validateAndCategorizeResources(AuthorizerType authorize
    * entries pairing sets of IOzoneObjs with the requisite permissions granted 
(if any).
    */
   @VisibleForTesting
-  static Set<AssumeRoleRequest.OzoneGrant> createPathsAndPermissions(String 
volumeName, AuthorizerType authorizerType,
-      Set<S3Action> mappedS3Actions, Set<ResourceSpec> resourceSpecs, 
Set<String> prefixes) {
-    // Create map to collect IOzoneObj to ACLType mappings
-    final Map<IOzoneObj, Set<ACLType>> objToAclsMap = new LinkedHashMap<>();
-
+  static void createPathsAndPermissions(String volumeName, AuthorizerType 
authorizerType, Set<S3Action> mappedS3Actions,
+      Set<ResourceSpec> resourceSpecs, Set<String> prefixes, Map<IOzoneObj, 
Set<ACLType>> objToAclsMap) {
     // Process each resource spec with the given actions
     for (ResourceSpec resourceSpec : resourceSpecs) {
       processResourceSpecWithActions(volumeName, authorizerType, 
mappedS3Actions, resourceSpec, prefixes, objToAclsMap);
     }
-
-    // Group objects by their ACL sets to create proper entries
-    return groupObjectsByAcls(objToAclsMap);
   }
 
   /**
    * Groups objects by their ACL sets.
    */
-  private static Set<AssumeRoleRequest.OzoneGrant> 
groupObjectsByAcls(Map<IOzoneObj, Set<ACLType>> objToAclsMap) {
+  @VisibleForTesting
+  static Set<AssumeRoleRequest.OzoneGrant> groupObjectsByAcls(Map<IOzoneObj, 
Set<ACLType>> objToAclsMap) {
     final Map<Set<ACLType>, Set<IOzoneObj>> groupMap = new LinkedHashMap<>();
 
     // Group objects by their ACL sets only (across resource types)
@@ -526,24 +521,34 @@ private static void processBucketResource(String 
volumeName, Set<S3Action> mappe
       // bucket name of "*".  To align with AWS, make sure that in this
       // specific case we also grant the volume-level permissions for 
volume-scoped
       // actions (currently s3:ListAllMyBuckets).
-      if (action.kind == ActionKind.BUCKET || action == S3Action.ALL_S3 ||
-          action.kind == ActionKind.VOLUME && "*".equals(resourceSpec.bucket)) 
{ // this handles s3:ListAllMyBuckets
+      if (action.kind == ActionKind.BUCKET ||
+          (action.kind == ActionKind.VOLUME && 
"*".equals(resourceSpec.bucket))) { // this handles s3:ListAllMyBuckets
         addAclsForObj(objToAclsMap, volumeObj(volumeName), action.volumePerms);
         addAclsForObj(objToAclsMap, bucketObj(volumeName, 
resourceSpec.bucket), action.bucketPerms);
+      } else if (action == S3Action.ALL_S3) {
+        // For s3:*, ALL should only apply at the bucket level; grant READ at 
volume for navigation
+        // However, resource "arn:aws:s3:::*" can apply to volume as well (as 
explained above)
+        // If the bucket is "*", include the volumePerms, otherwise just 
include READ for navigation.
+        if ("*".equals(resourceSpec.bucket)) {
+          addAclsForObj(objToAclsMap, volumeObj(volumeName), 
action.volumePerms);
+        } else {
+          addAclsForObj(objToAclsMap, volumeObj(volumeName), EnumSet.of(READ));
+        }
+        addAclsForObj(objToAclsMap, bucketObj(volumeName, 
resourceSpec.bucket), action.bucketPerms);
       }
 
-      if (action == S3Action.LIST_BUCKET) {
+      if (action == S3Action.LIST_BUCKET || action == S3Action.ALL_S3) {
         // If condition prefixes are present, these would constrain the object 
permissions if the action
-        // is s3:ListBucket
+        // is s3:ListBucket or s3:* (which includes s3:ListBucket)
         if (prefixes != null && !prefixes.isEmpty()) {
           for (String prefix : prefixes) {
             createObjectResourcesFromConditionPrefix(
-                volumeName, authorizerType, resourceSpec, prefix, 
objToAclsMap, action.objectPerms);
+                volumeName, authorizerType, resourceSpec, prefix, 
objToAclsMap, EnumSet.of(READ));
           }
         } else {
           // No condition prefixes, but we need READ access to all objects, so 
use "*" as the prefix
           createObjectResourcesFromConditionPrefix(
-              volumeName, authorizerType, resourceSpec, "*", objToAclsMap, 
action.objectPerms);
+              volumeName, authorizerType, resourceSpec, "*", objToAclsMap, 
EnumSet.of(READ));
         }
       }
     }
@@ -556,11 +561,12 @@ private static void processBucketResource(String 
volumeName, Set<S3Action> mappe
   private static void processObjectExactResource(String volumeName, 
Set<S3Action> mappedS3Actions,
       ResourceSpec resourceSpec, Map<IOzoneObj, Set<ACLType>> objToAclsMap) {
     for (S3Action action : mappedS3Actions) {
-      addAclsForObj(objToAclsMap, volumeObj(volumeName), action.volumePerms);
       if (action.kind == ActionKind.OBJECT) {
+        addAclsForObj(objToAclsMap, volumeObj(volumeName), action.volumePerms);
         addAclsForObj(objToAclsMap, bucketObj(volumeName, 
resourceSpec.bucket), action.bucketPerms);
         addAclsForObj(objToAclsMap, keyObj(volumeName, resourceSpec.bucket, 
resourceSpec.key), action.objectPerms);
       } else if (action == S3Action.ALL_S3) {
+        addAclsForObj(objToAclsMap, volumeObj(volumeName), EnumSet.of(READ));
         // For s3:*, ALL should only apply at the object level; grant READ at 
bucket level for navigation
         addAclsForObj(objToAclsMap, bucketObj(volumeName, 
resourceSpec.bucket), EnumSet.of(READ));
         addAclsForObj(objToAclsMap, keyObj(volumeName, resourceSpec.bucket, 
resourceSpec.key), action.objectPerms);
@@ -577,10 +583,11 @@ private static void processObjectPrefixResource(String 
volumeName, AuthorizerTyp
       Set<S3Action> mappedS3Actions, ResourceSpec resourceSpec, Map<IOzoneObj, 
Set<ACLType>> objToAclsMap) {
     for (S3Action action : mappedS3Actions) {
       // Object actions apply to prefix/key resources
-      addAclsForObj(objToAclsMap, volumeObj(volumeName), action.volumePerms);
       if (action.kind == ActionKind.OBJECT) {
+        addAclsForObj(objToAclsMap, volumeObj(volumeName), action.volumePerms);
         addAclsForObj(objToAclsMap, bucketObj(volumeName, 
resourceSpec.bucket), action.bucketPerms);
       } else if (action == S3Action.ALL_S3) {
+        addAclsForObj(objToAclsMap, volumeObj(volumeName), EnumSet.of(READ));
         // For s3:*, ALL should only apply at the object/prefix level; grant 
READ at bucket level for navigation
         addAclsForObj(objToAclsMap, bucketObj(volumeName, 
resourceSpec.bucket), EnumSet.of(READ));
       }
@@ -633,11 +640,26 @@ private static void 
createObjectResourcesFromConditionPrefix(String volumeName,
 
   /**
    * Helper method to add ACLs for an IOzoneObj, merging with existing ACLs if 
present.
+   * If ALL permission is present, no other permissions are added.
    */
   private static void addAclsForObj(Map<IOzoneObj, Set<ACLType>> objToAclsMap, 
IOzoneObj obj, Set<ACLType> acls) {
     if (acls != null && !acls.isEmpty()) {
       final OzoneObj ozoneObj = (OzoneObj) obj;
-      objToAclsMap.computeIfAbsent(ozoneObj, k -> 
EnumSet.noneOf(ACLType.class)).addAll(acls);
+      final Set<ACLType> existingAcls = objToAclsMap.computeIfAbsent(ozoneObj, 
k -> EnumSet.noneOf(ACLType.class));
+
+      // If ALL is already present, don't add other permissions
+      if (existingAcls.contains(ACLType.ALL)) {
+        return;
+      }
+
+      // If we're about to add ALL, remove all other permissions first
+      if (acls.contains(ACLType.ALL)) {
+        existingAcls.clear();
+        existingAcls.add(ACLType.ALL);
+      } else {
+        // Only add permissions if ALL is not already present
+        existingAcls.addAll(acls);
+      }
     }
   }
 
@@ -808,7 +830,7 @@ enum S3Action {
         EnumSet.of(ACLType.WRITE)),
 
     // Wildcard all
-    ALL_S3("s3:*", ActionKind.ALL, EnumSet.of(READ), EnumSet.of(ACLType.ALL), 
EnumSet.of(ACLType.ALL));
+    ALL_S3("s3:*", ActionKind.ALL, EnumSet.of(READ, LIST), 
EnumSet.of(ACLType.ALL), EnumSet.of(ACLType.ALL));
 
     private final String name;
     private final ActionKind kind;
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/security/acl/iam/TestIamSessionPolicyResolver.java
 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/security/acl/iam/TestIamSessionPolicyResolver.java
index 9fe5965874c..b885c5130ec 100644
--- 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/security/acl/iam/TestIamSessionPolicyResolver.java
+++ 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/security/acl/iam/TestIamSessionPolicyResolver.java
@@ -17,31 +17,38 @@
 
 package org.apache.hadoop.ozone.security.acl.iam;
 
+import static java.util.Collections.emptySet;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_SUPPORTED_OPERATION;
+import static 
org.apache.hadoop.ozone.security.acl.AssumeRoleRequest.OzoneGrant;
 import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
+import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
 import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.CREATE;
+import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.DELETE;
 import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.LIST;
 import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.READ;
 import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.READ_ACL;
+import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.WRITE;
 import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.WRITE_ACL;
 import static 
org.apache.hadoop.ozone.security.acl.iam.IamSessionPolicyResolver.AuthorizerType.NATIVE;
 import static 
org.apache.hadoop.ozone.security.acl.iam.IamSessionPolicyResolver.AuthorizerType.RANGER;
 import static 
org.apache.hadoop.ozone.security.acl.iam.IamSessionPolicyResolver.S3ResourceType;
 import static 
org.apache.hadoop.ozone.security.acl.iam.IamSessionPolicyResolver.buildCaseInsensitiveS3ActionMap;
 import static 
org.apache.hadoop.ozone.security.acl.iam.IamSessionPolicyResolver.createPathsAndPermissions;
+import static 
org.apache.hadoop.ozone.security.acl.iam.IamSessionPolicyResolver.groupObjectsByAcls;
 import static 
org.apache.hadoop.ozone.security.acl.iam.IamSessionPolicyResolver.mapPolicyActionsToS3Actions;
+import static 
org.apache.hadoop.ozone.security.acl.iam.IamSessionPolicyResolver.resolve;
 import static 
org.apache.hadoop.ozone.security.acl.iam.IamSessionPolicyResolver.validateAndCategorizeResources;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.security.acl.AssumeRoleRequest;
 import org.apache.hadoop.ozone.security.acl.IOzoneObj;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
@@ -243,8 +250,8 @@ public void testJsonAtMaxLengthSucceeds() throws 
OMException {
     assertThat(json.length()).isEqualTo(2048);
 
     // Must not throw an exception
-    IamSessionPolicyResolver.resolve(json, VOLUME, NATIVE);
-    IamSessionPolicyResolver.resolve(json, VOLUME, RANGER);
+    resolve(json, VOLUME, NATIVE);
+    resolve(json, VOLUME, RANGER);
   }
 
   @Test
@@ -259,8 +266,8 @@ public void testConditionKeyMustBeCaseInsensitive() throws 
OMException {
         "}";
 
     // Must not throw exception
-    IamSessionPolicyResolver.resolve(json, VOLUME, NATIVE);
-    IamSessionPolicyResolver.resolve(json, VOLUME, RANGER);
+    resolve(json, VOLUME, NATIVE);
+    resolve(json, VOLUME, RANGER);
   }
 
   @Test
@@ -286,7 +293,6 @@ public void 
testBuildCaseInsensitiveS3ActionMapMatchesConstant() {
   @Test
   public void testBuildCaseInsensitiveS3ActionMap() {
     final Map<String, Set<S3Action>> caseInsensitiveS3ActionMap = 
buildCaseInsensitiveS3ActionMap();
-    
     // Verify that individual S3 actions are present
     assertThat(caseInsensitiveS3ActionMap).containsKeys(
         "s3:listbucket", "s3:getobject", "s3:putobject", "s3:deleteobject", 
"s3:createbucket", "s3:listallmybuckets");
@@ -341,7 +347,7 @@ public void 
testMapPolicyActionsToS3ActionsWithNullReturnsEmpty() {
 
   @Test
   public void testMapPolicyActionsToS3ActionsWithEmptyListReturnsEmpty() {
-    final Set<S3Action> result = 
mapPolicyActionsToS3Actions(Collections.emptySet());
+    final Set<S3Action> result = mapPolicyActionsToS3Actions(emptySet());
     assertThat(result).isEmpty();
   }
 
@@ -681,11 +687,9 @@ public void 
testValidateAndCategorizeResourcesWithArnWithNoBucketThrows() {
   @Test
   public void testValidateAndCategorizeResourcesWithNoResourcesThrows() {
     expectOMExceptionWithCode(
-        () -> validateAndCategorizeResources(NATIVE, Collections.emptySet()),
-        "No Resource(s) found in policy", INVALID_REQUEST);
+        () -> validateAndCategorizeResources(NATIVE, emptySet()), "No 
Resource(s) found in policy", INVALID_REQUEST);
     expectOMExceptionWithCode(
-        () -> validateAndCategorizeResources(RANGER, Collections.emptySet()),
-        "No Resource(s) found in policy", INVALID_REQUEST);
+        () -> validateAndCategorizeResources(RANGER, emptySet()), "No 
Resource(s) found in policy", INVALID_REQUEST);
   }
 
   @Test
@@ -697,16 +701,17 @@ public void 
testCreatePathsAndPermissionsWithResourceAny() {
         new IamSessionPolicyResolver.ResourceSpec(S3ResourceType.ANY, "*", 
null, null));
 
     expectIllegalArgumentException(
-        () -> createPathsAndPermissions(VOLUME, NATIVE, actions, 
resourceSpecs, Collections.emptySet()),
+        () -> createPathsAndPermissions(VOLUME, NATIVE, actions, 
resourceSpecs, emptySet(), new LinkedHashMap<>()),
         "ResourceSpec type ANY not supported for OzoneNativeAuthorizer");
 
-    final Set<AssumeRoleRequest.OzoneGrant> resultRanger = 
createPathsAndPermissions(
-        VOLUME, RANGER, actions, resourceSpecs, Collections.emptySet());
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapRanger = new 
LinkedHashMap<>();
+    createPathsAndPermissions(VOLUME, RANGER, actions, resourceSpecs, 
emptySet(), objToAclsMapRanger);
+    final Set<OzoneGrant> resultRanger = 
groupObjectsByAcls(objToAclsMapRanger);
     final Set<IOzoneObj> readAndListObjects = objSet(volume(), bucket("*")); 
// volume, bucket level have READ, LIST
     final Set<IOzoneObj> readObject = objSet(key("*", "*")); // key level has 
READ
     assertThat(resultRanger).containsExactlyInAnyOrder(
-        new AssumeRoleRequest.OzoneGrant(readAndListObjects, acls(READ, LIST)),
-        new AssumeRoleRequest.OzoneGrant(readObject, acls(READ)));
+        new OzoneGrant(readAndListObjects, acls(READ, LIST)),
+        new OzoneGrant(readObject, acls(READ)));
   }
 
   @Test
@@ -716,19 +721,19 @@ public void 
testCreatePathsAndPermissionsWithBucketResourceThatIsListBucket() {
         new IamSessionPolicyResolver.ResourceSpec(S3ResourceType.BUCKET, 
"bucket1", null, null));
     final Set<IOzoneObj> readAndListObject = objSet(bucket("bucket1"));
 
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapNative = new 
LinkedHashMap<>();
     final Set<IOzoneObj> nativeReadObjects = objSet(volume(), 
prefix("bucket1", ""));
-    final Set<AssumeRoleRequest.OzoneGrant> resultNative = 
createPathsAndPermissions(
-        VOLUME, NATIVE, actions, resourceSpecs, Collections.emptySet());
+    createPathsAndPermissions(VOLUME, NATIVE, actions, resourceSpecs, 
emptySet(), objToAclsMapNative);
+    final Set<OzoneGrant> resultNative = 
groupObjectsByAcls(objToAclsMapNative);
     assertThat(resultNative).containsExactlyInAnyOrder(
-        new AssumeRoleRequest.OzoneGrant(readAndListObject, acls(READ, LIST)),
-        new AssumeRoleRequest.OzoneGrant(nativeReadObjects, acls(READ)));
+        new OzoneGrant(readAndListObject, acls(READ, LIST)), new 
OzoneGrant(nativeReadObjects, acls(READ)));
 
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapRanger = new 
LinkedHashMap<>();
     final Set<IOzoneObj> rangerReadObjects = objSet(volume(), key("bucket1", 
"*"));
-    final Set<AssumeRoleRequest.OzoneGrant> resultRanger = 
createPathsAndPermissions(
-        VOLUME, RANGER, actions, resourceSpecs, Collections.emptySet());
+    createPathsAndPermissions(VOLUME, RANGER, actions, resourceSpecs, 
emptySet(), objToAclsMapRanger);
+    final Set<OzoneGrant> resultRanger = 
groupObjectsByAcls(objToAclsMapRanger);
     assertThat(resultRanger).containsExactlyInAnyOrder(
-        new AssumeRoleRequest.OzoneGrant(readAndListObject, acls(READ, LIST)),
-        new AssumeRoleRequest.OzoneGrant(rangerReadObjects, acls(READ)));
+        new OzoneGrant(readAndListObject, acls(READ, LIST)), new 
OzoneGrant(rangerReadObjects, acls(READ)));
   }
 
   @Test
@@ -739,17 +744,17 @@ public void 
testCreatePathsAndPermissionsWithBucketResourceThatIsNotListBucket()
     final Set<IOzoneObj> createObject = objSet(bucket("bucket1"));
     final Set<IOzoneObj> readObject = objSet(volume());
 
-    final Set<AssumeRoleRequest.OzoneGrant> resultNative = 
createPathsAndPermissions(
-        VOLUME, NATIVE, actions, resourceSpecs, Collections.emptySet());
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapNative = new 
LinkedHashMap<>();
+    createPathsAndPermissions(VOLUME, NATIVE, actions, resourceSpecs, 
emptySet(), objToAclsMapNative);
+    final Set<OzoneGrant> resultNative = 
groupObjectsByAcls(objToAclsMapNative);
     assertThat(resultNative).containsExactlyInAnyOrder(
-        new AssumeRoleRequest.OzoneGrant(createObject, acls(CREATE)),
-        new AssumeRoleRequest.OzoneGrant(readObject, acls(READ)));
+        new OzoneGrant(createObject, acls(CREATE)), new OzoneGrant(readObject, 
acls(READ)));
 
-    final Set<AssumeRoleRequest.OzoneGrant> resultRanger = 
createPathsAndPermissions(
-        VOLUME, RANGER, actions, resourceSpecs, Collections.emptySet());
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapRanger = new 
LinkedHashMap<>();
+    createPathsAndPermissions(VOLUME, RANGER, actions, resourceSpecs, 
emptySet(), objToAclsMapRanger);
+    final Set<OzoneGrant> resultRanger = 
groupObjectsByAcls(objToAclsMapRanger);
     assertThat(resultRanger).containsExactlyInAnyOrder(
-        new AssumeRoleRequest.OzoneGrant(createObject, acls(CREATE)),
-        new AssumeRoleRequest.OzoneGrant(readObject, acls(READ)));
+        new OzoneGrant(createObject, acls(CREATE)), new OzoneGrant(readObject, 
acls(READ)));
   }
 
   @Test
@@ -761,14 +766,14 @@ public void 
testCreatePathsAndPermissionsWithBucketWildcardResource() {
     final Set<IOzoneObj> readVolume = objSet(volume());
 
     expectIllegalArgumentException(
-        () -> createPathsAndPermissions(VOLUME, NATIVE, actions, 
resourceSpecs, Collections.emptySet()),
+        () -> createPathsAndPermissions(VOLUME, NATIVE, actions, 
resourceSpecs, emptySet(), new LinkedHashMap<>()),
         "ResourceSpec type BUCKET_WILDCARD not supported for 
OzoneNativeAuthorizer");
 
-    final Set<AssumeRoleRequest.OzoneGrant> resultRanger = 
createPathsAndPermissions(
-        VOLUME, RANGER, actions, resourceSpecs, Collections.emptySet());
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapRanger = new 
LinkedHashMap<>();
+    createPathsAndPermissions(VOLUME, RANGER, actions, resourceSpecs, 
emptySet(), objToAclsMapRanger);
+    final Set<OzoneGrant> resultRanger = 
groupObjectsByAcls(objToAclsMapRanger);
     assertThat(resultRanger).containsExactlyInAnyOrder(
-        new AssumeRoleRequest.OzoneGrant(writeAclObject, acls(WRITE_ACL)),
-        new AssumeRoleRequest.OzoneGrant(readVolume, acls(READ)));
+        new OzoneGrant(writeAclObject, acls(WRITE_ACL)), new 
OzoneGrant(readVolume, acls(READ)));
   }
 
   @Test
@@ -783,19 +788,19 @@ public void 
testCreatePathsAndPermissionsWithBucketsWildcardResourceAll() {
         new 
IamSessionPolicyResolver.ResourceSpec(S3ResourceType.BUCKET_WILDCARD, "*", 
null, null));
 
     expectIllegalArgumentException(
-        () -> createPathsAndPermissions(VOLUME, NATIVE, actions, 
resourceSpecs, Collections.emptySet()),
+        () -> createPathsAndPermissions(VOLUME, NATIVE, actions, 
resourceSpecs, emptySet(), new LinkedHashMap<>()),
         "ResourceSpec type BUCKET_WILDCARD not supported for 
OzoneNativeAuthorizer");
 
-    final Set<AssumeRoleRequest.OzoneGrant> resultRanger = 
createPathsAndPermissions(
-        VOLUME, RANGER, actions, resourceSpecs, Collections.emptySet());
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapRanger = new 
LinkedHashMap<>();
+    createPathsAndPermissions(VOLUME, RANGER, actions, resourceSpecs, 
emptySet(), objToAclsMapRanger);
 
     // Both the volume and the wildcard bucket should end up with READ + LIST 
permissions.
     // We also need READ access on the keys
+    final Set<OzoneGrant> resultRanger = 
groupObjectsByAcls(objToAclsMapRanger);
     final Set<IOzoneObj> readAndListObjects = objSet(volume(), bucket("*"));
     final Set<IOzoneObj> readObjects = objSet(key("*", "*"));
     assertThat(resultRanger).containsExactlyInAnyOrder(
-        new AssumeRoleRequest.OzoneGrant(readAndListObjects, acls(READ, LIST)),
-        new AssumeRoleRequest.OzoneGrant(readObjects, acls(READ)));
+        new OzoneGrant(readAndListObjects, acls(READ, LIST)), new 
OzoneGrant(readObjects, acls(READ)));
   }
 
   @Test
@@ -805,13 +810,15 @@ public void 
testCreatePathsAndPermissionsWithObjectExactResource() {
         new IamSessionPolicyResolver.ResourceSpec(S3ResourceType.OBJECT_EXACT, 
"bucket1", null, "key.txt"));
     final Set<IOzoneObj> readObjects = objSet(key("bucket1", "key.txt"), 
bucket("bucket1"), volume());
 
-    final Set<AssumeRoleRequest.OzoneGrant> resultNative = 
createPathsAndPermissions(
-        VOLUME, NATIVE, actions, resourceSpecs, Collections.emptySet());
-    assertThat(resultNative).containsExactly(new 
AssumeRoleRequest.OzoneGrant(readObjects, acls(READ)));
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapNative = new 
LinkedHashMap<>();
+    createPathsAndPermissions(VOLUME, NATIVE, actions, resourceSpecs, 
emptySet(), objToAclsMapNative);
+    final Set<OzoneGrant> resultNative = 
groupObjectsByAcls(objToAclsMapNative);
+    assertThat(resultNative).containsExactly(new OzoneGrant(readObjects, 
acls(READ)));
 
-    final Set<AssumeRoleRequest.OzoneGrant> resultRanger = 
createPathsAndPermissions(
-        VOLUME, RANGER, actions, resourceSpecs, Collections.emptySet());
-    assertThat(resultRanger).containsExactly(new 
AssumeRoleRequest.OzoneGrant(readObjects, acls(READ)));
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapRanger = new 
LinkedHashMap<>();
+    createPathsAndPermissions(VOLUME, RANGER, actions, resourceSpecs, 
emptySet(), objToAclsMapRanger);
+    final Set<OzoneGrant> resultRanger = 
groupObjectsByAcls(objToAclsMapRanger);
+    assertThat(resultRanger).containsExactly(new OzoneGrant(readObjects, 
acls(READ)));
   }
 
   @Test
@@ -821,12 +828,13 @@ public void 
testCreatePathsAndPermissionsWithObjectPrefixResource() {
     final Set<IamSessionPolicyResolver.ResourceSpec> resourceSpecs = 
Collections.singleton(
         new 
IamSessionPolicyResolver.ResourceSpec(S3ResourceType.OBJECT_PREFIX, "bucket1", 
"prefix/", null));
     final Set<IOzoneObj> nativeReadObjects = objSet(prefix("bucket1", 
"prefix/"), bucket("bucket1"), volume());
-    final Set<AssumeRoleRequest.OzoneGrant> resultNative = 
createPathsAndPermissions(
-        VOLUME, NATIVE, actions, resourceSpecs, Collections.emptySet());
-    assertThat(resultNative).containsExactly(new 
AssumeRoleRequest.OzoneGrant(nativeReadObjects, acls(READ)));
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapNative = new 
LinkedHashMap<>();
+    createPathsAndPermissions(VOLUME, NATIVE, actions, resourceSpecs, 
emptySet(), objToAclsMapNative);
+    final Set<OzoneGrant> resultNative = 
groupObjectsByAcls(objToAclsMapNative);
+    assertThat(resultNative).containsExactly(new OzoneGrant(nativeReadObjects, 
acls(READ)));
 
     expectIllegalArgumentException(
-        () -> createPathsAndPermissions(VOLUME, RANGER, actions, 
resourceSpecs, Collections.emptySet()),
+        () -> createPathsAndPermissions(VOLUME, RANGER, actions, 
resourceSpecs, emptySet(), new LinkedHashMap<>()),
         "ResourceSpec type OBJECT_PREFIX not supported for 
RangerOzoneAuthorizer");
   }
 
@@ -837,13 +845,14 @@ public void 
testCreatePathsAndPermissionsWithObjectPrefixWildcardResource() {
         new 
IamSessionPolicyResolver.ResourceSpec(S3ResourceType.OBJECT_PREFIX_WILDCARD, 
"bucket1", "prefix/*", null));
 
     expectIllegalArgumentException(
-        () -> createPathsAndPermissions(VOLUME, NATIVE, actions, 
resourceSpecs, Collections.emptySet()),
+        () -> createPathsAndPermissions(VOLUME, NATIVE, actions, 
resourceSpecs, emptySet(), new LinkedHashMap<>()),
         "ResourceSpec type OBJECT_PREFIX_WILDCARD not supported for 
OzoneNativeAuthorizer");
 
     final Set<IOzoneObj> rangerReadObjects = objSet(key("bucket1", 
"prefix/*"), bucket("bucket1"), volume());
-    final Set<AssumeRoleRequest.OzoneGrant> resultRanger = 
createPathsAndPermissions(
-        VOLUME, RANGER, actions, resourceSpecs, Collections.emptySet());
-    assertThat(resultRanger).containsExactly(new 
AssumeRoleRequest.OzoneGrant(rangerReadObjects, acls(READ)));
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapRanger = new 
LinkedHashMap<>();
+    createPathsAndPermissions(VOLUME, RANGER, actions, resourceSpecs, 
emptySet(), objToAclsMapRanger);
+    final Set<OzoneGrant> resultRanger = 
groupObjectsByAcls(objToAclsMapRanger);
+    assertThat(resultRanger).containsExactly(new OzoneGrant(rangerReadObjects, 
acls(READ)));
   }
 
   @Test
@@ -853,17 +862,19 @@ public void 
testCreatePathsAndPermissionsWithConditionPrefixesForObjectActionMus
 
     final Set<IamSessionPolicyResolver.ResourceSpec> nativeResourceSpecs = 
Collections.singleton(
         new 
IamSessionPolicyResolver.ResourceSpec(S3ResourceType.OBJECT_PREFIX, "bucket1", 
"", null));
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapNative = new 
LinkedHashMap<>();
     final Set<IOzoneObj> nativeReadObjects = objSet(prefix("bucket1", ""), 
bucket("bucket1"), volume());
-    final Set<AssumeRoleRequest.OzoneGrant> resultNative = 
createPathsAndPermissions(
-        VOLUME, NATIVE, actions, nativeResourceSpecs, prefixes);
-    assertThat(resultNative).containsExactly(new 
AssumeRoleRequest.OzoneGrant(nativeReadObjects, acls(READ)));
+    createPathsAndPermissions(VOLUME, NATIVE, actions, nativeResourceSpecs, 
prefixes, objToAclsMapNative);
+    final Set<OzoneGrant> resultNative = 
groupObjectsByAcls(objToAclsMapNative);
+    assertThat(resultNative).containsExactly(new OzoneGrant(nativeReadObjects, 
acls(READ)));
 
     final Set<IamSessionPolicyResolver.ResourceSpec> rangerResourceSpecs = 
Collections.singleton(
         new 
IamSessionPolicyResolver.ResourceSpec(S3ResourceType.OBJECT_PREFIX_WILDCARD, 
"bucket1", "*", null));
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapRanger = new 
LinkedHashMap<>();
     final Set<IOzoneObj> rangerReadObjects = objSet(key("bucket1", "*"), 
bucket("bucket1"), volume());
-    final Set<AssumeRoleRequest.OzoneGrant> resultRanger = 
createPathsAndPermissions(
-        VOLUME, RANGER, actions, rangerResourceSpecs, prefixes);
-    assertThat(resultRanger).containsExactly(new 
AssumeRoleRequest.OzoneGrant(rangerReadObjects, acls(READ)));
+    createPathsAndPermissions(VOLUME, RANGER, actions, rangerResourceSpecs, 
prefixes, objToAclsMapRanger);
+    final Set<OzoneGrant> resultRanger = 
groupObjectsByAcls(objToAclsMapRanger);
+    assertThat(resultRanger).containsExactly(new OzoneGrant(rangerReadObjects, 
acls(READ)));
   }
 
   @Test
@@ -876,22 +887,22 @@ public void 
testCreatePathsAndPermissionsWithConditionPrefixesForBucketActionWhe
     final Set<IOzoneObj> nativeReadObjects = objSet(
         prefix("bucket1", "folder1/"), prefix("bucket1", "folder2/"), 
volume());
     final Set<IOzoneObj> nativeReadAndListObject = objSet(bucket("bucket1"));
-    final Set<AssumeRoleRequest.OzoneGrant> resultNative = 
createPathsAndPermissions(
-        VOLUME, NATIVE, actions, nativeResourceSpecs, prefixes);
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapNative = new 
LinkedHashMap<>();
+    createPathsAndPermissions(VOLUME, NATIVE, actions, nativeResourceSpecs, 
prefixes, objToAclsMapNative);
+    final Set<OzoneGrant> resultNative = 
groupObjectsByAcls(objToAclsMapNative);
     assertThat(resultNative).containsExactlyInAnyOrder(
-        new AssumeRoleRequest.OzoneGrant(nativeReadObjects, acls(READ)),
-        new AssumeRoleRequest.OzoneGrant(nativeReadAndListObject, acls(READ, 
LIST)));
+        new OzoneGrant(nativeReadObjects, acls(READ)), new 
OzoneGrant(nativeReadAndListObject, acls(READ, LIST)));
 
     final Set<IamSessionPolicyResolver.ResourceSpec> rangerResourceSpecs = 
Collections.singleton(
         new IamSessionPolicyResolver.ResourceSpec(S3ResourceType.BUCKET, 
"bucket1", null, null));
     final Set<IOzoneObj> rangerReadObjects = objSet(
         key("bucket1", "folder1/"), key("bucket1", "folder2/"), volume());
     final Set<IOzoneObj> rangerReadAndListObject = objSet(bucket("bucket1"));
-    final Set<AssumeRoleRequest.OzoneGrant> resultRanger = 
createPathsAndPermissions(
-        VOLUME, RANGER, actions, rangerResourceSpecs, prefixes);
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapRanger = new 
LinkedHashMap<>();
+    createPathsAndPermissions(VOLUME, RANGER, actions, rangerResourceSpecs, 
prefixes, objToAclsMapRanger);
+    final Set<OzoneGrant> resultRanger = 
groupObjectsByAcls(objToAclsMapRanger);
     assertThat(resultRanger).containsExactlyInAnyOrder(
-        new AssumeRoleRequest.OzoneGrant(rangerReadObjects, acls(READ)),
-        new AssumeRoleRequest.OzoneGrant(rangerReadAndListObject, acls(READ, 
LIST)));
+        new OzoneGrant(rangerReadObjects, acls(READ)), new 
OzoneGrant(rangerReadAndListObject, acls(READ, LIST)));
   }
 
   @Test
@@ -903,22 +914,1014 @@ public void 
testCreatePathsAndPermissionsWithConditionPrefixesForBucketActionWhe
 
     final Set<IamSessionPolicyResolver.ResourceSpec> nativeResourceSpecs = 
Collections.singleton(
         new IamSessionPolicyResolver.ResourceSpec(S3ResourceType.BUCKET, 
"bucket1", null, null));
-    final Set<AssumeRoleRequest.OzoneGrant> resultNative = 
createPathsAndPermissions(
-        VOLUME, NATIVE, actions, nativeResourceSpecs, prefixes);
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapNative = new 
LinkedHashMap<>();
+    createPathsAndPermissions(VOLUME, NATIVE, actions, nativeResourceSpecs, 
prefixes, objToAclsMapNative);
+    final Set<OzoneGrant> resultNative = 
groupObjectsByAcls(objToAclsMapNative);
     assertThat(resultNative).containsExactlyInAnyOrder(
-        new AssumeRoleRequest.OzoneGrant(readObject, acls(READ)),
-        new AssumeRoleRequest.OzoneGrant(readAndReadAclObject, acls(READ, 
READ_ACL)));
+        new OzoneGrant(readObject, acls(READ)), new 
OzoneGrant(readAndReadAclObject, acls(READ, READ_ACL)));
 
     final Set<IamSessionPolicyResolver.ResourceSpec> rangerResourceSpecs = 
Collections.singleton(
         new IamSessionPolicyResolver.ResourceSpec(S3ResourceType.BUCKET, 
"bucket1", null, null));
-    final Set<AssumeRoleRequest.OzoneGrant> resultRanger = 
createPathsAndPermissions(
-        VOLUME, RANGER, actions, rangerResourceSpecs, prefixes);
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapRanger = new 
LinkedHashMap<>();
+    createPathsAndPermissions(VOLUME, RANGER, actions, rangerResourceSpecs, 
prefixes, objToAclsMapRanger);
+    final Set<OzoneGrant> resultRanger = 
groupObjectsByAcls(objToAclsMapRanger);
+    assertThat(resultRanger).containsExactlyInAnyOrder(
+        new OzoneGrant(readObject, acls(READ)), new 
OzoneGrant(readAndReadAclObject, acls(READ, READ_ACL)));
+  }
+
+  @Test
+  public void testCreatePathsAndPermissionsWithNoMappedActions() {
+    final Set<S3Action> actions = emptySet();
+
+    final Set<IamSessionPolicyResolver.ResourceSpec> nativeResourceSpecs = 
Collections.singleton(
+        new 
IamSessionPolicyResolver.ResourceSpec(S3ResourceType.OBJECT_PREFIX, "bucket1", 
null, null));
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapNative = new 
LinkedHashMap<>();
+    createPathsAndPermissions(VOLUME, NATIVE, actions, nativeResourceSpecs, 
emptySet(), objToAclsMapNative);
+    final Set<OzoneGrant> resultNative = 
groupObjectsByAcls(objToAclsMapNative);
+    assertThat(resultNative).isEmpty();
+
+    final Set<IamSessionPolicyResolver.ResourceSpec> rangerResourceSpecs = 
Collections.singleton(
+        new 
IamSessionPolicyResolver.ResourceSpec(S3ResourceType.OBJECT_PREFIX_WILDCARD, 
"bucket1", null, null));
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapRanger = new 
LinkedHashMap<>();
+    createPathsAndPermissions(VOLUME, RANGER, actions, rangerResourceSpecs, 
emptySet(), objToAclsMapRanger);
+    final Set<OzoneGrant> resultRanger = 
groupObjectsByAcls(objToAclsMapRanger);
+    assertThat(resultRanger).isEmpty();
+  }
+
+  @Test
+  public void testCreatePathsAndPermissionsWithNoMappedResources() {
+    final Set<S3Action> actions = Collections.singleton(S3Action.GET_OBJECT);
+    final Set<IamSessionPolicyResolver.ResourceSpec> resourceSpecs = 
emptySet();
+
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapNative = new 
LinkedHashMap<>();
+    createPathsAndPermissions(VOLUME, NATIVE, actions, resourceSpecs, 
emptySet(), objToAclsMapNative);
+    final Set<OzoneGrant> resultNative = 
groupObjectsByAcls(objToAclsMapNative);
+    assertThat(resultNative).isEmpty();
+
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapRanger = new 
LinkedHashMap<>();
+    createPathsAndPermissions(VOLUME, RANGER, actions, resourceSpecs, 
emptySet(), objToAclsMapRanger);
+    final Set<OzoneGrant> resultRanger = 
groupObjectsByAcls(objToAclsMapRanger);
+    assertThat(resultRanger).isEmpty();
+  }
+
+  @Test
+  public void 
testCreatePathsAndPermissionsDeduplicatesAcrossSameResourceTypes() {
+    final Set<S3Action> actions = Stream.of(
+        S3Action.GET_OBJECT, S3Action.GET_OBJECT_TAGGING, 
S3Action.DELETE_OBJECT, S3Action.DELETE_OBJECT_TAGGING)
+        .collect(Collectors.toSet());
+    final Set<IamSessionPolicyResolver.ResourceSpec> resourceSpecs = 
Collections.singleton(
+        new IamSessionPolicyResolver.ResourceSpec(S3ResourceType.OBJECT_EXACT, 
"bucket1", null, "key.txt"));
+    final Set<IOzoneObj> readAndDeleteObject = objSet(key("bucket1", 
"key.txt"));
+    final Set<IOzoneObj> readObjects = objSet(bucket("bucket1"), volume());
+
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapNative = new 
LinkedHashMap<>();
+    createPathsAndPermissions(VOLUME, NATIVE, actions, resourceSpecs, 
emptySet(), objToAclsMapNative);
+    final Set<OzoneGrant> resultNative = 
groupObjectsByAcls(objToAclsMapNative);
+    assertThat(resultNative).containsExactlyInAnyOrder(
+        new OzoneGrant(readAndDeleteObject, acls(READ, DELETE)), new 
OzoneGrant(readObjects, acls(READ)));
+
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapRanger = new 
LinkedHashMap<>();
+    createPathsAndPermissions(VOLUME, RANGER, actions, resourceSpecs, 
emptySet(), objToAclsMapRanger);
+    final Set<OzoneGrant> resultRanger = 
groupObjectsByAcls(objToAclsMapRanger);
     assertThat(resultRanger).containsExactlyInAnyOrder(
-        new AssumeRoleRequest.OzoneGrant(readObject, acls(READ)),
-        new AssumeRoleRequest.OzoneGrant(readAndReadAclObject, acls(READ, 
READ_ACL)));
+        new OzoneGrant(readAndDeleteObject, acls(READ, DELETE)), new 
OzoneGrant(readObjects, acls(READ)));
+  }
+
+  @Test
+  public void 
testCreatePathsAndPermissionsWithAllS3ActionsOverridesAnyOtherAction() {
+    final Set<S3Action> actions = Stream.of(
+        S3Action.ALL_S3, S3Action.GET_OBJECT, S3Action.DELETE_OBJECT, 
S3Action.LIST_BUCKET)
+        .collect(Collectors.toSet());
+    final Set<IamSessionPolicyResolver.ResourceSpec> resourceSpecs = Stream.of(
+        new IamSessionPolicyResolver.ResourceSpec(S3ResourceType.OBJECT_EXACT, 
"bucket1", null, "key.txt"),
+        new IamSessionPolicyResolver.ResourceSpec(S3ResourceType.BUCKET, 
"bucket2", null, null))
+        .collect(Collectors.toSet());
+    final Set<IOzoneObj> allObjects = objSet(key("bucket1", "key.txt"), 
bucket("bucket2"));
+
+    final Set<IOzoneObj> nativeReadObjects = objSet(volume(), 
bucket("bucket1"), prefix("bucket2", ""));
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapNative = new 
LinkedHashMap<>();
+    createPathsAndPermissions(VOLUME, NATIVE, actions, resourceSpecs, 
emptySet(), objToAclsMapNative);
+    final Set<OzoneGrant> resultNative = 
groupObjectsByAcls(objToAclsMapNative);
+    assertThat(resultNative).containsExactlyInAnyOrder(
+        new OzoneGrant(allObjects, acls(ALL)), new 
OzoneGrant(nativeReadObjects, acls(READ)));
+
+    final Set<IOzoneObj> rangerReadObjects = objSet(volume(), 
bucket("bucket1"), key("bucket2", "*"));
+    final Map<IOzoneObj, Set<ACLType>> objToAclsMapRanger = new 
LinkedHashMap<>();
+    createPathsAndPermissions(VOLUME, RANGER, actions, resourceSpecs, 
emptySet(), objToAclsMapRanger);
+    final Set<OzoneGrant> resultRanger = 
groupObjectsByAcls(objToAclsMapRanger);
+    assertThat(resultRanger).containsExactlyInAnyOrder(
+        new OzoneGrant(allObjects, acls(ALL)), new 
OzoneGrant(rangerReadObjects, acls(READ)));
+  }
+
+  @Test
+  public void 
testDeduplicatesAcrossMultipleStatementsWhenSameStatementsArePresent() throws 
OMException {
+    final String json = "{\n" +
+        "  \"Version\": \"2012-10-17\",\n" +
+        "  \"Statement\": [\n" +
+        "    {\n" +
+        "      \"Effect\": \"Allow\",\n" +
+        "      \"Action\": [\n" +
+        "        \"s3:GetBucketAcl\",\n" +
+        "        \"s3:PutBucketAcl\",\n" +
+        "        \"s3:ListBucket\"\n" +
+        "      ],\n" +
+        "      \"Resource\": \"arn:aws:s3:::my-bucket\"\n" +
+        "    },\n" +
+        "    {\n" +
+        "      \"Effect\": \"Allow\",\n" +
+        "      \"Action\": [\n" +
+        "        \"s3:GetBucketAcl\",\n" +
+        "        \"s3:PutBucketAcl\",\n" +
+        "        \"s3:ListBucket\"\n" +
+        "      ],\n" +
+        "      \"Resource\": \"arn:aws:s3:::my-bucket\"\n" +
+        "    }\n" +
+        "  ]\n" +
+        "}";
+
+    final Set<OzoneGrant> resolvedFromNativeAuthorizer = resolve(json, VOLUME, 
NATIVE);
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedNative = new LinkedHashSet<>();
+    // Expected for native: bucket READ, LIST, READ_ACL, WRITE_ACL; volume and 
prefix "" READ
+    final Set<IOzoneObj> bucketSet = objSet(bucket("my-bucket"));
+    final Set<ACLType> bucketAcls = acls(READ, LIST, READ_ACL, WRITE_ACL);
+    expectedResolvedNative.add(new OzoneGrant(bucketSet, bucketAcls));
+    expectedResolvedNative.add(new OzoneGrant(objSet(volume(), 
prefix("my-bucket", "")), acls(READ)));
+    assertThat(resolvedFromNativeAuthorizer).isEqualTo(expectedResolvedNative);
+
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    // Expected for Ranger: bucket READ, LIST, READ_ACL, WRITE_ACL; volume and 
key "*"  READ
+    expectedResolvedRanger.add(new OzoneGrant(bucketSet, bucketAcls));
+    expectedResolvedRanger.add(new OzoneGrant(objSet(volume(), 
key("my-bucket", "*")), acls(READ)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void 
testDeduplicatesAcrossMultipleStatementsForSameActionsButDifferentResource() 
throws OMException {
+    final String json = "{\n" +
+        "  \"Version\": \"2012-10-17\",\n" +
+        "  \"Statement\": [\n" +
+        "    {\n" +
+        "      \"Effect\": \"Allow\",\n" +
+        "      \"Action\": [\n" +
+        "        \"s3:GetBucketAcl\",\n" +
+        "        \"s3:PutBucketAcl\",\n" +
+        "        \"s3:ListBucket\"\n" +
+        "      ],\n" +
+        "      \"Resource\": \"arn:aws:s3:::my-bucket\"\n" +
+        "    },\n" +
+        "    {\n" +
+        "      \"Effect\": \"Allow\",\n" +
+        "      \"Action\": [\n" +
+        "        \"s3:GetBucketAcl\",\n" +
+        "        \"s3:PutBucketAcl\",\n" +
+        "        \"s3:ListBucket\"\n" +
+        "      ],\n" +
+        "      \"Resource\": \"arn:aws:s3:::my-bucket2\"\n" +
+        "    }\n" +
+        "  ]\n" +
+        "}";
+
+    final Set<OzoneGrant> resolvedFromNativeAuthorizer = resolve(json, VOLUME, 
NATIVE);
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedNative = new LinkedHashSet<>();
+    // Expected for native: bucket READ, LIST, READ_ACL, WRITE_ACL; volume and 
prefix "" READ
+    final Set<IOzoneObj> bucketSet = objSet(bucket("my-bucket"), 
bucket("my-bucket2"));
+    final Set<ACLType> bucketAcls = acls(READ, LIST, READ_ACL, WRITE_ACL);
+    expectedResolvedNative.add(new OzoneGrant(bucketSet, bucketAcls));
+    expectedResolvedNative.add(new OzoneGrant(
+        objSet(volume(), prefix("my-bucket2", ""), prefix("my-bucket", "")), 
acls(READ)));
+    assertThat(resolvedFromNativeAuthorizer).isEqualTo(expectedResolvedNative);
+
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    // Expected for Ranger: bucket READ, LIST, READ_ACL, WRITE_ACL; volume and 
key "*" READ
+    expectedResolvedRanger.add(new OzoneGrant(bucketSet, bucketAcls));
+    expectedResolvedRanger.add(new OzoneGrant(
+        objSet(volume(), key("my-bucket2", "*"), key("my-bucket", "*")), 
acls(READ)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void 
testDeduplicatesAcrossMultipleStatementsForDifferentActionsButSameResource() 
throws OMException {
+    final String json = "{\n" +
+        "  \"Version\": \"2012-10-17\",\n" +
+        "  \"Statement\": [\n" +
+        "    {\n" +
+        "      \"Effect\": \"Allow\",\n" +
+        "      \"Action\": [\n" +
+        "        \"s3:GetBucketAcl\",\n" +
+        "        \"s3:PutBucketAcl\",\n" +
+        "        \"s3:ListBucket\"\n" +
+        "      ],\n" +
+        "      \"Resource\": \"arn:aws:s3:::my-bucket\"\n" +
+        "    },\n" +
+        "    {\n" +
+        "      \"Effect\": \"Allow\",\n" +
+        "      \"Action\": [\n" +
+        "        \"s3:GetBucketAcl\",\n" +
+        "        \"s3:CreateBucket\"\n" +
+        "      ],\n" +
+        "      \"Resource\": \"arn:aws:s3:::my-bucket\"\n" +
+        "    }\n" +
+        "  ]\n" +
+        "}";
+
+    final Set<OzoneGrant> resolvedFromNativeAuthorizer = resolve(json, VOLUME, 
NATIVE);
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedNative = new LinkedHashSet<>();
+    // Expected for native: bucket READ, LIST, READ_ACL, WRITE_ACL, CREATE; 
volume, prefix "" READ
+    final Set<IOzoneObj> bucketSet = objSet(bucket("my-bucket"));
+    final Set<ACLType> bucketAcls = acls(READ, LIST, READ_ACL, WRITE_ACL, 
CREATE);
+    expectedResolvedNative.add(new OzoneGrant(bucketSet, bucketAcls));
+    expectedResolvedNative.add(new OzoneGrant(objSet(volume(), 
prefix("my-bucket", "")), acls(READ)));
+    assertThat(resolvedFromNativeAuthorizer).isEqualTo(expectedResolvedNative);
+
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    // Expected for Ranger: bucket READ, LIST, READ_ACL, WRITE_ACL, CREATE; 
volume, key "*" READ
+    expectedResolvedRanger.add(new OzoneGrant(bucketSet, bucketAcls));
+    expectedResolvedRanger.add(new OzoneGrant(objSet(volume(), 
key("my-bucket", "*")), acls(READ)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void testDeduplicatesAcrossMultipleStatementsWhenAllActionPresent() 
throws OMException {
+    final String json = "{\n" +
+        "  \"Version\": \"2012-10-17\",\n" +
+        "  \"Statement\": [\n" +
+        "    {\n" +
+        "      \"Effect\": \"Allow\",\n" +
+        "      \"Action\": [\n" +
+        "        \"s3:GetBucketAcl\",\n" +
+        "        \"s3:PutBucketAcl\",\n" +
+        "        \"s3:ListBucket\"\n" +
+        "      ],\n" +
+        "      \"Resource\": \"arn:aws:s3:::my-bucket\"\n" +
+        "    },\n" +
+        "    {\n" +
+        "      \"Effect\": \"Allow\",\n" +
+        "      \"Action\": \"s3:*\",\n" +
+        "      \"Resource\": \"arn:aws:s3:::my-bucket\"\n" +
+        "    }\n" +
+        "  ]\n" +
+        "}";
+
+    final Set<OzoneGrant> resolvedFromNativeAuthorizer = resolve(json, VOLUME, 
NATIVE);
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedNative = new LinkedHashSet<>();
+    // Expected for native: bucket ALL (instead of individual actions); volume 
and prefix "" READ
+    final Set<IOzoneObj> bucketSet = objSet(bucket("my-bucket"));
+    final Set<ACLType> bucketAcls = acls(ALL);
+    expectedResolvedNative.add(new OzoneGrant(bucketSet, bucketAcls));
+    expectedResolvedNative.add(new OzoneGrant(objSet(volume(), 
prefix("my-bucket", "")), acls(READ)));
+    assertThat(resolvedFromNativeAuthorizer).isEqualTo(expectedResolvedNative);
+
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    // Expected for Ranger: bucket ALL (instead of individual actions); volume 
and key "*" READ
+    expectedResolvedRanger.add(new OzoneGrant(bucketSet, bucketAcls));
+    expectedResolvedRanger.add(new OzoneGrant(objSet(volume(), 
key("my-bucket", "*")), acls(READ)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void testAllowGetPutOnKey() throws OMException {
+    final String json = "{\n" +
+        "  \"Version\": \"2012-10-17\",\n" +
+        "  \"Statement\": [{\n" +
+        "    \"Effect\": \"Allow\",\n" +
+        "    \"Action\": [\"s3:GetObject\", \"s3:PutObject\"],\n" +
+        "    \"Resource\": \"arn:aws:s3:::my-bucket/folder/file.txt\"\n" +
+        "  }]\n" +
+        "}";
+
+    final Set<OzoneGrant> resolvedFromNativeAuthorizer = resolve(json, VOLUME, 
NATIVE);
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedFromBothAuthorizers = new 
LinkedHashSet<>();
+    // Expected: READ, CREATE, WRITE on key; bucket READ; volume READ
+    final Set<IOzoneObj> keySet = objSet(key("my-bucket", "folder/file.txt"));
+    final Set<ACLType> keyAcls = acls(READ, CREATE, WRITE);
+    expectedResolvedFromBothAuthorizers.add(new OzoneGrant(objSet(volume(), 
bucket("my-bucket")), acls(READ)));
+    expectedResolvedFromBothAuthorizers.add(new OzoneGrant(keySet, keyAcls));
+
+    
assertThat(resolvedFromNativeAuthorizer).isEqualTo(expectedResolvedFromBothAuthorizers);
+    
assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedFromBothAuthorizers);
+  }
+
+  @Test
+  public void testAllActionsForKey() throws OMException {
+    final String json = "{\n" +
+        "  \"Statement\": [{\n" +
+        "    \"Effect\": \"Allow\",\n" +
+        "    \"Action\": \"s3:*\",\n" +
+        "    \"Resource\": \"arn:aws:s3:::my-bucket/*\"\n" +
+        "  }]\n" +
+        "}";
+
+    final Set<OzoneGrant> resolvedFromNativeAuthorizer = resolve(json, VOLUME, 
NATIVE);
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedNative = new LinkedHashSet<>();
+    // Expected for native: all key ACLs on prefix "" under bucket; bucket 
READ, volume READ
+    final Set<IOzoneObj> keyPrefixSet = objSet(prefix("my-bucket", ""));
+    final Set<ACLType> allKeyAcls = acls(ALL);
+    expectedResolvedNative.add(new OzoneGrant(keyPrefixSet, allKeyAcls));
+    expectedResolvedNative.add(new OzoneGrant(objSet(volume(), 
bucket("my-bucket")), acls(READ)));
+    assertThat(resolvedFromNativeAuthorizer).isEqualTo(expectedResolvedNative);
+
+    // Expected for Ranger: all key acls for resource type KEY with key name 
"*"
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    final Set<IOzoneObj> rangerKeySet = objSet(key("my-bucket", "*"));
+    expectedResolvedRanger.add(new OzoneGrant(rangerKeySet, allKeyAcls));
+    expectedResolvedRanger.add(new OzoneGrant(objSet(volume(), 
bucket("my-bucket")), acls(READ)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void testAllActionsForBucket() throws OMException {
+    final String json = "{\n" +
+        "  \"Statement\": [{\n" +
+        "    \"Effect\": \"Allow\",\n" +
+        "    \"Action\": \"s3:*\",\n" +
+        "    \"Resource\": \"arn:aws:s3:::my-bucket\"\n" +
+        "  }]\n" +
+        "}";
+
+    final Set<OzoneGrant> resolvedFromNativeAuthorizer = resolve(json, VOLUME, 
NATIVE);
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedNative = new LinkedHashSet<>();
+    // Expected for native: all Bucket ACLs for bucket; volume, prefix "" READ
+    final Set<IOzoneObj> bucketSet = objSet(bucket("my-bucket"));
+    final Set<ACLType> allBucketAcls = acls(ALL);
+    expectedResolvedNative.add(new OzoneGrant(objSet(volume(), 
prefix("my-bucket", "")), acls(READ)));
+    expectedResolvedNative.add(new OzoneGrant(bucketSet, allBucketAcls));
+    assertThat(resolvedFromNativeAuthorizer).isEqualTo(expectedResolvedNative);
+
+    // Expected for Ranger: all Bucket ACLs for bucket; volume, key "*" READ
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    expectedResolvedRanger.add(new OzoneGrant(objSet(volume(), 
key("my-bucket", "*")), acls(READ)));
+    expectedResolvedRanger.add(new OzoneGrant(bucketSet, allBucketAcls));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void testMultipleResourcesInSeparateStatements() throws OMException {
+    final String json = "{\n" +
+        "  \"Version\": \"2012-10-17\",\n" +
+        "  \"Statement\": [\n" +
+        "    {\n" +
+        "      \"Effect\": \"Allow\",\n" +
+        "      \"Action\": [\n" +
+        "        \"s3:GetBucketAcl\",\n" +
+        "        \"s3:PutBucketAcl\",\n" +
+        "        \"s3:ListBucket\"\n" +
+        "      ],\n" +
+        "      \"Resource\": \"arn:aws:s3:::my-bucket\"\n" +
+        "    },\n" +
+        "    {\n" +
+        "      \"Effect\": \"Allow\",\n" +
+        "      \"Action\": \"s3:*\",\n" +
+        "      \"Resource\": \"arn:aws:s3:::my-bucket/*\"\n" +
+        "    }\n" +
+        "  ]\n" +
+        "}";
+
+    final Set<OzoneGrant> resolvedFromNativeAuthorizer = resolve(json, VOLUME, 
NATIVE);
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedNative = new LinkedHashSet<>();
+    // Expected for native: bucket READ, LIST, READ_ACL, WRITE_ACL; volume READ
+    final Set<IOzoneObj> bucketSet = objSet(bucket("my-bucket"));
+    final Set<ACLType> bucketAcls = acls(READ, LIST, READ_ACL, WRITE_ACL);
+    expectedResolvedNative.add(new OzoneGrant(bucketSet, bucketAcls));
+    expectedResolvedNative.add(new OzoneGrant(objSet(volume()), acls(READ)));
+    // Expected for native: all key ACLs on prefix "" under bucket
+    final Set<IOzoneObj> keyPrefixSet = objSet(prefix("my-bucket", ""));
+    final Set<ACLType> keyAllAcls = acls(ALL);
+    expectedResolvedNative.add(new OzoneGrant(keyPrefixSet, keyAllAcls));
+    assertThat(resolvedFromNativeAuthorizer).isEqualTo(expectedResolvedNative);
+
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    // Expected for Ranger: bucket READ, LIST, READ_ACL, WRITE_ACL; volume READ
+    expectedResolvedRanger.add(new OzoneGrant(bucketSet, bucketAcls));
+    expectedResolvedRanger.add(new OzoneGrant(objSet(volume()), acls(READ)));
+    // Expected for Ranger: all key acls for resource type KEY with key name 
"*"
+    final Set<IOzoneObj> rangerKeySet = objSet(key("my-bucket", "*"));
+    expectedResolvedRanger.add(new OzoneGrant(rangerKeySet, keyAllAcls));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void testMultipleResourcesInOneStatement() throws OMException {
+    final String json = "{\n" +
+        "  \"Version\": \"2012-10-17\",\n" +
+        "  \"Statement\": [\n" +
+        "    {\n" +
+        "      \"Effect\": \"Allow\",\n" +
+        "      \"Action\": [\n" +
+        "        \"s3:*\"\n" +
+        "      ],\n" +
+        "      \"Resource\": [\n" +
+        "        \"arn:aws:s3:::my-bucket\",\n" +
+        "        \"arn:aws:s3:::my-bucket/*\"\n" +
+        "      ]\n" +
+        "    }\n" +
+        "  ]\n" +
+        "}";
+
+    final Set<OzoneGrant> resolvedFromNativeAuthorizer = resolve(json, VOLUME, 
NATIVE);
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedNative = new LinkedHashSet<>();
+    // Expected for native: all for bucket and key acls; volume READ
+    final Set<IOzoneObj> resourceSetNative = objSet(bucket("my-bucket"), 
prefix("my-bucket", ""));
+    expectedResolvedNative.add(new OzoneGrant(resourceSetNative, acls(ALL)));
+    expectedResolvedNative.add(new OzoneGrant(objSet(volume()), acls(READ)));
+    assertThat(resolvedFromNativeAuthorizer).isEqualTo(expectedResolvedNative);
+
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    // Expected for Ranger: all for bucket and key acls; volume READ
+    final Set<IOzoneObj> resourceSetRanger = objSet(bucket("my-bucket"), 
key("my-bucket", "*"));
+    expectedResolvedRanger.add(new OzoneGrant(resourceSetRanger, acls(ALL)));
+    expectedResolvedRanger.add(new OzoneGrant(objSet(volume()), acls(READ)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void 
testMultipleResourcesWithDifferentBucketsAndDeepPathsInOneStatement() throws 
OMException {
+    final String json = "{\n" +
+        "  \"Version\": \"2012-10-17\",\n" +
+        "  \"Statement\": [\n" +
+        "    {\n" +
+        "      \"Effect\": \"Allow\",\n" +
+        "      \"Action\": [\n" +
+        "        \"s3:*\"\n" +
+        "      ],\n" +
+        "      \"Resource\": [\n" +
+        "        \"arn:aws:s3:::my-bucket/team/folder1/security/*\",\n" +
+        "        \"arn:aws:s3:::my-bucket2/team/folder2/misc/*\"\n" +
+        "      ]\n" +
+        "    }\n" +
+        "  ]\n" +
+        "}";
+
+    final Set<OzoneGrant> resolvedFromNativeAuthorizer = resolve(json, VOLUME, 
NATIVE);
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedNative = new LinkedHashSet<>();
+    // Expected for native: all key ACLs on prefix "team/folder1/security/" 
under
+    // my-bucket and all key ACLs on prefix "team/folder2/misc/" under 
my-bucket2; bucket READ; volume READ
+    final Set<IOzoneObj> keyPrefixSet = objSet(
+        prefix("my-bucket", "team/folder1/security/"), prefix("my-bucket2", 
"team/folder2/misc/"));
+    final Set<ACLType> keyAllAcls = acls(ALL);
+    expectedResolvedNative.add(new OzoneGrant(keyPrefixSet, keyAllAcls));
+    expectedResolvedNative.add(new OzoneGrant(objSet(volume(), 
bucket("my-bucket"), bucket("my-bucket2")), acls(READ)));
+    assertThat(resolvedFromNativeAuthorizer).isEqualTo(expectedResolvedNative);
+
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    // Expected for Ranger: all key acls for resource type KEY with key name
+    // "team/folder1/security/*" under my-bucket and "team/folder2/misc/*" 
under my-bucket2; bucket READ; volume READ
+    final Set<IOzoneObj> rangerKeySet = objSet(
+        key("my-bucket", "team/folder1/security/*"), key("my-bucket2", 
"team/folder2/misc/*"));
+    expectedResolvedRanger.add(new OzoneGrant(rangerKeySet, keyAllAcls));
+    expectedResolvedRanger.add(new OzoneGrant(objSet(volume(), 
bucket("my-bucket"), bucket("my-bucket2")), acls(READ)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void testUnsupportedActionIgnoredWhenItIsTheOnlyAction() throws 
OMException {
+    final String json = "{\n" +
+        "  \"Statement\": [{\n" +
+        "    \"Effect\": \"Allow\",\n" +
+        "    \"Action\": \"s3:ReplicateObject\",\n" +         // unsupported 
action
+        "    \"Resource\": \"*\"\n" +
+        "  }]\n" +
+        "}";
+
+    final Set<OzoneGrant> resolvedFromNativeAuthorizer = resolve(json, VOLUME, 
NATIVE);
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+
+    // Ensure what we got is what we expected
+    assertThat(resolvedFromNativeAuthorizer).isEmpty();
+    assertThat(resolvedFromRangerAuthorizer).isEmpty();
+  }
+
+  @Test
+  public void testUnsupportedResourceArnThrows() {
+    final String json = "{\n" +
+        "  \"Statement\": [{\n" +
+        "    \"Effect\": \"Allow\",\n" +
+        "    \"Action\": \"s3:ListBucket\",\n" +
+        "    \"Resource\": 
\"arn:aws:dynamodb:us-east-2:123456789012:table/example-table\"\n" +
+        "  }]\n" +
+        "}";
+
+    expectResolveThrowsForBothAuthorizers(
+        json, "Unsupported Resource Arn - " +
+        "arn:aws:dynamodb:us-east-2:123456789012:table/example-table", 
NOT_SUPPORTED_OPERATION);
+  }
+
+  @Test
+  public void testListBucketWithWildcard() throws OMException {
+    final String json = "{\n" +
+        "  \"Statement\": [{\n" +
+        "    \"Effect\": \"Allow\",\n" +
+        "    \"Action\": \"s3:ListBucket\",\n" +
+        "    \"Resource\": \"arn:aws:s3:::proj-*\"\n" +
+        "  }]\n" +
+        "}";
+
+    // Wildcards on bucket are not supported for Native authorizer
+    expectBucketWildcardUnsupportedExceptionForNativeAuthorizer(json);
+
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    // Expected for Ranger: bucket READ and LIST on wildcard pattern; volume 
and key "*" READ
+    final Set<IOzoneObj> bucketSet = objSet(bucket("proj-*"));
+    final Set<ACLType> bucketAcls = acls(READ, LIST);
+    expectedResolvedRanger.add(new OzoneGrant(bucketSet, bucketAcls));
+    expectedResolvedRanger.add(new OzoneGrant(objSet(volume(), key("proj-*", 
"*")), acls(READ)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void testListBucketOperationsWithNoPrefixes() throws OMException {
+    final String json = "{\n" +
+        "  \"Statement\": [\n" +
+        "    {\n" +
+        "      \"Effect\": \"Allow\",\n" +
+        "      \"Action\": [\n" +
+        "        \"s3:ListBucket\",\n" +
+        "        \"s3:ListBucketMultipartUploads\"\n" +
+        "      ],\n" +
+        "      \"Resource\": \"arn:aws:s3:::proj\"\n" +
+        "    }\n" +
+        "  ]\n" +
+        "}";
+
+    final Set<OzoneGrant> resolvedFromNativeAuthorizer = resolve(json, VOLUME, 
NATIVE);
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedNative = new LinkedHashSet<>();
+    // Expected for native: bucket READ and LIST; volume, prefix "" READ
+    final Set<IOzoneObj> bucketSet = objSet(bucket("proj"));
+    final Set<ACLType> bucketAcls = acls(READ, LIST);
+    final Set<IOzoneObj> nativeReadObjects = objSet(volume(), prefix("proj", 
""));
+    expectedResolvedNative.add(new OzoneGrant(bucketSet, bucketAcls));
+    expectedResolvedNative.add(new OzoneGrant(nativeReadObjects, acls(READ)));
+    assertThat(resolvedFromNativeAuthorizer).isEqualTo(expectedResolvedNative);
+
+    // Expected for Ranger: bucket READ and LIST; volume, key "*" READ
+    final Set<IOzoneObj> rangerReadObjects = objSet(volume(), key("proj", 
"*"));
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    expectedResolvedRanger.add(new OzoneGrant(bucketSet, bucketAcls));
+    expectedResolvedRanger.add(new OzoneGrant(rangerReadObjects, acls(READ)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void testIgnoresUnsupportedActionsWhenSupportedActionsAreIncluded() 
throws OMException {
+    final String json = "{\n" +
+        "  \"Version\": \"2012-10-17\",\n" +
+        "  \"Statement\": [\n" +
+        "    {\n" +
+        "      \"Sid\": \"AllowListingOfDataLakeFolder\",\n" +
+        "      \"Effect\": \"Allow\",\n" +
+        "      \"Action\": [\n" +
+        "        \"s3:GetAccelerateConfiguration\",\n" +    // unsupported 
action
+        "        \"s3:GetBucketAcl\",\n" +
+        "        \"s3:GetObject\",\n" +                     // object-level 
action not applied for bucket
+        "        \"s3:GetObjectAcl\",\n" +                  // unsupported 
action
+        "        \"s3:ListBucket\",\n" +
+        "        \"s3:ListBucketMultipartUploads\"\n" +
+        "      ],\n" +
+        "      \"Resource\": \"arn:aws:s3:::bucket1\",\n" +
+        "      \"Condition\": {\n" +
+        "        \"StringEquals\": {\n" +
+        "          \"s3:prefix\": [ \"team/folder\", \"team/folder/*\" ]\n" +
+        "        }\n" +
+        "      }\n" +
+        "    }\n" +
+        "  ]\n" +
+        "}";
+
+    final Set<OzoneGrant> resolvedFromNativeAuthorizer = resolve(json, VOLUME, 
NATIVE);
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedNative = new LinkedHashSet<>();
+
+    // Expected for native: READ, LIST, READ_ACL bucket acls; volume and 
prefixes "team/folder", "team/folder/" READ
+    final Set<IOzoneObj> bucketSet = objSet(bucket("bucket1"));
+    final Set<ACLType> bucketAcls = acls(READ, LIST, READ_ACL);
+    expectedResolvedNative.add(new OzoneGrant(bucketSet, bucketAcls));
+    expectedResolvedNative.add(new OzoneGrant(
+        objSet(volume(), prefix("bucket1", "team/folder"), prefix("bucket1", 
"team/folder/")), acls(READ)));
+    assertThat(resolvedFromNativeAuthorizer).isEqualTo(expectedResolvedNative);
+
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    // Expected for Ranger: READ, LIST, READ_ACL bucket acls; volume and keys 
"team/folder" and "team/folder/*" READ
+    expectedResolvedRanger.add(new OzoneGrant(bucketSet, bucketAcls));
+    expectedResolvedRanger.add(new OzoneGrant(
+        objSet(volume(), key("bucket1", "team/folder"), key("bucket1", 
"team/folder/*")), acls(READ)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void testMultiplePrefixesWithWildcards() throws OMException {
+    final String json = "{\n" +
+        "  \"Statement\": [{\n" +
+        "    \"Effect\": \"Allow\",\n" +
+        "    \"Action\": \"s3:GetObject\",\n" +
+        "    \"Resource\": \"arn:aws:s3:::logs/*\",\n" +
+        "    \"Condition\": { \"StringEquals\": { \"s3:prefix\": [\"a/*\", 
\"b/*\"] } }\n" +
+        "  }]\n" +
+        "}";
+
+    final Set<OzoneGrant> resolvedFromNativeAuthorizer = resolve(json, VOLUME, 
NATIVE);
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedNative = new LinkedHashSet<>();
+    // Expected for native: READ acl on prefix "" (condition prefixes are 
ignored); bucket READ; volume READ;
+    final Set<IOzoneObj> readObjectsNative = objSet(prefix("logs", ""), 
bucket("logs"), volume());
+    expectedResolvedNative.add(new OzoneGrant(readObjectsNative, acls(READ)));
+    assertThat(resolvedFromNativeAuthorizer).isEqualTo(expectedResolvedNative);
+
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    // Expected for Ranger: READ acl on key "*" (condition prefixes are 
ignored)
+    final Set<IOzoneObj> keySet = objSet(key("logs", "*"), bucket("logs"), 
volume());
+    expectedResolvedRanger.add(new OzoneGrant(keySet, acls(READ)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void testObjectResourceWithWildcardInMiddle() throws OMException {
+    final String json = "{\n" +
+        "  \"Statement\": [{\n" +
+        "    \"Effect\": \"Allow\",\n" +
+        "    \"Action\": \"s3:GetObject\",\n" +
+        "    \"Resource\": \"arn:aws:s3:::logs/file*.log\"\n" +
+        "  }]\n" +
+        "}";
+
+    // Wildcards in middle of object resource are not supported for Native 
authorizer
+    expectResolveThrows(
+        json, NATIVE, "Wildcard prefix patterns are not supported for Ozone 
native " +
+        "authorizer if wildcard is not at the end", NOT_SUPPORTED_OPERATION);
+
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    // Expected for Ranger: READ acl on key "file*.log", bucket READ, volume 
READ
+    final Set<IOzoneObj> readObjectsRanger = objSet(key("logs", "file*.log"), 
bucket("logs"), volume());
+    expectedResolvedRanger.add(new OzoneGrant(readObjectsRanger, acls(READ)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void testObjectResourceWithPrefixWildcard() throws OMException {
+    final String json = "{\n" +
+        "  \"Statement\": [{\n" +
+        "    \"Effect\": \"Allow\",\n" +
+        "    \"Action\": \"s3:GetObject\",\n" +
+        "    \"Resource\": \"arn:aws:s3:::myBucket/file*\"\n" +
+        "  }]\n" +
+        "}";
+
+    final Set<OzoneGrant> resolvedFromNativeAuthorizer = resolve(json, VOLUME, 
NATIVE);
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedNative = new LinkedHashSet<>();
+    // Expected for native: READ acl on prefix "file" under bucket, bucket 
READ, volume READ
+    final Set<IOzoneObj> readObjectsNative = objSet(prefix("myBucket", 
"file"), bucket("myBucket"), volume());
+    expectedResolvedNative.add(new OzoneGrant(readObjectsNative, acls(READ)));
+    assertThat(resolvedFromNativeAuthorizer).isEqualTo(expectedResolvedNative);
+
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    // Expected for Ranger: READ acl on key "file*", bucket READ, volume READ
+    final Set<IOzoneObj> readObjectsRanger = objSet(key("myBucket", "file*"), 
bucket("myBucket"), volume());
+    expectedResolvedRanger.add(new OzoneGrant(readObjectsRanger, acls(READ)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void testBucketActionOnAllResources() throws OMException {
+    final String json = "{\n" +
+        "  \"Statement\": [{\n" +
+        "    \"Effect\": \"Allow\",\n" +
+        "      \"Action\": [\n" +
+        "        \"s3:ListAllMyBuckets\",\n" +
+        "        \"s3:ListBucket\"\n" +
+        "      ],\n" +
+        "    \"Resource\": \"*\"\n" +
+        "  }]\n" +
+        "}";
+
+    // Wildcards on bucket are not supported for Native authorizer
+    expectBucketWildcardUnsupportedExceptionForNativeAuthorizer(json);
+
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    // Expected for Ranger: READ and LIST on volume and bucket (wildcard), 
READ on key "*"
+    final Set<IOzoneObj> resourceSet = objSet(volume(), bucket("*"));
+    expectedResolvedRanger.add(new OzoneGrant(resourceSet, acls(READ, LIST)));
+    expectedResolvedRanger.add(new OzoneGrant(objSet(key("*", "*")), 
acls(READ)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void testObjectActionOnAllResources() throws OMException {
+    final String json = "{\n" +
+        "  \"Statement\": [{\n" +
+        "    \"Effect\": \"Allow\",\n" +
+        "    \"Action\": \"s3:PutObject\",\n" +
+        "    \"Resource\": \"*\"\n" +
+        "  }]\n" +
+        "}";
+
+    // Wildcards on bucket are not supported for Native authorizer
+    expectBucketWildcardUnsupportedExceptionForNativeAuthorizer(json);
+
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    // Expected for Ranger: CREATE and WRITE key acls on wildcard pattern, 
bucket READ, volume READ
+    final Set<IOzoneObj> keySet = objSet(key("*", "*"));
+    final Set<ACLType> keyAcls = acls(CREATE, WRITE);
+    expectedResolvedRanger.add(new OzoneGrant(keySet, keyAcls));
+    expectedResolvedRanger.add(new OzoneGrant(objSet(volume(), bucket("*")), 
acls(READ)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void testAllActionsOnAllResources() throws OMException {
+    final String json = "{\n" +
+        "  \"Statement\": [{\n" +
+        "    \"Effect\": \"Allow\",\n" +
+        "    \"Action\": \"s3:*\",\n" +
+        "    \"Resource\": \"*\"\n" +
+        "  }]\n" +
+        "}";
+
+    // Wildcards on bucket are not supported for Native authorizer
+    expectBucketWildcardUnsupportedExceptionForNativeAuthorizer(json);
+
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    // Expected for Ranger: READ, LIST acl on volume, ALL acl bucket 
(wildcard) and key (wildcard)
+    final Set<IOzoneObj> resourceSet = objSet(bucket("*"), key("*", "*"));
+    expectedResolvedRanger.add(new OzoneGrant(resourceSet, acls(ALL)));
+    expectedResolvedRanger.add(new OzoneGrant(objSet(volume()), acls(READ, 
LIST)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void testAllActionsOnAllBucketResources() throws OMException {
+    final String json = "{\n" +
+        "  \"Statement\": [{\n" +
+        "    \"Effect\": \"Allow\",\n" +
+        "    \"Action\": \"s3:*\",\n" +
+        "    \"Resource\": \"arn:aws:s3:::*\"\n" +
+        "  }]\n" +
+        "}";
+
+    // Wildcards on bucket are not supported for Native authorizer
+    expectBucketWildcardUnsupportedExceptionForNativeAuthorizer(json);
+
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    // Expected for Ranger: ALL bucket acls on wildcard pattern, volume READ, 
key "*" READ
+    final Set<IOzoneObj> bucketSet = objSet(bucket("*"));
+    final Set<ACLType> bucketAcls = acls(ALL);
+    expectedResolvedRanger.add(new OzoneGrant(bucketSet, bucketAcls));
+    expectedResolvedRanger.add(new OzoneGrant(objSet(key("*", "*")), 
acls(READ)));
+    expectedResolvedRanger.add(new OzoneGrant(objSet(volume()), acls(READ, 
LIST)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void testAllActionsOnAllObjectResources() throws OMException {
+    final String json = "{\n" +
+        "  \"Statement\": [{\n" +
+        "    \"Effect\": \"Allow\",\n" +
+        "    \"Action\": \"s3:*\",\n" +
+        "    \"Resource\": \"arn:aws:s3:::*/*\"\n" +
+        "  }]\n" +
+        "}";
+
+    // Wildcards on bucket are not supported for Native authorizer
+    expectBucketWildcardUnsupportedExceptionForNativeAuthorizer(json);
+
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    // Expected for Ranger: ALL key acls on wildcard pattern; bucket READ; 
volume READ
+    final Set<IOzoneObj> keySet = objSet(key("*", "*"));
+    final Set<ACLType> keyAcls = acls(ALL);
+    expectedResolvedRanger.add(new OzoneGrant(keySet, keyAcls));
+    expectedResolvedRanger.add(new OzoneGrant(objSet(volume(), bucket("*")), 
acls(READ)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void testWildcardActionGroupGetStar() throws OMException {
+    final String json = "{\n" +
+        "  \"Statement\": [{\n" +
+        "    \"Effect\": \"Allow\",\n" +
+        "    \"Action\": \"s3:Get*\",\n" +
+        "      \"Resource\": [\n" +
+        "        \"arn:aws:s3:::my-bucket\",\n" +
+        "        \"arn:aws:s3:::my-bucket/*\"\n" +
+        "      ]\n" +
+        "  }]\n" +
+        "}";
+
+    final Set<OzoneGrant> resolvedFromNativeAuthorizer = resolve(json, VOLUME, 
NATIVE);
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedNative = new LinkedHashSet<>();
+    // Expected for native: bucket READ, READ_ACL acls
+    final Set<IOzoneObj> bucketSet = objSet(bucket("my-bucket"));
+    final Set<ACLType> bucketAcls = acls(READ, READ_ACL);
+    expectedResolvedNative.add(new OzoneGrant(bucketSet, bucketAcls));
+    // Expected for native: READ acl on prefix "" under bucket; volume READ
+    final Set<IOzoneObj> readObjectsNative = objSet(prefix("my-bucket", ""), 
volume());
+    expectedResolvedNative.add(new OzoneGrant(readObjectsNative, acls(READ)));
+    assertThat(resolvedFromNativeAuthorizer).isEqualTo(expectedResolvedNative);
+
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    // Expected for Ranger: bucket READ, READ_ACL acls
+    expectedResolvedRanger.add(new OzoneGrant(bucketSet, bucketAcls));
+    // Expected for Ranger: READ key acl for resource type KEY with key name 
"*"; volume READ
+    final Set<IOzoneObj> readObjectsRanger = objSet(key("my-bucket", "*"), 
volume());
+    expectedResolvedRanger.add(new OzoneGrant(readObjectsRanger, acls(READ)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void testWildcardActionGroupListStar() throws OMException {
+    final String json = "{\n" +
+        "  \"Statement\": [{\n" +
+        "    \"Effect\": \"Allow\",\n" +
+        "    \"Action\": \"s3:List*\",\n" +
+        "      \"Resource\": [\n" +
+        "        \"arn:aws:s3:::my-bucket\",\n" +
+        "        \"arn:aws:s3:::my-bucket/*\"\n" +  // 
ListMultipartUploadParts has READ effect on file/object resources
+        "      ]\n" +
+        "  }]\n" +
+        "}";
+
+    final Set<OzoneGrant> resolvedFromNativeAuthorizer = resolve(json, VOLUME, 
NATIVE);
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedNative = new LinkedHashSet<>();
+    // Expected for native: READ, LIST bucket acls
+    final Set<IOzoneObj> bucketSet = objSet(bucket("my-bucket"));
+    final Set<ACLType> bucketAcls = acls(READ, LIST);
+    expectedResolvedNative.add(new OzoneGrant(bucketSet, bucketAcls));
+    // Expected for native: READ acl on prefix "" under bucket; volume READ
+    final Set<IOzoneObj> readObjectsNative = objSet(prefix("my-bucket", ""), 
volume());
+    expectedResolvedNative.add(new OzoneGrant(readObjectsNative, acls(READ)));
+    assertThat(resolvedFromNativeAuthorizer).isEqualTo(expectedResolvedNative);
+
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    // Expected for Ranger: READ, LIST bucket acls
+    expectedResolvedRanger.add(new OzoneGrant(bucketSet, bucketAcls));
+    // Expected for Ranger: READ key acl for resource type KEY with key name 
"*"; volume READ
+    final Set<IOzoneObj> readObjectsRanger = objSet(key("my-bucket", "*"), 
volume());
+    expectedResolvedRanger.add(new OzoneGrant(readObjectsRanger, acls(READ)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void testWildcardActionGroupPutStar() throws OMException {
+    final String json = "{\n" +
+        "  \"Statement\": [{\n" +
+        "    \"Effect\": \"Allow\",\n" +
+        "    \"Action\": \"s3:Put*\",\n" +
+        "      \"Resource\": [\n" +
+        "        \"arn:aws:s3:::my-bucket\",\n" +
+        "        \"arn:aws:s3:::my-bucket/*\"\n" +
+        "      ]\n" +
+        "  }]\n" +
+        "}";
+
+    final Set<OzoneGrant> resolvedFromNativeAuthorizer = resolve(json, VOLUME, 
NATIVE);
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedNative = new LinkedHashSet<>();
+    // Expected for native: bucket READ, WRITE_ACL acl
+    final Set<IOzoneObj> bucketSet = objSet(bucket("my-bucket"));
+    final Set<ACLType> bucketAcl = acls(READ, WRITE_ACL);
+    expectedResolvedNative.add(new OzoneGrant(bucketSet, bucketAcl));
+    // Expected for native: CREATE, WRITE acls on prefix "" under bucket
+    final Set<IOzoneObj> keyPrefixSet = objSet(prefix("my-bucket", ""));
+    final Set<ACLType> keyAcls = acls(CREATE, WRITE);
+    expectedResolvedNative.add(new OzoneGrant(keyPrefixSet, keyAcls));
+    // Expected for native: volume READ
+    expectedResolvedNative.add(new OzoneGrant(objSet(volume()), acls(READ)));
+    assertThat(resolvedFromNativeAuthorizer).isEqualTo(expectedResolvedNative);
+
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    // Expected for Ranger: bucket READ, WRITE_ACL acl
+    expectedResolvedRanger.add(new OzoneGrant(bucketSet, bucketAcl));
+    // Expected for Ranger: CREATE, WRITE key acls for resource type KEY with 
key name "*"
+    final Set<IOzoneObj> rangerKeySet = objSet(key("my-bucket", "*"));
+    expectedResolvedRanger.add(new OzoneGrant(rangerKeySet, keyAcls));
+    // Expected for Ranger: volume READ
+    expectedResolvedRanger.add(new OzoneGrant(objSet(volume()), acls(READ)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
   }
 
-  // TODO sts - add more createPathsAndPermissions tests in the next PR
+  @Test
+  public void testWildcardActionGroupDeleteStar() throws OMException {
+    final String json = "{\n" +
+        "  \"Statement\": [{\n" +
+        "    \"Effect\": \"Allow\",\n" +
+        "    \"Action\": \"s3:Delete*\",\n" +
+        "      \"Resource\": [\n" +
+        "        \"arn:aws:s3:::my-bucket\",\n" +
+        "        \"arn:aws:s3:::my-bucket/*\"\n" +
+        "      ]\n" +
+        "  }]\n" +
+        "}";
+
+    final Set<OzoneGrant> resolvedFromNativeAuthorizer = resolve(json, VOLUME, 
NATIVE);
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+
+    // Ensure what we got is what we expected
+    final Set<OzoneGrant> expectedResolvedNative = new LinkedHashSet<>();
+    // Expected for native: DELETE on prefix "" under bucket; bucket READ, 
DELETE; volume READ
+    final Set<IOzoneObj> resourceSetNative = objSet(prefix("my-bucket", ""));
+    expectedResolvedNative.add(new OzoneGrant(resourceSetNative, 
acls(DELETE)));
+    expectedResolvedNative.add(new OzoneGrant(objSet(bucket("my-bucket")), 
acls(READ, DELETE)));
+    expectedResolvedNative.add(new OzoneGrant(objSet(volume()), acls(READ)));
+    assertThat(resolvedFromNativeAuthorizer).isEqualTo(expectedResolvedNative);
+
+    final Set<OzoneGrant> expectedResolvedRanger = new LinkedHashSet<>();
+    // Expected for Ranger: DELETE on resource type KEY with key name "*"; 
bucket READ, DELETE; volume READ
+    final Set<IOzoneObj> resourceSetRanger = objSet(key("my-bucket", "*"));
+    expectedResolvedRanger.add(new OzoneGrant(resourceSetRanger, 
acls(DELETE)));
+    expectedResolvedRanger.add(new OzoneGrant(objSet(bucket("my-bucket")), 
acls(READ, DELETE)));
+    expectedResolvedRanger.add(new OzoneGrant(objSet(volume()), acls(READ)));
+    assertThat(resolvedFromRangerAuthorizer).isEqualTo(expectedResolvedRanger);
+  }
+
+  @Test
+  public void testMismatchedActionAndResourceReturnsEmpty() throws OMException 
{
+    final String json = "{\n" +
+        "  \"Statement\": [{\n" +
+        "    \"Effect\": \"Allow\",\n" +
+        "    \"Action\": \"s3:GetObject\",\n" +             // object-level 
action
+        "    \"Resource\": \"arn:aws:s3:::my-bucket\"\n" +  // bucket-level 
resource
+        "  }]\n" +
+        "}";
+
+    final Set<OzoneGrant> resolvedFromNativeAuthorizer = resolve(json, VOLUME, 
NATIVE);
+    final Set<OzoneGrant> resolvedFromRangerAuthorizer = resolve(json, VOLUME, 
RANGER);
+
+    // Ensure what we got is what we expected
+    assertThat(resolvedFromNativeAuthorizer).isEmpty();
+    assertThat(resolvedFromRangerAuthorizer).isEmpty();
+  }
+
+  @Test
+  public void testInvalidResourceArnThrows() {
+    final String json = "{\n" +
+        "  \"Statement\": [{\n" +
+        "    \"Effect\": \"Allow\",\n" +
+        "    \"Action\": \"s3:ListBucket\",\n" +
+        "    \"Resource\": \"arn:aws:s3:::\"\n" +
+        "  }]\n" +
+        "}";
+
+    expectResolveThrowsForBothAuthorizers(
+        json, "Invalid Resource Arn - arn:aws:s3:::", INVALID_REQUEST);
+  }
 
   private static void expectIllegalArgumentException(Runnable runnable, String 
expectedMessage) {
     try {
@@ -1003,7 +2006,7 @@ private static Set<String> strSet(String... strs) {
   private static void expectResolveThrows(String json, 
IamSessionPolicyResolver.AuthorizerType authorizerType,
       String expectedMessage, OMException.ResultCodes expectedCode) {
     try {
-      IamSessionPolicyResolver.resolve(json, VOLUME, authorizerType);
+      resolve(json, VOLUME, authorizerType);
       throw new AssertionError("Expected exception not thrown");
     } catch (OMException ex) {
       assertThat(ex.getMessage()).isEqualTo(expectedMessage);
@@ -1017,6 +2020,20 @@ private static void 
expectResolveThrowsForBothAuthorizers(String json, String ex
     expectResolveThrows(json, RANGER, expectedMessage, expectedCode);
   }
 
+  /**
+   * Ensure resources containing wildcards in buckets throw an Exception
+   * when the OzoneNativeAuthorizer is used.
+   */
+  private static void 
expectBucketWildcardUnsupportedExceptionForNativeAuthorizer(String json) {
+    try {
+      resolve(json, VOLUME, NATIVE);
+      throw new AssertionError("Expected exception not thrown");
+    } catch (OMException ex) {
+      assertThat(ex.getMessage()).isEqualTo("Wildcard bucket patterns are not 
supported for Ozone native authorizer");
+      assertThat(ex.getResult()).isEqualTo(NOT_SUPPORTED_OPERATION);
+    }
+  }
+
   private static String createJsonStringLargerThan2048Characters() {
     final StringBuilder jsonBuilder = new StringBuilder();
     jsonBuilder.append("{\n");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to