This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch HDDS-2181
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit ac66e6348c8d911b3b0731bd84e8b868b600fbe2
Author: Vivek Ratnavel Subramanian <vivekratnave...@gmail.com>
AuthorDate: Thu Oct 3 15:37:54 2019 -0700

    Fix acceptance tests and native authorizer
---
 .../main/java/org/apache/hadoop/ozone/OmUtils.java | 14 +++++++
 .../ozone/om/request/key/OMKeyCommitRequest.java   | 25 ++++++++---
 .../ozone/security/acl/OzoneNativeAuthorizer.java  | 48 ++++++++++++++++++----
 3 files changed, 73 insertions(+), 14 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 7cd38ad..b5ce46b 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -52,6 +52,8 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 
 import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
 import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_BIND_HOST_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY;
@@ -530,4 +532,16 @@ public final class OmUtils {
 
     return repeatedOmKeyInfo;
   }
+
+  /**
+   * Returns true if OzoneNativeAuthorizer is configured in the configuration.
+   * @param configuration ozone configuration
+   * @return true if OzoneNativeAuthorizer is configured in the configuration;
+   * else false.
+   */
+  public static boolean isNativeAuthorizerEnabled(Configuration configuration) 
{
+    String authorizer = configuration.get(OZONE_ACL_AUTHORIZER_CLASS);
+    return authorizer != null &&
+        authorizer.equals(OZONE_ACL_AUTHORIZER_CLASS_NATIVE);
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index b349fa9..bf4148d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -25,6 +25,9 @@ import java.util.stream.Collectors;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.slf4j.Logger;
@@ -112,11 +115,20 @@ public class OMKeyCommitRequest extends OMKeyRequest {
     IOException exception = null;
     OmKeyInfo omKeyInfo = null;
     OMClientResponse omClientResponse = null;
+    boolean bucketLockAcquired = false;
 
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
     try {
       // check Acl
-      checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
+      // Native authorizer requires client id as part of keyname to check
+      // write ACL on key. Add client id to key name if ozone native
+      // authorizer is configured.
+      Configuration config = new OzoneConfiguration();
+      String keyNameForAclCheck = keyName;
+      if (OmUtils.isNativeAuthorizerEnabled(config)) {
+        keyNameForAclCheck = keyName + "/" + commitKeyRequest.getClientID();
+      }
+      checkKeyAcls(ozoneManager, volumeName, bucketName, keyNameForAclCheck,
           IAccessAuthorizer.ACLType.WRITE);
 
       List<OmKeyLocationInfo> locationInfoList = commitKeyArgs
@@ -129,8 +141,8 @@ public class OMKeyCommitRequest extends OMKeyRequest {
       String dbOpenKey = omMetadataManager.getOpenKey(volumeName, bucketName,
           keyName, commitKeyRequest.getClientID());
 
-      omMetadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName,
-          bucketName);
+      bucketLockAcquired = omMetadataManager.getLock().acquireLock(BUCKET_LOCK,
+          volumeName, bucketName);
 
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
       omKeyInfo = omMetadataManager.getOpenKeyTable().get(dbOpenKey);
@@ -168,8 +180,11 @@ public class OMKeyCommitRequest extends OMKeyRequest {
             ozoneManagerDoubleBufferHelper.add(omClientResponse,
                 transactionLogIndex));
       }
-      omMetadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
-          bucketName);
+
+      if(bucketLockAcquired) {
+        omMetadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+            bucketName);
+      }
     }
 
     // Performing audit logging outside of the lock.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
index 5acd37e..1731421 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
@@ -69,6 +69,9 @@ public class OzoneNativeAuthorizer implements 
IAccessAuthorizer {
     Objects.requireNonNull(ozObject);
     Objects.requireNonNull(context);
     OzoneObjInfo objInfo;
+    RequestContext parentContext;
+    boolean isACLTypeCreate = (context.getAclRights() == ACLType.CREATE);
+    boolean isACLTypeDelete = (context.getAclRights() == ACLType.DELETE);
 
     if (ozObject instanceof OzoneObjInfo) {
       objInfo = (OzoneObjInfo) ozObject;
@@ -77,25 +80,52 @@ public class OzoneNativeAuthorizer implements 
IAccessAuthorizer {
           "configured to work with OzoneObjInfo type only.", INVALID_REQUEST);
     }
 
+    // For CREATE and DELETE acl requests, the parents need to be checked
+    // for WRITE acl. If Key create request is received, then we need to
+    // check if user has WRITE acl set on Bucket and Volume. In all other cases
+    // the parents also need to be checked for the same acl type.
+    if (isACLTypeCreate || isACLTypeDelete) {
+      parentContext = RequestContext.newBuilder()
+        .setClientUgi(context.getClientUgi())
+        .setIp(context.getIp())
+        .setAclType(context.getAclType())
+        .setAclRights(ACLType.WRITE)
+        .build();
+    } else {
+      parentContext = context;
+    }
+
     switch (objInfo.getResourceType()) {
     case VOLUME:
       LOG.trace("Checking access for volume:" + objInfo);
       return volumeManager.checkAccess(objInfo, context);
     case BUCKET:
       LOG.trace("Checking access for bucket:" + objInfo);
-      return (bucketManager.checkAccess(objInfo, context)
-          && volumeManager.checkAccess(objInfo, context));
+      // Skip bucket access check for CREATE acl since
+      // bucket will not exist at the time of creation
+      boolean bucketAccess = isACLTypeCreate
+          || bucketManager.checkAccess(objInfo, context);
+      return (bucketAccess
+          && volumeManager.checkAccess(objInfo, parentContext));
     case KEY:
       LOG.trace("Checking access for Key:" + objInfo);
-      return (keyManager.checkAccess(objInfo, context)
-          && prefixManager.checkAccess(objInfo, context)
-          && bucketManager.checkAccess(objInfo, context)
-          && volumeManager.checkAccess(objInfo, context));
+      // Skip key access check for CREATE acl since
+      // key will not exist at the time of creation
+      boolean keyAccess = isACLTypeCreate
+          || keyManager.checkAccess(objInfo, context);
+      return (keyAccess
+          && prefixManager.checkAccess(objInfo, parentContext)
+          && bucketManager.checkAccess(objInfo, parentContext)
+          && volumeManager.checkAccess(objInfo, parentContext));
     case PREFIX:
       LOG.trace("Checking access for Prefix:" + objInfo);
-      return (prefixManager.checkAccess(objInfo, context)
-          && bucketManager.checkAccess(objInfo, context)
-          && volumeManager.checkAccess(objInfo, context));
+      // Skip prefix access check for CREATE acl since
+      // prefix will not exist at the time of creation
+      boolean prefixAccess = isACLTypeCreate
+          || prefixManager.checkAccess(objInfo, context);
+      return (prefixAccess
+          && bucketManager.checkAccess(objInfo, parentContext)
+          && volumeManager.checkAccess(objInfo, parentContext));
     default:
       throw new OMException("Unexpected object type:" +
           objInfo.getResourceType(), INVALID_REQUEST);


---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-commits-h...@hadoop.apache.org

Reply via email to