yuqi1129 commented on code in PR #4515:
URL: https://github.com/apache/gravitino/pull/4515#discussion_r1718402523


##########
api/src/main/java/org/apache/gravitino/authorization/RoleChange.java:
##########
@@ -152,4 +165,88 @@ public String toString() {
       return "REMOVESECURABLEOBJECT " + securableObject;
     }
   }
+
+  /**
+   * A UpdateSecurableObject to update securable object's privilege from role. 
<br>
+   * The securable object's metadata entity must same as new securable 
object's metadata entity.
+   * <br>
+   * The securable object's privilege must be different as new securable 
object's privilege. <br>
+   */
+  final class UpdateSecurableObject implements RoleChange {
+    private final SecurableObject securableObject;
+    private final SecurableObject newSecurableObject;
+
+    private UpdateSecurableObject(
+        SecurableObject securableObject, SecurableObject newSecurableObject) {
+      if (!securableObject.fullName().equals(newSecurableObject.fullName())) {
+        throw new IllegalArgumentException(
+            "The securable object's metadata entity must be same as new 
securable object's metadata entity.");
+      }
+      if 
(securableObject.privileges().containsAll(newSecurableObject.privileges())) {
+        throw new IllegalArgumentException(
+            "The securable object's privilege must be different as new 
securable object's privilege.");
+      }
+
+      this.securableObject = securableObject;
+      this.newSecurableObject = newSecurableObject;
+    }
+
+    /**
+     * Returns the securable object to be updated.
+     *
+     * @return return a securable object.
+     */
+    public SecurableObject getSecurableObject() {
+      return this.securableObject;
+    }
+
+    /**
+     * Returns the new securable object.
+     *
+     * @return return a securable object.
+     */
+    public SecurableObject getNewSecurableObject() {
+      return this.newSecurableObject;
+    }
+
+    /**
+     * Compares this UpdateSecurableObject instance with another object for 
equality. The comparison
+     * is based on the old securable object and new securable object.
+     *
+     * @param o The object to compare with this instance.
+     * @return true if the given object represents the same add securable 
object; false otherwise.

Review Comment:
   the same as the added securable...



##########
api/src/main/java/org/apache/gravitino/authorization/RoleChange.java:
##########
@@ -152,4 +165,88 @@ public String toString() {
       return "REMOVESECURABLEOBJECT " + securableObject;
     }
   }
+
+  /**
+   * A UpdateSecurableObject to update securable object's privilege from role. 
<br>
+   * The securable object's metadata entity must same as new securable 
object's metadata entity.
+   * <br>
+   * The securable object's privilege must be different as new securable 
object's privilege. <br>
+   */
+  final class UpdateSecurableObject implements RoleChange {
+    private final SecurableObject securableObject;
+    private final SecurableObject newSecurableObject;
+
+    private UpdateSecurableObject(
+        SecurableObject securableObject, SecurableObject newSecurableObject) {
+      if (!securableObject.fullName().equals(newSecurableObject.fullName())) {
+        throw new IllegalArgumentException(
+            "The securable object's metadata entity must be same as new 
securable object's metadata entity.");
+      }
+      if 
(securableObject.privileges().containsAll(newSecurableObject.privileges())) {
+        throw new IllegalArgumentException(
+            "The securable object's privilege must be different as new 
securable object's privilege.");

Review Comment:
   According to your code, the error message should be 'The updated securable 
object's privileges are just a part of that of old one'



##########
authorizations/authorization-ranger/build.gradle.kts:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+description = "authorization-ranger"
+
+plugins {
+  `maven-publish`
+  id("java")
+  id("idea")
+}
+
+dependencies {
+  implementation(project(":api"))

Review Comment:
   it's better to exclude all dependencies from project `api`.



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHiveAuthorizationPlugin.java:
##########
@@ -0,0 +1,854 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.ranger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.Group;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.RoleChange;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.authorization.User;
+import org.apache.gravitino.authorization.ranger.defines.VXGroup;
+import org.apache.gravitino.authorization.ranger.defines.VXGroupList;
+import org.apache.gravitino.authorization.ranger.defines.VXUser;
+import org.apache.gravitino.authorization.ranger.defines.VXUserList;
+import org.apache.gravitino.connector.AuthorizationPropertiesMeta;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.ranger.RangerServiceException;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RangerHiveAuthorizationPlugin is a plugin for Apache Ranger to manage the 
Hive authorization of
+ * the Apache Gravitino.
+ */
+public class RangerHiveAuthorizationPlugin extends RangerAuthorizationPlugin {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RangerHiveAuthorizationPlugin.class);
+
+  public RangerHiveAuthorizationPlugin(String catalogProvider, Map<String, 
String> config) {
+    super();
+    this.catalogProvider = catalogProvider;
+    String rangerUrl = 
config.get(AuthorizationPropertiesMeta.RANGER_ADMIN_URL);
+    String authType = config.get(AuthorizationPropertiesMeta.RANGER_AUTH_TYPE);
+    String username = config.get(AuthorizationPropertiesMeta.RANGER_USERNAME);
+    // Apache Ranger Password should be minimum 8 characters with min one 
alphabet and one numeric.
+    String password = config.get(AuthorizationPropertiesMeta.RANGER_PASSWORD);
+    rangerServiceName = 
config.get(AuthorizationPropertiesMeta.RANGER_SERVICE_NAME);
+    check(rangerUrl != null, "Ranger admin URL is required");

Review Comment:
   If we verify the validity, I think it would be better to make it more 
precise. Take `rangerUrl` for example, I believe the values should NOT be blank.



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHiveAuthorizationPlugin.java:
##########
@@ -0,0 +1,854 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.ranger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.Group;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.RoleChange;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.authorization.User;
+import org.apache.gravitino.authorization.ranger.defines.VXGroup;
+import org.apache.gravitino.authorization.ranger.defines.VXGroupList;
+import org.apache.gravitino.authorization.ranger.defines.VXUser;
+import org.apache.gravitino.authorization.ranger.defines.VXUserList;
+import org.apache.gravitino.connector.AuthorizationPropertiesMeta;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.ranger.RangerServiceException;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RangerHiveAuthorizationPlugin is a plugin for Apache Ranger to manage the 
Hive authorization of
+ * the Apache Gravitino.
+ */
+public class RangerHiveAuthorizationPlugin extends RangerAuthorizationPlugin {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RangerHiveAuthorizationPlugin.class);
+
+  public RangerHiveAuthorizationPlugin(String catalogProvider, Map<String, 
String> config) {
+    super();
+    this.catalogProvider = catalogProvider;
+    String rangerUrl = 
config.get(AuthorizationPropertiesMeta.RANGER_ADMIN_URL);
+    String authType = config.get(AuthorizationPropertiesMeta.RANGER_AUTH_TYPE);
+    String username = config.get(AuthorizationPropertiesMeta.RANGER_USERNAME);
+    // Apache Ranger Password should be minimum 8 characters with min one 
alphabet and one numeric.
+    String password = config.get(AuthorizationPropertiesMeta.RANGER_PASSWORD);
+    rangerServiceName = 
config.get(AuthorizationPropertiesMeta.RANGER_SERVICE_NAME);
+    check(rangerUrl != null, "Ranger admin URL is required");
+    check(authType != null, "Ranger auth type is required");
+    check(username != null, "Ranger username is required");
+    check(password != null, "Ranger password is required");
+    check(rangerServiceName != null, "Ranger service name is required");
+
+    rangerClient = new RangerClientExt(rangerUrl, authType, username, 
password);
+  }
+
+  /**
+   * Ranger hive's privilege have `select`, `update`, `create`, `drop`, 
`alter`, `index`, `lock`,
+   * `read`, `write`, `repladmin`, `serviceadmin`, `refresh` and `all`.
+   *
+   * <p>Reference: 
ranger/agents-common/src/main/resources/service-defs/ranger-servicedef-hive.json
+   */
+  @Override
+  protected void initMapPrivileges() {
+    mapPrivileges =
+        ImmutableMap.<Privilege.Name, Set<String>>builder()
+            .put(
+                Privilege.Name.CREATE_SCHEMA,
+                ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .put(
+                Privilege.Name.CREATE_TABLE, 
ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_CREATE))
+            .put(
+                Privilege.Name.MODIFY_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_UPDATE,
+                    RangerDefines.ACCESS_TYPE_HIVE_DROP,
+                    RangerDefines.ACCESS_TYPE_HIVE_ALTER,
+                    RangerDefines.ACCESS_TYPE_HIVE_WRITE))
+            .put(
+                Privilege.Name.SELECT_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_READ, 
RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .build();
+  }
+
+  /**
+   * Because Ranger does not have Role concept, Each metadata object will have 
a unique Ranger
+   * policy. we can use one or more Ranger policy to simulate the role. <br>
+   * 1. Create a policy for each metadata object. <br>
+   * 2. Save role name in the Policy properties. <br>
+   * 3. Set `MANAGED_BY_GRAVITINO` label in the policy. <br>
+   * 4. For easy manage, each privilege will create a RangerPolicyItemAccess 
in the policy. <br>
+   * 5. The policy will only have one user, the user is the {OWNER} of the 
policy. <br>
+   * 6. The policy will not have group. <br>
+   */
+  @Override
+  public Boolean onRoleCreated(Role role) throws RuntimeException {
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.addSecurableObject(securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleAcquired(Role role) throws RuntimeException {
+    Boolean findAll;
+    try {
+      findAll =
+          role.securableObjects().stream()
+                  .filter(
+                      securableObject -> {
+                        RangerPolicy policy = 
findManagedPolicy(securableObject);
+                        return policy != null;
+                      })
+                  .count()
+              == role.securableObjects().size();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return findAll;

Review Comment:
   ```suggestion
       return role.securableObjects().stream().allMatch(securableObject -> {
         RangerPolicy policy = findManagedPolicy(securableObject);
         return policy != null;
       });
   ```



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHiveAuthorizationPlugin.java:
##########
@@ -0,0 +1,854 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.ranger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.Group;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.RoleChange;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.authorization.User;
+import org.apache.gravitino.authorization.ranger.defines.VXGroup;
+import org.apache.gravitino.authorization.ranger.defines.VXGroupList;
+import org.apache.gravitino.authorization.ranger.defines.VXUser;
+import org.apache.gravitino.authorization.ranger.defines.VXUserList;
+import org.apache.gravitino.connector.AuthorizationPropertiesMeta;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.ranger.RangerServiceException;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RangerHiveAuthorizationPlugin is a plugin for Apache Ranger to manage the 
Hive authorization of
+ * the Apache Gravitino.
+ */
+public class RangerHiveAuthorizationPlugin extends RangerAuthorizationPlugin {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RangerHiveAuthorizationPlugin.class);
+
+  public RangerHiveAuthorizationPlugin(String catalogProvider, Map<String, 
String> config) {
+    super();
+    this.catalogProvider = catalogProvider;
+    String rangerUrl = 
config.get(AuthorizationPropertiesMeta.RANGER_ADMIN_URL);
+    String authType = config.get(AuthorizationPropertiesMeta.RANGER_AUTH_TYPE);
+    String username = config.get(AuthorizationPropertiesMeta.RANGER_USERNAME);
+    // Apache Ranger Password should be minimum 8 characters with min one 
alphabet and one numeric.
+    String password = config.get(AuthorizationPropertiesMeta.RANGER_PASSWORD);
+    rangerServiceName = 
config.get(AuthorizationPropertiesMeta.RANGER_SERVICE_NAME);
+    check(rangerUrl != null, "Ranger admin URL is required");
+    check(authType != null, "Ranger auth type is required");
+    check(username != null, "Ranger username is required");
+    check(password != null, "Ranger password is required");
+    check(rangerServiceName != null, "Ranger service name is required");
+
+    rangerClient = new RangerClientExt(rangerUrl, authType, username, 
password);
+  }
+
+  /**
+   * Ranger hive's privilege have `select`, `update`, `create`, `drop`, 
`alter`, `index`, `lock`,
+   * `read`, `write`, `repladmin`, `serviceadmin`, `refresh` and `all`.
+   *
+   * <p>Reference: 
ranger/agents-common/src/main/resources/service-defs/ranger-servicedef-hive.json
+   */
+  @Override
+  protected void initMapPrivileges() {
+    mapPrivileges =
+        ImmutableMap.<Privilege.Name, Set<String>>builder()
+            .put(
+                Privilege.Name.CREATE_SCHEMA,
+                ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .put(
+                Privilege.Name.CREATE_TABLE, 
ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_CREATE))
+            .put(
+                Privilege.Name.MODIFY_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_UPDATE,
+                    RangerDefines.ACCESS_TYPE_HIVE_DROP,
+                    RangerDefines.ACCESS_TYPE_HIVE_ALTER,
+                    RangerDefines.ACCESS_TYPE_HIVE_WRITE))
+            .put(
+                Privilege.Name.SELECT_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_READ, 
RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .build();
+  }
+
+  /**
+   * Because Ranger does not have Role concept, Each metadata object will have 
a unique Ranger
+   * policy. we can use one or more Ranger policy to simulate the role. <br>
+   * 1. Create a policy for each metadata object. <br>
+   * 2. Save role name in the Policy properties. <br>
+   * 3. Set `MANAGED_BY_GRAVITINO` label in the policy. <br>
+   * 4. For easy manage, each privilege will create a RangerPolicyItemAccess 
in the policy. <br>
+   * 5. The policy will only have one user, the user is the {OWNER} of the 
policy. <br>
+   * 6. The policy will not have group. <br>
+   */
+  @Override
+  public Boolean onRoleCreated(Role role) throws RuntimeException {
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.addSecurableObject(securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleAcquired(Role role) throws RuntimeException {
+    Boolean findAll;
+    try {
+      findAll =
+          role.securableObjects().stream()
+                  .filter(
+                      securableObject -> {
+                        RangerPolicy policy = 
findManagedPolicy(securableObject);
+                        return policy != null;
+                      })
+                  .count()
+              == role.securableObjects().size();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return findAll;
+  }
+
+  /**
+   * Because one Ranger policy maybe contain multiple securable objects, so we 
didn't directly
+   * remove the policy. <br>
+   * Reference: https://github.com/apache/gravitino/issues/4509
+   */
+  @Override
+  public Boolean onRoleDeleted(Role role) throws RuntimeException {
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.removeSecurableObject(securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleUpdated(Role role, RoleChange... changes) throws 
RuntimeException {
+    for (RoleChange change : changes) {
+      boolean execResult;
+      if (change instanceof RoleChange.AddSecurableObject) {
+        execResult = doAddSecurableObject((RoleChange.AddSecurableObject) 
change);
+      } else if (change instanceof RoleChange.RemoveSecurableObject) {
+        execResult =
+            doRemoveSecurableObject(role.name(), 
(RoleChange.RemoveSecurableObject) change);
+      } else if (change instanceof RoleChange.UpdateSecurableObject) {
+        execResult =
+            doUpdateSecurableObject(role.name(), 
(RoleChange.UpdateSecurableObject) change);
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported role change type: "
+                + (change == null ? "null" : 
change.getClass().getSimpleName()));
+      }

Review Comment:
   Can we merge all the changes and update them all in one operation?



##########
api/src/main/java/org/apache/gravitino/authorization/RoleChange.java:
##########
@@ -152,4 +165,88 @@ public String toString() {
       return "REMOVESECURABLEOBJECT " + securableObject;
     }
   }
+
+  /**
+   * A UpdateSecurableObject to update securable object's privilege from role. 
<br>

Review Comment:
   A UpdateSecurableObject is to update securable object's privilege from role



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHiveAuthorizationPlugin.java:
##########
@@ -0,0 +1,854 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.ranger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.Group;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.RoleChange;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.authorization.User;
+import org.apache.gravitino.authorization.ranger.defines.VXGroup;
+import org.apache.gravitino.authorization.ranger.defines.VXGroupList;
+import org.apache.gravitino.authorization.ranger.defines.VXUser;
+import org.apache.gravitino.authorization.ranger.defines.VXUserList;
+import org.apache.gravitino.connector.AuthorizationPropertiesMeta;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.ranger.RangerServiceException;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RangerHiveAuthorizationPlugin is a plugin for Apache Ranger to manage the 
Hive authorization of
+ * the Apache Gravitino.
+ */
+public class RangerHiveAuthorizationPlugin extends RangerAuthorizationPlugin {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RangerHiveAuthorizationPlugin.class);
+
+  public RangerHiveAuthorizationPlugin(String catalogProvider, Map<String, 
String> config) {
+    super();
+    this.catalogProvider = catalogProvider;
+    String rangerUrl = 
config.get(AuthorizationPropertiesMeta.RANGER_ADMIN_URL);
+    String authType = config.get(AuthorizationPropertiesMeta.RANGER_AUTH_TYPE);
+    String username = config.get(AuthorizationPropertiesMeta.RANGER_USERNAME);
+    // Apache Ranger Password should be minimum 8 characters with min one 
alphabet and one numeric.
+    String password = config.get(AuthorizationPropertiesMeta.RANGER_PASSWORD);
+    rangerServiceName = 
config.get(AuthorizationPropertiesMeta.RANGER_SERVICE_NAME);
+    check(rangerUrl != null, "Ranger admin URL is required");
+    check(authType != null, "Ranger auth type is required");
+    check(username != null, "Ranger username is required");
+    check(password != null, "Ranger password is required");
+    check(rangerServiceName != null, "Ranger service name is required");
+
+    rangerClient = new RangerClientExt(rangerUrl, authType, username, 
password);
+  }
+
+  /**
+   * Ranger hive's privilege have `select`, `update`, `create`, `drop`, 
`alter`, `index`, `lock`,
+   * `read`, `write`, `repladmin`, `serviceadmin`, `refresh` and `all`.
+   *
+   * <p>Reference: 
ranger/agents-common/src/main/resources/service-defs/ranger-servicedef-hive.json
+   */
+  @Override
+  protected void initMapPrivileges() {
+    mapPrivileges =
+        ImmutableMap.<Privilege.Name, Set<String>>builder()
+            .put(
+                Privilege.Name.CREATE_SCHEMA,
+                ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .put(
+                Privilege.Name.CREATE_TABLE, 
ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_CREATE))
+            .put(
+                Privilege.Name.MODIFY_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_UPDATE,
+                    RangerDefines.ACCESS_TYPE_HIVE_DROP,
+                    RangerDefines.ACCESS_TYPE_HIVE_ALTER,
+                    RangerDefines.ACCESS_TYPE_HIVE_WRITE))
+            .put(
+                Privilege.Name.SELECT_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_READ, 
RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .build();
+  }
+
+  /**
+   * Because Ranger does not have Role concept, Each metadata object will have 
a unique Ranger
+   * policy. we can use one or more Ranger policy to simulate the role. <br>
+   * 1. Create a policy for each metadata object. <br>
+   * 2. Save role name in the Policy properties. <br>
+   * 3. Set `MANAGED_BY_GRAVITINO` label in the policy. <br>
+   * 4. For easy manage, each privilege will create a RangerPolicyItemAccess 
in the policy. <br>
+   * 5. The policy will only have one user, the user is the {OWNER} of the 
policy. <br>
+   * 6. The policy will not have group. <br>
+   */
+  @Override
+  public Boolean onRoleCreated(Role role) throws RuntimeException {
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.addSecurableObject(securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleAcquired(Role role) throws RuntimeException {
+    Boolean findAll;
+    try {
+      findAll =
+          role.securableObjects().stream()
+                  .filter(
+                      securableObject -> {
+                        RangerPolicy policy = 
findManagedPolicy(securableObject);
+                        return policy != null;
+                      })
+                  .count()
+              == role.securableObjects().size();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return findAll;
+  }
+
+  /**
+   * Because one Ranger policy maybe contain multiple securable objects, so we 
didn't directly
+   * remove the policy. <br>
+   * Reference: https://github.com/apache/gravitino/issues/4509
+   */
+  @Override
+  public Boolean onRoleDeleted(Role role) throws RuntimeException {
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.removeSecurableObject(securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleUpdated(Role role, RoleChange... changes) throws 
RuntimeException {
+    for (RoleChange change : changes) {
+      boolean execResult;
+      if (change instanceof RoleChange.AddSecurableObject) {
+        execResult = doAddSecurableObject((RoleChange.AddSecurableObject) 
change);
+      } else if (change instanceof RoleChange.RemoveSecurableObject) {
+        execResult =
+            doRemoveSecurableObject(role.name(), 
(RoleChange.RemoveSecurableObject) change);
+      } else if (change instanceof RoleChange.UpdateSecurableObject) {
+        execResult =
+            doUpdateSecurableObject(role.name(), 
(RoleChange.UpdateSecurableObject) change);
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported role change type: "
+                + (change == null ? "null" : 
change.getClass().getSimpleName()));
+      }
+      if (!execResult) {
+        return Boolean.FALSE;
+      }
+    }
+
+    return Boolean.TRUE;
+  }
+
+  @VisibleForTesting
+  public List<Privilege> getAllPrivileges() {
+    return 
mapPrivileges.keySet().stream().map(Privileges::allow).collect(Collectors.toList());
+  }
+
+  @Override
+  public Boolean onOwnerSet(MetadataObject metadataObject, Owner preOwner, 
Owner newOwner)

Review Comment:
   Can we assure that the update process is atomic?



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHiveAuthorizationPlugin.java:
##########
@@ -0,0 +1,854 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.ranger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.Group;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.RoleChange;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.authorization.User;
+import org.apache.gravitino.authorization.ranger.defines.VXGroup;
+import org.apache.gravitino.authorization.ranger.defines.VXGroupList;
+import org.apache.gravitino.authorization.ranger.defines.VXUser;
+import org.apache.gravitino.authorization.ranger.defines.VXUserList;
+import org.apache.gravitino.connector.AuthorizationPropertiesMeta;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.ranger.RangerServiceException;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RangerHiveAuthorizationPlugin is a plugin for Apache Ranger to manage the 
Hive authorization of
+ * the Apache Gravitino.
+ */
+public class RangerHiveAuthorizationPlugin extends RangerAuthorizationPlugin {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RangerHiveAuthorizationPlugin.class);
+
+  public RangerHiveAuthorizationPlugin(String catalogProvider, Map<String, 
String> config) {
+    super();
+    this.catalogProvider = catalogProvider;
+    String rangerUrl = 
config.get(AuthorizationPropertiesMeta.RANGER_ADMIN_URL);
+    String authType = config.get(AuthorizationPropertiesMeta.RANGER_AUTH_TYPE);
+    String username = config.get(AuthorizationPropertiesMeta.RANGER_USERNAME);
+    // Apache Ranger Password should be minimum 8 characters with min one 
alphabet and one numeric.
+    String password = config.get(AuthorizationPropertiesMeta.RANGER_PASSWORD);
+    rangerServiceName = 
config.get(AuthorizationPropertiesMeta.RANGER_SERVICE_NAME);
+    check(rangerUrl != null, "Ranger admin URL is required");
+    check(authType != null, "Ranger auth type is required");
+    check(username != null, "Ranger username is required");
+    check(password != null, "Ranger password is required");
+    check(rangerServiceName != null, "Ranger service name is required");
+
+    rangerClient = new RangerClientExt(rangerUrl, authType, username, 
password);
+  }
+
+  /**
+   * Ranger hive's privilege have `select`, `update`, `create`, `drop`, 
`alter`, `index`, `lock`,
+   * `read`, `write`, `repladmin`, `serviceadmin`, `refresh` and `all`.
+   *
+   * <p>Reference: 
ranger/agents-common/src/main/resources/service-defs/ranger-servicedef-hive.json
+   */
+  @Override
+  protected void initMapPrivileges() {
+    mapPrivileges =
+        ImmutableMap.<Privilege.Name, Set<String>>builder()
+            .put(
+                Privilege.Name.CREATE_SCHEMA,
+                ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .put(
+                Privilege.Name.CREATE_TABLE, 
ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_CREATE))
+            .put(
+                Privilege.Name.MODIFY_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_UPDATE,
+                    RangerDefines.ACCESS_TYPE_HIVE_DROP,
+                    RangerDefines.ACCESS_TYPE_HIVE_ALTER,
+                    RangerDefines.ACCESS_TYPE_HIVE_WRITE))
+            .put(
+                Privilege.Name.SELECT_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_READ, 
RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .build();
+  }
+
+  /**
+   * Because Ranger does not have Role concept, Each metadata object will have 
a unique Ranger
+   * policy. we can use one or more Ranger policy to simulate the role. <br>
+   * 1. Create a policy for each metadata object. <br>
+   * 2. Save role name in the Policy properties. <br>
+   * 3. Set `MANAGED_BY_GRAVITINO` label in the policy. <br>
+   * 4. For easy manage, each privilege will create a RangerPolicyItemAccess 
in the policy. <br>
+   * 5. The policy will only have one user, the user is the {OWNER} of the 
policy. <br>
+   * 6. The policy will not have group. <br>
+   */
+  @Override
+  public Boolean onRoleCreated(Role role) throws RuntimeException {
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.addSecurableObject(securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleAcquired(Role role) throws RuntimeException {
+    Boolean findAll;
+    try {
+      findAll =
+          role.securableObjects().stream()
+                  .filter(
+                      securableObject -> {
+                        RangerPolicy policy = 
findManagedPolicy(securableObject);
+                        return policy != null;
+                      })
+                  .count()
+              == role.securableObjects().size();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return findAll;
+  }
+
+  /**
+   * Because one Ranger policy maybe contain multiple securable objects, so we 
didn't directly
+   * remove the policy. <br>
+   * Reference: https://github.com/apache/gravitino/issues/4509
+   */
+  @Override
+  public Boolean onRoleDeleted(Role role) throws RuntimeException {
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.removeSecurableObject(securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleUpdated(Role role, RoleChange... changes) throws 
RuntimeException {
+    for (RoleChange change : changes) {
+      boolean execResult;
+      if (change instanceof RoleChange.AddSecurableObject) {
+        execResult = doAddSecurableObject((RoleChange.AddSecurableObject) 
change);
+      } else if (change instanceof RoleChange.RemoveSecurableObject) {
+        execResult =
+            doRemoveSecurableObject(role.name(), 
(RoleChange.RemoveSecurableObject) change);
+      } else if (change instanceof RoleChange.UpdateSecurableObject) {
+        execResult =
+            doUpdateSecurableObject(role.name(), 
(RoleChange.UpdateSecurableObject) change);
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported role change type: "
+                + (change == null ? "null" : 
change.getClass().getSimpleName()));
+      }
+      if (!execResult) {
+        return Boolean.FALSE;
+      }
+    }
+
+    return Boolean.TRUE;
+  }
+
+  @VisibleForTesting
+  public List<Privilege> getAllPrivileges() {
+    return 
mapPrivileges.keySet().stream().map(Privileges::allow).collect(Collectors.toList());
+  }
+
+  @Override
+  public Boolean onOwnerSet(MetadataObject metadataObject, Owner preOwner, 
Owner newOwner)
+      throws RuntimeException {
+    List<Privilege> allPrivileges = getAllPrivileges();
+
+    SecurableObject securableObjects =
+        SecurableObjects.parse(metadataObject.fullName(), 
metadataObject.type(), allPrivileges);
+
+    AuditInfo auditInfo =
+        AuditInfo.builder()
+            .withCreator(PrincipalUtils.getCurrentUserName())
+            .withCreateTime(Instant.now())
+            .build();
+
+    RoleEntity role =
+        RoleEntity.builder()
+            .withId(0L)
+            .withName(OWNER_ROLE_NAME)
+            .withAuditInfo(auditInfo)
+            .withSecurableObjects(Lists.newArrayList(securableObjects))
+            .build();
+
+    Boolean execResult = onRoleCreated(role);
+    if (!execResult) {
+      return Boolean.FALSE;
+    }
+    if (preOwner != null) {
+      if (preOwner.type() == Owner.Type.USER) {
+        UserEntity userEntity =
+            UserEntity.builder()
+                .withId(0L)
+                .withName(preOwner.name())
+                .withRoleNames(Collections.emptyList())
+                .withRoleIds(Collections.emptyList())
+                .withAuditInfo(auditInfo)
+                .build();
+        execResult = onRevokedRolesFromUser(Lists.newArrayList(role), 
userEntity);
+        if (!execResult) {
+          return Boolean.FALSE;

Review Comment:
   I believe we should log the failure information regarding the steps that 
failed.



##########
api/src/main/java/org/apache/gravitino/authorization/RoleChange.java:
##########
@@ -152,4 +165,88 @@ public String toString() {
       return "REMOVESECURABLEOBJECT " + securableObject;
     }
   }
+
+  /**
+   * A UpdateSecurableObject to update securable object's privilege from role. 
<br>
+   * The securable object's metadata entity must same as new securable 
object's metadata entity.

Review Comment:
   must be the same as...



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHiveAuthorizationPlugin.java:
##########
@@ -0,0 +1,854 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.ranger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.Group;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.RoleChange;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.authorization.User;
+import org.apache.gravitino.authorization.ranger.defines.VXGroup;
+import org.apache.gravitino.authorization.ranger.defines.VXGroupList;
+import org.apache.gravitino.authorization.ranger.defines.VXUser;
+import org.apache.gravitino.authorization.ranger.defines.VXUserList;
+import org.apache.gravitino.connector.AuthorizationPropertiesMeta;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.ranger.RangerServiceException;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RangerHiveAuthorizationPlugin is a plugin for Apache Ranger to manage the 
Hive authorization of
+ * the Apache Gravitino.
+ */
+public class RangerHiveAuthorizationPlugin extends RangerAuthorizationPlugin {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RangerHiveAuthorizationPlugin.class);
+
+  public RangerHiveAuthorizationPlugin(String catalogProvider, Map<String, 
String> config) {
+    super();
+    this.catalogProvider = catalogProvider;
+    String rangerUrl = 
config.get(AuthorizationPropertiesMeta.RANGER_ADMIN_URL);
+    String authType = config.get(AuthorizationPropertiesMeta.RANGER_AUTH_TYPE);
+    String username = config.get(AuthorizationPropertiesMeta.RANGER_USERNAME);
+    // Apache Ranger Password should be minimum 8 characters with min one 
alphabet and one numeric.
+    String password = config.get(AuthorizationPropertiesMeta.RANGER_PASSWORD);
+    rangerServiceName = 
config.get(AuthorizationPropertiesMeta.RANGER_SERVICE_NAME);
+    check(rangerUrl != null, "Ranger admin URL is required");
+    check(authType != null, "Ranger auth type is required");
+    check(username != null, "Ranger username is required");
+    check(password != null, "Ranger password is required");
+    check(rangerServiceName != null, "Ranger service name is required");
+
+    rangerClient = new RangerClientExt(rangerUrl, authType, username, 
password);
+  }
+
+  /**
+   * Ranger hive's privilege have `select`, `update`, `create`, `drop`, 
`alter`, `index`, `lock`,
+   * `read`, `write`, `repladmin`, `serviceadmin`, `refresh` and `all`.
+   *
+   * <p>Reference: 
ranger/agents-common/src/main/resources/service-defs/ranger-servicedef-hive.json
+   */
+  @Override
+  protected void initMapPrivileges() {
+    mapPrivileges =
+        ImmutableMap.<Privilege.Name, Set<String>>builder()
+            .put(
+                Privilege.Name.CREATE_SCHEMA,
+                ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .put(
+                Privilege.Name.CREATE_TABLE, 
ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_CREATE))
+            .put(
+                Privilege.Name.MODIFY_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_UPDATE,
+                    RangerDefines.ACCESS_TYPE_HIVE_DROP,
+                    RangerDefines.ACCESS_TYPE_HIVE_ALTER,
+                    RangerDefines.ACCESS_TYPE_HIVE_WRITE))
+            .put(
+                Privilege.Name.SELECT_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_READ, 
RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .build();
+  }
+
+  /**
+   * Because Ranger does not have Role concept, Each metadata object will have 
a unique Ranger
+   * policy. we can use one or more Ranger policy to simulate the role. <br>
+   * 1. Create a policy for each metadata object. <br>
+   * 2. Save role name in the Policy properties. <br>
+   * 3. Set `MANAGED_BY_GRAVITINO` label in the policy. <br>
+   * 4. For easy manage, each privilege will create a RangerPolicyItemAccess 
in the policy. <br>
+   * 5. The policy will only have one user, the user is the {OWNER} of the 
policy. <br>
+   * 6. The policy will not have group. <br>
+   */
+  @Override
+  public Boolean onRoleCreated(Role role) throws RuntimeException {
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.addSecurableObject(securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleAcquired(Role role) throws RuntimeException {
+    Boolean findAll;
+    try {
+      findAll =
+          role.securableObjects().stream()
+                  .filter(
+                      securableObject -> {
+                        RangerPolicy policy = 
findManagedPolicy(securableObject);
+                        return policy != null;
+                      })
+                  .count()
+              == role.securableObjects().size();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return findAll;
+  }
+
+  /**
+   * Because one Ranger policy maybe contain multiple securable objects, so we 
didn't directly
+   * remove the policy. <br>
+   * Reference: https://github.com/apache/gravitino/issues/4509
+   */
+  @Override
+  public Boolean onRoleDeleted(Role role) throws RuntimeException {
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.removeSecurableObject(securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleUpdated(Role role, RoleChange... changes) throws 
RuntimeException {
+    for (RoleChange change : changes) {
+      boolean execResult;
+      if (change instanceof RoleChange.AddSecurableObject) {
+        execResult = doAddSecurableObject((RoleChange.AddSecurableObject) 
change);
+      } else if (change instanceof RoleChange.RemoveSecurableObject) {
+        execResult =
+            doRemoveSecurableObject(role.name(), 
(RoleChange.RemoveSecurableObject) change);
+      } else if (change instanceof RoleChange.UpdateSecurableObject) {
+        execResult =
+            doUpdateSecurableObject(role.name(), 
(RoleChange.UpdateSecurableObject) change);
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported role change type: "
+                + (change == null ? "null" : 
change.getClass().getSimpleName()));
+      }
+      if (!execResult) {
+        return Boolean.FALSE;
+      }
+    }
+
+    return Boolean.TRUE;
+  }
+
+  @VisibleForTesting
+  public List<Privilege> getAllPrivileges() {
+    return 
mapPrivileges.keySet().stream().map(Privileges::allow).collect(Collectors.toList());
+  }
+
+  @Override
+  public Boolean onOwnerSet(MetadataObject metadataObject, Owner preOwner, 
Owner newOwner)
+      throws RuntimeException {
+    List<Privilege> allPrivileges = getAllPrivileges();
+
+    SecurableObject securableObjects =
+        SecurableObjects.parse(metadataObject.fullName(), 
metadataObject.type(), allPrivileges);
+
+    AuditInfo auditInfo =
+        AuditInfo.builder()
+            .withCreator(PrincipalUtils.getCurrentUserName())
+            .withCreateTime(Instant.now())
+            .build();
+
+    RoleEntity role =
+        RoleEntity.builder()
+            .withId(0L)
+            .withName(OWNER_ROLE_NAME)
+            .withAuditInfo(auditInfo)
+            .withSecurableObjects(Lists.newArrayList(securableObjects))
+            .build();
+
+    Boolean execResult = onRoleCreated(role);
+    if (!execResult) {
+      return Boolean.FALSE;
+    }
+    if (preOwner != null) {
+      if (preOwner.type() == Owner.Type.USER) {
+        UserEntity userEntity =
+            UserEntity.builder()
+                .withId(0L)
+                .withName(preOwner.name())
+                .withRoleNames(Collections.emptyList())
+                .withRoleIds(Collections.emptyList())
+                .withAuditInfo(auditInfo)
+                .build();
+        execResult = onRevokedRolesFromUser(Lists.newArrayList(role), 
userEntity);
+        if (!execResult) {
+          return Boolean.FALSE;
+        }
+      } else {
+        GroupEntity groupEntity =
+            GroupEntity.builder()
+                .withId(0L)
+                .withName(preOwner.name())
+                .withRoleNames(Collections.emptyList())
+                .withRoleIds(Collections.emptyList())
+                .withAuditInfo(auditInfo)
+                .build();
+        execResult = onRevokedRolesFromGroup(Lists.newArrayList(role), 
groupEntity);
+        if (!execResult) {
+          return Boolean.FALSE;
+        }
+      }
+    }
+    if (newOwner != null) {
+      if (newOwner.type() == Owner.Type.USER) {
+        UserEntity userEntity =
+            UserEntity.builder()
+                .withId(0L)
+                .withName(newOwner.name())
+                .withRoleNames(Collections.emptyList())
+                .withRoleIds(Collections.emptyList())
+                .withAuditInfo(auditInfo)
+                .build();
+        execResult = onGrantedRolesToUser(Lists.newArrayList(role), 
userEntity);
+        if (!execResult) {
+          return Boolean.FALSE;
+        }
+      } else {
+        GroupEntity groupEntity =
+            GroupEntity.builder()
+                .withId(0L)
+                .withName(newOwner.name())
+                .withRoleNames(Collections.emptyList())
+                .withRoleIds(Collections.emptyList())
+                .withAuditInfo(auditInfo)
+                .build();
+        execResult = onGrantedRolesToGroup(Lists.newArrayList(role), 
groupEntity);
+        if (!execResult) {
+          return Boolean.FALSE;
+        }
+      }
+    }
+    return Boolean.TRUE;
+  }
+
+  /**
+   * Because one Ranger policy maybe contain multiple Gravitino securable 
objects, <br>
+   * So we need to find the corresponding policy item mapping to set the user.
+   */
+  @Override
+  public Boolean onGrantedRolesToUser(List<Role> roles, User user) throws 
RuntimeException {
+    AtomicReference<Boolean> execResult = new AtomicReference<>(Boolean.TRUE);
+    for (Role role : roles) {
+      role.securableObjects()
+          .forEach(
+              securableObject -> {
+                RangerPolicy policy = findManagedPolicy(securableObject);
+                if (policy == null) {
+                  LOG.warn("The policy is not exist for the securable 
object({})", securableObject);
+                  execResult.set(Boolean.FALSE);
+                  return;
+                }
+
+                securableObject
+                    .privileges()
+                    .forEach(
+                        privilege -> {
+                          // Convert Gravitino privilege to Ranger privilege
+                          mapPrivileges
+                              .getOrDefault(privilege.name(), 
Collections.emptySet())
+                              .forEach(
+                                  // Use the Ranger privilege name to search
+                                  rangerPrivilegeName -> {
+                                    policy
+                                        /**
+                                         * TODO: Maybe we need to add the user 
to the
+                                         * Deny/DataMask/RowFilter policy item 
in the future.
+                                         */
+                                        .getPolicyItems()
+                                        .forEach(
+                                            policyItem -> {
+                                              if 
(policyItem.getAccesses().stream()
+                                                  .anyMatch(
+                                                      policyItemAccess ->
+                                                          policyItemAccess
+                                                              .getType()
+                                                              
.equals(rangerPrivilegeName))) {
+                                                // If the user is not exist in 
the policy item, then
+                                                // add it.
+                                                if 
(!policyItem.getUsers().contains(user.name())) {
+                                                  
policyItem.getUsers().add(user.name());
+                                                }
+                                              }
+                                            });
+                                    try {
+                                      
rangerClient.updatePolicy(policy.getId(), policy);
+                                    } catch (RangerServiceException e) {
+                                      throw new RuntimeException(e);
+                                    }
+                                  });
+                        });
+              });
+      if (!execResult.get()) {
+        return Boolean.FALSE;
+      }
+    }
+
+    return Boolean.TRUE;
+  }
+
+  /**
+   * Because one Ranger policy maybe contain multiple Gravitino securable 
objects, <br>
+   * So we need to find the corresponding policy item mapping to remove the 
user.
+   */
+  @Override
+  public Boolean onRevokedRolesFromUser(List<Role> roles, User user) throws 
RuntimeException {
+    AtomicReference<Boolean> result = new AtomicReference<>(Boolean.TRUE);
+    for (Role role : roles) {
+      role.securableObjects()
+          .forEach(
+              securableObject -> {
+                RangerPolicy policy = findManagedPolicy(securableObject);
+                if (policy == null) {
+                  LOG.warn("The policy is not exist for the securable 
object({})", securableObject);
+                  result.set(Boolean.FALSE);
+                  return;
+                }
+
+                securableObject
+                    .privileges()
+                    .forEach(
+                        privilege -> {
+                          // Convert Gravitino privilege to Ranger privilege
+                          mapPrivileges
+                              .getOrDefault(privilege.name(), 
Collections.emptySet())
+                              .forEach(
+                                  // Use the Ranger privilege name to search
+                                  rangerPrivilegeName -> {
+                                    policy
+                                        // TODO: Maybe we need to add the user 
to the
+                                        // Deny/DataMask/RowFilter policy item 
in the future.
+                                        .getPolicyItems()
+                                        .forEach(
+                                            policyItem -> {
+                                              if 
(policyItem.getAccesses().stream()
+                                                  .anyMatch(
+                                                      policyItemAccess ->
+                                                          policyItemAccess
+                                                              .getType()
+                                                              
.equals(rangerPrivilegeName))) {
+                                                // If the user is exist in the 
policy item, then
+                                                // remove it.
+                                                
policyItem.getUsers().removeIf(user.name()::equals);
+                                              }
+                                            });
+                                    try {
+                                      
rangerClient.updatePolicy(policy.getId(), policy);
+                                    } catch (RangerServiceException e) {
+                                      throw new RuntimeException(e);
+                                    }
+                                  });
+                        });
+              });
+      if (!result.get()) {

Review Comment:
   If the code goes there, the value of `result.get()` can be false, Am I right?



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHiveAuthorizationPlugin.java:
##########
@@ -0,0 +1,854 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.ranger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.Group;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.RoleChange;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.authorization.User;
+import org.apache.gravitino.authorization.ranger.defines.VXGroup;
+import org.apache.gravitino.authorization.ranger.defines.VXGroupList;
+import org.apache.gravitino.authorization.ranger.defines.VXUser;
+import org.apache.gravitino.authorization.ranger.defines.VXUserList;
+import org.apache.gravitino.connector.AuthorizationPropertiesMeta;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.ranger.RangerServiceException;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RangerHiveAuthorizationPlugin is a plugin for Apache Ranger to manage the 
Hive authorization of
+ * the Apache Gravitino.
+ */
+public class RangerHiveAuthorizationPlugin extends RangerAuthorizationPlugin {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RangerHiveAuthorizationPlugin.class);
+
+  public RangerHiveAuthorizationPlugin(String catalogProvider, Map<String, 
String> config) {
+    super();
+    this.catalogProvider = catalogProvider;
+    String rangerUrl = 
config.get(AuthorizationPropertiesMeta.RANGER_ADMIN_URL);
+    String authType = config.get(AuthorizationPropertiesMeta.RANGER_AUTH_TYPE);
+    String username = config.get(AuthorizationPropertiesMeta.RANGER_USERNAME);
+    // Apache Ranger Password should be minimum 8 characters with min one 
alphabet and one numeric.
+    String password = config.get(AuthorizationPropertiesMeta.RANGER_PASSWORD);
+    rangerServiceName = 
config.get(AuthorizationPropertiesMeta.RANGER_SERVICE_NAME);
+    check(rangerUrl != null, "Ranger admin URL is required");
+    check(authType != null, "Ranger auth type is required");
+    check(username != null, "Ranger username is required");
+    check(password != null, "Ranger password is required");
+    check(rangerServiceName != null, "Ranger service name is required");
+
+    rangerClient = new RangerClientExt(rangerUrl, authType, username, 
password);
+  }
+
+  /**
+   * Ranger hive's privilege have `select`, `update`, `create`, `drop`, 
`alter`, `index`, `lock`,
+   * `read`, `write`, `repladmin`, `serviceadmin`, `refresh` and `all`.
+   *
+   * <p>Reference: 
ranger/agents-common/src/main/resources/service-defs/ranger-servicedef-hive.json
+   */
+  @Override
+  protected void initMapPrivileges() {
+    mapPrivileges =
+        ImmutableMap.<Privilege.Name, Set<String>>builder()
+            .put(
+                Privilege.Name.CREATE_SCHEMA,
+                ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .put(
+                Privilege.Name.CREATE_TABLE, 
ImmutableSet.of(RangerDefines.ACCESS_TYPE_HIVE_CREATE))
+            .put(
+                Privilege.Name.MODIFY_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_UPDATE,
+                    RangerDefines.ACCESS_TYPE_HIVE_DROP,
+                    RangerDefines.ACCESS_TYPE_HIVE_ALTER,
+                    RangerDefines.ACCESS_TYPE_HIVE_WRITE))
+            .put(
+                Privilege.Name.SELECT_TABLE,
+                ImmutableSet.of(
+                    RangerDefines.ACCESS_TYPE_HIVE_READ, 
RangerDefines.ACCESS_TYPE_HIVE_SELECT))
+            .build();
+  }
+
+  /**
+   * Because Ranger does not have Role concept, Each metadata object will have 
a unique Ranger
+   * policy. we can use one or more Ranger policy to simulate the role. <br>
+   * 1. Create a policy for each metadata object. <br>
+   * 2. Save role name in the Policy properties. <br>
+   * 3. Set `MANAGED_BY_GRAVITINO` label in the policy. <br>
+   * 4. For easy manage, each privilege will create a RangerPolicyItemAccess 
in the policy. <br>
+   * 5. The policy will only have one user, the user is the {OWNER} of the 
policy. <br>
+   * 6. The policy will not have group. <br>
+   */
+  @Override
+  public Boolean onRoleCreated(Role role) throws RuntimeException {
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.addSecurableObject(securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleAcquired(Role role) throws RuntimeException {
+    Boolean findAll;
+    try {
+      findAll =
+          role.securableObjects().stream()
+                  .filter(
+                      securableObject -> {
+                        RangerPolicy policy = 
findManagedPolicy(securableObject);
+                        return policy != null;
+                      })
+                  .count()
+              == role.securableObjects().size();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return findAll;
+  }
+
+  /**
+   * Because one Ranger policy maybe contain multiple securable objects, so we 
didn't directly
+   * remove the policy. <br>
+   * Reference: https://github.com/apache/gravitino/issues/4509
+   */
+  @Override
+  public Boolean onRoleDeleted(Role role) throws RuntimeException {
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.removeSecurableObject(securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleUpdated(Role role, RoleChange... changes) throws 
RuntimeException {
+    for (RoleChange change : changes) {
+      boolean execResult;
+      if (change instanceof RoleChange.AddSecurableObject) {
+        execResult = doAddSecurableObject((RoleChange.AddSecurableObject) 
change);
+      } else if (change instanceof RoleChange.RemoveSecurableObject) {
+        execResult =
+            doRemoveSecurableObject(role.name(), 
(RoleChange.RemoveSecurableObject) change);
+      } else if (change instanceof RoleChange.UpdateSecurableObject) {
+        execResult =
+            doUpdateSecurableObject(role.name(), 
(RoleChange.UpdateSecurableObject) change);
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported role change type: "
+                + (change == null ? "null" : 
change.getClass().getSimpleName()));
+      }
+      if (!execResult) {
+        return Boolean.FALSE;
+      }
+    }
+
+    return Boolean.TRUE;
+  }
+
+  @VisibleForTesting
+  public List<Privilege> getAllPrivileges() {
+    return 
mapPrivileges.keySet().stream().map(Privileges::allow).collect(Collectors.toList());
+  }
+
+  @Override
+  public Boolean onOwnerSet(MetadataObject metadataObject, Owner preOwner, 
Owner newOwner)
+      throws RuntimeException {
+    List<Privilege> allPrivileges = getAllPrivileges();
+
+    SecurableObject securableObjects =
+        SecurableObjects.parse(metadataObject.fullName(), 
metadataObject.type(), allPrivileges);
+
+    AuditInfo auditInfo =
+        AuditInfo.builder()
+            .withCreator(PrincipalUtils.getCurrentUserName())
+            .withCreateTime(Instant.now())
+            .build();
+
+    RoleEntity role =
+        RoleEntity.builder()
+            .withId(0L)
+            .withName(OWNER_ROLE_NAME)
+            .withAuditInfo(auditInfo)
+            .withSecurableObjects(Lists.newArrayList(securableObjects))
+            .build();
+
+    Boolean execResult = onRoleCreated(role);
+    if (!execResult) {
+      return Boolean.FALSE;
+    }
+    if (preOwner != null) {
+      if (preOwner.type() == Owner.Type.USER) {
+        UserEntity userEntity =
+            UserEntity.builder()
+                .withId(0L)
+                .withName(preOwner.name())
+                .withRoleNames(Collections.emptyList())
+                .withRoleIds(Collections.emptyList())
+                .withAuditInfo(auditInfo)
+                .build();
+        execResult = onRevokedRolesFromUser(Lists.newArrayList(role), 
userEntity);
+        if (!execResult) {
+          return Boolean.FALSE;
+        }
+      } else {
+        GroupEntity groupEntity =
+            GroupEntity.builder()
+                .withId(0L)
+                .withName(preOwner.name())
+                .withRoleNames(Collections.emptyList())
+                .withRoleIds(Collections.emptyList())
+                .withAuditInfo(auditInfo)
+                .build();
+        execResult = onRevokedRolesFromGroup(Lists.newArrayList(role), 
groupEntity);
+        if (!execResult) {
+          return Boolean.FALSE;
+        }
+      }
+    }
+    if (newOwner != null) {
+      if (newOwner.type() == Owner.Type.USER) {
+        UserEntity userEntity =
+            UserEntity.builder()
+                .withId(0L)
+                .withName(newOwner.name())
+                .withRoleNames(Collections.emptyList())
+                .withRoleIds(Collections.emptyList())
+                .withAuditInfo(auditInfo)
+                .build();
+        execResult = onGrantedRolesToUser(Lists.newArrayList(role), 
userEntity);
+        if (!execResult) {
+          return Boolean.FALSE;
+        }
+      } else {
+        GroupEntity groupEntity =
+            GroupEntity.builder()
+                .withId(0L)
+                .withName(newOwner.name())
+                .withRoleNames(Collections.emptyList())
+                .withRoleIds(Collections.emptyList())
+                .withAuditInfo(auditInfo)
+                .build();
+        execResult = onGrantedRolesToGroup(Lists.newArrayList(role), 
groupEntity);
+        if (!execResult) {
+          return Boolean.FALSE;
+        }
+      }
+    }
+    return Boolean.TRUE;
+  }
+
+  /**
+   * Because one Ranger policy maybe contain multiple Gravitino securable 
objects, <br>
+   * So we need to find the corresponding policy item mapping to set the user.
+   */
+  @Override
+  public Boolean onGrantedRolesToUser(List<Role> roles, User user) throws 
RuntimeException {
+    AtomicReference<Boolean> execResult = new AtomicReference<>(Boolean.TRUE);
+    for (Role role : roles) {
+      role.securableObjects()
+          .forEach(
+              securableObject -> {
+                RangerPolicy policy = findManagedPolicy(securableObject);
+                if (policy == null) {
+                  LOG.warn("The policy is not exist for the securable 
object({})", securableObject);

Review Comment:
   is -> does



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to