This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 75ec8aed8 [#4903] feat(core,server): Support to grant or revoke 
privileges for a role (#5021)
75ec8aed8 is described below

commit 75ec8aed8a9bff37a7d577f5cfd942b53b7ac6ef
Author: roryqi <[email protected]>
AuthorDate: Wed Oct 9 11:26:57 2024 +0800

    [#4903] feat(core,server): Support to grant or revoke privileges for a role 
(#5021)
    
    ### What changes were proposed in this pull request?
    
    Supports to grant or revoke privileges for  a role
    
    ### Why are the changes needed?
    
    Fix: #4903
    
    ### Does this PR introduce _any_ user-facing change?
    I will add the document later.
    
    ### How was this patch tested?
    Add the UT.
---
 .../gravitino/authorization/SecurableObjects.java  |   4 +-
 .../exceptions/IllegalPrivilegeException.java      |  62 +++++
 .../org/apache/gravitino/client/DTOConverters.java |  25 +-
 .../org/apache/gravitino/client/ErrorHandlers.java |  17 +-
 .../apache/gravitino/client/GravitinoClient.java   |  43 +++
 .../apache/gravitino/client/GravitinoMetalake.java |  87 ++++++
 .../apache/gravitino/client/TestPermission.java    | 117 +++++++-
 .../test/authorization/AccessControlIT.java        | 143 ++++++++++
 .../dto/requests/PrivilegeGrantRequest.java        |  68 +++++
 .../dto/requests/PrivilegeRevokeRequest.java       |  68 +++++
 .../gravitino/dto/requests/RoleCreateRequest.java  |   5 +-
 .../gravitino/dto/responses/ErrorResponse.java     |  17 +-
 .../authorization/AccessControlDispatcher.java     |  30 ++
 .../authorization/AccessControlManager.java        |  15 +
 .../authorization/AuthorizationUtils.java          |  32 ++-
 .../gravitino/authorization/PermissionManager.java | 301 ++++++++++++++++++++-
 .../hook/AccessControlHookDispatcher.java          |  15 +
 .../gravitino/storage/relational/JDBCBackend.java  |   2 +
 .../storage/relational/mapper/RoleMetaMapper.java  |   4 +
 .../mapper/RoleMetaSQLProviderFactory.java         |   5 +
 .../relational/mapper/SecurableObjectMapper.java   |   6 +
 .../mapper/SecurableObjectSQLProviderFactory.java  |   5 +
 .../provider/base/RoleMetaBaseSQLProvider.java     |  19 ++
 .../base/SecurableObjectBaseSQLProvider.java       |  17 ++
 .../SecurableObjectPostgreSQLProvider.java         |  20 ++
 .../relational/service/RoleMetaService.java        |  90 ++++++
 .../storage/relational/utils/POConverters.java     |  21 ++
 .../TestAccessControlManagerForPermissions.java    | 116 ++++++++
 .../relational/service/TestRoleMetaService.java    | 176 ++++++++++++
 .../org/apache/gravitino/server/web/Utils.java     |   8 +-
 .../web/mapper/JsonMappingExceptionMapper.java     |   2 +-
 .../web/mapper/JsonParseExceptionMapper.java       |   2 +-
 .../server/web/rest/ExceptionHandlers.java         |  18 ++
 .../server/web/rest/PermissionOperations.java      |  98 +++++++
 .../gravitino/server/web/rest/RoleOperations.java  |   2 +
 .../server/web/rest/TestPermissionOperations.java  | 172 ++++++++++++
 .../server/web/rest/TestRoleOperations.java        |   3 +-
 37 files changed, 1797 insertions(+), 38 deletions(-)

diff --git 
a/api/src/main/java/org/apache/gravitino/authorization/SecurableObjects.java 
b/api/src/main/java/org/apache/gravitino/authorization/SecurableObjects.java
index 2ec594bcd..e6ca3a851 100644
--- a/api/src/main/java/org/apache/gravitino/authorization/SecurableObjects.java
+++ b/api/src/main/java/org/apache/gravitino/authorization/SecurableObjects.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.authorization;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
@@ -125,7 +126,8 @@ public class SecurableObjects {
 
     SecurableObjectImpl(String parent, String name, Type type, List<Privilege> 
privileges) {
       super(parent, name, type);
-      this.privileges = ImmutableList.copyOf(privileges);
+      // Remove duplicated privileges
+      this.privileges = ImmutableList.copyOf(Sets.newHashSet(privileges));
     }
 
     @Override
diff --git 
a/api/src/main/java/org/apache/gravitino/exceptions/IllegalPrivilegeException.java
 
b/api/src/main/java/org/apache/gravitino/exceptions/IllegalPrivilegeException.java
new file mode 100644
index 000000000..0fead4d96
--- /dev/null
+++ 
b/api/src/main/java/org/apache/gravitino/exceptions/IllegalPrivilegeException.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.exceptions;
+
+import com.google.errorprone.annotations.FormatMethod;
+import com.google.errorprone.annotations.FormatString;
+
+/** An exception thrown when a privilege is invalid. */
+public class IllegalPrivilegeException extends IllegalArgumentException {
+  /**
+   * Constructs a new exception with the specified detail message.
+   *
+   * @param message the detail message.
+   * @param args the arguments to the message.
+   */
+  @FormatMethod
+  public IllegalPrivilegeException(@FormatString String message, Object... 
args) {
+    super(String.format(message, args));
+  }
+
+  /**
+   * Constructs a new exception with the specified detail message and cause.
+   *
+   * @param cause the cause.
+   * @param message the detail message.
+   * @param args the arguments to the message.
+   */
+  @FormatMethod
+  public IllegalPrivilegeException(Throwable cause, @FormatString String 
message, Object... args) {
+    super(String.format(message, args), cause);
+  }
+
+  /**
+   * Constructs a new exception with the specified cause.
+   *
+   * @param cause the cause.
+   */
+  public IllegalPrivilegeException(Throwable cause) {
+    super(cause);
+  }
+
+  /** Constructs a new exception with the specified detail message and cause. 
*/
+  public IllegalPrivilegeException() {
+    super();
+  }
+}
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/DTOConverters.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/DTOConverters.java
index 6b259405d..20aa19319 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/DTOConverters.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/DTOConverters.java
@@ -20,11 +20,14 @@ package org.apache.gravitino.client;
 
 import static org.apache.gravitino.dto.util.DTOConverters.toFunctionArg;
 
+import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.CatalogChange;
 import org.apache.gravitino.MetalakeChange;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.SchemaChange;
+import org.apache.gravitino.authorization.Privilege;
 import org.apache.gravitino.authorization.SecurableObject;
 import org.apache.gravitino.dto.AuditDTO;
 import org.apache.gravitino.dto.CatalogDTO;
@@ -302,19 +305,21 @@ class DTOConverters {
     return SecurableObjectDTO.builder()
         .withFullName(securableObject.fullName())
         .withType(securableObject.type())
-        .withPrivileges(
-            securableObject.privileges().stream()
-                .map(
-                    privilege -> {
-                      return PrivilegeDTO.builder()
-                          .withCondition(privilege.condition())
-                          .withName(privilege.name())
-                          .build();
-                    })
-                .toArray(PrivilegeDTO[]::new))
+        .withPrivileges(toPrivileges(securableObject.privileges()).toArray(new 
PrivilegeDTO[0]))
         .build();
   }
 
+  static List<PrivilegeDTO> toPrivileges(List<Privilege> privileges) {
+    return privileges.stream()
+        .map(
+            privilege ->
+                PrivilegeDTO.builder()
+                    .withCondition(privilege.condition())
+                    .withName(privilege.name())
+                    .build())
+        .collect(Collectors.toList());
+  }
+
   static TagUpdateRequest toTagUpdateRequest(TagChange change) {
     if (change instanceof TagChange.RenameTag) {
       return new TagUpdateRequest.RenameTagRequest(((TagChange.RenameTag) 
change).getNewName());
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
index 7414f4c4e..5ab440092 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
@@ -31,6 +31,7 @@ import 
org.apache.gravitino.exceptions.CatalogAlreadyExistsException;
 import org.apache.gravitino.exceptions.ConnectionFailedException;
 import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
 import org.apache.gravitino.exceptions.GroupAlreadyExistsException;
+import org.apache.gravitino.exceptions.IllegalPrivilegeException;
 import org.apache.gravitino.exceptions.MetalakeAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
@@ -626,7 +627,11 @@ public class ErrorHandlers {
 
       switch (errorResponse.getCode()) {
         case ErrorConstants.ILLEGAL_ARGUMENTS_CODE:
-          throw new IllegalArgumentException(errorMessage);
+          if 
(errorResponse.getType().equals(IllegalPrivilegeException.class.getSimpleName()))
 {
+            throw new IllegalPrivilegeException(errorMessage);
+          } else {
+            throw new IllegalArgumentException(errorMessage);
+          }
 
         case ErrorConstants.NOT_FOUND_CODE:
           if 
(errorResponse.getType().equals(NoSuchMetalakeException.class.getSimpleName())) 
{
@@ -669,7 +674,11 @@ public class ErrorHandlers {
 
       switch (errorResponse.getCode()) {
         case ErrorConstants.ILLEGAL_ARGUMENTS_CODE:
-          throw new IllegalArgumentException(errorMessage);
+          if 
(errorResponse.getType().equals(IllegalPrivilegeException.class.getSimpleName()))
 {
+            throw new IllegalPrivilegeException(errorMessage);
+          } else {
+            throw new IllegalArgumentException(errorMessage);
+          }
 
         case ErrorConstants.NOT_FOUND_CODE:
           if 
(errorResponse.getType().equals(NoSuchMetalakeException.class.getSimpleName())) 
{
@@ -680,6 +689,10 @@ public class ErrorHandlers {
             throw new NoSuchGroupException(errorMessage);
           } else if 
(errorResponse.getType().equals(NoSuchRoleException.class.getSimpleName())) {
             throw new NoSuchRoleException(errorMessage);
+          } else if (errorResponse
+              .getType()
+              .equals(NoSuchMetadataObjectException.class.getSimpleName())) {
+            throw new NoSuchMetadataObjectException(errorMessage);
           } else {
             throw new NotFoundException(errorMessage);
           }
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoClient.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoClient.java
index a8a46ff8f..217b8b35a 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoClient.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoClient.java
@@ -29,11 +29,13 @@ import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.SupportsCatalogs;
 import org.apache.gravitino.authorization.Group;
 import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privilege;
 import org.apache.gravitino.authorization.Role;
 import org.apache.gravitino.authorization.SecurableObject;
 import org.apache.gravitino.authorization.User;
 import org.apache.gravitino.exceptions.CatalogAlreadyExistsException;
 import org.apache.gravitino.exceptions.GroupAlreadyExistsException;
+import org.apache.gravitino.exceptions.IllegalPrivilegeException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchGroupException;
 import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
@@ -389,6 +391,47 @@ public class GravitinoClient extends GravitinoClientBase
     return getMetalake().listRoleNames();
   }
 
+  /**
+   * Grant privileges to a role.
+   *
+   * @param role The name of the role.
+   * @param privileges The privileges to grant.
+   * @param object The object is associated with privileges to grant.
+   * @return The role after granted.
+   * @throws NoSuchRoleException If the role with the given name does not 
exist.
+   * @throws NoSuchMetadataObjectException If the metadata object with the 
given name does not
+   *     exist.
+   * @throws NoSuchMetalakeException If the Metalake with the given name does 
not exist.
+   * @throws IllegalPrivilegeException If any privilege can't be bind to the 
metadata object.
+   * @throws RuntimeException If granting roles to a role encounters storage 
issues.
+   */
+  public Role grantPrivilegesToRole(String role, MetadataObject object, 
List<Privilege> privileges)
+      throws NoSuchRoleException, NoSuchMetadataObjectException, 
NoSuchMetalakeException,
+          IllegalPrivilegeException {
+    return getMetalake().grantPrivilegesToRole(role, object, privileges);
+  }
+
+  /**
+   * Revoke privileges from a role.
+   *
+   * @param role The name of the role.
+   * @param privileges The privileges to revoke.
+   * @param object The object is associated with privileges to revoke.
+   * @return The role after revoked.
+   * @throws NoSuchRoleException If the role with the given name does not 
exist.
+   * @throws NoSuchMetadataObjectException If the metadata object with the 
given name does not
+   *     exist.
+   * @throws NoSuchMetalakeException If the Metalake with the given name does 
not exist.
+   * @throws IllegalPrivilegeException If any privilege can't be bind to the 
metadata object.
+   * @throws RuntimeException If revoking privileges from a role encounters 
storage issues.
+   */
+  public Role revokePrivilegesFromRole(
+      String role, MetadataObject object, List<Privilege> privileges)
+      throws NoSuchRoleException, NoSuchMetadataObjectException, 
NoSuchMetalakeException,
+          IllegalPrivilegeException {
+    return getMetalake().revokePrivilegesFromRole(role, object, privileges);
+  }
+
   /**
    * Creates a new builder for constructing a GravitinoClient.
    *
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoMetalake.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoMetalake.java
index 4905681b7..46f75aa01 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoMetalake.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoMetalake.java
@@ -37,6 +37,7 @@ import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.SupportsCatalogs;
 import org.apache.gravitino.authorization.Group;
 import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privilege;
 import org.apache.gravitino.authorization.Role;
 import org.apache.gravitino.authorization.SecurableObject;
 import org.apache.gravitino.authorization.SupportsRoles;
@@ -49,6 +50,8 @@ import org.apache.gravitino.dto.requests.CatalogUpdateRequest;
 import org.apache.gravitino.dto.requests.CatalogUpdatesRequest;
 import org.apache.gravitino.dto.requests.GroupAddRequest;
 import org.apache.gravitino.dto.requests.OwnerSetRequest;
+import org.apache.gravitino.dto.requests.PrivilegeGrantRequest;
+import org.apache.gravitino.dto.requests.PrivilegeRevokeRequest;
 import org.apache.gravitino.dto.requests.RoleCreateRequest;
 import org.apache.gravitino.dto.requests.RoleGrantRequest;
 import org.apache.gravitino.dto.requests.RoleRevokeRequest;
@@ -75,6 +78,7 @@ import org.apache.gravitino.dto.responses.UserListResponse;
 import org.apache.gravitino.dto.responses.UserResponse;
 import org.apache.gravitino.exceptions.CatalogAlreadyExistsException;
 import org.apache.gravitino.exceptions.GroupAlreadyExistsException;
+import org.apache.gravitino.exceptions.IllegalPrivilegeException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchGroupException;
 import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
@@ -894,6 +898,89 @@ public class GravitinoMetalake extends MetalakeDTO
     return resp.getGroup();
   }
 
+  /**
+   * Grant privileges to a role.
+   *
+   * @param role The name of the role.
+   * @param privileges The privileges to grant.
+   * @param object The object is associated with privileges to grant.
+   * @return The role after granted.
+   * @throws NoSuchRoleException If the role with the given name does not 
exist.
+   * @throws NoSuchMetadataObjectException If the metadata object with the 
given name does not
+   *     exist.
+   * @throws NoSuchMetalakeException If the Metalake with the given name does 
not exist.
+   * @throws IllegalPrivilegeException If any privilege can't be bind to the 
metadata object.
+   * @throws RuntimeException If granting privileges to a role encounters 
storage issues.
+   */
+  public Role grantPrivilegesToRole(String role, MetadataObject object, 
List<Privilege> privileges)
+      throws NoSuchRoleException, NoSuchMetadataObjectException, 
NoSuchMetalakeException,
+          IllegalPrivilegeException {
+    PrivilegeGrantRequest request =
+        new PrivilegeGrantRequest(DTOConverters.toPrivileges(privileges));
+    request.validate();
+
+    RoleResponse resp =
+        restClient.put(
+            String.format(
+                API_PERMISSION_PATH,
+                this.name(),
+                String.format(
+                    "roles/%s/%s/%s/grant",
+                    RESTUtils.encodeString(role),
+                    object.type().name().toLowerCase(Locale.ROOT),
+                    object.fullName())),
+            request,
+            RoleResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.permissionOperationErrorHandler());
+
+    resp.validate();
+
+    return resp.getRole();
+  }
+
+  /**
+   * Revoke privileges from a role.
+   *
+   * @param role The name of the role.
+   * @param privileges The privileges to revoke.
+   * @param object The object is associated with privileges to revoke.
+   * @return The role after revoked.
+   * @throws NoSuchRoleException If the role with the given name does not 
exist.
+   * @throws NoSuchMetadataObjectException If the metadata object with the 
given name does not
+   *     exist.
+   * @throws NoSuchMetalakeException If the Metalake with the given name does 
not exist.
+   * @throws IllegalPrivilegeException If any privilege can't be bind to the 
metadata object.
+   * @throws RuntimeException If revoking privileges from a role encounters 
storage issues.
+   */
+  public Role revokePrivilegesFromRole(
+      String role, MetadataObject object, List<Privilege> privileges)
+      throws NoSuchRoleException, NoSuchMetadataObjectException, 
NoSuchMetalakeException,
+          IllegalPrivilegeException {
+    PrivilegeRevokeRequest request =
+        new PrivilegeRevokeRequest(DTOConverters.toPrivileges(privileges));
+    request.validate();
+
+    RoleResponse resp =
+        restClient.put(
+            String.format(
+                API_PERMISSION_PATH,
+                this.name(),
+                String.format(
+                    "roles/%s/%s/%s/revoke",
+                    RESTUtils.encodeString(role),
+                    object.type().name().toLowerCase(Locale.ROOT),
+                    object.fullName())),
+            request,
+            RoleResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.permissionOperationErrorHandler());
+
+    resp.validate();
+
+    return resp.getRole();
+  }
+
   /**
    * Get the owner of a metadata object.
    *
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestPermission.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestPermission.java
index 70912e9b5..006a47b43 100644
--- 
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestPermission.java
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestPermission.java
@@ -24,17 +24,28 @@ import static 
org.apache.hc.core5.http.HttpStatus.SC_SERVER_ERROR;
 import com.google.common.collect.Lists;
 import java.time.Instant;
 import java.util.List;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.authorization.Group;
+import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.SecurableObject;
 import org.apache.gravitino.authorization.User;
 import org.apache.gravitino.dto.AuditDTO;
 import org.apache.gravitino.dto.MetalakeDTO;
 import org.apache.gravitino.dto.authorization.GroupDTO;
+import org.apache.gravitino.dto.authorization.PrivilegeDTO;
+import org.apache.gravitino.dto.authorization.RoleDTO;
+import org.apache.gravitino.dto.authorization.SecurableObjectDTO;
 import org.apache.gravitino.dto.authorization.UserDTO;
+import org.apache.gravitino.dto.requests.PrivilegeRevokeRequest;
 import org.apache.gravitino.dto.requests.RoleGrantRequest;
 import org.apache.gravitino.dto.requests.RoleRevokeRequest;
 import org.apache.gravitino.dto.responses.ErrorResponse;
 import org.apache.gravitino.dto.responses.GroupResponse;
 import org.apache.gravitino.dto.responses.MetalakeResponse;
+import org.apache.gravitino.dto.responses.RoleResponse;
 import org.apache.gravitino.dto.responses.UserResponse;
 import org.apache.hc.core5.http.HttpStatus;
 import org.apache.hc.core5.http.Method;
@@ -174,8 +185,112 @@ public class TestPermission extends TestBase {
 
     // test Exception
     ErrorResponse errResp = ErrorResponse.internalError("internal error");
-    buildMockResource(Method.DELETE, groupPath, null, errResp, 
SC_SERVER_ERROR);
+    buildMockResource(Method.PUT, groupPath, null, errResp, SC_SERVER_ERROR);
     Assertions.assertThrows(
         RuntimeException.class, () -> 
gravitinoClient.revokeRolesFromGroup(roles, group));
   }
+
+  @Test
+  public void testGrantPrivilegeToRole() throws Exception {
+    String role = "role";
+    String rolePath =
+        String.format(
+            API_PERMISSION_PATH,
+            metalakeName,
+            String.format("roles/%s/%s/%s/grant", role, "metalake", 
metalakeName));
+    RoleDTO roleDTO =
+        RoleDTO.builder()
+            .withName(role)
+            .withSecurableObjects(
+                new SecurableObjectDTO[] {
+                  SecurableObjectDTO.builder()
+                      .withFullName(metalakeName)
+                      .withType(MetadataObject.Type.METALAKE)
+                      .withPrivileges(
+                          new PrivilegeDTO[] {
+                            PrivilegeDTO.builder()
+                                .withName(Privilege.Name.CREATE_TABLE)
+                                .withCondition(Privilege.Condition.ALLOW)
+                                .build()
+                          })
+                      .build()
+                })
+            
.withAudit(AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+    RoleResponse response = new RoleResponse(roleDTO);
+
+    PrivilegeRevokeRequest request =
+        new PrivilegeRevokeRequest(
+            Lists.newArrayList(
+                PrivilegeDTO.builder()
+                    .withName(Privilege.Name.CREATE_TABLE)
+                    .withCondition(Privilege.Condition.ALLOW)
+                    .build()));
+
+    buildMockResource(Method.PUT, rolePath, request, response, SC_OK);
+    MetadataObject object = MetadataObjects.of(null, metalakeName, 
MetadataObject.Type.METALAKE);
+    Role grantedRole =
+        gravitinoClient.grantPrivilegesToRole(
+            role, object, Lists.newArrayList(Privileges.CreateTable.allow()));
+    Assertions.assertEquals(grantedRole.name(), role);
+    Assertions.assertEquals(1, grantedRole.securableObjects().size());
+    SecurableObject securableObject = grantedRole.securableObjects().get(0);
+    Assertions.assertEquals(metalakeName, securableObject.name());
+    Assertions.assertEquals(MetadataObject.Type.METALAKE, 
securableObject.type());
+    Assertions.assertEquals(
+        Privilege.Name.CREATE_TABLE, 
securableObject.privileges().get(0).name());
+    Assertions.assertEquals(
+        Privilege.Condition.ALLOW, 
securableObject.privileges().get(0).condition());
+
+    // test Exception
+    ErrorResponse errResp = ErrorResponse.internalError("internal error");
+    buildMockResource(Method.PUT, rolePath, null, errResp, SC_SERVER_ERROR);
+    Assertions.assertThrows(
+        RuntimeException.class,
+        () ->
+            gravitinoClient.grantPrivilegesToRole(
+                role, object, 
Lists.newArrayList(Privileges.CreateTable.allow())));
+  }
+
+  @Test
+  public void testRevokePrivilegeFromRole() throws Exception {
+    String role = "role";
+    String rolePath =
+        String.format(
+            API_PERMISSION_PATH,
+            metalakeName,
+            String.format("roles/%s/%s/%s/revoke", role, "metalake", 
metalakeName));
+    RoleDTO roleDTO =
+        RoleDTO.builder()
+            .withName(role)
+            
.withAudit(AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .withSecurableObjects(new SecurableObjectDTO[0])
+            .build();
+    RoleResponse response = new RoleResponse(roleDTO);
+
+    PrivilegeRevokeRequest request =
+        new PrivilegeRevokeRequest(
+            Lists.newArrayList(
+                PrivilegeDTO.builder()
+                    .withName(Privilege.Name.CREATE_TABLE)
+                    .withCondition(Privilege.Condition.ALLOW)
+                    .build()));
+
+    buildMockResource(Method.PUT, rolePath, request, response, SC_OK);
+    MetadataObject object = MetadataObjects.of(null, metalakeName, 
MetadataObject.Type.METALAKE);
+    Role revokedRole =
+        gravitinoClient.revokePrivilegesFromRole(
+            role, object, Lists.newArrayList(Privileges.CreateTable.allow()));
+    Assertions.assertEquals(revokedRole.name(), role);
+    Assertions.assertTrue(revokedRole.securableObjects().isEmpty());
+
+    // test Exception
+    ErrorResponse errResp = ErrorResponse.internalError("internal error");
+    buildMockResource(Method.PUT, rolePath, null, errResp, SC_SERVER_ERROR);
+    Assertions.assertThrows(
+        RuntimeException.class,
+        () ->
+            gravitinoClient.revokePrivilegesFromRole(
+                role, object, 
Lists.newArrayList(Privileges.CreateTable.allow())));
+  }
 }
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/AccessControlIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/AccessControlIT.java
index 76cd938fa..e62cebcfd 100644
--- 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/AccessControlIT.java
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/AccessControlIT.java
@@ -28,6 +28,8 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.Configs;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Schema;
 import org.apache.gravitino.auth.AuthConstants;
@@ -40,6 +42,7 @@ import org.apache.gravitino.authorization.SecurableObjects;
 import org.apache.gravitino.authorization.User;
 import org.apache.gravitino.client.GravitinoMetalake;
 import org.apache.gravitino.exceptions.GroupAlreadyExistsException;
+import org.apache.gravitino.exceptions.IllegalPrivilegeException;
 import org.apache.gravitino.exceptions.NoSuchGroupException;
 import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
 import org.apache.gravitino.exceptions.NoSuchRoleException;
@@ -214,6 +217,47 @@ public class AccessControlIT extends AbstractIT {
         NoSuchMetadataObjectException.class,
         () -> metalake.createRole("not-existed", properties, 
Lists.newArrayList(catalogObject)));
 
+    // Create a role with duplicated securable objects
+    SecurableObject duplicatedMetalakeObject =
+        SecurableObjects.ofMetalake(
+            metalakeName, 
Lists.newArrayList(Privileges.CreateCatalog.allow()));
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            metalake.createRole(
+                roleName,
+                properties,
+                Lists.newArrayList(metalakeObject, duplicatedMetalakeObject)));
+
+    // Create a role with wrong privilege
+    SecurableObject wrongPrivilegeObject =
+        SecurableObjects.ofCatalog("wrong", 
Lists.newArrayList(Privileges.CreateCatalog.allow()));
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            metalake.createRole(
+                "not-existed", properties, 
Lists.newArrayList(wrongPrivilegeObject)));
+
+    // Create a role with wrong privilege
+    SecurableObject wrongCatalogObject =
+        SecurableObjects.ofCatalog(
+            "fileset_catalog", 
Lists.newArrayList(Privileges.SelectTable.allow()));
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            metalake.createRole("not-existed", properties, 
Lists.newArrayList(wrongCatalogObject)));
+
+    // Create a role with duplicated privilege
+    SecurableObject duplicatedCatalogObject =
+        SecurableObjects.ofCatalog(
+            "fileset_catalog",
+            Lists.newArrayList(Privileges.SelectTable.allow(), 
Privileges.SelectTable.deny()));
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            metalake.createRole(
+                "not-existed", properties, 
Lists.newArrayList(duplicatedCatalogObject)));
+
     // Get a role
     role = metalake.getRole(roleName);
 
@@ -282,9 +326,15 @@ public class AccessControlIT extends AbstractIT {
     Assertions.assertEquals(createdPrivilege.name(), 
Privilege.Name.CREATE_CATALOG);
     Assertions.assertEquals(createdPrivilege.condition(), 
Privilege.Condition.ALLOW);
 
+    // create empty objects role
+    Role emptyObjectsRole = metalake.createRole("empty", properties, 
Collections.emptyList());
+    Assertions.assertEquals("empty", emptyObjectsRole.name());
+    Assertions.assertEquals(properties, role.properties());
+
     // Delete a role
     Assertions.assertTrue(metalake.deleteRole(roleName));
     Assertions.assertFalse(metalake.deleteRole(roleName));
+    metalake.deleteRole(emptyObjectsRole.name());
   }
 
   @Test
@@ -397,6 +447,99 @@ public class AccessControlIT extends AbstractIT {
     metalake.deleteRole(roleName);
   }
 
+  @Test
+  void testManageRolePermissions() {
+    String roleName = "role#123";
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("k1", "v1");
+    metalake.createRole(roleName, properties, Lists.newArrayList());
+    MetadataObject metadataObject =
+        MetadataObjects.of(null, metalakeName, MetadataObject.Type.METALAKE);
+
+    // grant a privilege
+    Role role =
+        metalake.grantPrivilegesToRole(
+            roleName, metadataObject, 
Lists.newArrayList(Privileges.CreateCatalog.allow()));
+    Assertions.assertEquals(1, role.securableObjects().size());
+
+    // grant a wrong privilege
+    MetadataObject catalog = MetadataObjects.of(null, "catalog", 
MetadataObject.Type.CATALOG);
+    Assertions.assertThrows(
+        IllegalPrivilegeException.class,
+        () ->
+            metalake.grantPrivilegesToRole(
+                roleName, catalog, 
Lists.newArrayList(Privileges.CreateCatalog.allow())));
+
+    // grant a wrong catalog type privilege
+    MetadataObject wrongCatalog =
+        MetadataObjects.of(null, "fileset_catalog", 
MetadataObject.Type.CATALOG);
+    Assertions.assertThrows(
+        IllegalPrivilegeException.class,
+        () ->
+            metalake.grantPrivilegesToRole(
+                roleName, wrongCatalog, 
Lists.newArrayList(Privileges.SelectTable.allow())));
+
+    // grant a duplicated privilege
+    MetadataObject duplicatedCatalog =
+        MetadataObjects.of(null, "fileset_catalog", 
MetadataObject.Type.CATALOG);
+    Assertions.assertThrows(
+        IllegalPrivilegeException.class,
+        () ->
+            metalake.grantPrivilegesToRole(
+                roleName,
+                duplicatedCatalog,
+                Lists.newArrayList(Privileges.SelectTable.allow(), 
Privileges.SelectTable.deny())));
+
+    // repeat to grant a privilege
+    role =
+        metalake.grantPrivilegesToRole(
+            roleName, metadataObject, 
Lists.newArrayList(Privileges.CreateCatalog.allow()));
+    Assertions.assertEquals(1, role.securableObjects().size());
+
+    // grant a not-existing role
+    Assertions.assertThrows(
+        NoSuchRoleException.class,
+        () ->
+            metalake.grantPrivilegesToRole(
+                "not-exist", metadataObject, 
Lists.newArrayList(Privileges.CreateCatalog.allow())));
+
+    // revoke a privilege
+    role =
+        metalake.revokePrivilegesFromRole(
+            roleName, metadataObject, 
Lists.newArrayList(Privileges.CreateCatalog.allow()));
+    Assertions.assertTrue(role.securableObjects().isEmpty());
+
+    // revoke a wrong privilege
+    Assertions.assertThrows(
+        IllegalPrivilegeException.class,
+        () ->
+            metalake.revokePrivilegesFromRole(
+                roleName, catalog, 
Lists.newArrayList(Privileges.CreateCatalog.allow())));
+
+    // revoke a wrong catalog type privilege
+    Assertions.assertThrows(
+        IllegalPrivilegeException.class,
+        () ->
+            metalake.revokePrivilegesFromRole(
+                roleName, wrongCatalog, 
Lists.newArrayList(Privileges.SelectTable.allow())));
+
+    // repeat to revoke a privilege
+    role =
+        metalake.revokePrivilegesFromRole(
+            roleName, metadataObject, 
Lists.newArrayList(Privileges.CreateCatalog.allow()));
+    Assertions.assertTrue(role.securableObjects().isEmpty());
+
+    // revoke a not-existing role
+    Assertions.assertThrows(
+        NoSuchRoleException.class,
+        () ->
+            metalake.revokePrivilegesFromRole(
+                "not-exist", metadataObject, 
Lists.newArrayList(Privileges.CreateCatalog.allow())));
+
+    // Cleanup
+    metalake.deleteRole(roleName);
+  }
+
   private static void assertSecurableObjects(
       List<SecurableObject> expect, List<SecurableObject> actual) {
     Assertions.assertEquals(expect.size(), actual.size());
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/requests/PrivilegeGrantRequest.java
 
b/common/src/main/java/org/apache/gravitino/dto/requests/PrivilegeGrantRequest.java
new file mode 100644
index 000000000..26122ae51
--- /dev/null
+++ 
b/common/src/main/java/org/apache/gravitino/dto/requests/PrivilegeGrantRequest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.requests;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.extern.jackson.Jacksonized;
+import org.apache.gravitino.dto.authorization.PrivilegeDTO;
+import org.apache.gravitino.rest.RESTRequest;
+
+/** Request to grant a privilege to a role. */
+@Getter
+@EqualsAndHashCode
+@ToString
+@Builder
+@Jacksonized
+public class PrivilegeGrantRequest implements RESTRequest {
+
+  @JsonProperty("privileges")
+  private final List<PrivilegeDTO> privileges;
+
+  /**
+   * Constructor for privilegeGrantRequest.
+   *
+   * @param privileges The privileges for the PrivilegeGrantRequest.
+   */
+  public PrivilegeGrantRequest(List<PrivilegeDTO> privileges) {
+    this.privileges = privileges;
+  }
+
+  /** Default constructor for PrivilegeGrantRequest. */
+  public PrivilegeGrantRequest() {
+    this(null);
+  }
+
+  /**
+   * Validates the fields of the request.
+   *
+   * @throws IllegalArgumentException if the privileges field is not set or 
empty.
+   */
+  @Override
+  public void validate() throws IllegalArgumentException {
+    Preconditions.checkArgument(
+        privileges != null && !privileges.isEmpty(),
+        "\"privileges\" field is required and cannot be empty");
+  }
+}
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/requests/PrivilegeRevokeRequest.java
 
b/common/src/main/java/org/apache/gravitino/dto/requests/PrivilegeRevokeRequest.java
new file mode 100644
index 000000000..10da859d0
--- /dev/null
+++ 
b/common/src/main/java/org/apache/gravitino/dto/requests/PrivilegeRevokeRequest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.requests;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.extern.jackson.Jacksonized;
+import org.apache.gravitino.dto.authorization.PrivilegeDTO;
+import org.apache.gravitino.rest.RESTRequest;
+
+/** Request to revoke a privilege from a role. */
+@Getter
+@EqualsAndHashCode
+@ToString
+@Builder
+@Jacksonized
+public class PrivilegeRevokeRequest implements RESTRequest {
+
+  @JsonProperty("privileges")
+  private final List<PrivilegeDTO> privileges;
+
+  /**
+   * Constructor for privilegeRevokeRequest.
+   *
+   * @param privileges The privileges for the PrivilegeRevokeRequest.
+   */
+  public PrivilegeRevokeRequest(List<PrivilegeDTO> privileges) {
+    this.privileges = privileges;
+  }
+
+  /** Default constructor for PrivilegeRevokeRequest. */
+  public PrivilegeRevokeRequest() {
+    this(null);
+  }
+
+  /**
+   * Validates the fields of the request.
+   *
+   * @throws IllegalArgumentException if the privileges field is not set or 
empty.
+   */
+  @Override
+  public void validate() throws IllegalArgumentException {
+    Preconditions.checkArgument(
+        privileges != null && !privileges.isEmpty(),
+        "\"privileges\" field is required and cannot be empty");
+  }
+}
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/requests/RoleCreateRequest.java 
b/common/src/main/java/org/apache/gravitino/dto/requests/RoleCreateRequest.java
index b4c3f24ae..9d85c0c6e 100644
--- 
a/common/src/main/java/org/apache/gravitino/dto/requests/RoleCreateRequest.java
+++ 
b/common/src/main/java/org/apache/gravitino/dto/requests/RoleCreateRequest.java
@@ -78,9 +78,6 @@ public class RoleCreateRequest implements RESTRequest {
   public void validate() throws IllegalArgumentException {
     Preconditions.checkArgument(
         StringUtils.isNotBlank(name), "\"name\" field is required and cannot 
be empty");
-
-    Preconditions.checkArgument(
-        securableObjects != null && securableObjects.length != 0,
-        "\"securableObjects\" can't null or empty");
+    Preconditions.checkArgument(securableObjects != null, 
"\"securableObjects\" can't null ");
   }
 }
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/responses/ErrorResponse.java 
b/common/src/main/java/org/apache/gravitino/dto/responses/ErrorResponse.java
index f38fabc81..86e619bc7 100644
--- a/common/src/main/java/org/apache/gravitino/dto/responses/ErrorResponse.java
+++ b/common/src/main/java/org/apache/gravitino/dto/responses/ErrorResponse.java
@@ -119,11 +119,20 @@ public class ErrorResponse extends BaseResponse {
    * @return The new instance.
    */
   public static ErrorResponse illegalArguments(String message, Throwable 
throwable) {
+    return illegalArguments(IllegalArgumentException.class.getSimpleName(), 
message, throwable);
+  }
+
+  /**
+   * Create a new illegal arguments error instance of {@link ErrorResponse}.
+   *
+   * @param type The type of the error.
+   * @param message The message of the error.
+   * @param throwable The throwable that caused the error.
+   * @return The new instance.
+   */
+  public static ErrorResponse illegalArguments(String type, String message, 
Throwable throwable) {
     return new ErrorResponse(
-        ErrorConstants.ILLEGAL_ARGUMENTS_CODE,
-        IllegalArgumentException.class.getSimpleName(),
-        message,
-        getStackTrace(throwable));
+        ErrorConstants.ILLEGAL_ARGUMENTS_CODE, type, message, 
getStackTrace(throwable));
   }
 
   /**
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/AccessControlDispatcher.java
 
b/core/src/main/java/org/apache/gravitino/authorization/AccessControlDispatcher.java
index a3919512b..73004280b 100644
--- 
a/core/src/main/java/org/apache/gravitino/authorization/AccessControlDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/AccessControlDispatcher.java
@@ -278,4 +278,34 @@ public interface AccessControlDispatcher {
    */
   String[] listRoleNamesByObject(String metalake, MetadataObject object)
       throws NoSuchMetalakeException, NoSuchMetadataObjectException;
+
+  /**
+   * Grant privileges to a role.
+   *
+   * @param role The name of the role.
+   * @param privileges The privileges to grant.
+   * @param object The object is associated with privileges to grant.
+   * @return The role after granted.
+   * @throws NoSuchRoleException If the role with the given name does not 
exist.
+   * @throws NoSuchMetalakeException If the Metalake with the given name does 
not exist.
+   * @throws RuntimeException If granting roles to a role encounters storage 
issues.
+   */
+  Role grantPrivilegeToRole(
+      String metalake, String role, MetadataObject object, List<Privilege> 
privileges)
+      throws NoSuchGroupException, NoSuchRoleException;
+
+  /**
+   * Revoke privileges from a role.
+   *
+   * @param role The name of the role.
+   * @param privileges The privileges to revoke.
+   * @param object The object is associated with privileges to revoke.
+   * @return The role after revoked.
+   * @throws NoSuchRoleException If the role with the given name does not 
exist.
+   * @throws NoSuchMetalakeException If the Metalake with the given name does 
not exist.
+   * @throws RuntimeException If revoking privileges from a role encounters 
storage issues.
+   */
+  Role revokePrivilegesFromRole(
+      String metalake, String role, MetadataObject object, List<Privilege> 
privileges)
+      throws NoSuchMetalakeException, NoSuchRoleException;
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/AccessControlManager.java
 
b/core/src/main/java/org/apache/gravitino/authorization/AccessControlManager.java
index 75b0f9f1e..c9adf314a 100644
--- 
a/core/src/main/java/org/apache/gravitino/authorization/AccessControlManager.java
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/AccessControlManager.java
@@ -150,6 +150,7 @@ public class AccessControlManager implements 
AccessControlDispatcher {
     return roleManager.getRole(metalake, role);
   }
 
+  @Override
   public boolean deleteRole(String metalake, String role) throws 
NoSuchMetalakeException {
     return roleManager.deleteRole(metalake, role);
   }
@@ -164,4 +165,18 @@ public class AccessControlManager implements 
AccessControlDispatcher {
       throws NoSuchMetalakeException, NoSuchMetadataObjectException {
     return roleManager.listRoleNamesByObject(metalake, object);
   }
+
+  @Override
+  public Role grantPrivilegeToRole(
+      String metalake, String role, MetadataObject object, List<Privilege> 
privileges)
+      throws NoSuchRoleException, NoSuchMetalakeException {
+    return permissionManager.grantPrivilegesToRole(metalake, role, object, 
privileges);
+  }
+
+  @Override
+  public Role revokePrivilegesFromRole(
+      String metalake, String role, MetadataObject object, List<Privilege> 
privileges)
+      throws NoSuchRoleException, NoSuchMetalakeException {
+    return permissionManager.revokePrivilegesFromRole(metalake, role, object, 
privileges);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
index 5a4a62cd6..66019ee04 100644
--- 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
@@ -22,6 +22,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.collect.Sets;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 import java.util.function.Consumer;
@@ -38,6 +39,7 @@ import org.apache.gravitino.connector.BaseCatalog;
 import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
 import org.apache.gravitino.dto.authorization.PrivilegeDTO;
 import org.apache.gravitino.dto.util.DTOConverters;
+import org.apache.gravitino.exceptions.IllegalPrivilegeException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
@@ -54,6 +56,7 @@ public class AuthorizationUtils {
   static final String ROLE_DOES_NOT_EXIST_MSG = "Role %s does not exist in th 
metalake %s";
   private static final Logger LOG = 
LoggerFactory.getLogger(AuthorizationUtils.class);
   private static final String METALAKE_DOES_NOT_EXIST_MSG = "Metalake %s does 
not exist";
+
   private static final Set<Privilege.Name> FILESET_PRIVILEGES =
       Sets.immutableEnumSet(
           Privilege.Name.CREATE_FILESET, Privilege.Name.WRITE_FILESET, 
Privilege.Name.READ_FILESET);
@@ -220,7 +223,7 @@ public class AuthorizationUtils {
     Supplier<NoSuchMetadataObjectException> exceptionToThrowSupplier =
         () ->
             new NoSuchMetadataObjectException(
-                "Securable object %s doesn't exist", object.fullName());
+                "Securable object %s type %s doesn't exist", 
object.fullName(), object.type());
 
     switch (object.type()) {
       case METALAKE:
@@ -265,14 +268,26 @@ public class AuthorizationUtils {
     }
   }
 
+  public static void checkDuplicatedNamePrivilege(Collection<Privilege> 
privileges) {
+    Set<Privilege.Name> privilegeNameSet = Sets.newHashSet();
+    for (Privilege privilege : privileges) {
+      if (privilegeNameSet.contains(privilege.name())) {
+        throw new IllegalPrivilegeException(
+            "Doesn't support duplicated privilege name %s with different 
condition",
+            privilege.name());
+      }
+      privilegeNameSet.add(privilege.name());
+    }
+  }
+
   public static void checkPrivilege(
       PrivilegeDTO privilegeDTO, MetadataObject object, String metalake) {
     Privilege privilege = DTOConverters.fromPrivilegeDTO(privilegeDTO);
+
     if (!privilege.canBindTo(object.type())) {
-      throw new IllegalArgumentException(
-          String.format(
-              "Securable object %s type %s don't support binding privilege %s",
-              object.fullName(), object.type(), privilege));
+      throw new IllegalPrivilegeException(
+          "Securable object %s type %s don't support binding privilege %s",
+          object.fullName(), object.type(), privilege);
     }
 
     if (object.type() == MetadataObject.Type.CATALOG
@@ -309,10 +324,9 @@ public class AuthorizationUtils {
       NameIdentifier catalogIdent, Catalog.Type type, Privilege privilege) {
     Catalog catalog = 
GravitinoEnv.getInstance().catalogDispatcher().loadCatalog(catalogIdent);
     if (catalog.type() != type) {
-      throw new IllegalArgumentException(
-          String.format(
-              "Catalog %s type %s don't support privilege %s",
-              catalogIdent, catalog.type(), privilege));
+      throw new IllegalPrivilegeException(
+          "Catalog %s type %s doesn't support privilege %s",
+          catalogIdent, catalog.type(), privilege);
     }
   }
 
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/PermissionManager.java 
b/core/src/main/java/org/apache/gravitino/authorization/PermissionManager.java
index edb02cdce..056b18f40 100644
--- 
a/core/src/main/java/org/apache/gravitino/authorization/PermissionManager.java
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/PermissionManager.java
@@ -19,6 +19,7 @@
 package org.apache.gravitino.authorization;
 
 import static 
org.apache.gravitino.authorization.AuthorizationUtils.GROUP_DOES_NOT_EXIST_MSG;
+import static 
org.apache.gravitino.authorization.AuthorizationUtils.ROLE_DOES_NOT_EXIST_MSG;
 import static 
org.apache.gravitino.authorization.AuthorizationUtils.USER_DOES_NOT_EXIST_MSG;
 
 import com.google.common.collect.Lists;
@@ -26,12 +27,15 @@ import java.io.IOException;
 import java.time.Instant;
 import java.util.List;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.exceptions.NoSuchGroupException;
+import org.apache.gravitino.exceptions.NoSuchRoleException;
 import org.apache.gravitino.exceptions.NoSuchUserException;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.GroupEntity;
@@ -373,11 +377,304 @@ class PermissionManager {
     }
   }
 
-  private List<String> toRoleNames(List<RoleEntity> roleEntities) {
-    return 
roleEntities.stream().map(RoleEntity::name).collect(Collectors.toList());
+  Role grantPrivilegesToRole(
+      String metalake, String role, MetadataObject object, List<Privilege> 
privileges) {
+    try {
+      AuthorizationPluginCallbackWrapper authorizationPluginCallbackWrapper =
+          new AuthorizationPluginCallbackWrapper();
+
+      Role updatedRole =
+          store.update(
+              AuthorizationUtils.ofRole(metalake, role),
+              RoleEntity.class,
+              Entity.EntityType.ROLE,
+              roleEntity -> {
+                List<SecurableObject> grantedSecurableObjects =
+                    generateNewSecurableObjects(
+                        roleEntity.securableObjects(),
+                        object,
+                        targetObject -> {
+                          if (targetObject == null) {
+                            return createNewSecurableObject(
+                                metalake,
+                                role,
+                                object,
+                                privileges,
+                                roleEntity,
+                                authorizationPluginCallbackWrapper);
+                          } else {
+                            return updateGrantedSecurableObject(
+                                metalake,
+                                role,
+                                object,
+                                privileges,
+                                roleEntity,
+                                targetObject,
+                                authorizationPluginCallbackWrapper);
+                          }
+                        });
+
+                AuditInfo auditInfo =
+                    AuditInfo.builder()
+                        .withCreator(roleEntity.auditInfo().creator())
+                        .withCreateTime(roleEntity.auditInfo().createTime())
+                        
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+                        .withLastModifiedTime(Instant.now())
+                        .build();
+
+                return RoleEntity.builder()
+                    .withId(roleEntity.id())
+                    .withName(roleEntity.name())
+                    .withNamespace(roleEntity.namespace())
+                    .withProperties(roleEntity.properties())
+                    .withAuditInfo(auditInfo)
+                    .withSecurableObjects(grantedSecurableObjects)
+                    .build();
+              });
+
+      // Execute the authorization plugin callback
+      authorizationPluginCallbackWrapper.execute();
+      return updatedRole;
+    } catch (NoSuchEntityException nse) {
+      LOG.error("Failed to grant, role {} does not exist in the metalake {}", 
role, metalake, nse);
+      throw new NoSuchRoleException(ROLE_DOES_NOT_EXIST_MSG, role, metalake);
+    } catch (IOException ioe) {
+      LOG.error("Grant privileges to {} failed due to storage issues", role, 
ioe);
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  private static SecurableObject updateGrantedSecurableObject(
+      String metalake,
+      String role,
+      MetadataObject object,
+      List<Privilege> privileges,
+      RoleEntity roleEntity,
+      SecurableObject targetObject,
+      AuthorizationPluginCallbackWrapper authorizationPluginCallbackWrapper) {
+    // Removed duplicated privileges by set
+    Set<Privilege> updatePrivileges = Sets.newHashSet();
+    updatePrivileges.addAll(targetObject.privileges());
+    // If old object contains all the privileges to grant, the object doesn't
+    // need to update.
+    if (updatePrivileges.containsAll(privileges)) {
+      return targetObject;
+    } else {
+      updatePrivileges.addAll(privileges);
+      AuthorizationUtils.checkDuplicatedNamePrivilege(privileges);
+
+      SecurableObject newSecurableObject =
+          SecurableObjects.parse(
+              targetObject.fullName(), targetObject.type(), 
Lists.newArrayList(updatePrivileges));
+
+      // We set authorization callback here, we won't execute this callback in 
this place.
+      // We will execute the callback after we execute the SQL transaction.
+      authorizationPluginCallbackWrapper.setCallback(
+          () ->
+              AuthorizationUtils.callAuthorizationPluginForMetadataObject(
+                  metalake,
+                  object,
+                  authorizationPlugin -> {
+                    authorizationPlugin.onRoleUpdated(
+                        roleEntity,
+                        RoleChange.updateSecurableObject(role, targetObject, 
newSecurableObject));
+                  }));
+
+      return newSecurableObject;
+    }
+  }
+
+  Role revokePrivilegesFromRole(
+      String metalake, String role, MetadataObject object, List<Privilege> 
privileges) {
+    try {
+      AuthorizationPluginCallbackWrapper authorizationCallbackWrapper =
+          new AuthorizationPluginCallbackWrapper();
+
+      RoleEntity updatedRole =
+          store.update(
+              AuthorizationUtils.ofRole(metalake, role),
+              RoleEntity.class,
+              Entity.EntityType.ROLE,
+              roleEntity -> {
+                List<SecurableObject> revokedSecurableObjects =
+                    generateNewSecurableObjects(
+                        roleEntity.securableObjects(),
+                        object,
+                        targetObject -> {
+                          // If the securable object doesn't exist, we do 
nothing except for
+                          // logging.
+                          if (targetObject == null) {
+                            LOG.warn(
+                                "Securable object {} type {} doesn't exist in 
the role {}",
+                                object.fullName(),
+                                object.type(),
+                                role);
+                            return null;
+                          } else {
+                            // If the securable object exists, we remove the 
privileges of the
+                            // securable object and will remove duplicated 
privileges at the same
+                            // time.
+                            return updateRevokedSecurableObject(
+                                metalake,
+                                role,
+                                object,
+                                privileges,
+                                roleEntity,
+                                targetObject,
+                                authorizationCallbackWrapper);
+                          }
+                        });
+
+                AuditInfo auditInfo =
+                    AuditInfo.builder()
+                        .withCreator(roleEntity.auditInfo().creator())
+                        .withCreateTime(roleEntity.auditInfo().createTime())
+                        
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+                        .withLastModifiedTime(Instant.now())
+                        .build();
+
+                return RoleEntity.builder()
+                    .withId(roleEntity.id())
+                    .withName(roleEntity.name())
+                    .withNamespace(roleEntity.namespace())
+                    .withProperties(roleEntity.properties())
+                    .withAuditInfo(auditInfo)
+                    .withSecurableObjects(revokedSecurableObjects)
+                    .build();
+              });
+
+      authorizationCallbackWrapper.execute();
+
+      return updatedRole;
+    } catch (NoSuchEntityException nse) {
+      LOG.error("Failed to revoke, role {} does not exist in the metalake {}", 
role, metalake, nse);
+      throw new NoSuchRoleException(ROLE_DOES_NOT_EXIST_MSG, role, metalake);
+    } catch (IOException ioe) {
+      LOG.error("Revoke privileges from {} failed due to storage issues", 
role, ioe);
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  private static SecurableObject createNewSecurableObject(
+      String metalake,
+      String role,
+      MetadataObject object,
+      List<Privilege> privileges,
+      RoleEntity roleEntity,
+      AuthorizationPluginCallbackWrapper authorizationPluginCallbackWrapper) {
+    // Add a new securable object if there doesn't exist the object in the role
+    SecurableObject securableObject =
+        SecurableObjects.parse(object.fullName(), object.type(), 
Lists.newArrayList(privileges));
+
+    // We set authorization callback here, we won't execute this callback in 
this place.
+    // We will execute the callback after we execute the SQL transaction.
+    authorizationPluginCallbackWrapper.setCallback(
+        () ->
+            AuthorizationUtils.callAuthorizationPluginForMetadataObject(
+                metalake,
+                object,
+                authorizationPlugin -> {
+                  authorizationPlugin.onRoleUpdated(
+                      roleEntity, RoleChange.addSecurableObject(role, 
securableObject));
+                }));
+
+    return securableObject;
+  }
+
+  private static SecurableObject updateRevokedSecurableObject(
+      String metalake,
+      String role,
+      MetadataObject object,
+      List<Privilege> privileges,
+      RoleEntity roleEntity,
+      SecurableObject targetObject,
+      AuthorizationPluginCallbackWrapper authorizationCallbackWrapper) {
+    // Use set to deduplicate the privileges
+    Set<Privilege> updatePrivileges = Sets.newHashSet();
+    updatePrivileges.addAll(targetObject.privileges());
+    privileges.forEach(updatePrivileges::remove);
+
+    // If the object still contains privilege, we should update the object
+    // with new privileges
+    if (!updatePrivileges.isEmpty()) {
+      SecurableObject newSecurableObject =
+          SecurableObjects.parse(
+              targetObject.fullName(), targetObject.type(), 
Lists.newArrayList(updatePrivileges));
+
+      // We set authorization callback here, we won't execute this callback in 
this place.
+      // We will execute the callback after we execute the SQL transaction.
+      authorizationCallbackWrapper.setCallback(
+          () ->
+              AuthorizationUtils.callAuthorizationPluginForMetadataObject(
+                  metalake,
+                  object,
+                  authorizationPlugin -> {
+                    authorizationPlugin.onRoleUpdated(
+                        roleEntity,
+                        RoleChange.updateSecurableObject(role, targetObject, 
newSecurableObject));
+                  }));
+
+      return newSecurableObject;
+    } else {
+      // If the object doesn't contain any privilege, we remove this object.
+      // We set authorization callback here, we won't execute this callback in 
this place.
+      // We will execute the callback after we execute the SQL transaction.
+      authorizationCallbackWrapper.setCallback(
+          () ->
+              AuthorizationUtils.callAuthorizationPluginForMetadataObject(
+                  metalake,
+                  object,
+                  authorizationPlugin -> {
+                    authorizationPlugin.onRoleUpdated(
+                        roleEntity, RoleChange.removeSecurableObject(role, 
targetObject));
+                  }));
+      // If we return null, the newly generated objects won't contain this 
object, the storage will
+      // delete this object.
+      return null;
+    }
+  }
+
+  // This method will generate all the securable objects after granting or 
revoking privileges
+  private List<SecurableObject> generateNewSecurableObjects(
+      List<SecurableObject> securableObjects,
+      MetadataObject targetObject,
+      Function<SecurableObject, SecurableObject> objectUpdater) {
+
+    // Find a matched securable object to update privileges
+    List<SecurableObject> updateSecurableObjects = 
Lists.newArrayList(securableObjects);
+    SecurableObject matchedSecurableObject =
+        
securableObjects.stream().filter(targetObject::equals).findFirst().orElse(null);
+    if (matchedSecurableObject != null) {
+      updateSecurableObjects.remove(matchedSecurableObject);
+    }
+
+    // Apply the updates for the matched object
+    SecurableObject newSecurableObject = 
objectUpdater.apply(matchedSecurableObject);
+    if (newSecurableObject != null) {
+      updateSecurableObjects.add(newSecurableObject);
+    }
+    return updateSecurableObjects;
+  }
+
+  private static class AuthorizationPluginCallbackWrapper {
+    private Runnable callback;
+
+    public void setCallback(Runnable callback) {
+      this.callback = callback;
+    }
+
+    public void execute() {
+      if (callback != null) {
+        callback.run();
+      }
+    }
   }
 
   private List<Long> toRoleIds(List<RoleEntity> roleEntities) {
     return 
roleEntities.stream().map(RoleEntity::id).collect(Collectors.toList());
   }
+
+  private List<String> toRoleNames(List<RoleEntity> roleEntities) {
+    return 
roleEntities.stream().map(RoleEntity::name).collect(Collectors.toList());
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/AccessControlHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/AccessControlHookDispatcher.java
index e16974764..36f75cfad 100644
--- 
a/core/src/main/java/org/apache/gravitino/hook/AccessControlHookDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/hook/AccessControlHookDispatcher.java
@@ -28,6 +28,7 @@ import org.apache.gravitino.authorization.AuthorizationUtils;
 import org.apache.gravitino.authorization.Group;
 import org.apache.gravitino.authorization.Owner;
 import org.apache.gravitino.authorization.OwnerManager;
+import org.apache.gravitino.authorization.Privilege;
 import org.apache.gravitino.authorization.Role;
 import org.apache.gravitino.authorization.SecurableObject;
 import org.apache.gravitino.authorization.User;
@@ -180,4 +181,18 @@ public class AccessControlHookDispatcher implements 
AccessControlDispatcher {
       throws NoSuchMetalakeException, NoSuchMetadataObjectException {
     return dispatcher.listRoleNamesByObject(metalake, object);
   }
+
+  @Override
+  public Role grantPrivilegeToRole(
+      String metalake, String role, MetadataObject object, List<Privilege> 
privileges)
+      throws NoSuchMetalakeException, NoSuchRoleException {
+    return dispatcher.grantPrivilegeToRole(metalake, role, object, privileges);
+  }
+
+  @Override
+  public Role revokePrivilegesFromRole(
+      String metalake, String role, MetadataObject object, List<Privilege> 
privileges)
+      throws NoSuchMetalakeException, NoSuchRoleException {
+    return dispatcher.revokePrivilegesFromRole(metalake, role, object, 
privileges);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index c1f72c360..1ed138555 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -176,6 +176,8 @@ public class JDBCBackend implements RelationalBackend {
         return (E) UserMetaService.getInstance().updateUser(ident, updater);
       case GROUP:
         return (E) GroupMetaService.getInstance().updateGroup(ident, updater);
+      case ROLE:
+        return (E) RoleMetaService.getInstance().updateRole(ident, updater);
       case TAG:
         return (E) TagMetaService.getInstance().updateTag(ident, updater);
       default:
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/RoleMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/RoleMetaMapper.java
index 6b155f498..8f0220131 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/RoleMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/RoleMetaMapper.java
@@ -76,6 +76,10 @@ public interface RoleMetaMapper {
       method = "insertRoleMetaOnDuplicateKeyUpdate")
   void insertRoleMetaOnDuplicateKeyUpdate(@Param("roleMeta") RolePO rolePO);
 
+  @UpdateProvider(type = RoleMetaSQLProviderFactory.class, method = 
"updateRoleMeta")
+  Integer updateRoleMeta(
+      @Param("newRoleMeta") RolePO newRolePO, @Param("oldRoleMeta") RolePO 
oldRolePO);
+
   @UpdateProvider(type = RoleMetaSQLProviderFactory.class, method = 
"softDeleteRoleMetaByRoleId")
   void softDeleteRoleMetaByRoleId(Long roleId);
 
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/RoleMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/RoleMetaSQLProviderFactory.java
index 65e68975d..b101af552 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/RoleMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/RoleMetaSQLProviderFactory.java
@@ -84,6 +84,11 @@ public class RoleMetaSQLProviderFactory {
     return getProvider().insertRoleMetaOnDuplicateKeyUpdate(rolePO);
   }
 
+  public static String updateRoleMeta(
+      @Param("newRoleMeta") RolePO newRolePO, @Param("oldRoleMeta") RolePO 
oldRolePO) {
+    return getProvider().updateRoleMeta(newRolePO, oldRolePO);
+  }
+
   public static String softDeleteRoleMetaByRoleId(Long roleId) {
     return getProvider().softDeleteRoleMetaByRoleId(roleId);
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SecurableObjectMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SecurableObjectMapper.java
index e5fa90cc9..a5160f905 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SecurableObjectMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SecurableObjectMapper.java
@@ -45,6 +45,12 @@ public interface SecurableObjectMapper {
   void batchInsertSecurableObjects(
       @Param("securableObjects") List<SecurableObjectPO> securableObjectPOs);
 
+  @UpdateProvider(
+      type = SecurableObjectSQLProviderFactory.class,
+      method = "batchSoftDeleteSecurableObjects")
+  void batchSoftDeleteSecurableObjects(
+      @Param("securableObjects") List<SecurableObjectPO> securableObjectPOs);
+
   @UpdateProvider(
       type = SecurableObjectSQLProviderFactory.class,
       method = "softDeleteSecurableObjectsByRoleId")
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SecurableObjectSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SecurableObjectSQLProviderFactory.java
index 8f3d6d9d4..4f664dabc 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SecurableObjectSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SecurableObjectSQLProviderFactory.java
@@ -57,6 +57,11 @@ public class SecurableObjectSQLProviderFactory {
     return getProvider().batchInsertSecurableObjects(securableObjectPOs);
   }
 
+  public static String batchSoftDeleteSecurableObjects(
+      @Param("securableObjects") List<SecurableObjectPO> securableObjectPOs) {
+    return getProvider().batchSoftDeleteSecurableObjects(securableObjectPOs);
+  }
+
   public static String softDeleteSecurableObjectsByRoleId(@Param("roleId") 
Long roleId) {
     return getProvider().softDeleteSecurableObjectsByRoleId(roleId);
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/RoleMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/RoleMetaBaseSQLProvider.java
index b2dc12137..d28e26f3a 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/RoleMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/RoleMetaBaseSQLProvider.java
@@ -151,6 +151,25 @@ public class RoleMetaBaseSQLProvider {
         + " deleted_at = #{roleMeta.deletedAt}";
   }
 
+  public String updateRoleMeta(
+      @Param("newRoleMeta") RolePO newRolePO, @Param("oldRoleMeta") RolePO 
oldRolePO) {
+    return "UPDATE "
+        + ROLE_TABLE_NAME
+        + " SET role_name = #{newRoleMeta.roleName},"
+        + " metalake_id = #{newRoleMeta.metalakeId},"
+        + " properties = #{newRoleMeta.properties},"
+        + " audit_info = #{newRoleMeta.auditInfo},"
+        + " current_version = #{newRoleMeta.currentVersion},"
+        + " last_version = #{newRoleMeta.lastVersion},"
+        + " deleted_at = #{newRoleMeta.deletedAt}"
+        + " WHERE role_id = #{oldRoleMeta.roleId}"
+        + " AND role_name = #{oldRoleMeta.roleName}"
+        + " AND metalake_id = #{oldRoleMeta.metalakeId}"
+        + " AND current_version = #{oldRoleMeta.currentVersion}"
+        + " AND last_version = #{oldRoleMeta.lastVersion}"
+        + " AND deleted_at = 0";
+  }
+
   public String softDeleteRoleMetaByRoleId(Long roleId) {
     return "UPDATE "
         + ROLE_TABLE_NAME
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/SecurableObjectBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/SecurableObjectBaseSQLProvider.java
index 012d9ed95..5561f7cbb 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/SecurableObjectBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/SecurableObjectBaseSQLProvider.java
@@ -48,6 +48,23 @@ public class SecurableObjectBaseSQLProvider {
         + "</script>";
   }
 
+  public String batchSoftDeleteSecurableObjects(
+      @Param("securableObjects") List<SecurableObjectPO> securableObjectPOs) {
+    return "<script>"
+        + "UPDATE "
+        + SECURABLE_OBJECT_TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE FALSE "
+        + "<foreach collection='securableObjects' item='item' separator=' '>"
+        + " OR (metadata_object_id = #{item.metadataObjectId} AND"
+        + " role_id = #{item.roleId} AND deleted_at = 0 AND"
+        + " privilege_names = #{item.privilegeNames} AND"
+        + " privilege_conditions = #{item.privilegeConditions})"
+        + "</foreach>"
+        + "</script>";
+  }
+
   public String softDeleteSecurableObjectsByRoleId(@Param("roleId") Long 
roleId) {
     return "UPDATE "
         + SECURABLE_OBJECT_TABLE_NAME
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/SecurableObjectPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/SecurableObjectPostgreSQLProvider.java
index e09249c91..20867ee42 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/SecurableObjectPostgreSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/SecurableObjectPostgreSQLProvider.java
@@ -21,9 +21,29 @@ package 
org.apache.gravitino.storage.relational.mapper.provider.postgresql;
 import static 
org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper.ROLE_TABLE_NAME;
 import static 
org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper.SECURABLE_OBJECT_TABLE_NAME;
 
+import java.util.List;
 import 
org.apache.gravitino.storage.relational.mapper.provider.base.SecurableObjectBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.SecurableObjectPO;
+import org.apache.ibatis.annotations.Param;
 
 public class SecurableObjectPostgreSQLProvider extends 
SecurableObjectBaseSQLProvider {
+  public String batchSoftDeleteSecurableObjects(
+      @Param("securableObjects") List<SecurableObjectPO> securableObjectPOs) {
+    return "<script>"
+        + "UPDATE "
+        + SECURABLE_OBJECT_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE FALSE "
+        + "<foreach collection='securableObjects' item='item' separator=' '>"
+        + " OR (metadata_object_id = #{item.metadataObjectId} AND"
+        + " role_id = #{item.roleId} AND deleted_at = 0 AND"
+        + " privilege_names = #{item.privilegeNames} AND"
+        + " privilege_conditions = #{item.privilegeConditions})"
+        + "</foreach>"
+        + "</script>";
+  }
+
   @Override
   public String softDeleteSecurableObjectsByRoleId(Long roleId) {
     return "UPDATE "
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
index 915a14950..0236b01fa 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
@@ -18,12 +18,19 @@
  */
 package org.apache.gravitino.storage.relational.service;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
+import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
@@ -160,6 +167,89 @@ public class RoleMetaService {
     }
   }
 
+  public <E extends Entity & HasIdentifier> RoleEntity updateRole(
+      NameIdentifier identifier, Function<E, E> updater) throws IOException {
+    AuthorizationUtils.checkRole(identifier);
+
+    try {
+      String metalake = identifier.namespace().level(0);
+      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
+
+      RolePO rolePO = getRolePOByMetalakeIdAndName(metalakeId, 
identifier.name());
+      RoleEntity oldRoleEntity =
+          POConverters.fromRolePO(
+              rolePO, listSecurableObjects(rolePO), 
AuthorizationUtils.ofRoleNamespace(metalake));
+
+      RoleEntity newRoleEntity = (RoleEntity) updater.apply((E) oldRoleEntity);
+
+      Preconditions.checkArgument(
+          Objects.equals(oldRoleEntity.id(), newRoleEntity.id()),
+          "The updated role entity id: %s should be same with the role entity 
id before: %s",
+          newRoleEntity.id(),
+          oldRoleEntity.id());
+
+      Set<SecurableObject> oldObjects = new 
HashSet<>(oldRoleEntity.securableObjects());
+      Set<SecurableObject> newObjects = new 
HashSet<>(newRoleEntity.securableObjects());
+      Set<SecurableObject> insertObjects = Sets.difference(newObjects, 
oldObjects);
+      Set<SecurableObject> deleteObjects = Sets.difference(oldObjects, 
newObjects);
+
+      if (insertObjects.isEmpty() && deleteObjects.isEmpty()) {
+        return newRoleEntity;
+      }
+
+      List<SecurableObjectPO> deleteSecurableObjectPOs =
+          toSecurableObjectPOs(deleteObjects, oldRoleEntity, metalakeId);
+
+      List<SecurableObjectPO> insertSecurableObjectPOs =
+          toSecurableObjectPOs(insertObjects, oldRoleEntity, metalakeId);
+
+      SessionUtils.doMultipleWithCommit(
+          () ->
+              SessionUtils.doWithoutCommit(
+                  RoleMetaMapper.class,
+                  mapper ->
+                      mapper.updateRoleMeta(
+                          POConverters.updateRolePOWithVersion(rolePO, 
newRoleEntity), rolePO)),
+          () -> {
+            if (deleteSecurableObjectPOs.isEmpty()) {
+              return;
+            }
+
+            SessionUtils.doWithoutCommit(
+                SecurableObjectMapper.class,
+                mapper -> 
mapper.batchSoftDeleteSecurableObjects(deleteSecurableObjectPOs));
+          },
+          () -> {
+            if (insertSecurableObjectPOs.isEmpty()) {
+              return;
+            }
+
+            SessionUtils.doWithoutCommit(
+                SecurableObjectMapper.class,
+                mapper -> 
mapper.batchInsertSecurableObjects(insertSecurableObjectPOs));
+          });
+
+      return newRoleEntity;
+    } catch (RuntimeException re) {
+      ExceptionUtils.checkSQLException(re, Entity.EntityType.ROLE, 
identifier.toString());
+      throw re;
+    }
+  }
+
+  private List<SecurableObjectPO> toSecurableObjectPOs(
+      Set<SecurableObject> deleteObjects, RoleEntity oldRoleEntity, Long 
metalakeId) {
+    List<SecurableObjectPO> securableObjectPOs = Lists.newArrayList();
+    for (SecurableObject object : deleteObjects) {
+      SecurableObjectPO.Builder objectBuilder =
+          POConverters.initializeSecurablePOBuilderWithVersion(
+              oldRoleEntity.id(), object, getEntityType(object));
+      objectBuilder.withMetadataObjectId(
+          MetadataObjectService.getMetadataObjectId(metalakeId, 
object.fullName(), object.type()));
+      securableObjectPOs.add(objectBuilder.build());
+    }
+    return securableObjectPOs;
+  }
+
   public RoleEntity getRoleByIdentifier(NameIdentifier identifier) {
     AuthorizationUtils.checkRole(identifier);
 
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
index f6392127b..f09f6751c 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
@@ -1069,6 +1069,27 @@ public class POConverters {
     }
   }
 
+  public static RolePO updateRolePOWithVersion(RolePO oldRolePO, RoleEntity 
newRole) {
+    Long lastVersion = oldRolePO.getLastVersion();
+    // TODO: set the version to the last version + 1 when having some fields 
need be multiple
+    // version
+    Long nextVersion = lastVersion;
+    try {
+      return RolePO.builder()
+          .withRoleId(oldRolePO.getRoleId())
+          .withRoleName(newRole.name())
+          .withMetalakeId(oldRolePO.getMetalakeId())
+          
.withProperties(JsonUtils.anyFieldMapper().writeValueAsString(newRole.properties()))
+          
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(newRole.auditInfo()))
+          .withCurrentVersion(nextVersion)
+          .withLastVersion(nextVersion)
+          .withDeletedAt(DEFAULT_DELETED_AT)
+          .build();
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to serialize json object:", e);
+    }
+  }
+
   public static TagEntity fromTagPO(TagPO tagPO, Namespace namespace) {
     try {
       return TagEntity.builder()
diff --git 
a/core/src/test/java/org/apache/gravitino/authorization/TestAccessControlManagerForPermissions.java
 
b/core/src/test/java/org/apache/gravitino/authorization/TestAccessControlManagerForPermissions.java
index ce3c4e90c..e7e792536 100644
--- 
a/core/src/test/java/org/apache/gravitino/authorization/TestAccessControlManagerForPermissions.java
+++ 
b/core/src/test/java/org/apache/gravitino/authorization/TestAccessControlManagerForPermissions.java
@@ -28,11 +28,14 @@ import java.io.IOException;
 import java.time.Instant;
 import java.util.List;
 import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Catalog;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Configs;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.catalog.CatalogManager;
 import org.apache.gravitino.connector.BaseCatalog;
@@ -117,6 +120,34 @@ public class TestAccessControlManagerForPermissions {
           .withAuditInfo(auditInfo)
           .build();
 
+  private static RoleEntity grantedRoleEntity =
+      RoleEntity.builder()
+          .withNamespace(
+              Namespace.of(METALAKE, Entity.SYSTEM_CATALOG_RESERVED_NAME, 
Entity.ROLE_SCHEMA_NAME))
+          .withId(1L)
+          .withName("grantedRole")
+          .withProperties(Maps.newHashMap())
+          .withSecurableObjects(
+              Lists.newArrayList(
+                  SecurableObjects.ofCatalog(
+                      CATALOG, 
Lists.newArrayList(Privileges.UseCatalog.allow()))))
+          .withAuditInfo(auditInfo)
+          .build();
+
+  private static RoleEntity revokedRoleEntity =
+      RoleEntity.builder()
+          .withNamespace(
+              Namespace.of(METALAKE, Entity.SYSTEM_CATALOG_RESERVED_NAME, 
Entity.ROLE_SCHEMA_NAME))
+          .withId(1L)
+          .withName("revokedRole")
+          .withProperties(Maps.newHashMap())
+          .withSecurableObjects(
+              Lists.newArrayList(
+                  SecurableObjects.ofCatalog(
+                      CATALOG, 
Lists.newArrayList(Privileges.UseCatalog.allow()))))
+          .withAuditInfo(auditInfo)
+          .build();
+
   @BeforeAll
   public static void setUp() throws Exception {
     config = new Config(false) {};
@@ -130,6 +161,8 @@ public class TestAccessControlManagerForPermissions {
     entityStore.put(userEntity, true);
     entityStore.put(groupEntity, true);
     entityStore.put(roleEntity, true);
+    entityStore.put(grantedRoleEntity, true);
+    entityStore.put(revokedRoleEntity, true);
 
     accessControlManager = new AccessControlManager(entityStore, new 
RandomIdGenerator(), config);
 
@@ -139,6 +172,8 @@ public class TestAccessControlManagerForPermissions {
     FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager", 
catalogManager, true);
     BaseCatalog catalog = Mockito.mock(BaseCatalog.class);
     Mockito.when(catalogManager.loadCatalog(any())).thenReturn(catalog);
+    Mockito.when(catalogManager.listCatalogsInfo(Mockito.any()))
+        .thenReturn(new Catalog[] {catalog});
     authorizationPlugin = Mockito.mock(AuthorizationPlugin.class);
     
Mockito.when(catalog.getAuthorizationPlugin()).thenReturn(authorizationPlugin);
   }
@@ -309,4 +344,85 @@ public class TestAccessControlManagerForPermissions {
         NoSuchGroupException.class,
         () -> accessControlManager.revokeRolesFromGroup(METALAKE, ROLE, 
notExist));
   }
+
+  @Test
+  public void testGrantPrivilegeToRole() {
+    reset(authorizationPlugin);
+    String notExist = "not-exist";
+
+    Role role =
+        accessControlManager.grantPrivilegeToRole(
+            METALAKE,
+            "grantedRole",
+            MetadataObjects.of(null, METALAKE, MetadataObject.Type.METALAKE),
+            Lists.newArrayList(Privileges.CreateTable.allow()));
+
+    List<SecurableObject> objects = role.securableObjects();
+
+    // Test authorization plugin
+    verify(authorizationPlugin).onRoleUpdated(any(), any());
+
+    Assertions.assertEquals(2, objects.size());
+
+    // Repeat to grant
+    role =
+        accessControlManager.grantPrivilegeToRole(
+            METALAKE,
+            "grantedRole",
+            MetadataObjects.of(null, METALAKE, MetadataObject.Type.METALAKE),
+            Lists.newArrayList(Privileges.CreateTable.allow()));
+    objects = role.securableObjects();
+
+    Assertions.assertEquals(2, objects.size());
+
+    // Throw NoSuchRoleException
+    Assertions.assertThrows(
+        NoSuchRoleException.class,
+        () ->
+            accessControlManager.grantPrivilegeToRole(
+                METALAKE,
+                notExist,
+                MetadataObjects.of(null, METALAKE, 
MetadataObject.Type.METALAKE),
+                Lists.newArrayList(Privileges.CreateTable.allow())));
+  }
+
+  @Test
+  public void testRevokePrivilegeFromRole() {
+    reset(authorizationPlugin);
+    String notExist = "not-exist";
+
+    Role role =
+        accessControlManager.revokePrivilegesFromRole(
+            METALAKE,
+            "revokedRole",
+            MetadataObjects.of(null, CATALOG, MetadataObject.Type.CATALOG),
+            Lists.newArrayList(Privileges.UseCatalog.allow()));
+
+    // Test authorization plugin
+    verify(authorizationPlugin).onRoleUpdated(any(), any());
+
+    List<SecurableObject> objects = role.securableObjects();
+
+    Assertions.assertTrue(objects.isEmpty());
+
+    // repeat to revoke
+    role =
+        accessControlManager.revokePrivilegesFromRole(
+            METALAKE,
+            "revokedRole",
+            MetadataObjects.of(null, CATALOG, MetadataObject.Type.CATALOG),
+            Lists.newArrayList(Privileges.UseCatalog.allow()));
+    objects = role.securableObjects();
+    Assertions.assertTrue(objects.isEmpty());
+
+    // Throw NoSuchRoleException
+    Assertions.assertThrows(
+        NoSuchRoleException.class,
+        () ->
+            accessControlManager.revokePrivilegesFromRole(
+                METALAKE,
+                notExist,
+                MetadataObjects.of(null, METALAKE, 
MetadataObject.Type.METALAKE),
+                Lists.newArrayList(Privileges.CreateTable.allow())));
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestRoleMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestRoleMetaService.java
index 1f818b112..14ba3254d 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestRoleMetaService.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestRoleMetaService.java
@@ -27,8 +27,10 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.Instant;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.function.Function;
 import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.authorization.AuthorizationUtils;
@@ -559,6 +561,180 @@ class TestRoleMetaService extends TestJDBCBackend {
             .isEmpty());
   }
 
+  @Test
+  void testUpdateRole() throws IOException {
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
+    backend.insert(metalake, false);
+
+    CatalogEntity catalog =
+        createCatalog(
+            RandomIdGenerator.INSTANCE.nextId(), Namespace.of("metalake"), 
"catalog", auditInfo);
+    backend.insert(catalog, false);
+
+    RoleMetaService roleMetaService = RoleMetaService.getInstance();
+    RoleEntity roleEntity =
+        createRoleEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            AuthorizationUtils.ofRoleNamespace(metalakeName),
+            "role1",
+            auditInfo,
+            "catalog");
+    roleMetaService.insertRole(roleEntity, false);
+
+    // grant privileges to the role
+    Function<RoleEntity, RoleEntity> grantUpdater =
+        role -> {
+          AuditInfo updateAuditInfo =
+              AuditInfo.builder()
+                  .withCreator(role.auditInfo().creator())
+                  .withCreateTime(role.auditInfo().createTime())
+                  .withLastModifier("grantRole")
+                  .withLastModifiedTime(Instant.now())
+                  .build();
+
+          List<SecurableObject> securableObjects = 
Lists.newArrayList(role.securableObjects());
+          securableObjects.add(
+              SecurableObjects.ofMetalake(
+                  metalakeName, 
Lists.newArrayList(Privileges.CreateTable.allow())));
+
+          return RoleEntity.builder()
+              .withId(role.id())
+              .withName(role.name())
+              .withNamespace(role.namespace())
+              .withProperties(ImmutableMap.of("k1", "v1"))
+              .withSecurableObjects(securableObjects)
+              .withAuditInfo(updateAuditInfo)
+              .build();
+        };
+
+    
Assertions.assertNotNull(roleMetaService.updateRole(roleEntity.nameIdentifier(),
 grantUpdater));
+    RoleEntity grantRole = 
roleMetaService.getRoleByIdentifier(roleEntity.nameIdentifier());
+
+    Assertions.assertEquals(grantRole.id(), roleEntity.id());
+    Assertions.assertEquals(grantRole.name(), roleEntity.name());
+    Assertions.assertEquals("creator", grantRole.auditInfo().creator());
+    Assertions.assertEquals("grantRole", grantRole.auditInfo().lastModifier());
+    Assertions.assertEquals(
+        Lists.newArrayList(
+            SecurableObjects.ofCatalog(
+                "catalog", Lists.newArrayList(Privileges.UseCatalog.allow())),
+            SecurableObjects.ofMetalake(
+                metalakeName, 
Lists.newArrayList(Privileges.CreateTable.allow()))),
+        grantRole.securableObjects());
+
+    // revoke privileges from the role
+    Function<RoleEntity, RoleEntity> revokeUpdater =
+        role -> {
+          AuditInfo updateAuditInfo =
+              AuditInfo.builder()
+                  .withCreator(role.auditInfo().creator())
+                  .withCreateTime(role.auditInfo().createTime())
+                  .withLastModifier("revokeRole")
+                  .withLastModifiedTime(Instant.now())
+                  .build();
+
+          List<SecurableObject> securableObjects = 
Lists.newArrayList(role.securableObjects());
+          securableObjects.remove(0);
+
+          return RoleEntity.builder()
+              .withId(role.id())
+              .withName(role.name())
+              .withNamespace(role.namespace())
+              .withAuditInfo(updateAuditInfo)
+              .withProperties(role.properties())
+              .withSecurableObjects(securableObjects)
+              .withAuditInfo(updateAuditInfo)
+              .build();
+        };
+    roleMetaService.updateRole(roleEntity.nameIdentifier(), revokeUpdater);
+
+    RoleEntity revokeRole = 
roleMetaService.getRoleByIdentifier(roleEntity.nameIdentifier());
+    Assertions.assertEquals(revokeRole.id(), roleEntity.id());
+    Assertions.assertEquals(revokeRole.name(), roleEntity.name());
+    Assertions.assertEquals("creator", revokeRole.auditInfo().creator());
+    Assertions.assertEquals("revokeRole", 
revokeRole.auditInfo().lastModifier());
+    Assertions.assertEquals(
+        Lists.newArrayList(
+            SecurableObjects.ofMetalake(
+                metalakeName, 
Lists.newArrayList(Privileges.CreateTable.allow()))),
+        revokeRole.securableObjects());
+
+    // grant and revoke privileges for the role
+    Function<RoleEntity, RoleEntity> grantRevokeUpdater =
+        role -> {
+          AuditInfo updateAuditInfo =
+              AuditInfo.builder()
+                  .withCreator(role.auditInfo().creator())
+                  .withCreateTime(role.auditInfo().createTime())
+                  .withLastModifier("grantRevokeRole")
+                  .withLastModifiedTime(Instant.now())
+                  .build();
+
+          List<SecurableObject> securableObjects = 
Lists.newArrayList(role.securableObjects());
+          securableObjects.remove(0);
+          securableObjects.add(
+              SecurableObjects.ofCatalog(
+                  "catalog", 
Lists.newArrayList(Privileges.CreateTable.allow())));
+
+          return RoleEntity.builder()
+              .withId(role.id())
+              .withName(role.name())
+              .withNamespace(role.namespace())
+              .withAuditInfo(updateAuditInfo)
+              .withProperties(role.properties())
+              .withSecurableObjects(securableObjects)
+              .withAuditInfo(updateAuditInfo)
+              .build();
+        };
+    roleMetaService.updateRole(roleEntity.nameIdentifier(), 
grantRevokeUpdater);
+
+    RoleEntity grantRevokeRole = 
roleMetaService.getRoleByIdentifier(roleEntity.nameIdentifier());
+    Assertions.assertEquals(grantRevokeRole.id(), roleEntity.id());
+    Assertions.assertEquals(grantRevokeRole.name(), roleEntity.name());
+    Assertions.assertEquals("creator", grantRevokeRole.auditInfo().creator());
+    Assertions.assertEquals("grantRevokeRole", 
grantRevokeRole.auditInfo().lastModifier());
+    Assertions.assertEquals(
+        Lists.newArrayList(
+            SecurableObjects.ofCatalog(
+                "catalog", 
Lists.newArrayList(Privileges.CreateTable.allow()))),
+        grantRevokeRole.securableObjects());
+
+    // revoke multiple securable objects
+    roleMetaService.updateRole(roleEntity.nameIdentifier(), grantUpdater);
+    Function<RoleEntity, RoleEntity> revokeMultipleUpdater =
+        role -> {
+          AuditInfo updateAuditInfo =
+              AuditInfo.builder()
+                  .withCreator(role.auditInfo().creator())
+                  .withCreateTime(role.auditInfo().createTime())
+                  .withLastModifier("revokeMultiple")
+                  .withLastModifiedTime(Instant.now())
+                  .build();
+
+          return RoleEntity.builder()
+              .withId(role.id())
+              .withName(role.name())
+              .withNamespace(role.namespace())
+              .withAuditInfo(updateAuditInfo)
+              .withProperties(role.properties())
+              .withSecurableObjects(Collections.emptyList())
+              .withAuditInfo(updateAuditInfo)
+              .build();
+        };
+
+    roleMetaService.updateRole(roleEntity.nameIdentifier(), 
revokeMultipleUpdater);
+    RoleEntity revokeMultipleRole =
+        roleMetaService.getRoleByIdentifier(roleEntity.nameIdentifier());
+    Assertions.assertEquals(revokeMultipleRole.id(), roleEntity.id());
+    Assertions.assertEquals(revokeMultipleRole.name(), roleEntity.name());
+    Assertions.assertEquals("creator", 
revokeMultipleRole.auditInfo().creator());
+    Assertions.assertEquals("revokeMultiple", 
revokeMultipleRole.auditInfo().lastModifier());
+    Assertions.assertTrue(revokeMultipleRole.securableObjects().isEmpty());
+  }
+
   @Test
   void deleteMetalakeCascade() throws IOException {
     AuditInfo auditInfo =
diff --git a/server/src/main/java/org/apache/gravitino/server/web/Utils.java 
b/server/src/main/java/org/apache/gravitino/server/web/Utils.java
index 5a0ece332..7226eb363 100644
--- a/server/src/main/java/org/apache/gravitino/server/web/Utils.java
+++ b/server/src/main/java/org/apache/gravitino/server/web/Utils.java
@@ -53,12 +53,16 @@ public class Utils {
   }
 
   public static Response illegalArguments(String message) {
-    return illegalArguments(message, null);
+    return illegalArguments(IllegalArgumentException.class.getSimpleName(), 
message, null);
   }
 
   public static Response illegalArguments(String message, Throwable throwable) 
{
+    return illegalArguments(throwable.getClass().getSimpleName(), message, 
throwable);
+  }
+
+  public static Response illegalArguments(String type, String message, 
Throwable throwable) {
     return Response.status(Response.Status.BAD_REQUEST)
-        .entity(ErrorResponse.illegalArguments(message, throwable))
+        .entity(ErrorResponse.illegalArguments(type, message, throwable))
         .type(MediaType.APPLICATION_JSON)
         .build();
   }
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/mapper/JsonMappingExceptionMapper.java
 
b/server/src/main/java/org/apache/gravitino/server/web/mapper/JsonMappingExceptionMapper.java
index d2151385d..df972d0ff 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/mapper/JsonMappingExceptionMapper.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/mapper/JsonMappingExceptionMapper.java
@@ -34,6 +34,6 @@ public class JsonMappingExceptionMapper implements 
ExceptionMapper<JsonMappingEx
   @Override
   public Response toResponse(JsonMappingException e) {
     String errorMsg = "Malformed json request";
-    return Utils.illegalArguments(errorMsg, e);
+    return 
Utils.illegalArguments(IllegalArgumentException.class.getSimpleName(), 
errorMsg, e);
   }
 }
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/mapper/JsonParseExceptionMapper.java
 
b/server/src/main/java/org/apache/gravitino/server/web/mapper/JsonParseExceptionMapper.java
index 08ba8d083..1cf10c83f 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/mapper/JsonParseExceptionMapper.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/mapper/JsonParseExceptionMapper.java
@@ -33,6 +33,6 @@ public class JsonParseExceptionMapper implements 
ExceptionMapper<JsonParseExcept
   @Override
   public Response toResponse(JsonParseException e) {
     String errorMsg = "Malformed json request";
-    return Utils.illegalArguments(errorMsg, e);
+    return 
Utils.illegalArguments(IllegalArgumentException.class.getSimpleName(), 
errorMsg, e);
   }
 }
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
index 346c39a2c..4748cf23d 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
@@ -143,6 +143,11 @@ public class ExceptionHandlers {
     return OwnerExceptionHandler.INSTANCE.handle(type, name, metalake, e);
   }
 
+  public static Response handleRolePermissionOperationException(
+      OperationType type, String name, String parent, Exception e) {
+    return RolePermissionOperationHandler.INSTANCE.handle(type, name, parent, 
e);
+  }
+
   private static class PartitionExceptionHandler extends BaseExceptionHandler {
 
     private static final ExceptionHandler INSTANCE = new 
PartitionExceptionHandler();
@@ -508,6 +513,19 @@ public class ExceptionHandlers {
     }
   }
 
+  private static class RolePermissionOperationHandler extends 
BasePermissionExceptionHandler {
+
+    private static final ExceptionHandler INSTANCE = new 
RolePermissionOperationHandler();
+
+    @Override
+    protected String getPermissionErrorMsg(
+        String object, String operation, String parent, String reason) {
+      return String.format(
+          "Failed to operate object(%s) operation [%s] under role [%s], reason 
[%s]",
+          object, operation, parent, reason);
+    }
+  }
+
   private abstract static class BasePermissionExceptionHandler extends 
BaseExceptionHandler {
 
     protected abstract String getPermissionErrorMsg(
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/PermissionOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/PermissionOperations.java
index 089cf72df..94ace77aa 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/PermissionOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/PermissionOperations.java
@@ -20,6 +20,8 @@ package org.apache.gravitino.server.web.rest;
 
 import com.codahale.metrics.annotation.ResponseMetered;
 import com.codahale.metrics.annotation.Timed;
+import java.util.Locale;
+import java.util.stream.Collectors;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
@@ -29,12 +31,18 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.authorization.AccessControlDispatcher;
 import org.apache.gravitino.authorization.AuthorizationUtils;
+import org.apache.gravitino.dto.authorization.PrivilegeDTO;
+import org.apache.gravitino.dto.requests.PrivilegeGrantRequest;
+import org.apache.gravitino.dto.requests.PrivilegeRevokeRequest;
 import org.apache.gravitino.dto.requests.RoleGrantRequest;
 import org.apache.gravitino.dto.requests.RoleRevokeRequest;
 import org.apache.gravitino.dto.responses.GroupResponse;
+import org.apache.gravitino.dto.responses.RoleResponse;
 import org.apache.gravitino.dto.responses.UserResponse;
 import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.lock.LockType;
@@ -185,4 +193,94 @@ public class PermissionOperations {
           OperationType.REVOKE, StringUtils.join(request.getRoleNames()), 
group, e);
     }
   }
+
+  @PUT
+  @Path("roles/{role}/{type}/{fullName}/grant/")
+  @Produces("application/vnd.gravitino.v1+json")
+  @Timed(name = "grant-privilege-to-role." + 
MetricNames.HTTP_PROCESS_DURATION, absolute = true)
+  @ResponseMetered(name = "grant-privilege-to-role", absolute = true)
+  public Response grantPrivilegeToRole(
+      @PathParam("metalake") String metalake,
+      @PathParam("role") String role,
+      @PathParam("type") String type,
+      @PathParam("fullName") String fullName,
+      PrivilegeGrantRequest privilegeGrantRequest) {
+    try {
+      MetadataObject object =
+          MetadataObjects.parse(
+              fullName, 
MetadataObject.Type.valueOf(type.toUpperCase(Locale.ROOT)));
+
+      return Utils.doAs(
+          httpRequest,
+          () -> {
+            for (PrivilegeDTO privilegeDTO : 
privilegeGrantRequest.getPrivileges()) {
+              AuthorizationUtils.checkPrivilege(privilegeDTO, object, 
metalake);
+            }
+
+            AuthorizationUtils.checkSecurableObject(metalake, object);
+            return TreeLockUtils.doWithTreeLock(
+                AuthorizationUtils.ofRole(metalake, role),
+                LockType.WRITE,
+                () ->
+                    Utils.ok(
+                        new RoleResponse(
+                            DTOConverters.toDTO(
+                                accessControlManager.grantPrivilegeToRole(
+                                    metalake,
+                                    role,
+                                    object,
+                                    
privilegeGrantRequest.getPrivileges().stream()
+                                        .map(DTOConverters::fromPrivilegeDTO)
+                                        .collect(Collectors.toList()))))));
+          });
+    } catch (Exception e) {
+      return ExceptionHandlers.handleRolePermissionOperationException(
+          OperationType.GRANT, fullName, role, e);
+    }
+  }
+
+  @PUT
+  @Path("roles/{role}/{type}/{fullName}/revoke/")
+  @Produces("application/vnd.gravitino.v1+json")
+  @Timed(name = "revoke-privilege-from-role." + 
MetricNames.HTTP_PROCESS_DURATION, absolute = true)
+  @ResponseMetered(name = "revoke-privilege-from-role", absolute = true)
+  public Response revokePrivilegeFromRole(
+      @PathParam("metalake") String metalake,
+      @PathParam("role") String role,
+      @PathParam("type") String type,
+      @PathParam("fullName") String fullName,
+      PrivilegeRevokeRequest privilegeRevokeRequest) {
+    try {
+      MetadataObject object =
+          MetadataObjects.parse(
+              fullName, 
MetadataObject.Type.valueOf(type.toUpperCase(Locale.ROOT)));
+
+      return Utils.doAs(
+          httpRequest,
+          () -> {
+            for (PrivilegeDTO privilegeDTO : 
privilegeRevokeRequest.getPrivileges()) {
+              AuthorizationUtils.checkPrivilege(privilegeDTO, object, 
metalake);
+            }
+
+            AuthorizationUtils.checkSecurableObject(metalake, object);
+            return TreeLockUtils.doWithTreeLock(
+                AuthorizationUtils.ofRole(metalake, role),
+                LockType.WRITE,
+                () ->
+                    Utils.ok(
+                        new RoleResponse(
+                            DTOConverters.toDTO(
+                                accessControlManager.revokePrivilegesFromRole(
+                                    metalake,
+                                    role,
+                                    object,
+                                    
privilegeRevokeRequest.getPrivileges().stream()
+                                        .map(DTOConverters::fromPrivilegeDTO)
+                                        .collect(Collectors.toList()))))));
+          });
+    } catch (Exception e) {
+      return ExceptionHandlers.handleRolePermissionOperationException(
+          OperationType.REVOKE, fullName, role, e);
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/RoleOperations.java 
b/server/src/main/java/org/apache/gravitino/server/web/rest/RoleOperations.java
index 9810ad759..e20f6b451 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/RoleOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/RoleOperations.java
@@ -137,6 +137,8 @@ public class RoleOperations {
                 metadataObjects.add(metadataObject);
               }
 
+              Set<Privilege> privileges = Sets.newHashSet(object.privileges());
+              AuthorizationUtils.checkDuplicatedNamePrivilege(privileges);
               for (Privilege privilege : object.privileges()) {
                 AuthorizationUtils.checkPrivilege((PrivilegeDTO) privilege, 
object, metalake);
               }
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestPermissionOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestPermissionOperations.java
index 76dba3edd..e927a0a4e 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestPermissionOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestPermissionOperations.java
@@ -39,20 +39,31 @@ import org.apache.gravitino.Config;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.authorization.AccessControlManager;
 import org.apache.gravitino.authorization.Group;
+import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.SecurableObjects;
 import org.apache.gravitino.authorization.User;
+import org.apache.gravitino.dto.authorization.PrivilegeDTO;
+import org.apache.gravitino.dto.requests.PrivilegeGrantRequest;
+import org.apache.gravitino.dto.requests.PrivilegeRevokeRequest;
 import org.apache.gravitino.dto.requests.RoleGrantRequest;
 import org.apache.gravitino.dto.requests.RoleRevokeRequest;
 import org.apache.gravitino.dto.responses.ErrorConstants;
 import org.apache.gravitino.dto.responses.ErrorResponse;
 import org.apache.gravitino.dto.responses.GroupResponse;
+import org.apache.gravitino.dto.responses.RoleResponse;
 import org.apache.gravitino.dto.responses.UserResponse;
+import org.apache.gravitino.exceptions.IllegalPrivilegeException;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
 import org.apache.gravitino.exceptions.NoSuchRoleException;
 import org.apache.gravitino.exceptions.NoSuchUserException;
 import org.apache.gravitino.lock.LockManager;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.metalake.MetalakeDispatcher;
 import org.apache.gravitino.rest.RESTUtils;
 import org.glassfish.hk2.utilities.binding.AbstractBinder;
 import org.glassfish.jersey.server.ResourceConfig;
@@ -66,6 +77,7 @@ import org.mockito.Mockito;
 public class TestPermissionOperations extends JerseyTest {
 
   private static final AccessControlManager manager = 
mock(AccessControlManager.class);
+  private static final MetalakeDispatcher metalakeDispatcher = 
mock(MetalakeDispatcher.class);
 
   private static class MockServletRequestFactory extends 
ServletRequestFactoryBase {
     @Override
@@ -84,6 +96,8 @@ public class TestPermissionOperations extends JerseyTest {
     Mockito.doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
     FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new 
LockManager(config), true);
     FieldUtils.writeField(GravitinoEnv.getInstance(), 
"accessControlDispatcher", manager, true);
+    FieldUtils.writeField(
+        GravitinoEnv.getInstance(), "metalakeDispatcher", metalakeDispatcher, 
true);
   }
 
   @Override
@@ -394,4 +408,162 @@ public class TestPermissionOperations extends JerseyTest {
     Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE, 
errorResponse.getCode());
     Assertions.assertEquals(RuntimeException.class.getSimpleName(), 
errorResponse.getType());
   }
+
+  @Test
+  public void testGrantPrivilegesToRole() {
+    RoleEntity roleEntity =
+        RoleEntity.builder()
+            .withId(1L)
+            .withName("role")
+            .withSecurableObjects(
+                Lists.newArrayList(
+                    SecurableObjects.ofMetalake(
+                        "metalake1", 
Lists.newArrayList(Privileges.CreateTable.allow()))))
+            .withAuditInfo(
+                
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+    when(manager.grantPrivilegeToRole(any(), any(), any(), 
any())).thenReturn(roleEntity);
+    when(metalakeDispatcher.metalakeExists(any())).thenReturn(true);
+    PrivilegeGrantRequest request =
+        new PrivilegeGrantRequest(
+            Lists.newArrayList(
+                PrivilegeDTO.builder()
+                    .withName(Privilege.Name.CREATE_TABLE)
+                    .withCondition(Privilege.Condition.ALLOW)
+                    .build()));
+
+    Response resp =
+        
target("/metalakes/metalake1/permissions/roles/role1/metalake/metalake1/grant")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .put(Entity.entity(request, MediaType.APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
resp.getStatus());
+    RoleResponse grantResponse = resp.readEntity(RoleResponse.class);
+    Assertions.assertEquals(0, grantResponse.getCode());
+
+    Role role = grantResponse.getRole();
+    Assertions.assertEquals(roleEntity.name(), role.name());
+    Assertions.assertEquals(1, role.securableObjects().size());
+    Assertions.assertEquals("metalake1", 
role.securableObjects().get(0).name());
+    Assertions.assertEquals(
+        Privilege.Name.CREATE_TABLE, 
role.securableObjects().get(0).privileges().get(0).name());
+    Assertions.assertEquals(
+        Privilege.Condition.ALLOW, 
role.securableObjects().get(0).privileges().get(0).condition());
+    Assertions.assertEquals(roleEntity.properties(), role.properties());
+
+    doThrow(new RuntimeException("mock error"))
+        .when(manager)
+        .grantPrivilegeToRole(any(), any(), any(), any());
+    Response resp3 =
+        
target("/metalakes/metalake1/permissions/roles/role1/metalake/metalake1/grant")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .put(Entity.entity(request, MediaType.APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(
+        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
resp3.getStatus());
+
+    ErrorResponse errorResponse = resp3.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE, 
errorResponse.getCode());
+    Assertions.assertEquals(RuntimeException.class.getSimpleName(), 
errorResponse.getType());
+
+    // Test with wrong binding privileges
+    PrivilegeGrantRequest wrongPriRequest =
+        new PrivilegeGrantRequest(
+            Lists.newArrayList(
+                PrivilegeDTO.builder()
+                    .withName(Privilege.Name.CREATE_CATALOG)
+                    .withCondition(Privilege.Condition.ALLOW)
+                    .build()));
+
+    Response wrongPrivilegeResp =
+        
target("/metalakes/metalake1/permissions/roles/role1/catalog/catalog1/grant")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .put(Entity.entity(wrongPriRequest, 
MediaType.APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(
+        Response.Status.BAD_REQUEST.getStatusCode(), 
wrongPrivilegeResp.getStatus());
+
+    ErrorResponse wrongPriErrorResp = 
wrongPrivilegeResp.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.ILLEGAL_ARGUMENTS_CODE, 
wrongPriErrorResp.getCode());
+    Assertions.assertEquals(
+        IllegalPrivilegeException.class.getSimpleName(), 
wrongPriErrorResp.getType());
+  }
+
+  @Test
+  public void testRevokePrivilegesFromRole() {
+    RoleEntity roleEntity =
+        RoleEntity.builder()
+            .withId(1L)
+            .withName("role")
+            .withSecurableObjects(Lists.newArrayList())
+            .withAuditInfo(
+                
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+    when(manager.revokePrivilegesFromRole(any(), any(), any(), 
any())).thenReturn(roleEntity);
+    when(metalakeDispatcher.metalakeExists(any())).thenReturn(true);
+    PrivilegeRevokeRequest request =
+        new PrivilegeRevokeRequest(
+            Lists.newArrayList(
+                PrivilegeDTO.builder()
+                    .withName(Privilege.Name.CREATE_TABLE)
+                    .withCondition(Privilege.Condition.ALLOW)
+                    .build()));
+
+    Response resp =
+        
target("/metalakes/metalake1/permissions/roles/role1/metalake/metalake1/revoke")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .put(Entity.entity(request, MediaType.APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
resp.getStatus());
+    RoleResponse revokeResponse = resp.readEntity(RoleResponse.class);
+    Assertions.assertEquals(0, revokeResponse.getCode());
+
+    Role role = revokeResponse.getRole();
+    Assertions.assertEquals(roleEntity.name(), role.name());
+    Assertions.assertEquals(roleEntity.securableObjects(), 
role.securableObjects());
+    Assertions.assertEquals(roleEntity.properties(), role.properties());
+
+    doThrow(new RuntimeException("mock error"))
+        .when(manager)
+        .revokePrivilegesFromRole(any(), any(), any(), any());
+    Response resp3 =
+        
target("/metalakes/metalake1/permissions/roles/role1/metalake/metalake1/revoke")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .put(Entity.entity(request, MediaType.APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(
+        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
resp3.getStatus());
+
+    ErrorResponse errorResponse = resp3.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE, 
errorResponse.getCode());
+    Assertions.assertEquals(RuntimeException.class.getSimpleName(), 
errorResponse.getType());
+
+    // Test with wrong binding privileges
+    PrivilegeRevokeRequest wrongPriRequest =
+        new PrivilegeRevokeRequest(
+            Lists.newArrayList(
+                PrivilegeDTO.builder()
+                    .withName(Privilege.Name.CREATE_CATALOG)
+                    .withCondition(Privilege.Condition.ALLOW)
+                    .build()));
+
+    Response wrongPrivilegeResp =
+        
target("/metalakes/metalake1/permissions/roles/role1/catalog/catalog1/revoke")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .put(Entity.entity(wrongPriRequest, 
MediaType.APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(
+        Response.Status.BAD_REQUEST.getStatusCode(), 
wrongPrivilegeResp.getStatus());
+
+    ErrorResponse wrongPriErrorResp = 
wrongPrivilegeResp.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.ILLEGAL_ARGUMENTS_CODE, 
wrongPriErrorResp.getCode());
+    Assertions.assertEquals(
+        IllegalPrivilegeException.class.getSimpleName(), 
wrongPriErrorResp.getType());
+  }
 }
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestRoleOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestRoleOperations.java
index 576746489..265fb6073 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestRoleOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestRoleOperations.java
@@ -59,6 +59,7 @@ import org.apache.gravitino.dto.responses.ErrorResponse;
 import org.apache.gravitino.dto.responses.NameListResponse;
 import org.apache.gravitino.dto.responses.RoleResponse;
 import org.apache.gravitino.dto.util.DTOConverters;
+import org.apache.gravitino.exceptions.IllegalPrivilegeException;
 import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
 import org.apache.gravitino.exceptions.NoSuchRoleException;
@@ -281,7 +282,7 @@ public class TestRoleOperations extends JerseyTest {
     ErrorResponse wrongPriErrorResp = 
wrongPrivilegeResp.readEntity(ErrorResponse.class);
     Assertions.assertEquals(ErrorConstants.ILLEGAL_ARGUMENTS_CODE, 
wrongPriErrorResp.getCode());
     Assertions.assertEquals(
-        IllegalArgumentException.class.getSimpleName(), 
wrongPriErrorResp.getType());
+        IllegalPrivilegeException.class.getSimpleName(), 
wrongPriErrorResp.getType());
 
     // Test with empty securable objects request
     RoleCreateRequest emptyObjectRequest =

Reply via email to