This is an automated email from the ASF dual-hosted git repository.

aengineer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit b4a1afd60e3a3c7319a1ffa97d5ace3a95ed26f6
Author: Anu Engineer <aengin...@apache.org>
AuthorDate: Tue Oct 15 15:26:38 2019 -0700

    HDDS 2181. Ozone Manager should send correct ACL type in ACL requests to 
Authorizer
    Contributed by Vivek Ratnavel Subramanian.
---
 ...OzoneManagerProtocolClientSideTranslatorPB.java |  2 +-
 .../ozone/security/acl/IAccessAuthorizer.java      |  2 +-
 .../org/apache/hadoop/ozone/om/TestOmAcls.java     |  6 ++--
 .../security/acl/TestOzoneNativeAuthorizer.java    | 23 +++++++++++--
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 12 +++++--
 .../org/apache/hadoop/ozone/om/OzoneManager.java   | 10 ++++++
 .../om/request/bucket/OMBucketCreateRequest.java   |  2 +-
 .../om/request/bucket/OMBucketDeleteRequest.java   |  2 +-
 .../om/request/file/OMDirectoryCreateRequest.java  |  5 ++-
 .../ozone/om/request/file/OMFileCreateRequest.java |  5 ++-
 .../om/request/key/OMAllocateBlockRequest.java     |  4 ++-
 .../ozone/om/request/key/OMKeyCommitRequest.java   | 14 ++++++--
 .../ozone/om/request/key/OMKeyCreateRequest.java   |  5 ++-
 .../ozone/om/request/key/OMKeyDeleteRequest.java   |  5 ++-
 .../ozone/om/request/key/OMKeyRenameRequest.java   | 10 ++++--
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  | 39 +++++++++++++++++++---
 .../ozone/security/acl/OzoneNativeAuthorizer.java  |  3 ++
 .../ozone/om/request/key/TestOMKeyRequest.java     |  1 +
 18 files changed, 123 insertions(+), 27 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index c9dc8ec..ee9e19a 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -340,7 +340,7 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
       if (omResponse.hasLeaderOMNodeId() && omFailoverProxyProvider != null) {
         String leaderOmId = omResponse.getLeaderOMNodeId();
 
-        // Failover to the OM node returned by OMReponse leaderOMNodeId if
+        // Failover to the OM node returned by OMResponse leaderOMNodeId if
         // current proxy is not pointing to that node.
         omFailoverProxyProvider.performFailoverIfRequired(leaderOmId);
       }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/IAccessAuthorizer.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/IAccessAuthorizer.java
