xunliu commented on code in PR #4515:
URL: https://github.com/apache/gravitino/pull/4515#discussion_r1720993986


##########
api/src/main/java/org/apache/gravitino/authorization/RoleChange.java:
##########
@@ -152,4 +165,88 @@ public String toString() {
       return "REMOVESECURABLEOBJECT " + securableObject;
     }
   }
+
+  /**
+   * A UpdateSecurableObject to update securable object's privilege from role. 
<br>

Review Comment:
   DONE



##########
api/src/main/java/org/apache/gravitino/authorization/RoleChange.java:
##########
@@ -152,4 +165,88 @@ public String toString() {
       return "REMOVESECURABLEOBJECT " + securableObject;
     }
   }
+
+  /**
+   * A UpdateSecurableObject to update securable object's privilege from role. 
<br>
+   * The securable object's metadata entity must same as new securable 
object's metadata entity.
+   * <br>
+   * The securable object's privilege must be different as new securable 
object's privilege. <br>
+   */
+  final class UpdateSecurableObject implements RoleChange {
+    private final SecurableObject securableObject;
+    private final SecurableObject newSecurableObject;
+
+    private UpdateSecurableObject(
+        SecurableObject securableObject, SecurableObject newSecurableObject) {
+      if (!securableObject.fullName().equals(newSecurableObject.fullName())) {
+        throw new IllegalArgumentException(
+            "The securable object's metadata entity must be same as new 
securable object's metadata entity.");
+      }
+      if 
(securableObject.privileges().containsAll(newSecurableObject.privileges())) {
+        throw new IllegalArgumentException(
+            "The securable object's privilege must be different as new 
securable object's privilege.");
+      }
+
+      this.securableObject = securableObject;
+      this.newSecurableObject = newSecurableObject;
+    }
+
+    /**
+     * Returns the securable object to be updated.
+     *
+     * @return return a securable object.
+     */
+    public SecurableObject getSecurableObject() {
+      return this.securableObject;
+    }
+
+    /**
+     * Returns the new securable object.
+     *
+     * @return return a securable object.
+     */
+    public SecurableObject getNewSecurableObject() {
+      return this.newSecurableObject;
+    }
+
+    /**
+     * Compares this UpdateSecurableObject instance with another object for 
equality. The comparison
+     * is based on the old securable object and new securable object.
+     *
+     * @param o The object to compare with this instance.
+     * @return true if the given object represents the same add securable 
object; false otherwise.

Review Comment:
   DONE



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHiveAuthorizationPlugin.java:
##########
@@ -0,0 +1,854 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.ranger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.Group;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.RoleChange;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.authorization.User;
+import org.apache.gravitino.authorization.ranger.defines.VXGroup;
+import org.apache.gravitino.authorization.ranger.defines.VXGroupList;
+import org.apache.gravitino.authorization.ranger.defines.VXUser;
+import org.apache.gravitino.authorization.ranger.defines.VXUserList;
+import org.apache.gravitino.connector.AuthorizationPropertiesMeta;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.ranger.RangerServiceException;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RangerHiveAuthorizationPlugin is a plugin for Apache Ranger to manage the 
Hive authorization of
+ * the Apache Gravitino.
+ */
+public class RangerHiveAuthorizationPlugin extends RangerAuthorizationPlugin {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RangerHiveAuthorizationPlugin.class);
+
+  public RangerHiveAuthorizationPlugin(String catalogProvider, Map<String, 
String> config) {
+    super();
+    this.catalogProvider = catalogProvider;
+    String rangerUrl = 
config.get(AuthorizationPropertiesMeta.RANGER_ADMIN_URL);
+    String authType = config.get(AuthorizationPropertiesMeta.RANGER_AUTH_TYPE);
+    String username = config.get(AuthorizationPropertiesMeta.RANGER_USERNAME);
+    // Apache Ranger Password should be minimum 8 characters with min one 
alphabet and one numeric.
+    String password = config.get(AuthorizationPropertiesMeta.RANGER_PASSWORD);
+    rangerServiceName = 
config.get(AuthorizationPropertiesMeta.RANGER_SERVICE_NAME);
+    check(rangerUrl != null, "Ranger admin URL is required");
+    check(authType != null, "Ranger auth type is required");
+    check(username != null, "Ranger username is required");
+    check(password != null, "Ranger password is required");
+    check(rangerServiceName != null, "Ranger service name is required");
+
+    rangerClient = new RangerClientExt(rangerUrl, authType, username, 
password);
+  }
+
+  /**
+   * Ranger hive's privilege have `select`, `update`, `create`, `drop`, 
`alter`, `index`, `lock`,
+   * `read`, `write`, `repladmin`, `serviceadmin`, `refresh` and `all`.
+   *
+   * <p>Reference: 
ranger/agents-common/src/main/resources/service-defs/ranger-servicedef-hive.json
+   */
+  @Override
+  protected void initMapPrivileges() {
+    mapPrivileges =
+        ImmutableMap.<Privilege.Name, Set<String>>builder()
+            .put(
+                Privilege.Name.CREATE_SCHEMA,
+                ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .put(
+                Privilege.Name.CREATE_TABLE, 
ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_CREATE))
+            .put(
+                Privilege.Name.MODIFY_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_UPDATE,
+                    RangerDefines.ACCESS_TYPE_HIVE_DROP,
+                    RangerDefines.ACCESS_TYPE_HIVE_ALTER,
+                    RangerDefines.ACCESS_TYPE_HIVE_WRITE))
+            .put(
+                Privilege.Name.SELECT_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_READ, 
RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .build();
+  }
+
+  /**
+   * Because Ranger does not have Role concept, Each metadata object will have 
a unique Ranger
+   * policy. we can use one or more Ranger policy to simulate the role. <br>
+   * 1. Create a policy for each metadata object. <br>
+   * 2. Save role name in the Policy properties. <br>
+   * 3. Set `MANAGED_BY_GRAVITINO` label in the policy. <br>
+   * 4. For easy manage, each privilege will create a RangerPolicyItemAccess 
in the policy. <br>
+   * 5. The policy will only have one user, the user is the {OWNER} of the 
policy. <br>
+   * 6. The policy will not have group. <br>
+   */
+  @Override
+  public Boolean onRoleCreated(Role role) throws RuntimeException {
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.addSecurableObject(securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleAcquired(Role role) throws RuntimeException {
+    Boolean findAll;
+    try {
+      findAll =
+          role.securableObjects().stream()
+                  .filter(
+                      securableObject -> {
+                        RangerPolicy policy = 
findManagedPolicy(securableObject);
+                        return policy != null;
+                      })
+                  .count()
+              == role.securableObjects().size();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return findAll;
+  }
+
+  /**
+   * Because one Ranger policy maybe contain multiple securable objects, so we 
didn't directly
+   * remove the policy. <br>
+   * Reference: https://github.com/apache/gravitino/issues/4509
+   */
+  @Override
+  public Boolean onRoleDeleted(Role role) throws RuntimeException {
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.removeSecurableObject(securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleUpdated(Role role, RoleChange... changes) throws 
RuntimeException {
+    for (RoleChange change : changes) {
+      boolean execResult;
+      if (change instanceof RoleChange.AddSecurableObject) {
+        execResult = doAddSecurableObject((RoleChange.AddSecurableObject) 
change);
+      } else if (change instanceof RoleChange.RemoveSecurableObject) {
+        execResult =
+            doRemoveSecurableObject(role.name(), 
(RoleChange.RemoveSecurableObject) change);
+      } else if (change instanceof RoleChange.UpdateSecurableObject) {
+        execResult =
+            doUpdateSecurableObject(role.name(), 
(RoleChange.UpdateSecurableObject) change);
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported role change type: "
+                + (change == null ? "null" : 
change.getClass().getSimpleName()));
+      }
+      if (!execResult) {
+        return Boolean.FALSE;
+      }
+    }
+
+    return Boolean.TRUE;
+  }
+
+  @VisibleForTesting
+  public List<Privilege> getAllPrivileges() {
+    return 
mapPrivileges.keySet().stream().map(Privileges::allow).collect(Collectors.toList());
+  }
+
+  @Override
+  public Boolean onOwnerSet(MetadataObject metadataObject, Owner preOwner, 
Owner newOwner)
+      throws RuntimeException {
+    List<Privilege> allPrivileges = getAllPrivileges();
+
+    SecurableObject securableObjects =
+        SecurableObjects.parse(metadataObject.fullName(), 
metadataObject.type(), allPrivileges);
+
+    AuditInfo auditInfo =
+        AuditInfo.builder()
+            .withCreator(PrincipalUtils.getCurrentUserName())
+            .withCreateTime(Instant.now())
+            .build();
+
+    RoleEntity role =
+        RoleEntity.builder()
+            .withId(0L)
+            .withName(OWNER_ROLE_NAME)
+            .withAuditInfo(auditInfo)
+            .withSecurableObjects(Lists.newArrayList(securableObjects))
+            .build();
+
+    Boolean execResult = onRoleCreated(role);
+    if (!execResult) {
+      return Boolean.FALSE;
+    }
+    if (preOwner != null) {
+      if (preOwner.type() == Owner.Type.USER) {
+        UserEntity userEntity =
+            UserEntity.builder()
+                .withId(0L)
+                .withName(preOwner.name())
+                .withRoleNames(Collections.emptyList())
+                .withRoleIds(Collections.emptyList())
+                .withAuditInfo(auditInfo)
+                .build();
+        execResult = onRevokedRolesFromUser(Lists.newArrayList(role), 
userEntity);
+        if (!execResult) {
+          return Boolean.FALSE;
+        }
+      } else {
+        GroupEntity groupEntity =
+            GroupEntity.builder()
+                .withId(0L)
+                .withName(preOwner.name())
+                .withRoleNames(Collections.emptyList())
+                .withRoleIds(Collections.emptyList())
+                .withAuditInfo(auditInfo)
+                .build();
+        execResult = onRevokedRolesFromGroup(Lists.newArrayList(role), 
groupEntity);
+        if (!execResult) {
+          return Boolean.FALSE;
+        }
+      }
+    }
+    if (newOwner != null) {
+      if (newOwner.type() == Owner.Type.USER) {
+        UserEntity userEntity =
+            UserEntity.builder()
+                .withId(0L)
+                .withName(newOwner.name())
+                .withRoleNames(Collections.emptyList())
+                .withRoleIds(Collections.emptyList())
+                .withAuditInfo(auditInfo)
+                .build();
+        execResult = onGrantedRolesToUser(Lists.newArrayList(role), 
userEntity);
+        if (!execResult) {
+          return Boolean.FALSE;
+        }
+      } else {
+        GroupEntity groupEntity =
+            GroupEntity.builder()
+                .withId(0L)
+                .withName(newOwner.name())
+                .withRoleNames(Collections.emptyList())
+                .withRoleIds(Collections.emptyList())
+                .withAuditInfo(auditInfo)
+                .build();
+        execResult = onGrantedRolesToGroup(Lists.newArrayList(role), 
groupEntity);
+        if (!execResult) {
+          return Boolean.FALSE;
+        }
+      }
+    }
+    return Boolean.TRUE;
+  }
+
+  /**
+   * Because one Ranger policy maybe contain multiple Gravitino securable 
objects, <br>
+   * So we need to find the corresponding policy item mapping to set the user.
+   */
+  @Override
+  public Boolean onGrantedRolesToUser(List<Role> roles, User user) throws 
RuntimeException {
+    AtomicReference<Boolean> execResult = new AtomicReference<>(Boolean.TRUE);
+    for (Role role : roles) {
+      role.securableObjects()
+          .forEach(
+              securableObject -> {
+                RangerPolicy policy = findManagedPolicy(securableObject);
+                if (policy == null) {
+                  LOG.warn("The policy is not exist for the securable 
object({})", securableObject);

Review Comment:
   DONE



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHiveAuthorizationPlugin.java:
##########
@@ -0,0 +1,854 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.ranger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.Group;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.RoleChange;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.authorization.User;
+import org.apache.gravitino.authorization.ranger.defines.VXGroup;
+import org.apache.gravitino.authorization.ranger.defines.VXGroupList;
+import org.apache.gravitino.authorization.ranger.defines.VXUser;
+import org.apache.gravitino.authorization.ranger.defines.VXUserList;
+import org.apache.gravitino.connector.AuthorizationPropertiesMeta;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.ranger.RangerServiceException;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RangerHiveAuthorizationPlugin is a plugin for Apache Ranger to manage the 
Hive authorization of
+ * the Apache Gravitino.
+ */
+public class RangerHiveAuthorizationPlugin extends RangerAuthorizationPlugin {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RangerHiveAuthorizationPlugin.class);
+
+  public RangerHiveAuthorizationPlugin(String catalogProvider, Map<String, 
String> config) {
+    super();
+    this.catalogProvider = catalogProvider;
+    String rangerUrl = 
config.get(AuthorizationPropertiesMeta.RANGER_ADMIN_URL);
+    String authType = config.get(AuthorizationPropertiesMeta.RANGER_AUTH_TYPE);
+    String username = config.get(AuthorizationPropertiesMeta.RANGER_USERNAME);
+    // Apache Ranger Password should be minimum 8 characters with min one 
alphabet and one numeric.
+    String password = config.get(AuthorizationPropertiesMeta.RANGER_PASSWORD);
+    rangerServiceName = 
config.get(AuthorizationPropertiesMeta.RANGER_SERVICE_NAME);
+    check(rangerUrl != null, "Ranger admin URL is required");
+    check(authType != null, "Ranger auth type is required");
+    check(username != null, "Ranger username is required");
+    check(password != null, "Ranger password is required");
+    check(rangerServiceName != null, "Ranger service name is required");
+
+    rangerClient = new RangerClientExt(rangerUrl, authType, username, 
password);
+  }
+
+  /**
+   * Ranger hive's privilege have `select`, `update`, `create`, `drop`, 
`alter`, `index`, `lock`,
+   * `read`, `write`, `repladmin`, `serviceadmin`, `refresh` and `all`.
+   *
+   * <p>Reference: 
ranger/agents-common/src/main/resources/service-defs/ranger-servicedef-hive.json
+   */
+  @Override
+  protected void initMapPrivileges() {
+    mapPrivileges =
+        ImmutableMap.<Privilege.Name, Set<String>>builder()
+            .put(
+                Privilege.Name.CREATE_SCHEMA,
+                ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .put(
+                Privilege.Name.CREATE_TABLE, 
ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_CREATE))
+            .put(
+                Privilege.Name.MODIFY_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_UPDATE,
+                    RangerDefines.ACCESS_TYPE_HIVE_DROP,
+                    RangerDefines.ACCESS_TYPE_HIVE_ALTER,
+                    RangerDefines.ACCESS_TYPE_HIVE_WRITE))
+            .put(
+                Privilege.Name.SELECT_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_READ, 
RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .build();
+  }
+
+  /**
+   * Because Ranger does not have Role concept, Each metadata object will have 
a unique Ranger
+   * policy. we can use one or more Ranger policy to simulate the role. <br>
+   * 1. Create a policy for each metadata object. <br>
+   * 2. Save role name in the Policy properties. <br>
+   * 3. Set `MANAGED_BY_GRAVITINO` label in the policy. <br>
+   * 4. For easy manage, each privilege will create a RangerPolicyItemAccess 
in the policy. <br>
+   * 5. The policy will only have one user, the user is the {OWNER} of the 
policy. <br>
+   * 6. The policy will not have group. <br>
+   */
+  @Override
+  public Boolean onRoleCreated(Role role) throws RuntimeException {
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.addSecurableObject(securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleAcquired(Role role) throws RuntimeException {
+    Boolean findAll;
+    try {
+      findAll =
+          role.securableObjects().stream()
+                  .filter(
+                      securableObject -> {
+                        RangerPolicy policy = 
findManagedPolicy(securableObject);
+                        return policy != null;
+                      })
+                  .count()
+              == role.securableObjects().size();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return findAll;
+  }
+
+  /**
+   * Because one Ranger policy maybe contain multiple securable objects, so we 
didn't directly
+   * remove the policy. <br>
+   * Reference: https://github.com/apache/gravitino/issues/4509
+   */
+  @Override
+  public Boolean onRoleDeleted(Role role) throws RuntimeException {
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.removeSecurableObject(securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleUpdated(Role role, RoleChange... changes) throws 
RuntimeException {
+    for (RoleChange change : changes) {
+      boolean execResult;
+      if (change instanceof RoleChange.AddSecurableObject) {
+        execResult = doAddSecurableObject((RoleChange.AddSecurableObject) 
change);
+      } else if (change instanceof RoleChange.RemoveSecurableObject) {
+        execResult =
+            doRemoveSecurableObject(role.name(), 
(RoleChange.RemoveSecurableObject) change);
+      } else if (change instanceof RoleChange.UpdateSecurableObject) {
+        execResult =
+            doUpdateSecurableObject(role.name(), 
(RoleChange.UpdateSecurableObject) change);
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported role change type: "
+                + (change == null ? "null" : 
change.getClass().getSimpleName()));
+      }
+      if (!execResult) {
+        return Boolean.FALSE;
+      }
+    }
+
+    return Boolean.TRUE;
+  }
+
+  @VisibleForTesting
+  public List<Privilege> getAllPrivileges() {
+    return 
mapPrivileges.keySet().stream().map(Privileges::allow).collect(Collectors.toList());
+  }
+
+  @Override
+  public Boolean onOwnerSet(MetadataObject metadataObject, Owner preOwner, 
Owner newOwner)
+      throws RuntimeException {
+    List<Privilege> allPrivileges = getAllPrivileges();
+
+    SecurableObject securableObjects =
+        SecurableObjects.parse(metadataObject.fullName(), 
metadataObject.type(), allPrivileges);
+
+    AuditInfo auditInfo =
+        AuditInfo.builder()
+            .withCreator(PrincipalUtils.getCurrentUserName())
+            .withCreateTime(Instant.now())
+            .build();
+
+    RoleEntity role =
+        RoleEntity.builder()
+            .withId(0L)
+            .withName(OWNER_ROLE_NAME)
+            .withAuditInfo(auditInfo)
+            .withSecurableObjects(Lists.newArrayList(securableObjects))
+            .build();
+
+    Boolean execResult = onRoleCreated(role);
+    if (!execResult) {
+      return Boolean.FALSE;
+    }
+    if (preOwner != null) {
+      if (preOwner.type() == Owner.Type.USER) {
+        UserEntity userEntity =
+            UserEntity.builder()
+                .withId(0L)
+                .withName(preOwner.name())
+                .withRoleNames(Collections.emptyList())
+                .withRoleIds(Collections.emptyList())
+                .withAuditInfo(auditInfo)
+                .build();
+        execResult = onRevokedRolesFromUser(Lists.newArrayList(role), 
userEntity);
+        if (!execResult) {
+          return Boolean.FALSE;
+        }
+      } else {
+        GroupEntity groupEntity =
+            GroupEntity.builder()
+                .withId(0L)
+                .withName(preOwner.name())
+                .withRoleNames(Collections.emptyList())
+                .withRoleIds(Collections.emptyList())
+                .withAuditInfo(auditInfo)
+                .build();
+        execResult = onRevokedRolesFromGroup(Lists.newArrayList(role), 
groupEntity);
+        if (!execResult) {
+          return Boolean.FALSE;
+        }
+      }
+    }
+    if (newOwner != null) {
+      if (newOwner.type() == Owner.Type.USER) {
+        UserEntity userEntity =
+            UserEntity.builder()
+                .withId(0L)
+                .withName(newOwner.name())
+                .withRoleNames(Collections.emptyList())
+                .withRoleIds(Collections.emptyList())
+                .withAuditInfo(auditInfo)
+                .build();
+        execResult = onGrantedRolesToUser(Lists.newArrayList(role), 
userEntity);
+        if (!execResult) {
+          return Boolean.FALSE;
+        }
+      } else {
+        GroupEntity groupEntity =
+            GroupEntity.builder()
+                .withId(0L)
+                .withName(newOwner.name())
+                .withRoleNames(Collections.emptyList())
+                .withRoleIds(Collections.emptyList())
+                .withAuditInfo(auditInfo)
+                .build();
+        execResult = onGrantedRolesToGroup(Lists.newArrayList(role), 
groupEntity);
+        if (!execResult) {
+          return Boolean.FALSE;
+        }
+      }
+    }
+    return Boolean.TRUE;
+  }
+
+  /**
+   * Because one Ranger policy maybe contain multiple Gravitino securable 
objects, <br>
+   * So we need to find the corresponding policy item mapping to set the user.
+   */
+  @Override
+  public Boolean onGrantedRolesToUser(List<Role> roles, User user) throws 
RuntimeException {
+    AtomicReference<Boolean> execResult = new AtomicReference<>(Boolean.TRUE);
+    for (Role role : roles) {
+      role.securableObjects()
+          .forEach(
+              securableObject -> {
+                RangerPolicy policy = findManagedPolicy(securableObject);
+                if (policy == null) {
+                  LOG.warn("The policy is not exist for the securable 
object({})", securableObject);
+                  execResult.set(Boolean.FALSE);
+                  return;
+                }
+
+                securableObject
+                    .privileges()
+                    .forEach(
+                        privilege -> {
+                          // Convert Gravitino privilege to Ranger privilege
+                          mapPrivileges
+                              .getOrDefault(privilege.name(), 
Collections.emptySet())
+                              .forEach(
+                                  // Use the Ranger privilege name to search
+                                  rangerPrivilegeName -> {
+                                    policy
+                                        /**
+                                         * TODO: Maybe we need to add the user 
to the
+                                         * Deny/DataMask/RowFilter policy item 
in the future.
+                                         */
+                                        .getPolicyItems()
+                                        .forEach(
+                                            policyItem -> {
+                                              if 
(policyItem.getAccesses().stream()
+                                                  .anyMatch(
+                                                      policyItemAccess ->
+                                                          policyItemAccess
+                                                              .getType()
+                                                              
.equals(rangerPrivilegeName))) {
+                                                // If the user is not exist in 
the policy item, then
+                                                // add it.
+                                                if 
(!policyItem.getUsers().contains(user.name())) {
+                                                  
policyItem.getUsers().add(user.name());
+                                                }
+                                              }
+                                            });
+                                    try {
+                                      
rangerClient.updatePolicy(policy.getId(), policy);
+                                    } catch (RangerServiceException e) {
+                                      throw new RuntimeException(e);
+                                    }
+                                  });
+                        });
+              });
+      if (!execResult.get()) {
+        return Boolean.FALSE;
+      }
+    }
+
+    return Boolean.TRUE;
+  }
+
+  /**
+   * Because one Ranger policy maybe contain multiple Gravitino securable 
objects, <br>
+   * So we need to find the corresponding policy item mapping to remove the 
user.
+   */
+  @Override
+  public Boolean onRevokedRolesFromUser(List<Role> roles, User user) throws 
RuntimeException {
+    AtomicReference<Boolean> result = new AtomicReference<>(Boolean.TRUE);
+    for (Role role : roles) {
+      role.securableObjects()
+          .forEach(
+              securableObject -> {
+                RangerPolicy policy = findManagedPolicy(securableObject);
+                if (policy == null) {
+                  LOG.warn("The policy is not exist for the securable 
object({})", securableObject);
+                  result.set(Boolean.FALSE);
+                  return;
+                }
+
+                securableObject
+                    .privileges()
+                    .forEach(
+                        privilege -> {
+                          // Convert Gravitino privilege to Ranger privilege
+                          mapPrivileges
+                              .getOrDefault(privilege.name(), 
Collections.emptySet())
+                              .forEach(
+                                  // Use the Ranger privilege name to search
+                                  rangerPrivilegeName -> {
+                                    policy
+                                        // TODO: Maybe we need to add the user 
to the
+                                        // Deny/DataMask/RowFilter policy item 
in the future.
+                                        .getPolicyItems()
+                                        .forEach(
+                                            policyItem -> {
+                                              if 
(policyItem.getAccesses().stream()
+                                                  .anyMatch(
+                                                      policyItemAccess ->
+                                                          policyItemAccess
+                                                              .getType()
+                                                              
.equals(rangerPrivilegeName))) {
+                                                // If the user is exist in the 
policy item, then
+                                                // remove it.
+                                                
policyItem.getUsers().removeIf(user.name()::equals);
+                                              }
+                                            });
+                                    try {
+                                      
rangerClient.updatePolicy(policy.getId(), policy);
+                                    } catch (RangerServiceException e) {
+                                      throw new RuntimeException(e);
+                                    }
+                                  });
+                        });
+              });
+      if (!result.get()) {

Review Comment:
   I already refactor these codes



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationPlugin.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.ranger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.errorprone.annotations.FormatMethod;
+import com.google.errorprone.annotations.FormatString;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
+import org.apache.gravitino.exceptions.AuthorizationHookException;
+import org.apache.ranger.RangerServiceException;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.apache.ranger.plugin.util.SearchFilter;
+
+/** Ranger authorization operations hooks interfaces. */
+public abstract class RangerAuthorizationPlugin implements AuthorizationPlugin 
{
+  protected String catalogProvider;
+  protected RangerClientExt rangerClient;
+  protected String rangerServiceName;
+  /** Mapping Gravitino privilege name to the underlying authorization system 
privileges. */
+  protected Map<Privilege.Name, Set<String>> mapPrivileges = null;
+
+  /**
+   * Because UserGroupAuthorizationPlugin::onOwnerSet() interface function 
didn't have param Role,
+   * So we need predefine a role name
+   */
+  public static final String OWNER_ROLE_NAME = "OWNER";
+
+  public static final String MANAGED_BY_GRAVITINO = "MANAGED_BY_GRAVITINO";
+  public static final String POLICY_ITEM_OWNER_USER = "{OWNER}";
+
+  public RangerAuthorizationPlugin() {
+    initMapPrivileges();
+  }
+
+  /**
+   * Different underlying permission system may have different privilege 
names, this function is
+   * used to initialize the privilege mapping.
+   */
+  protected abstract void initMapPrivileges();
+
+  /**
+   * Translate the privilege name to the corresponding privilege name in the 
underlying permission
+   */
+  public Set<String> translatePrivilege(Privilege.Name name) {
+    return mapPrivileges.get(name);
+  }
+
+  /** Whether this privilege is underlying permission system supported */
+  protected boolean checkPrivilege(Privilege.Name name) {
+    return mapPrivileges.containsKey(name);
+  }
+
+  @FormatMethod
+  protected static void check(boolean condition, @FormatString String message, 
Object... args) {
+    if (!condition) {
+      throw new AuthorizationHookException(message, args);
+    }
+  }
+
+  @VisibleForTesting
+  public String formatPolicyName(String roleName, String 
securableObjectFullName) {
+    return roleName + "-" + securableObjectFullName;

Review Comment:
   This is a good idea, I modified this function.



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHiveAuthorizationPlugin.java:
##########
@@ -0,0 +1,854 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.ranger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.Group;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.RoleChange;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.authorization.User;
+import org.apache.gravitino.authorization.ranger.defines.VXGroup;
+import org.apache.gravitino.authorization.ranger.defines.VXGroupList;
+import org.apache.gravitino.authorization.ranger.defines.VXUser;
+import org.apache.gravitino.authorization.ranger.defines.VXUserList;
+import org.apache.gravitino.connector.AuthorizationPropertiesMeta;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.ranger.RangerServiceException;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RangerHiveAuthorizationPlugin is a plugin for Apache Ranger to manage the 
Hive authorization of
+ * the Apache Gravitino.
+ */
+public class RangerHiveAuthorizationPlugin extends RangerAuthorizationPlugin {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RangerHiveAuthorizationPlugin.class);
+
+  public RangerHiveAuthorizationPlugin(String catalogProvider, Map<String, 
String> config) {
+    super();
+    this.catalogProvider = catalogProvider;
+    String rangerUrl = 
config.get(AuthorizationPropertiesMeta.RANGER_ADMIN_URL);
+    String authType = config.get(AuthorizationPropertiesMeta.RANGER_AUTH_TYPE);
+    String username = config.get(AuthorizationPropertiesMeta.RANGER_USERNAME);
+    // Apache Ranger Password should be minimum 8 characters with min one 
alphabet and one numeric.
+    String password = config.get(AuthorizationPropertiesMeta.RANGER_PASSWORD);
+    rangerServiceName = 
config.get(AuthorizationPropertiesMeta.RANGER_SERVICE_NAME);
+    check(rangerUrl != null, "Ranger admin URL is required");
+    check(authType != null, "Ranger auth type is required");
+    check(username != null, "Ranger username is required");
+    check(password != null, "Ranger password is required");
+    check(rangerServiceName != null, "Ranger service name is required");
+
+    rangerClient = new RangerClientExt(rangerUrl, authType, username, 
password);
+  }
+
+  /**
+   * Ranger hive's privilege have `select`, `update`, `create`, `drop`, 
`alter`, `index`, `lock`,
+   * `read`, `write`, `repladmin`, `serviceadmin`, `refresh` and `all`.
+   *
+   * <p>Reference: 
ranger/agents-common/src/main/resources/service-defs/ranger-servicedef-hive.json
+   */
+  @Override
+  protected void initMapPrivileges() {
+    mapPrivileges =
+        ImmutableMap.<Privilege.Name, Set<String>>builder()
+            .put(
+                Privilege.Name.CREATE_SCHEMA,
+                ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .put(
+                Privilege.Name.CREATE_TABLE, 
ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_CREATE))
+            .put(
+                Privilege.Name.MODIFY_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_UPDATE,
+                    RangerDefines.ACCESS_TYPE_HIVE_DROP,
+                    RangerDefines.ACCESS_TYPE_HIVE_ALTER,
+                    RangerDefines.ACCESS_TYPE_HIVE_WRITE))
+            .put(
+                Privilege.Name.SELECT_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_READ, 
RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .build();
+  }
+
+  /**
+   * Because Ranger does not have Role concept, Each metadata object will have 
a unique Ranger
+   * policy. we can use one or more Ranger policy to simulate the role. <br>
+   * 1. Create a policy for each metadata object. <br>
+   * 2. Save role name in the Policy properties. <br>
+   * 3. Set `MANAGED_BY_GRAVITINO` label in the policy. <br>
+   * 4. For easy manage, each privilege will create a RangerPolicyItemAccess 
in the policy. <br>
+   * 5. The policy will only have one user, the user is the {OWNER} of the 
policy. <br>
+   * 6. The policy will not have group. <br>
+   */
+  @Override
+  public Boolean onRoleCreated(Role role) throws RuntimeException {
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.addSecurableObject(securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleAcquired(Role role) throws RuntimeException {
+    Boolean findAll;
+    try {
+      findAll =
+          role.securableObjects().stream()
+                  .filter(
+                      securableObject -> {
+                        RangerPolicy policy = 
findManagedPolicy(securableObject);
+                        return policy != null;
+                      })
+                  .count()
+              == role.securableObjects().size();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return findAll;
+  }
+
+  /**
+   * Because one Ranger policy maybe contain multiple securable objects, so we 
didn't directly
+   * remove the policy. <br>
+   * Reference: https://github.com/apache/gravitino/issues/4509
+   */
+  @Override
+  public Boolean onRoleDeleted(Role role) throws RuntimeException {
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.removeSecurableObject(securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleUpdated(Role role, RoleChange... changes) throws 
RuntimeException {
+    for (RoleChange change : changes) {
+      boolean execResult;
+      if (change instanceof RoleChange.AddSecurableObject) {
+        execResult = doAddSecurableObject((RoleChange.AddSecurableObject) 
change);
+      } else if (change instanceof RoleChange.RemoveSecurableObject) {
+        execResult =
+            doRemoveSecurableObject(role.name(), 
(RoleChange.RemoveSecurableObject) change);
+      } else if (change instanceof RoleChange.UpdateSecurableObject) {
+        execResult =
+            doUpdateSecurableObject(role.name(), 
(RoleChange.UpdateSecurableObject) change);
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported role change type: "
+                + (change == null ? "null" : 
change.getClass().getSimpleName()));
+      }
+      if (!execResult) {
+        return Boolean.FALSE;
+      }
+    }
+
+    return Boolean.TRUE;
+  }
+
+  @VisibleForTesting
+  public List<Privilege> getAllPrivileges() {
+    return 
mapPrivileges.keySet().stream().map(Privileges::allow).collect(Collectors.toList());
+  }
+
+  @Override
+  public Boolean onOwnerSet(MetadataObject metadataObject, Owner preOwner, 
Owner newOwner)

Review Comment:
   No, This dependent underlying permission system.



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHiveAuthorizationPlugin.java:
##########
@@ -0,0 +1,840 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.ranger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.Group;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.RoleChange;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.authorization.User;
+import org.apache.gravitino.authorization.ranger.defines.VXGroup;
+import org.apache.gravitino.authorization.ranger.defines.VXGroupList;
+import org.apache.gravitino.authorization.ranger.defines.VXUser;
+import org.apache.gravitino.authorization.ranger.defines.VXUserList;
+import org.apache.gravitino.connector.AuthorizationPropertiesMeta;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.ranger.RangerServiceException;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RangerHiveAuthorizationPlugin is a plugin for Apache Ranger to manage the 
Hive authorization of
+ * the Apache Gravitino.
+ */
+public class RangerHiveAuthorizationPlugin extends RangerAuthorizationPlugin {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RangerHiveAuthorizationPlugin.class);
+
+  public RangerHiveAuthorizationPlugin(String catalogProvider, Map<String, 
String> config) {
+    super();
+    this.catalogProvider = catalogProvider;
+    String rangerUrl = 
config.get(AuthorizationPropertiesMeta.RANGER_ADMIN_URL);
+    String authType = config.get(AuthorizationPropertiesMeta.RANGER_AUTH_TYPE);
+    String username = config.get(AuthorizationPropertiesMeta.RANGER_USERNAME);
+    // Apache Ranger Password should be minimum 8 characters with min one 
alphabet and one numeric.
+    String password = config.get(AuthorizationPropertiesMeta.RANGER_PASSWORD);
+    rangerServiceName = 
config.get(AuthorizationPropertiesMeta.RANGER_SERVICE_NAME);
+    check(rangerUrl != null, "Ranger admin URL is required");
+    check(authType != null, "Ranger auth type is required");
+    check(username != null, "Ranger username is required");
+    check(password != null, "Ranger password is required");
+    check(rangerServiceName != null, "Ranger service name is required");
+
+    rangerClient = new RangerClientExt(rangerUrl, authType, username, 
password);
+  }
+
+  /**
+   * Ranger hive's privilege have `select`, `update`, `create`, `drop`, 
`alter`, `index`, `lock`,
+   * `read`, `write`, `repladmin`, `serviceadmin`, `refresh` and `all`.
+   *
+   * <p>Reference: 
ranger/agents-common/src/main/resources/service-defs/ranger-servicedef-hive.json
+   */
+  @Override
+  protected void initMapPrivileges() {
+    mapPrivileges =
+        ImmutableMap.<Privilege.Name, Set<String>>builder()
+            .put(Privilege.Name.CREATE_SCHEMA, ImmutableSet.of("create"))
+            .put(Privilege.Name.CREATE_TABLE, ImmutableSet.of("create"))
+            .put(Privilege.Name.MODIFY_TABLE, ImmutableSet.of("update", 
"drop", "alter", "write"))
+            .put(Privilege.Name.SELECT_TABLE, ImmutableSet.of("read", 
"select"))
+            .build();
+  }
+
+  /**
+   * Because Ranger does not have Role concept, Each metadata object will have 
a unique Ranger

Review Comment:
   Thank you for the suggestion, I refactor this PR. 



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHiveAuthorizationPlugin.java:
##########
@@ -0,0 +1,854 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.ranger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.Group;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.RoleChange;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.authorization.User;
+import org.apache.gravitino.authorization.ranger.defines.VXGroup;
+import org.apache.gravitino.authorization.ranger.defines.VXGroupList;
+import org.apache.gravitino.authorization.ranger.defines.VXUser;
+import org.apache.gravitino.authorization.ranger.defines.VXUserList;
+import org.apache.gravitino.connector.AuthorizationPropertiesMeta;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.ranger.RangerServiceException;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RangerHiveAuthorizationPlugin is a plugin for Apache Ranger to manage the 
Hive authorization of
+ * the Apache Gravitino.
+ */
+public class RangerHiveAuthorizationPlugin extends RangerAuthorizationPlugin {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RangerHiveAuthorizationPlugin.class);
+
+  public RangerHiveAuthorizationPlugin(String catalogProvider, Map<String, 
String> config) {
+    super();
+    this.catalogProvider = catalogProvider;
+    String rangerUrl = 
config.get(AuthorizationPropertiesMeta.RANGER_ADMIN_URL);
+    String authType = config.get(AuthorizationPropertiesMeta.RANGER_AUTH_TYPE);
+    String username = config.get(AuthorizationPropertiesMeta.RANGER_USERNAME);
+    // Apache Ranger Password should be minimum 8 characters with min one 
alphabet and one numeric.
+    String password = config.get(AuthorizationPropertiesMeta.RANGER_PASSWORD);
+    rangerServiceName = 
config.get(AuthorizationPropertiesMeta.RANGER_SERVICE_NAME);
+    check(rangerUrl != null, "Ranger admin URL is required");
+    check(authType != null, "Ranger auth type is required");
+    check(username != null, "Ranger username is required");
+    check(password != null, "Ranger password is required");
+    check(rangerServiceName != null, "Ranger service name is required");
+
+    rangerClient = new RangerClientExt(rangerUrl, authType, username, 
password);
+  }
+
+  /**
+   * Ranger hive's privilege have `select`, `update`, `create`, `drop`, 
`alter`, `index`, `lock`,
+   * `read`, `write`, `repladmin`, `serviceadmin`, `refresh` and `all`.
+   *
+   * <p>Reference: 
ranger/agents-common/src/main/resources/service-defs/ranger-servicedef-hive.json
+   */
+  @Override
+  protected void initMapPrivileges() {
+    mapPrivileges =
+        ImmutableMap.<Privilege.Name, Set<String>>builder()
+            .put(
+                Privilege.Name.CREATE_SCHEMA,
+                ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .put(
+                Privilege.Name.CREATE_TABLE, 
ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_CREATE))
+            .put(
+                Privilege.Name.MODIFY_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_UPDATE,
+                    RangerDefines.ACCESS_TYPE_HIVE_DROP,
+                    RangerDefines.ACCESS_TYPE_HIVE_ALTER,
+                    RangerDefines.ACCESS_TYPE_HIVE_WRITE))
+            .put(
+                Privilege.Name.SELECT_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_READ, 
RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .build();
+  }
+
+  /**
+   * Because Ranger does not have Role concept, Each metadata object will have 
a unique Ranger
+   * policy. we can use one or more Ranger policy to simulate the role. <br>
+   * 1. Create a policy for each metadata object. <br>
+   * 2. Save role name in the Policy properties. <br>
+   * 3. Set `MANAGED_BY_GRAVITINO` label in the policy. <br>
+   * 4. For easy manage, each privilege will create a RangerPolicyItemAccess 
in the policy. <br>
+   * 5. The policy will only have one user, the user is the {OWNER} of the 
policy. <br>
+   * 6. The policy will not have group. <br>
+   */
+  @Override
+  public Boolean onRoleCreated(Role role) throws RuntimeException {
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.addSecurableObject(securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleAcquired(Role role) throws RuntimeException {
+    Boolean findAll;
+    try {
+      findAll =
+          role.securableObjects().stream()
+                  .filter(
+                      securableObject -> {
+                        RangerPolicy policy = 
findManagedPolicy(securableObject);
+                        return policy != null;
+                      })
+                  .count()
+              == role.securableObjects().size();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return findAll;
+  }
+
+  /**
+   * Because one Ranger policy maybe contain multiple securable objects, so we 
didn't directly
+   * remove the policy. <br>
+   * Reference: https://github.com/apache/gravitino/issues/4509
+   */
+  @Override
+  public Boolean onRoleDeleted(Role role) throws RuntimeException {
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.removeSecurableObject(securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleUpdated(Role role, RoleChange... changes) throws 
RuntimeException {
+    for (RoleChange change : changes) {
+      boolean execResult;
+      if (change instanceof RoleChange.AddSecurableObject) {
+        execResult = doAddSecurableObject((RoleChange.AddSecurableObject) 
change);
+      } else if (change instanceof RoleChange.RemoveSecurableObject) {
+        execResult =
+            doRemoveSecurableObject(role.name(), 
(RoleChange.RemoveSecurableObject) change);
+      } else if (change instanceof RoleChange.UpdateSecurableObject) {
+        execResult =
+            doUpdateSecurableObject(role.name(), 
(RoleChange.UpdateSecurableObject) change);
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported role change type: "
+                + (change == null ? "null" : 
change.getClass().getSimpleName()));
+      }
+      if (!execResult) {
+        return Boolean.FALSE;
+      }
+    }
+
+    return Boolean.TRUE;
+  }
+
+  @VisibleForTesting
+  public List<Privilege> getAllPrivileges() {
+    return 
mapPrivileges.keySet().stream().map(Privileges::allow).collect(Collectors.toList());
+  }
+
+  @Override
+  public Boolean onOwnerSet(MetadataObject metadataObject, Owner preOwner, 
Owner newOwner)
+      throws RuntimeException {
+    List<Privilege> allPrivileges = getAllPrivileges();
+
+    SecurableObject securableObjects =
+        SecurableObjects.parse(metadataObject.fullName(), 
metadataObject.type(), allPrivileges);
+
+    AuditInfo auditInfo =
+        AuditInfo.builder()
+            .withCreator(PrincipalUtils.getCurrentUserName())
+            .withCreateTime(Instant.now())
+            .build();
+
+    RoleEntity role =
+        RoleEntity.builder()
+            .withId(0L)
+            .withName(OWNER_ROLE_NAME)
+            .withAuditInfo(auditInfo)
+            .withSecurableObjects(Lists.newArrayList(securableObjects))
+            .build();
+
+    Boolean execResult = onRoleCreated(role);
+    if (!execResult) {
+      return Boolean.FALSE;
+    }
+    if (preOwner != null) {
+      if (preOwner.type() == Owner.Type.USER) {
+        UserEntity userEntity =
+            UserEntity.builder()
+                .withId(0L)
+                .withName(preOwner.name())
+                .withRoleNames(Collections.emptyList())
+                .withRoleIds(Collections.emptyList())
+                .withAuditInfo(auditInfo)
+                .build();
+        execResult = onRevokedRolesFromUser(Lists.newArrayList(role), 
userEntity);
+        if (!execResult) {
+          return Boolean.FALSE;

Review Comment:
   I already refactor these codes



##########
authorizations/authorization-ranger/build.gradle.kts:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+description = "authorization-ranger"
+
+plugins {
+  `maven-publish`
+  id("java")
+  id("idea")
+}
+
+dependencies {
+  implementation(project(":api"))

Review Comment:
   DONE



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHiveAuthorizationPlugin.java:
##########
@@ -0,0 +1,854 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.ranger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.Group;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.RoleChange;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.authorization.User;
+import org.apache.gravitino.authorization.ranger.defines.VXGroup;
+import org.apache.gravitino.authorization.ranger.defines.VXGroupList;
+import org.apache.gravitino.authorization.ranger.defines.VXUser;
+import org.apache.gravitino.authorization.ranger.defines.VXUserList;
+import org.apache.gravitino.connector.AuthorizationPropertiesMeta;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.ranger.RangerServiceException;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RangerHiveAuthorizationPlugin is a plugin for Apache Ranger to manage the 
Hive authorization of
+ * the Apache Gravitino.
+ */
+public class RangerHiveAuthorizationPlugin extends RangerAuthorizationPlugin {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RangerHiveAuthorizationPlugin.class);
+
+  public RangerHiveAuthorizationPlugin(String catalogProvider, Map<String, 
String> config) {
+    super();
+    this.catalogProvider = catalogProvider;
+    String rangerUrl = 
config.get(AuthorizationPropertiesMeta.RANGER_ADMIN_URL);
+    String authType = config.get(AuthorizationPropertiesMeta.RANGER_AUTH_TYPE);
+    String username = config.get(AuthorizationPropertiesMeta.RANGER_USERNAME);
+    // Apache Ranger Password should be minimum 8 characters with min one 
alphabet and one numeric.
+    String password = config.get(AuthorizationPropertiesMeta.RANGER_PASSWORD);
+    rangerServiceName = 
config.get(AuthorizationPropertiesMeta.RANGER_SERVICE_NAME);
+    check(rangerUrl != null, "Ranger admin URL is required");
+    check(authType != null, "Ranger auth type is required");
+    check(username != null, "Ranger username is required");
+    check(password != null, "Ranger password is required");
+    check(rangerServiceName != null, "Ranger service name is required");
+
+    rangerClient = new RangerClientExt(rangerUrl, authType, username, 
password);
+  }
+
+  /**
+   * Ranger hive's privilege have `select`, `update`, `create`, `drop`, 
`alter`, `index`, `lock`,
+   * `read`, `write`, `repladmin`, `serviceadmin`, `refresh` and `all`.
+   *
+   * <p>Reference: 
ranger/agents-common/src/main/resources/service-defs/ranger-servicedef-hive.json
+   */
+  @Override
+  protected void initMapPrivileges() {
+    mapPrivileges =
+        ImmutableMap.<Privilege.Name, Set<String>>builder()
+            .put(
+                Privilege.Name.CREATE_SCHEMA,
+                ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .put(
+                Privilege.Name.CREATE_TABLE, 
ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_CREATE))
+            .put(
+                Privilege.Name.MODIFY_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_UPDATE,
+                    RangerDefines.ACCESS_TYPE_HIVE_DROP,
+                    RangerDefines.ACCESS_TYPE_HIVE_ALTER,
+                    RangerDefines.ACCESS_TYPE_HIVE_WRITE))
+            .put(
+                Privilege.Name.SELECT_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_READ, 
RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .build();
+  }
+
+  /**
+   * Because Ranger does not have Role concept, Each metadata object will have 
a unique Ranger
+   * policy. we can use one or more Ranger policy to simulate the role. <br>
+   * 1. Create a policy for each metadata object. <br>
+   * 2. Save role name in the Policy properties. <br>
+   * 3. Set `MANAGED_BY_GRAVITINO` label in the policy. <br>
+   * 4. For easy manage, each privilege will create a RangerPolicyItemAccess 
in the policy. <br>
+   * 5. The policy will only have one user, the user is the {OWNER} of the 
policy. <br>
+   * 6. The policy will not have group. <br>
+   */
+  @Override
+  public Boolean onRoleCreated(Role role) throws RuntimeException {
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.addSecurableObject(securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleAcquired(Role role) throws RuntimeException {
+    Boolean findAll;
+    try {
+      findAll =
+          role.securableObjects().stream()
+                  .filter(
+                      securableObject -> {
+                        RangerPolicy policy = 
findManagedPolicy(securableObject);
+                        return policy != null;
+                      })
+                  .count()
+              == role.securableObjects().size();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return findAll;
+  }
+
+  /**
+   * Because one Ranger policy maybe contain multiple securable objects, so we 
didn't directly
+   * remove the policy. <br>
+   * Reference: https://github.com/apache/gravitino/issues/4509
+   */
+  @Override
+  public Boolean onRoleDeleted(Role role) throws RuntimeException {
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.removeSecurableObject(securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleUpdated(Role role, RoleChange... changes) throws 
RuntimeException {
+    for (RoleChange change : changes) {
+      boolean execResult;
+      if (change instanceof RoleChange.AddSecurableObject) {
+        execResult = doAddSecurableObject((RoleChange.AddSecurableObject) 
change);
+      } else if (change instanceof RoleChange.RemoveSecurableObject) {
+        execResult =
+            doRemoveSecurableObject(role.name(), 
(RoleChange.RemoveSecurableObject) change);
+      } else if (change instanceof RoleChange.UpdateSecurableObject) {
+        execResult =
+            doUpdateSecurableObject(role.name(), 
(RoleChange.UpdateSecurableObject) change);
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported role change type: "
+                + (change == null ? "null" : 
change.getClass().getSimpleName()));
+      }

Review Comment:
   I refactor these codes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to