This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 1f1655028eede24197705a594b6ef19e6737db35
Author: Da Zhou <da.z...@microsoft.com>
AuthorDate: Thu Feb 7 21:58:21 2019 +0000

    HADOOP-15954. ABFS: Enable owner and group conversion for MSI and login 
user using OAuth.
    
    Contributed by Da Zhou and Junhua Gu.
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  10 +
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java    |  37 +--
 .../fs/azurebfs/AzureBlobFileSystemStore.java      | 179 ++++++------
 .../fs/azurebfs/constants/ConfigurationKeys.java   |  23 +-
 .../fs/azurebfs/oauth2/IdentityTransformer.java    | 278 +++++++++++++++++++
 .../src/site/markdown/testing_azure.md             |  55 ++++
 .../fs/azurebfs/ITestAbfsIdentityTransformer.java  | 301 +++++++++++++++++++++
 7 files changed, 773 insertions(+), 110 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index b9bc7f2..67055c5 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -219,6 +219,16 @@ public class AbfsConfiguration{
 
   /**
    * Returns the account-specific value if it exists, then looks for an
+   * account-agnostic value.
+   * @param key Account-agnostic configuration key
+   * @return value if one exists, else the default value
+   */
+  public String getString(String key, String defaultValue) {
+    return rawConfig.get(accountConf(key), rawConfig.get(key, defaultValue));
+  }
+
+  /**
+   * Returns the account-specific value if it exists, then looks for an
    * account-agnostic value, and finally tries the default value.
    * @param key Account-agnostic configuration key
    * @param defaultValue Value returned if none is configured
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 4c24ac8..e321e9e 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -83,9 +83,6 @@ public class AzureBlobFileSystem extends FileSystem {
   public static final Logger LOG = 
LoggerFactory.getLogger(AzureBlobFileSystem.class);
   private URI uri;
   private Path workingDir;
-  private UserGroupInformation userGroupInformation;
-  private String user;
-  private String primaryUserGroup;
   private AzureBlobFileSystemStore abfsStore;
   private boolean isClosed;
 
@@ -103,9 +100,7 @@ public class AzureBlobFileSystem extends FileSystem {
     LOG.debug("Initializing AzureBlobFileSystem for {}", uri);
 
     this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
-    this.userGroupInformation = UserGroupInformation.getCurrentUser();
-    this.user = userGroupInformation.getUserName();
-    this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), 
configuration, userGroupInformation);
+    this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), 
configuration);
     final AbfsConfiguration abfsConfiguration = 
abfsStore.getAbfsConfiguration();
 
     this.setWorkingDirectory(this.getHomeDirectory());
@@ -120,18 +115,6 @@ public class AzureBlobFileSystem extends FileSystem {
       }
     }
 
-    if (!abfsConfiguration.getSkipUserGroupMetadataDuringInitialization()) {
-      try {
-        this.primaryUserGroup = userGroupInformation.getPrimaryGroupName();
-      } catch (IOException ex) {
-        LOG.error("Failed to get primary group for {}, using user name as 
primary group name", user);
-        this.primaryUserGroup = this.user;
-      }
-    } else {
-      //Provide a default group name
-      this.primaryUserGroup = this.user;
-    }
-
     if (UserGroupInformation.isSecurityEnabled()) {
       this.delegationTokenEnabled = 
abfsConfiguration.isDelegationTokenManagerEnabled();
 
@@ -153,8 +136,8 @@ public class AzureBlobFileSystem extends FileSystem {
     final StringBuilder sb = new StringBuilder(
         "AzureBlobFileSystem{");
     sb.append("uri=").append(uri);
-    sb.append(", user='").append(user).append('\'');
-    sb.append(", primaryUserGroup='").append(primaryUserGroup).append('\'');
+    sb.append(", user='").append(abfsStore.getUser()).append('\'');
+    sb.append(", 
primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
     sb.append('}');
     return sb.toString();
   }
@@ -503,7 +486,7 @@ public class AzureBlobFileSystem extends FileSystem {
   public Path getHomeDirectory() {
     return makeQualified(new Path(
             FileSystemConfigurations.USER_HOME_DIRECTORY_PREFIX
-                + "/" + this.userGroupInformation.getShortUserName()));
+                + "/" + abfsStore.getUser()));
   }
 
   /**
@@ -554,12 +537,20 @@ public class AzureBlobFileSystem extends FileSystem {
     super.finalize();
   }
 
+  /**
+   * Get the username of the FS.
+   * @return the short name of the user who instantiated the FS
+   */
   public String getOwnerUser() {
-    return user;
+    return abfsStore.getUser();
   }
 
+  /**
+   * Get the group name of the owner of the FS.
+   * @return primary group name
+   */
   public String getOwnerUserPrimaryGroup() {
-    return primaryUserGroup;
+    return abfsStore.getPrimaryGroup();
   }
 
   private boolean deleteRoot() throws IOException {
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 5c28bd4..c2739e9 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -67,6 +67,7 @@ import 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
 import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
 import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
@@ -88,9 +89,7 @@ import org.apache.http.client.utils.URIBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SUPER_USER;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT;
-
 /**
  * Provides the bridging logic between Hadoop's abstract filesystem and Azure 
Storage.
  */
@@ -101,7 +100,6 @@ public class AzureBlobFileSystemStore {
 
   private AbfsClient client;
   private URI uri;
-  private final UserGroupInformation userGroupInformation;
   private String userName;
   private String primaryUserGroup;
   private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss 
'GMT'";
@@ -113,11 +111,12 @@ public class AzureBlobFileSystemStore {
   private boolean isNamespaceEnabledSet;
   private boolean isNamespaceEnabled;
   private final AuthType authType;
+  private final UserGroupInformation userGroupInformation;
+  private final IdentityTransformer identityTransformer;
 
-  public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, 
Configuration configuration, UserGroupInformation userGroupInformation)
-          throws AzureBlobFileSystemException, IOException {
+  public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, 
Configuration configuration)
+          throws IOException {
     this.uri = uri;
-
     String[] authorityParts = authorityParts(uri);
     final String fileSystemName = authorityParts[0];
     final String accountName = authorityParts[1];
@@ -127,10 +126,8 @@ public class AzureBlobFileSystemStore {
     } catch (IllegalAccessException exception) {
       throw new FileSystemOperationUnhandledException(exception);
     }
-
-    this.userGroupInformation = userGroupInformation;
+    this.userGroupInformation = UserGroupInformation.getCurrentUser();
     this.userName = userGroupInformation.getShortUserName();
-
     if (!abfsConfiguration.getSkipUserGroupMetadataDuringInitialization()) {
       try {
         this.primaryUserGroup = userGroupInformation.getPrimaryGroupName();
@@ -145,12 +142,25 @@ public class AzureBlobFileSystemStore {
 
     this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
         
abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
-
     this.authType = abfsConfiguration.getAuthType(accountName);
     boolean usingOauth = (authType == AuthType.OAuth);
     boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? 
true : isSecureScheme;
     initializeClient(uri, fileSystemName, accountName, useHttps);
+    this.identityTransformer = new 
IdentityTransformer(abfsConfiguration.getRawConfiguration());
+  }
 
+  /**
+   * @return local user name.
+   * */
+  public String getUser() {
+    return this.userName;
+  }
+
+  /**
+  * @return primary group that user belongs to.
+  * */
+  public String getPrimaryGroup() {
+    return this.primaryUserGroup;
   }
 
   private String[] authorityParts(URI uri) throws 
InvalidUriAuthorityException, InvalidUriException {
@@ -452,60 +462,54 @@ public class AzureBlobFileSystemStore {
             path,
             isNamespaceEnabled);
 
+    final AbfsRestOperation op;
     if (path.isRoot()) {
-      final AbfsRestOperation op = isNamespaceEnabled
-          ? client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + 
AbfsHttpConstants.ROOT_PATH)
-          : client.getFilesystemProperties();
-
-      final long blockSize = abfsConfiguration.getAzureBlockSize();
-      final String owner = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
-      final String group = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
-      final String permissions = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS);
-      final String eTag = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
-      final String lastModified = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
-      final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
-
-      return new VersionedFileStatus(
-              isSuperUserOrEmpty(owner) ? userName : owner,
-              isSuperUserOrEmpty(group) ? primaryUserGroup : group,
-              permissions == null ? new AbfsPermission(FsAction.ALL, 
FsAction.ALL, FsAction.ALL)
-                      : AbfsPermission.valueOf(permissions),
-              hasAcl,
-              0,
-              true,
-              1,
-              blockSize,
-              parseLastModifiedTime(lastModified),
-              path,
-              eTag);
+      op = isNamespaceEnabled
+              ? client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + 
AbfsHttpConstants.ROOT_PATH)
+              : client.getFilesystemProperties();
     } else {
-      AbfsRestOperation op = 
client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + 
getRelativePath(path), isNamespaceEnabled);
-
-      final long blockSize = abfsConfiguration.getAzureBlockSize();
-      final AbfsHttpOperation result = op.getResult();
-      final String eTag = 
result.getResponseHeader(HttpHeaderConfigurations.ETAG);
-      final String lastModified = 
result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
-      final String contentLength = 
result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH);
-      final String resourceType = 
result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
-      final String owner = 
result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
-      final String group = 
result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
-      final String permissions = 
result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS));
-      final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
-
-      return new VersionedFileStatus(
-              isSuperUserOrEmpty(owner) ? userName : owner,
-              isSuperUserOrEmpty(group) ? primaryUserGroup : group,
-              permissions == null ? new AbfsPermission(FsAction.ALL, 
FsAction.ALL, FsAction.ALL)
-                      : AbfsPermission.valueOf(permissions),
-              hasAcl,
-              parseContentLength(contentLength),
-              parseIsDirectory(resourceType),
-              1,
-              blockSize,
-              parseLastModifiedTime(lastModified),
-              path,
-              eTag);
+      op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + 
getRelativePath(path), isNamespaceEnabled);
     }