index d8a2660..939f2c1 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/IAccessAuthorizer.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/IAccessAuthorizer.java
@@ -64,7 +64,7 @@ public interface IAccessAuthorizer {
 
     public static ACLType getAclTypeFromOrdinal(int ordinal) {
       if (ordinal > length - 1 && ordinal > -1) {
-        throw new IllegalArgumentException("Ordinal greater than array lentgh" 
+
+        throw new IllegalArgumentException("Ordinal greater than array length" 
+
             ". ordinal:" + ordinal);
       }
       return vals[ordinal];
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
index c75e365..ebf5964 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
@@ -118,7 +118,7 @@ public class TestOmAcls {
         () -> volume.createBucket(bucketName));
 
     assertTrue(logCapturer.getOutput()
-        .contains("doesn't have CREATE permission to access volume"));
+        .contains("doesn't have CREATE permission to access bucket"));
   }
 
   @Test
@@ -133,8 +133,8 @@ public class TestOmAcls {
 
     OzoneTestUtils.expectOmException(ResultCodes.PERMISSION_DENIED,
         () -> TestDataUtil.createKey(bucket, "testKey", "testcontent"));
-    assertTrue(logCapturer.getOutput().contains("doesn't have WRITE " +
-        "permission to access bucket"));
+    assertTrue(logCapturer.getOutput().contains("doesn't have CREATE " +
+        "permission to access key"));
   }
 
   /**
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java
index 43ce679..c7bda9b 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java
@@ -69,6 +69,8 @@ import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentity
 import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.USER;
 import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.WORLD;
 import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
+import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.CREATE;
+import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.WRITE;
 import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.NONE;
 import static 
org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.BUCKET;
 import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.KEY;
@@ -107,6 +109,7 @@ public class TestOzoneNativeAuthorizer {
   private static OzoneObj buckObj;
   private static OzoneObj keyObj;
   private static OzoneObj prefixObj;
+  private static long keySessionId;
 
   @Parameterized.Parameters
   public static Collection<Object[]> data() {
@@ -192,6 +195,7 @@ public class TestOzoneNativeAuthorizer {
           keySession.getKeyInfo().getLatestVersionLocations()
               .getLocationList());
       keyManager.commitKey(keyArgs, keySession.getId());
+      keySessionId = keySession.getId();
     }
 
     keyObj = new OzoneObjInfo.Builder()
@@ -358,10 +362,21 @@ public class TestOzoneNativeAuthorizer {
       List<ACLType> aclsToBeAdded =
           Arrays.stream(ACLType.values()).collect(Collectors.toList());
       aclsToBeValidated.remove(NONE);
+      // Do not validate "WRITE" since write acl type requires object to be
+      // present in OpenKeyTable.
+      aclsToBeValidated.remove(WRITE);
       aclsToBeValidated.remove(a1);
 
       aclsToBeAdded.remove(NONE);
       aclsToBeAdded.remove(ALL);
+      // AclType "CREATE" is skipped from access check on objects
+      // since the object will not exist during access check.
+      aclsToBeAdded.remove(CREATE);
+      // AclType "WRITE" is removed from being tested here,
+      // because object must always be present in OpenKeyTable for write
+      // acl requests. But, here the objects are already committed
+      // and will move to keyTable.
+      aclsToBeAdded.remove(WRITE);
 
       // Fetch acls again.
       for (ACLType a2 : aclsToBeAdded) {
@@ -370,7 +385,7 @@ public class TestOzoneNativeAuthorizer {
           acls = aclImplementor.getAcl(obj);
           List right = acls.stream().map(a -> a.getAclList()).collect(
               Collectors.toList());
-          assertFalse("Do not expected client to have " + a2 + " acl. " +
+          assertFalse("Did not expect client to have " + a2 + " acl. " +
                   "Current acls found:" + right + ". Type:" + accessType + ","
                   + " name:" + (accessType == USER ? user : group),
               nativeAuthorizer.checkAccess(obj,
@@ -410,7 +425,7 @@ public class TestOzoneNativeAuthorizer {
                   builder.setAclRights(a2).build()));
           aclsToBeValidated.remove(a2);
           for (ACLType a3 : aclsToBeValidated) {
-            if (!a3.equals(a1) && !a3.equals(a2)) {
+            if (!a3.equals(a1) && !a3.equals(a2) && !a3.equals(CREATE)) {
               assertFalse("User shouldn't have right " + a3 + ". " +
                       "Current acl rights for user:" + a1 + "," + a2,
                   nativeAuthorizer.checkAccess(obj,
@@ -420,7 +435,6 @@ public class TestOzoneNativeAuthorizer {
         }
       }
     }
-
   }
 
   private String getAclName(ACLIdentityType identityType) {
@@ -462,6 +476,9 @@ public class TestOzoneNativeAuthorizer {
       builder) throws OMException {
     List<ACLType> allAcls = new ArrayList<>(Arrays.asList(ACLType.values()));
     allAcls.remove(NONE);
+    // Removing CREATE, WRITE since they need special handling.
+    allAcls.remove(CREATE);
+    allAcls.remove(WRITE);
     for (ACLType a : allAcls) {
       assertFalse("User shouldn't have right " + a + ".", 
           nativeAuthorizer.checkAccess(obj, builder.setAclRights(a).build()));
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 20b7fdf..9a9c607 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -89,6 +89,7 @@ import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
 import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.RequestContext;
 import org.apache.hadoop.security.SecurityUtil;
@@ -1656,8 +1657,15 @@ public class KeyManagerImpl implements KeyManager {
       validateBucket(volume, bucket);
       OmKeyInfo keyInfo = null;
       try {
-        OzoneFileStatus fileStatus = getFileStatus(args);
-        keyInfo = fileStatus.getKeyInfo();
+        // For Acl Type "WRITE", the key can only be found in
+        // OpenKeyTable since appends to existing keys are not supported.
+        if (context.getAclRights() == IAccessAuthorizer.ACLType.WRITE) {
+          keyInfo = metadataManager.getOpenKeyTable().get(objectKey);
+        } else {
+          OzoneFileStatus fileStatus = getFileStatus(args);
+          keyInfo = fileStatus.getKeyInfo();
+        }
+
         if (keyInfo == null) {
           // the key does not exist, but it is a parent "dir" of some key
           // let access be determined based on volume/bucket/prefix ACL
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 0cd087e..294b74e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -301,6 +301,8 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   private final boolean grpcBlockTokenEnabled;
   private final boolean useRatisForReplication;
 
+  private boolean isNativeAuthorizerEnabled;
+
   private OzoneManager(OzoneConfiguration conf) throws IOException,
       AuthenticationException {
     super(OzoneVersionInfo.OZONE_VERSION_INFO);
@@ -473,6 +475,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       if (accessAuthorizer instanceof OzoneNativeAuthorizer) {
         OzoneNativeAuthorizer authorizer =
             (OzoneNativeAuthorizer) accessAuthorizer;
+        isNativeAuthorizerEnabled = true;
         authorizer.setVolumeManager(volumeManager);
         authorizer.setBucketManager(bucketManager);
         authorizer.setKeyManager(keyManager);
@@ -3292,4 +3295,11 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     return ozAdmins;
   }
 
+  /**
+   * Returns true if OzoneNativeAuthorizer is enabled and false if otherwise.
+   * @return if native authorizer is enabled.
+   */
+  public boolean isNativeAuthorizerEnabled() {
+    return isNativeAuthorizerEnabled;
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java
index 2b2448d..33a6310 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java
@@ -143,7 +143,7 @@ public class OMBucketCreateRequest extends OMClientRequest {
     try {
       // check Acl
       if (ozoneManager.getAclsEnabled()) {
-        checkAcls(ozoneManager, OzoneObj.ResourceType.VOLUME,
+        checkAcls(ozoneManager, OzoneObj.ResourceType.BUCKET,
             OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.CREATE,
             volumeName, bucketName, null);
       }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketDeleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketDeleteRequest.java
index 9469f88..5444a33 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketDeleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketDeleteRequest.java
@@ -95,7 +95,7 @@ public class OMBucketDeleteRequest extends OMClientRequest {
       // check Acl
       if (ozoneManager.getAclsEnabled()) {
         checkAcls(ozoneManager, OzoneObj.ResourceType.BUCKET,
-            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
+            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.DELETE,
             volumeName, bucketName, null);
       }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
index 4b591db..aaac874 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
@@ -32,6 +32,8 @@ import 
org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -127,7 +129,8 @@ public class OMDirectoryCreateRequest extends OMKeyRequest {
     OMClientResponse omClientResponse = null;
     try {
       // check Acl
-      checkBucketAcls(ozoneManager, volumeName, bucketName, keyName);
+      checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
+          IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
 
       // Check if this is the root of the filesystem.
       if (keyName.length() == 0) {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
index 20b5174..52af0a3 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
@@ -31,6 +31,8 @@ import javax.annotation.Nonnull;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -177,7 +179,8 @@ public class OMFileCreateRequest extends OMKeyRequest {
     OMClientResponse omClientResponse = null;
     try {
       // check Acl
-      checkBucketAcls(ozoneManager, volumeName, bucketName, keyName);
+      checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
+          IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
 
       // acquire lock
       acquiredLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
index e800927..7bc8738 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -169,7 +170,8 @@ public class OMAllocateBlockRequest extends OMKeyRequest {
     OmKeyInfo omKeyInfo = null;
     try {
       // check Acl
-      checkBucketAcls(ozoneManager, volumeName, bucketName, keyName);
+      checkKeyAclsInOpenKeyTable(ozoneManager, volumeName, bucketName, keyName,
+          IAccessAuthorizer.ACLType.WRITE, allocateBlockRequest.getClientID());
 
       OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
       validateBucketAndVolume(omMetadataManager, volumeName,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index 196d61c..f45679f 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -26,6 +26,7 @@ import java.util.stream.Collectors;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -111,11 +112,14 @@ public class OMKeyCommitRequest extends OMKeyRequest {
     IOException exception = null;
     OmKeyInfo omKeyInfo = null;
     OMClientResponse omClientResponse = null;
+    boolean bucketLockAcquired = false;
 
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
     try {
       // check Acl
-      checkBucketAcls(ozoneManager, volumeName, bucketName, keyName);
+      checkKeyAclsInOpenKeyTable(ozoneManager, volumeName, bucketName,
+          keyName, IAccessAuthorizer.ACLType.WRITE,
+          commitKeyRequest.getClientID());
 
       List<OmKeyLocationInfo> locationInfoList = commitKeyArgs
           .getKeyLocationsList().stream()
@@ -127,8 +131,8 @@ public class OMKeyCommitRequest extends OMKeyRequest {
       String dbOpenKey = omMetadataManager.getOpenKey(volumeName, bucketName,
           keyName, commitKeyRequest.getClientID());
 
-      omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
-          bucketName);
+      bucketLockAcquired = omMetadataManager.getLock().acquireLock(BUCKET_LOCK,
+          volumeName, bucketName);
 
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
       omKeyInfo = omMetadataManager.getOpenKeyTable().get(dbOpenKey);
@@ -165,6 +169,10 @@ public class OMKeyCommitRequest extends OMKeyRequest {
         omClientResponse.setFlushFuture(
             ozoneManagerDoubleBufferHelper.add(omClientResponse,
                 transactionLogIndex));
+        if(bucketLockAcquired) {
+          omMetadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+              bucketName);
+        }
       }
       omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
           bucketName);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
index baa13ad..9681b20 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
@@ -26,6 +26,8 @@ import java.util.stream.Collectors;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -162,7 +164,8 @@ public class OMKeyCreateRequest extends OMKeyRequest {
     OMClientResponse omClientResponse = null;
     try {
       // check Acl
-      checkBucketAcls(ozoneManager, volumeName, bucketName, keyName);
+      checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
+          IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
 
       acquireLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
           volumeName, bucketName);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
index ee4b9b2..28dfaa5 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
@@ -23,6 +23,8 @@ import java.util.Map;
 
 import com.google.common.base.Optional;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,7 +111,8 @@ public class OMKeyDeleteRequest extends OMKeyRequest {
     OMClientResponse omClientResponse = null;
     try {
       // check Acl
-      checkKeyAcls(ozoneManager, volumeName, bucketName, keyName);
+      checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
+          IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
 
       String objectKey = omMetadataManager.getOzoneKey(
           volumeName, bucketName, keyName);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java
index 526473c..6f7ff60 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -117,8 +119,12 @@ public class OMKeyRenameRequest extends OMKeyRequest {
         throw new OMException("Key name is empty",
             OMException.ResultCodes.INVALID_KEY_NAME);
       }
-      // check Acl
-      checkKeyAcls(ozoneManager, volumeName, bucketName, fromKeyName);
+      // check Acls to see if user has access to perform delete operation on
+      // old key and create operation on new key
+      checkKeyAcls(ozoneManager, volumeName, bucketName, fromKeyName,
+          IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
+      checkKeyAcls(ozoneManager, volumeName, bucketName, toKeyName,
+          IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
 
       acquiredLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
           volumeName, bucketName);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 8e1e760..eb93018 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -507,10 +507,11 @@ public abstract class OMKeyRequest extends 
OMClientRequest {
    * @throws IOException
    */
   protected void checkBucketAcls(OzoneManager ozoneManager, String volume,
-      String bucket, String key) throws IOException {
+      String bucket, String key, IAccessAuthorizer.ACLType aclType)
+      throws IOException {
     if (ozoneManager.getAclsEnabled()) {
       checkAcls(ozoneManager, OzoneObj.ResourceType.BUCKET,
-          OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
+          OzoneObj.StoreType.OZONE, aclType,
           volume, bucket, key);
     }
   }
@@ -522,15 +523,43 @@ public abstract class OMKeyRequest extends 
OMClientRequest {
    * @param volume
    * @param bucket
    * @param key
+   * @param aclType
+   * @param resourceType
    * @throws IOException
    */
   protected void checkKeyAcls(OzoneManager ozoneManager, String volume,
-      String bucket, String key) throws IOException {
+      String bucket, String key, IAccessAuthorizer.ACLType aclType,
+      OzoneObj.ResourceType resourceType)
+      throws IOException {
     if (ozoneManager.getAclsEnabled()) {
-      checkAcls(ozoneManager, OzoneObj.ResourceType.KEY,
-          OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
+      checkAcls(ozoneManager, resourceType, OzoneObj.StoreType.OZONE, aclType,
           volume, bucket, key);
     }
   }
 
+  /**
+   * Check ACLs for Ozone Key in OpenKey table
+   * if ozone native authorizer is enabled.
+   * @param ozoneManager
+   * @param volume
+   * @param bucket
+   * @param key
+   * @param aclType
+   * @param clientId
+   * @throws IOException
+   */
+  protected void checkKeyAclsInOpenKeyTable(OzoneManager ozoneManager,
+      String volume, String bucket, String key,
+      IAccessAuthorizer.ACLType aclType, long clientId) throws IOException {
+    String keyNameForAclCheck = key;
+    // Native authorizer requires client id as part of key name to check
+    // write ACL on key. Add client id to key name if ozone native
+    // authorizer is configured.
+    if (ozoneManager.isNativeAuthorizerEnabled()) {
+      keyNameForAclCheck = key + "/" + clientId;
+    }
+
+    checkKeyAcls(ozoneManager, volume, bucket, keyNameForAclCheck,
+          aclType, OzoneObj.ResourceType.KEY);
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
index 0b7c51a..74d6578 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
@@ -69,6 +69,9 @@ public class OzoneNativeAuthorizer implements 
IAccessAuthorizer {
     Objects.requireNonNull(ozObject);
     Objects.requireNonNull(context);
     OzoneObjInfo objInfo;
+    RequestContext parentContext;
+    boolean isACLTypeCreate = (context.getAclRights() == ACLType.CREATE);
+    boolean isACLTypeDelete = (context.getAclRights() == ACLType.DELETE);
 
     if (ozObject instanceof OzoneObjInfo) {
       objInfo = (OzoneObjInfo) ozObject;
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
index 92d6cdb..f634ff3 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
@@ -100,6 +100,7 @@ public class TestOMKeyRequest {
     omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
     when(ozoneManager.getMetrics()).thenReturn(omMetrics);
     when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
+    when(ozoneManager.getConfiguration()).thenReturn(ozoneConfiguration);
     auditLogger = Mockito.mock(AuditLogger.class);
     when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
     Mockito.doNothing().when(auditLogger).logWrite(any(AuditMessage.class));


---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-commits-h...@hadoop.apache.org

Reply via email to