+
+    final long blockSize = abfsConfiguration.getAzureBlockSize();
+    final AbfsHttpOperation result = op.getResult();
+
+    final String eTag = 
result.getResponseHeader(HttpHeaderConfigurations.ETAG);
+    final String lastModified = 
result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
+    final String permissions = 
result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS));
+    final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
+    final long contentLength;
+    final boolean resourceIsDir;
+
+    if (path.isRoot()) {
+      contentLength = 0;
+      resourceIsDir = true;
+    } else {
+      contentLength = 
parseContentLength(result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
+      resourceIsDir = 
parseIsDirectory(result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE));
+    }
+
+    final String transformedOwner = 
identityTransformer.transformIdentityForGetRequest(
+              result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER),
+              userName);
+
+    final String transformedGroup = 
identityTransformer.transformIdentityForGetRequest(
+              result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP),
+              primaryUserGroup);
+
+    return new VersionedFileStatus(
+            transformedOwner,
+            transformedGroup,
+            permissions == null ? new AbfsPermission(FsAction.ALL, 
FsAction.ALL, FsAction.ALL)
+                    : AbfsPermission.valueOf(permissions),
+            hasAcl,
+            contentLength,
+            resourceIsDir,
+            1,
+            blockSize,
+            parseLastModifiedTime(lastModified),
+            path,
+            eTag);
   }
 
   public FileStatus[] listStatus(final Path path) throws IOException {
@@ -532,8 +536,8 @@ public class AzureBlobFileSystemStore {
       long blockSize = abfsConfiguration.getAzureBlockSize();
 
       for (ListResultEntrySchema entry : retrievedSchema.paths()) {
-        final String owner = isSuperUserOrEmpty(entry.owner()) ? userName : 
entry.owner();
-        final String group = isSuperUserOrEmpty(entry.group()) ? 
primaryUserGroup : entry.group();
+        final String owner = 
identityTransformer.transformIdentityForGetRequest(entry.owner(), userName);
+        final String group = 
identityTransformer.transformIdentityForGetRequest(entry.group(), 
primaryUserGroup);
         final FsPermission fsPermission = entry.permissions() == null
                 ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
                 : AbfsPermission.valueOf(entry.permissions());
@@ -566,7 +570,7 @@ public class AzureBlobFileSystemStore {
 
     } while (continuation != null && !continuation.isEmpty());
 
-    return fileStatuses.toArray(new FileStatus[0]);
+    return fileStatuses.toArray(new FileStatus[fileStatuses.size()]);
   }
 
   public void setOwner(final Path path, final String owner, final String 
group) throws
@@ -576,20 +580,17 @@ public class AzureBlobFileSystemStore {
           "This operation is only valid for storage accounts with the 
hierarchical namespace enabled.");
     }
 
-    String effectiveOwner = owner;
-    String effectiveGroup = group;
-    if (authType == AuthType.SharedKey && owner.equals(userName)) {
-      effectiveOwner = SUPER_USER;
-      effectiveGroup = SUPER_USER;
-    }
-
     LOG.debug(
             "setOwner filesystem: {} path: {} owner: {} group: {}",
             client.getFileSystem(),
             path.toString(),
-            effectiveOwner,
-            effectiveGroup);
-    client.setOwner(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true), effectiveOwner, effectiveGroup);
+            owner,
+            group);
+
+    final String transformedOwner = 
identityTransformer.transformUserOrGroupForSetRequest(owner);
+    final String transformedGroup = 
identityTransformer.transformUserOrGroupForSetRequest(group);
+
+    client.setOwner(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true), transformedOwner, transformedGroup);
   }
 
   public void setPermission(final Path path, final FsPermission permission) 
throws
@@ -620,7 +621,9 @@ public class AzureBlobFileSystemStore {
             client.getFileSystem(),
             path.toString(),
             AclEntry.aclSpecToString(aclSpec));
-    final Map<String, String> modifyAclEntries = 
AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
+
+    final List<AclEntry> transformedAclEntries = 
identityTransformer.transformAclEntriesForSetRequest(aclSpec);
+    final Map<String, String> modifyAclEntries = 
AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(transformedAclEntries));
     boolean useUpn = AbfsAclHelper.isUpnFormatAclEntries(modifyAclEntries);
 
     final AbfsRestOperation op = 
client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true), useUpn);
@@ -645,7 +648,9 @@ public class AzureBlobFileSystemStore {
             client.getFileSystem(),
             path.toString(),
             AclEntry.aclSpecToString(aclSpec));
-    final Map<String, String> removeAclEntries = 
AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
+
+    final List<AclEntry> transformedAclEntries = 
identityTransformer.transformAclEntriesForSetRequest(aclSpec);
+    final Map<String, String> removeAclEntries = 
AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(transformedAclEntries));
     boolean isUpnFormat = 
AbfsAclHelper.isUpnFormatAclEntries(removeAclEntries);
 
     final AbfsRestOperation op = 
client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true), isUpnFormat);
@@ -722,7 +727,9 @@ public class AzureBlobFileSystemStore {
             client.getFileSystem(),
             path.toString(),
             AclEntry.aclSpecToString(aclSpec));
-    final Map<String, String> aclEntries = 
AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
+
+    final List<AclEntry> transformedAclEntries = 
identityTransformer.transformAclEntriesForSetRequest(aclSpec);
+    final Map<String, String> aclEntries = 
AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(transformedAclEntries));
     final boolean isUpnFormat = 
AbfsAclHelper.isUpnFormatAclEntries(aclEntries);
 
     final AbfsRestOperation op = 
client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true), isUpnFormat);
@@ -749,8 +756,13 @@ public class AzureBlobFileSystemStore {
     AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH 
+ getRelativePath(path, true));
     AbfsHttpOperation result = op.getResult();
 
-    final String owner = 
result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
-    final String group = 
result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
+    final String transformedOwner = 
identityTransformer.transformIdentityForGetRequest(
+            result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER),
+            userName);
+    final String transformedGroup = 
identityTransformer.transformIdentityForGetRequest(
+            result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP),
+            primaryUserGroup);
+
     final String permissions = 
result.getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS);
     final String aclSpecString = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL);
 
@@ -759,8 +771,8 @@ public class AzureBlobFileSystemStore {
             : AbfsPermission.valueOf(permissions);
 
     final AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
-    aclStatusBuilder.owner(isSuperUserOrEmpty(owner)? userName : owner);
-    aclStatusBuilder.group(isSuperUserOrEmpty(group) ? primaryUserGroup : 
group);
+    aclStatusBuilder.owner(transformedOwner);
+    aclStatusBuilder.group(transformedGroup);
 
     aclStatusBuilder.setPermission(fsPermission);
     aclStatusBuilder.stickyBit(fsPermission.getStickyBit());
@@ -944,11 +956,6 @@ public class AzureBlobFileSystemStore {
     return false;
   }
 
-  private boolean isSuperUserOrEmpty(final String name) {
-      return name == null || name.equals(SUPER_USER);
-  }
-
-
   private static class VersionedFileStatus extends FileStatus {
     private final String version;
 
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index d079d94..8cd86bf 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -55,7 +55,28 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = 
"fs.azure.user.agent.prefix";
   public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = 
"fs.azure.ssl.channel.mode";
   public static final String FS_AZURE_USE_UPN = "fs.azure.use.upn";
-
+  /** User principal names (UPNs) have the format “{alias}@{domain}”. If true,
+   *  only {alias} is included when a UPN would otherwise appear in the output
+   *  of APIs like getFileStatus, getOwner, getAclStatus, etc. Default is 
false. **/
+  public static final String FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME = 
"fs.azure.identity.transformer.enable.short.name";
+  /** If the domain name is specified and 
“fs.azure.identity.transformer.enable.short.name”
+   *  is true, then the {alias} part of a UPN can be specified as input to 
APIs like setOwner and
+   *  setAcl and it will be transformed to a UPN by appending @ and the domain 
specified by
+   *  this configuration property. **/
+  public static final String FS_AZURE_FILE_OWNER_DOMAINNAME = 
"fs.azure.identity.transformer.domain.name";
+  /** An Azure Active Directory object ID (oid) used as the replacement for 
names contained in the
+   * list specified by 
“fs.azure.identity.transformer.service.principal.substitution.list.
+   * Notice that instead of setting oid, you can also set $superuser.**/
+  public static final String FS_AZURE_OVERRIDE_OWNER_SP = 
"fs.azure.identity.transformer.service.principal.id";
+  /** A comma separated list of names to be replaced with the service 
principal ID specified by
+   * “fs.default.identity.transformer.service.principal.id”. This substitution 
occurs
+   * when setOwner, setAcl, modifyAclEntries, or removeAclEntries are invoked 
with identities
+   * contained in the substitution list. Notice that when in non-secure 
cluster, asterisk symbol "*"
+   * can be used to match all user/group. **/
+  public static final String FS_AZURE_OVERRIDE_OWNER_SP_LIST = 
"fs.azure.identity.transformer.service.principal.substitution.list";
+  /** By default this is set as false, so “$superuser” is replaced with the 
current user when it appears as the owner
+   * or owning group of a file or directory. To disable it, set it as true. **/
+  public static final String FS_AZURE_SKIP_SUPER_USER_REPLACEMENT = 
"fs.azure.identity.transformer.skip.superuser.replacement";
   public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER = 
"fs.azure.account.keyprovider";
   public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = 
"fs.azure.shellkeyprovider.script";
 
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/IdentityTransformer.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/IdentityTransformer.java
new file mode 100644
index 0000000..62b54d1
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/IdentityTransformer.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.AT;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.STAR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SUPER_USER;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SKIP_SUPER_USER_REPLACEMENT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FILE_OWNER_DOMAINNAME;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP_LIST;
+
+/**
+ * Perform transformation for Azure Active Directory identities used in owner, 
group and acls.
+ */
+public class IdentityTransformer {
+  private static final Logger LOG = 
LoggerFactory.getLogger(IdentityTransformer.class);
+
+  private boolean isSecure;
+  private String servicePrincipalId;
+  private String serviceWhiteList;
+  private String domainName;
+  private boolean enableShortName;
+  private boolean skipUserIdentityReplacement;
+  private boolean skipSuperUserReplacement;
+  private boolean domainIsSet;
+  private static final String UUID_PATTERN = 
"^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$";
+
+  public IdentityTransformer(Configuration configuration) throws IOException {
+    Preconditions.checkNotNull(configuration, "configuration");
+    this.isSecure = UserGroupInformation.getCurrentUser().isSecurityEnabled();
+    this.servicePrincipalId = configuration.get(FS_AZURE_OVERRIDE_OWNER_SP, 
"");
+    this.serviceWhiteList = configuration.get(FS_AZURE_OVERRIDE_OWNER_SP_LIST, 
"");
+    this.domainName = configuration.get(FS_AZURE_FILE_OWNER_DOMAINNAME, "");
+    this.enableShortName = 
configuration.getBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, false);
+
+    // - "servicePrincipalId" and "serviceWhiteList" are required for
+    //    transformation between localUserOrGroup and principalId,$superuser
+    // - "enableShortName" is required for transformation between shortName 
and fullyQualifiedName.
+    this.skipUserIdentityReplacement = servicePrincipalId.isEmpty() && 
serviceWhiteList.isEmpty() && !enableShortName;
+    this.skipSuperUserReplacement = 
configuration.getBoolean(FS_AZURE_SKIP_SUPER_USER_REPLACEMENT, false);
+
+    if (enableShortName){
+      // need to check the domain setting only when short name is enabled.
+      // if shortName is not enabled, transformer won't transform a shortName 
to
+      // a fully qualified name.
+      this.domainIsSet = !domainName.isEmpty();
+    }
+  }
+
+  /**
+   * Perform identity transformation for the Get request results in 
AzureBlobFileSystemStore:
+   * getFileStatus(), listStatus(), getAclStatus().
+   * Input originalUserOrGroup can be one of the following:
+   * 1. $superuser:
+   *     by default it will be transformed to local user/group, this can be 
disabled by setting
+   *     "fs.azure.identity.transformer.skip.superuser.replacement" to true.
+   *
+   * 2. User principal id:
+   *     can be transformed to localUserOrGroup, if this principal id matches 
the principal id set in
+   *     "fs.azure.identity.transformer.service.principal.id" and 
localUserOrGroup is stated in
+   *     "fs.azure.identity.transformer.service.principal.substitution.list"
+   *
+   * 3. User principal name (UPN):
+   *     can be transformed to a short name(localUserOrGroup) if 
"fs.azure.identity.transformer.enable.short.name"
+   *     is enabled.
+   *
+   * @param originalUserOrGroup the original user or group in the get request 
results: FileStatus, AclStatus.
+   * @param localUserOrGroup the local user or group, should be parsed from 
UserGroupInformation.
+   * @return owner or group after transformation.
+   * */
+  public String transformIdentityForGetRequest(String originalUserOrGroup, 
String localUserOrGroup) {
+    if (originalUserOrGroup == null) {
+      originalUserOrGroup = localUserOrGroup;
+      // localUserOrGroup might be a full name, so continue the transformation.
+    }
+    // case 1: it is $superuser and replace $superuser config is enabled
+    if (!skipSuperUserReplacement && SUPER_USER.equals(originalUserOrGroup)) {
+      return localUserOrGroup;
+    }
+
+    if (skipUserIdentityReplacement) {
+      return originalUserOrGroup;
+    }
+
+    // case 2: original owner is principalId set in config, and localUser
+    //         is a daemon service specified in substitution list,
+    //         To avoid ownership check failure in job task, replace it
+    //         to local daemon user/group
+    if (originalUserOrGroup.equals(servicePrincipalId) && 
isInSubstitutionList(localUserOrGroup)) {
+      return localUserOrGroup;
+    }
+
+    // case 3: If original owner is a fully qualified name, and
+    //         short name is enabled, replace with shortName.
+    if (shouldUseShortUserName(originalUserOrGroup)) {
+      return getShortName(originalUserOrGroup);
+    }
+
+    return originalUserOrGroup;
+  }
+
+  /**
+   * Perform Identity transformation when setting owner on a path.
+   * There are four possible input:
+   * 1.short name; 2.$superuser; 3.Fully qualified name; 4. principal id.
+   *
+   * short name could be transformed to:
+   *    - A service principal id or $superuser, if short name belongs a daemon 
service
+   *      stated in substitution list AND 
"fs.azure.identity.transformer.service.principal.id"
+   *      is set with $superuser or a principal id.
+   *    - Fully qualified name, if "fs.azure.identity.transformer.domain.name" 
is set in configuration.
+   *
+   * $superuser, fully qualified name and principalId should not be 
transformed.
+   *
+   * @param userOrGroup the user or group to be set as owner.
+   * @return user or group after transformation.
+   * */
+  public String transformUserOrGroupForSetRequest(String userOrGroup) {
+    if (userOrGroup == null || userOrGroup.isEmpty() || 
skipUserIdentityReplacement) {
+      return userOrGroup;
+    }
+
+    // case 1: when the owner to be set is stated in substitution list.
+    if (isInSubstitutionList(userOrGroup)) {
+      return servicePrincipalId;
+    }
+
+    // case 2: when the owner is a short name of the user principal name(UPN).
+    if (shouldUseFullyQualifiedUserName(userOrGroup)) {
+      return getFullyQualifiedName(userOrGroup);
+    }
+
+    return userOrGroup;
+  }
+
+  /**
+   * Perform Identity transformation when calling setAcl(),removeAclEntries() 
and modifyAclEntries()
+   * If the AclEntry type is a user or group, and its name is one of the 
following:
+   * 1.short name; 2.$superuser; 3.Fully qualified name; 4. principal id.
+   * Short name could be transformed to:
+   *    - A service principal id or $superuser, if short name belongs a daemon 
service
+   *      stated in substitution list AND 
"fs.azure.identity.transformer.service.principal.id"
+   *      is set with $superuser or a principal id.
+   *    - A fully qualified name, if the AclEntry type is User AND if 
"fs.azure.identity.transformer.domain.name"
+   *    is set in configuration. This is to make the behavior consistent with 
HDI.
+   *
+   * $superuser, fully qualified name and principal id should not be 
transformed.
+   *
+   * @param aclEntries list of AclEntry
+   * @return list of AclEntry after the identity transformation.
+   * */
+  public List<AclEntry> transformAclEntriesForSetRequest(final List<AclEntry> 
aclEntries) {
+    if (skipUserIdentityReplacement) {
+      return aclEntries;
+    }
+
+    for (int i = 0; i < aclEntries.size(); i++) {
+      AclEntry aclEntry = aclEntries.get(i);
+      String name = aclEntry.getName();
+      String transformedName = name;
+      if (name == null || name.isEmpty() || 
aclEntry.getType().equals(AclEntryType.OTHER) || 
aclEntry.getType().equals(AclEntryType.MASK)) {
+        continue;
+      }
+
+      // case 1: when the user or group name to be set is stated in 
substitution list.
+      if (isInSubstitutionList(name)) {
+        transformedName = servicePrincipalId;
+      } else if (aclEntry.getType().equals(AclEntryType.USER) // case 2: when 
the owner is a short name
+              && shouldUseFullyQualifiedUserName(name)) {     //         of 
the user principal name (UPN).
+        // Notice: for group type ACL entry, if name is shortName.
+        //         It won't be converted to Full Name. This is
+        //         to make the behavior consistent with HDI.
+        transformedName = getFullyQualifiedName(name);
+      }
+
+      // Avoid unnecessary new AclEntry allocation
+      if (transformedName.equals(name)) {
+        continue;
+      }
+
+      AclEntry.Builder aclEntryBuilder = new AclEntry.Builder();
+      aclEntryBuilder.setType(aclEntry.getType());
+      aclEntryBuilder.setName(transformedName);
+      aclEntryBuilder.setScope(aclEntry.getScope());
+      aclEntryBuilder.setPermission(aclEntry.getPermission());
+
+      // Replace the original AclEntry
+      aclEntries.set(i, aclEntryBuilder.build());
+    }
+    return aclEntries;
+  }
+
+  /**
+   * Internal method to identify if owner name returned by the ADLS backend is 
short name or not.
+   * If name contains "@", this code assumes that whatever comes after '@' is 
domain name and ignores it.
+   * @param owner
+   * @return
+   */
+  private boolean isShortUserName(String owner) {
+    return (owner != null) && !owner.contains(AT);
+  }
+
+  private boolean shouldUseShortUserName(String owner){
+    return enableShortName && !isShortUserName(owner);
+  }
+
+  private String getShortName(String userName) {
+    if (userName == null)    {
+      return  null;
+    }
+
+    if (isShortUserName(userName)) {
+      return userName;
+    }
+
+    String userNameBeforeAt = userName.substring(0, userName.indexOf(AT));
+    if (isSecure) {
+      //In secure clusters we apply auth to local rules to lowercase all short 
localUser names (notice /L at the end),
+      //E.G. : RULE:[1:$1@$0](.*@FOO.ONMICROSOFT.COM)s/@.*/// Ideally we 
should use the HadoopKerberosName class to get
+      // new HadoopKerberosName(arg).getShortName. However,
+      //1. ADLS can report the Realm in lower case while returning file owner 
names( ie. : some.u...@realm.onmicrosoft.com)
+      //2. The RULE specification does not allow specifying character classes 
to do case insensitive matches
+      //Due to this, we end up using a forced lowercase version of the 
manually shortened name
+      return userNameBeforeAt.toLowerCase(Locale.ENGLISH);
+    }
+    return userNameBeforeAt;
+  }
+
+  private String getFullyQualifiedName(String name){
+    if (domainIsSet && (name != null) && !name.contains(AT)){
+      return name + AT + domainName;
+    }
+    return name;
+  }
+
+  private boolean shouldUseFullyQualifiedUserName(String owner){
+    return domainIsSet && !SUPER_USER.equals(owner) && !isUuid(owner) && 
enableShortName && isShortUserName(owner);
+  }
+
+  private boolean isInSubstitutionList(String localUserName) {
+    return serviceWhiteList.contains(STAR) || 
serviceWhiteList.contains(localUserName);
+  }
+
+  private boolean isUuid(String input) {
+    if (input == null) return false;
+    return input.matches(UUID_PATTERN);
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md 
b/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md
index c2afe74..40a372d 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md
@@ -733,6 +733,61 @@ hierarchical namespace enabled, and set the following 
configuration settings:
      <description>AAD client id.</description>
    </property>
   -->
+
+  <!--
+    <property>
+        <name>fs.azure.identity.transformer.enable.short.name</name>
+        <value>true/false</value>
+        <description>
+          User principal names (UPNs) have the format “{alias}@{domain}”.
+          If true, only {alias} is included when a UPN would otherwise appear 
in the output
+          of APIs like getFileStatus, getOwner, getAclStatus, etc, default is 
false.
+        </description>
+    </property>
+
+    <property>
+        <name>fs.azure.identity.transformer.domain.name</name>
+        <value>domain name of the user's upn</value>
+        <description>
+          If the domain name is specified and 
“fs.azure.identity.transformer.enable.short.name”
+          is true, then the {alias} part of a UPN can be specified as input to 
APIs like setOwner,
+          setAcl, modifyAclEntries, or removeAclEntries, and it will be 
transformed to a UPN by appending @ and the domain specified by
+          this configuration property.
+        </description>
+    </property>
+
+    <property>
+        <name>fs.azure.identity.transformer.service.principal.id</name>
+        <value>service principal object id</value>
+        <description>
+          An Azure Active Directory object ID (oid) used as the replacement 
for names contained
+          in the list specified by 
“fs.azure.identity.transformer.service.principal.substitution.list”.
+          Notice that instead of setting oid, you can also set $superuser here.
+        </description>
+    </property>
+
+    <property>
+        <name>fs.azure.identity.transformer.skip.superuser.replacement</name>
+        <value>true/false</value>
+        <description>
+          If false, “$superuser” is replaced with the current user when it 
appears as the owner
+          or owning group of a file or directory. The default is false.
+        </description>
+    </property>
+
+    <property>
+        
<name>fs.azure.identity.transformer.service.principal.substitution.list</name>
+        <value>mapred,hdfs,yarn,hive,tez</value>
+        <description>
+           A comma separated list of names to be replaced with the service 
principal ID specified by
+           “fs.azure.identity.transformer.service.principal.id”.  This 
substitution occurs
+           when setOwner, setAcl, modifyAclEntries, or removeAclEntries are 
invoked with identities
+           contained in the substitution list. Notice that when in non-secure 
cluster, asterisk symbol *
+           can be used to match all user/group.
+        </description>
+    </property>
+   -->
+
 ```
 
 If running tests against an endpoint that uses the URL format
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java
new file mode 100644
index 0000000..41f680e
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SUPER_USER;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FILE_OWNER_DOMAINNAME;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP_LIST;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SKIP_SUPER_USER_REPLACEMENT;
+import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry;
+import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
+import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
+import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
+import static org.apache.hadoop.fs.permission.AclEntryType.MASK;
+import static org.apache.hadoop.fs.permission.AclEntryType.OTHER;
+import static org.apache.hadoop.fs.permission.AclEntryType.USER;
+import static org.apache.hadoop.fs.permission.FsAction.ALL;
+
+/**
+ * Test IdentityTransformer.
+ */
+//@RunWith(Parameterized.class)
+public class ITestAbfsIdentityTransformer extends AbstractAbfsScaleTest{
+  private final UserGroupInformation userGroupInfo;
+  private final String localUser;
+  private final String localGroup;
+  private static final String DAEMON = "daemon";
+  private static final String ASTERISK = "*";
+  private static final String SHORT_NAME = "abc";
+  private static final String DOMAIN = "domain.com";
+  private static final String FULLY_QUALIFIED_NAME = "a...@domain.com";
+  private static final String SERVICE_PRINCIPAL_ID = 
UUID.randomUUID().toString();
+
+  public ITestAbfsIdentityTransformer() throws Exception {
+    super();
+    userGroupInfo = UserGroupInformation.getCurrentUser();
+    localUser = userGroupInfo.getShortUserName();
+    localGroup = userGroupInfo.getPrimaryGroupName();
+  }
+
+  @Test
+  public void testDaemonServiceSettingIdentity() throws IOException {
+    Configuration config = this.getRawConfiguration();
+    resetIdentityConfig(config);
+    // Default config
+    IdentityTransformer identityTransformer = 
getTransformerWithDefaultIdentityConfig(config);
+    assertEquals("Identity should not change for default config",
+            DAEMON, 
identityTransformer.transformUserOrGroupForSetRequest(DAEMON));
+
+    // Add service principal id
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID);
+
+    // case 1: substitution list doesn't contain daemon
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, "a,b,c,d");
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("Identity should not change when substitution list doesn't 
contain daemon",
+            DAEMON, 
identityTransformer.transformUserOrGroupForSetRequest(DAEMON));
+
+    // case 2: substitution list contains daemon name
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, DAEMON + ",a,b,c,d");
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("Identity should be replaced to servicePrincipalId",
+            SERVICE_PRINCIPAL_ID, 
identityTransformer.transformUserOrGroupForSetRequest(DAEMON));
+
+    // case 3: substitution list is *
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, ASTERISK);
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("Identity should be replaced to servicePrincipalId",
+            SERVICE_PRINCIPAL_ID, 
identityTransformer.transformUserOrGroupForSetRequest(DAEMON));
+  }
+
+  @Test
+  public void testFullyQualifiedNameSettingIdentity() throws IOException {
+    Configuration config = this.getRawConfiguration();
+    // Default config
+    IdentityTransformer identityTransformer = 
getTransformerWithDefaultIdentityConfig(config);
+    assertEquals("short name should not be converted to full name by default",
+            SHORT_NAME, 
identityTransformer.transformUserOrGroupForSetRequest(SHORT_NAME));
+
+    resetIdentityConfig(config);
+
+    // Add config to get fully qualified username
+    config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true);
+    config.set(FS_AZURE_FILE_OWNER_DOMAINNAME, DOMAIN);
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("short name should be converted to full name",
+            FULLY_QUALIFIED_NAME, 
identityTransformer.transformUserOrGroupForSetRequest(SHORT_NAME));
+  }
+
+  @Test
+  public void testNoOpForSettingOidAsIdentity() throws IOException {
+    Configuration config = this.getRawConfiguration();
+    resetIdentityConfig(config);
+
+    config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true);
+    config.set(FS_AZURE_FILE_OWNER_DOMAINNAME, DOMAIN);
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP, UUID.randomUUID().toString());
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, "a,b,c,d");
+
+    IdentityTransformer identityTransformer = 
getTransformerWithCustomizedIdentityConfig(config);
+    final String principalId = UUID.randomUUID().toString();
+    assertEquals("Identity should not be changed when owner is already a 
principal id ",
+            principalId, 
identityTransformer.transformUserOrGroupForSetRequest(principalId));
+  }
+
+  @Test
+  public void testNoOpWhenSettingSuperUserAsdentity() throws IOException {
+    Configuration config = this.getRawConfiguration();
+    resetIdentityConfig(config);
+
+    config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true);
+    config.set(FS_AZURE_FILE_OWNER_DOMAINNAME, DOMAIN);
+    // Default config
+    IdentityTransformer identityTransformer = 
getTransformerWithDefaultIdentityConfig(config);
+    assertEquals("Identity should not be changed because it is not in 
substitution list",
+            SUPER_USER, 
identityTransformer.transformUserOrGroupForSetRequest(SUPER_USER));
+  }
+
+  @Test
+  public void testIdentityReplacementForSuperUserGetRequest() throws 
IOException {
+    Configuration config = this.getRawConfiguration();
+    resetIdentityConfig(config);
+
+    // with default config, identityTransformer should do $superUser 
replacement
+    IdentityTransformer identityTransformer = 
getTransformerWithDefaultIdentityConfig(config);
+    assertEquals("$superuser should be replaced with local user by default",
+            localUser, 
identityTransformer.transformIdentityForGetRequest(SUPER_USER, localUser));
+
+    // Disable $supeuser replacement
+    config.setBoolean(FS_AZURE_SKIP_SUPER_USER_REPLACEMENT, true);
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("$superuser should not be replaced",
+            SUPER_USER, 
identityTransformer.transformIdentityForGetRequest(SUPER_USER, localUser));
+  }
+
+  @Test
+  public void testIdentityReplacementForDaemonServiceGetRequest() throws 
IOException {
+    Configuration config = this.getRawConfiguration();
+    resetIdentityConfig(config);
+
+    // Default config
+    IdentityTransformer identityTransformer = 
getTransformerWithDefaultIdentityConfig(config);
+    assertEquals("By default servicePrincipalId should not be converted for 
GetFileStatus(), listFileStatus(), getAcl()",
+            SERVICE_PRINCIPAL_ID, 
identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, 
localUser));
+
+    resetIdentityConfig(config);
+    // 1. substitution list doesn't contain currentUser
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, "a,b,c,d");
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("servicePrincipalId should not be replaced if local daemon 
user is not in substitution list",
+            SERVICE_PRINCIPAL_ID, 
identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, 
localUser));
+
+    resetIdentityConfig(config);
+    // 2. substitution list contains currentUser(daemon name) but the service 
principal id in config doesn't match
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, localUser + ",a,b,c,d");
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP, UUID.randomUUID().toString());
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("servicePrincipalId should not be replaced if it is not equal 
to the SPN set in config",
+            SERVICE_PRINCIPAL_ID, 
identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, 
localUser));
+
+    resetIdentityConfig(config);
+    // 3. substitution list contains currentUser(daemon name) and the service 
principal id in config matches
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, localUser + ",a,b,c,d");
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID);
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("servicePrincipalId should be transformed to local use",
+            localUser, 
identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, 
localUser));
+
+    resetIdentityConfig(config);
+    // 4. substitution is "*" but the service principal id in config doesn't 
match the input
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, ASTERISK);
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP, UUID.randomUUID().toString());
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("servicePrincipalId should not be replaced if it is not equal 
to the SPN set in config",
+            SERVICE_PRINCIPAL_ID, 
identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, 
localUser));
+
+    resetIdentityConfig(config);
+    // 5. substitution is "*" and the service principal id in config match the 
input
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, ASTERISK);
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID);
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("servicePrincipalId should be transformed to local user",
+            localUser, 
identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, 
localUser));
+  }
+
+  @Test
+  public void testIdentityReplacementForKinitUserGetRequest() throws 
IOException {
+    Configuration config = this.getRawConfiguration();
+    resetIdentityConfig(config);
+
+    // Default config
+    IdentityTransformer identityTransformer = 
getTransformerWithDefaultIdentityConfig(config);
+    assertEquals("full name should not be transformed if shortname is not 
enabled",
+            FULLY_QUALIFIED_NAME, 
identityTransformer.transformIdentityForGetRequest(FULLY_QUALIFIED_NAME, 
localUser));
+
+    // add config to get short name
+    config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true);
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("should convert the full name to shortname ",
+            SHORT_NAME, 
identityTransformer.transformIdentityForGetRequest(FULLY_QUALIFIED_NAME, 
localUser));
+  }
+
+  @Test
+  public void transformAclEntriesForSetRequest() throws IOException {
+    Configuration config = this.getRawConfiguration();
+    resetIdentityConfig(config);
+
+    List<AclEntry> aclEntriesToBeTransformed = Lists.newArrayList(
+            aclEntry(ACCESS, USER, DAEMON, ALL),
+            aclEntry(ACCESS, USER, FULLY_QUALIFIED_NAME,ALL),
+            aclEntry(DEFAULT, USER, SUPER_USER, ALL),
+            aclEntry(DEFAULT, USER, SERVICE_PRINCIPAL_ID, ALL),
+            aclEntry(DEFAULT, USER, SHORT_NAME, ALL),
+            aclEntry(DEFAULT, GROUP, DAEMON, ALL),
+            aclEntry(DEFAULT, GROUP, SHORT_NAME, ALL), // Notice: for group 
type ACL entry, if name is shortName,
+            aclEntry(DEFAULT, OTHER, ALL),             //         It won't be 
converted to Full Name. This is
+            aclEntry(DEFAULT, MASK, ALL)               //         to make the 
behavior consistent with HDI.
+    );
+
+    // Default config should not change the identities
+    IdentityTransformer identityTransformer = 
getTransformerWithDefaultIdentityConfig(config);
+    checkAclEntriesList(aclEntriesToBeTransformed, 
identityTransformer.transformAclEntriesForSetRequest(aclEntriesToBeTransformed));
+
+    resetIdentityConfig(config);
+    // With config
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, DAEMON + ",a,b,c,d");
+    config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true);
+    config.set(FS_AZURE_FILE_OWNER_DOMAINNAME, DOMAIN);
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID);
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+
+    // expected acl entries
+    List<AclEntry> expectedAclEntries = Lists.newArrayList(
+            aclEntry(ACCESS, USER, SERVICE_PRINCIPAL_ID, ALL),
+            aclEntry(ACCESS, USER, FULLY_QUALIFIED_NAME, ALL),
+            aclEntry(DEFAULT, USER, SUPER_USER, ALL),
+            aclEntry(DEFAULT, USER, SERVICE_PRINCIPAL_ID, ALL),
+            aclEntry(DEFAULT, USER, FULLY_QUALIFIED_NAME, ALL),
+            aclEntry(DEFAULT, GROUP, SERVICE_PRINCIPAL_ID, ALL),
+            aclEntry(DEFAULT, GROUP, SHORT_NAME, ALL),
+            aclEntry(DEFAULT, OTHER, ALL),
+            aclEntry(DEFAULT, MASK, ALL)
+    );
+
+    
checkAclEntriesList(identityTransformer.transformAclEntriesForSetRequest(aclEntriesToBeTransformed),
 expectedAclEntries);
+
+  }
+
+  private void resetIdentityConfig(Configuration config) {
+    config.unset(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME);
+    config.unset(FS_AZURE_FILE_OWNER_DOMAINNAME);
+    config.unset(FS_AZURE_OVERRIDE_OWNER_SP);
+    config.unset(FS_AZURE_OVERRIDE_OWNER_SP_LIST);
+    config.unset(FS_AZURE_SKIP_SUPER_USER_REPLACEMENT);
+  }
+
+  private IdentityTransformer 
getTransformerWithDefaultIdentityConfig(Configuration config) throws 
IOException {
+    resetIdentityConfig(config);
+    return new IdentityTransformer(config);
+  }
+
+  private IdentityTransformer 
getTransformerWithCustomizedIdentityConfig(Configuration config) throws 
IOException {
+    return new IdentityTransformer(config);
+  }
+
+  private void checkAclEntriesList(List<AclEntry> aclEntries, List<AclEntry> 
expected) {
+    assertTrue("list size not equals", aclEntries.size() == expected.size());
+    for (int i = 0; i < aclEntries.size(); i++) {
+      assertEquals("Identity doesn't match", expected.get(i).getName(), 
aclEntries.get(i).getName());
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to