This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch branch-metadata-authz
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-metadata-authz by this 
push:
     new f79c1ae871 [#7542] feat(authz): Support table authorization (#7554)
f79c1ae871 is described below

commit f79c1ae87195aac09d5ababb7db24ce4019b30ad
Author: Yunchi Pang <[email protected]>
AuthorDate: Wed Jul 9 05:02:26 2025 -0700

    [#7542] feat(authz): Support table authorization (#7554)
    
    <!--
    1. Title: [#<issue>] <type>(<scope>): <subject>
       Examples:
         - "[#123] feat(operator): support xxx"
         - "[#233] fix: check null before access result in xxx"
         - "[MINOR] refactor: fix typo in variable name"
         - "[MINOR] docs: fix typo in README"
         - "[#255] test: fix flaky test NameOfTheTest"
       Reference: https://www.conventionalcommits.org/en/v1.0.0/
    2. If the PR is unfinished, please mark this PR as draft.
    -->
    
    ### What changes were proposed in this pull request?
    
    Support table authorization.
    
    ### Why are the changes needed?
    
    Fix: #7542
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    
    
`org.apache.gravitino.client.integration.test.authorization.TableAuthorizationIT`
    
    ---------
    
    Co-authored-by: yangyang zhong <[email protected]>
    Co-authored-by: [email protected] <[email protected]>
---
 .../test/authorization/TableAuthorizationIT.java   | 249 ++++++++++++++++++++
 .../AuthorizationExpressionConverter.java          |  29 +++
 .../TestAuthorizationExpressionConverter.java      |   8 +-
 server/build.gradle.kts                            |   1 +
 .../web/filter/GravitinoInterceptionService.java   |  10 +-
 .../server/web/rest/CatalogOperations.java         |  16 +-
 .../server/web/rest/SchemaOperations.java          |  26 +--
 .../gravitino/server/web/rest/TableOperations.java |  75 ++++--
 .../server/web/rest/TestTableOperations.java       |   3 +-
 .../MockAuthorizationExpressionEvaluator.java      |  74 ++++++
 .../TestCatalogAuthorizationExpression.java        | 121 ++++++++++
 .../TestSchemaAuthorizationExpression.java         | 137 +++++++++++
 .../TestTableAuthorizationExpression.java          | 252 +++++++++++++++++++++
 13 files changed, 953 insertions(+), 48 deletions(-)

diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/TableAuthorizationIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/TableAuthorizationIT.java
new file mode 100644
index 0000000000..6042c0c33a
--- /dev/null
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/TableAuthorizationIT.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.client.integration.test.authorization;
+
+import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.container.HiveContainer;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+@Tag("gravitino-docker-test")
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class TableAuthorizationIT extends BaseRestApiAuthorizationIT {
+
+  private static final String CATALOG = "catalog";
+  private static final String SCHEMA = "schema";
+  private static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
+  private static String hmsUri;
+  private static String role = "role";
+
+  @BeforeAll
+  public void startIntegrationTest() throws Exception {
+    containerSuite.startHiveContainer();
+    super.startIntegrationTest();
+    hmsUri =
+        String.format(
+            "thrift://%s:%d",
+            containerSuite.getHiveContainer().getContainerIpAddress(),
+            HiveContainer.HIVE_METASTORE_PORT);
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("metastore.uris", hmsUri);
+    client
+        .loadMetalake(METALAKE)
+        .createCatalog(CATALOG, Catalog.Type.RELATIONAL, "hive", "comment", 
properties)
+        .asSchemas()
+        .createSchema(SCHEMA, "test", new HashMap<>());
+    // try to load the schema as normal user, expect failure
+    assertThrows(
+        "Can not access metadata {" + CATALOG + "." + SCHEMA + "}.",
+        RuntimeException.class,
+        () -> {
+          normalUserClient
+              .loadMetalake(METALAKE)
+              .loadCatalog(CATALOG)
+              .asSchemas()
+              .loadSchema(SCHEMA);
+        });
+    // grant tester privilege
+    List<SecurableObject> securableObjects = new ArrayList<>();
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    SecurableObject catalogObject =
+        SecurableObjects.ofCatalog(CATALOG, 
ImmutableList.of(Privileges.UseCatalog.allow()));
+    securableObjects.add(catalogObject);
+    gravitinoMetalake.createRole(role, new HashMap<>(), securableObjects);
+    gravitinoMetalake.grantRolesToUser(ImmutableList.of(role), NORMAL_USER);
+    // normal user can load the catalog but not the schema
+    Catalog catalogLoadByNormalUser = 
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG);
+    assertEquals(CATALOG, catalogLoadByNormalUser.name());
+    assertThrows(
+        "Can not access metadata {" + CATALOG + "." + SCHEMA + "}.",
+        RuntimeException.class,
+        () -> {
+          catalogLoadByNormalUser.asSchemas().loadSchema(SCHEMA);
+        });
+  }
+
+  private Column[] createColumns() {
+    return new Column[] {Column.of("col1", Types.StringType.get())};
+  }
+
+  @Test
+  @Order(1)
+  public void testCreateTable() {
+    // owner can create table
+    TableCatalog tableCatalog = 
client.loadMetalake(METALAKE).loadCatalog(CATALOG).asTableCatalog();
+    tableCatalog.createTable(
+        NameIdentifier.of(SCHEMA, "table1"), createColumns(), "test", new 
HashMap<>());
+    // normal user cannot create table
+    TableCatalog tableCatalogNormalUser =
+        
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG).asTableCatalog();
+    assertThrows(
+        "Can not access metadata {" + CATALOG + "." + SCHEMA + "}.",
+        RuntimeException.class,
+        () -> {
+          tableCatalogNormalUser.createTable(
+              NameIdentifier.of(SCHEMA, "table2"), createColumns(), "test2", 
new HashMap<>());
+        });
+    // grant privileges
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    gravitinoMetalake.grantPrivilegesToRole(
+        role,
+        MetadataObjects.of(CATALOG, SCHEMA, MetadataObject.Type.SCHEMA),
+        ImmutableList.of(Privileges.UseSchema.allow(), 
Privileges.CreateTable.allow()));
+    // normal user can now create table
+    tableCatalogNormalUser.createTable(
+        NameIdentifier.of(SCHEMA, "table2"), createColumns(), "test2", new 
HashMap<>());
+    tableCatalogNormalUser.createTable(
+        NameIdentifier.of(SCHEMA, "table3"), createColumns(), "test2", new 
HashMap<>());
+  }
+
+  @Test
+  @Order(2)
+  public void testListTable() {
+    TableCatalog tableCatalog = 
client.loadMetalake(METALAKE).loadCatalog(CATALOG).asTableCatalog();
+    NameIdentifier[] tablesList = 
tableCatalog.listTables(Namespace.of(SCHEMA));
+    assertArrayEquals(
+        new NameIdentifier[] {
+          NameIdentifier.of(SCHEMA, "table1"),
+          NameIdentifier.of(SCHEMA, "table2"),
+          NameIdentifier.of(SCHEMA, "table3")
+        },
+        tablesList);
+    // normal user can only see tables they have privilege for
+    TableCatalog tableCatalogNormalUser =
+        
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG).asTableCatalog();
+    NameIdentifier[] tablesListNormalUser = 
tableCatalogNormalUser.listTables(Namespace.of(SCHEMA));
+    assertArrayEquals(
+        new NameIdentifier[] {
+          NameIdentifier.of(SCHEMA, "table2"), NameIdentifier.of(SCHEMA, 
"table3")
+        },
+        tablesListNormalUser);
+  }
+
+  @Test
+  @Order(3)
+  public void testLoadTable() {
+    TableCatalog tableCatalogNormalUser =
+        
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG).asTableCatalog();
+    // normal user can load table2 and table3, but not table1
+    assertThrows(
+        String.format("Can not access metadata {%s.%s.%s}.", CATALOG, SCHEMA, 
"table1"),
+        RuntimeException.class,
+        () -> {
+          tableCatalogNormalUser.loadTable(NameIdentifier.of(CATALOG, SCHEMA, 
"table1"));
+        });
+    Table table2 = tableCatalogNormalUser.loadTable(NameIdentifier.of(SCHEMA, 
"table2"));
+    assertEquals("table2", table2.name());
+    Table table3 = tableCatalogNormalUser.loadTable(NameIdentifier.of(SCHEMA, 
"table3"));
+    assertEquals("table3", table3.name());
+
+    // grant normal user privilege to use table1
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    gravitinoMetalake.grantPrivilegesToRole(
+        role,
+        MetadataObjects.of(ImmutableList.of(CATALOG, SCHEMA, "table1"), 
MetadataObject.Type.TABLE),
+        ImmutableList.of(Privileges.SelectTable.allow()));
+    tableCatalogNormalUser.loadTable(NameIdentifier.of(SCHEMA, "table1"));
+  }
+
+  @Test
+  @Order(4)
+  public void testAlterTable() {
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    TableCatalog tableCatalogNormalUser =
+        
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG).asTableCatalog();
+
+    // normal user cannot alter table1 (no privilege)
+    assertThrows(
+        String.format("Can not access metadata {%s.%s.%s}.", CATALOG, SCHEMA, 
"table1"),
+        RuntimeException.class,
+        () -> {
+          tableCatalogNormalUser.alterTable(
+              NameIdentifier.of(SCHEMA, "table1"), 
TableChange.setProperty("key", "value"));
+        });
+    // grant normal user owner privilege on table1
+    gravitinoMetalake.setOwner(
+        MetadataObjects.of(ImmutableList.of(CATALOG, SCHEMA, "table1"), 
MetadataObject.Type.TABLE),
+        NORMAL_USER,
+        Owner.Type.USER);
+    tableCatalogNormalUser.alterTable(
+        NameIdentifier.of(SCHEMA, "table1"), TableChange.setProperty("key", 
"value"));
+  }
+
+  @Test
+  @Order(5)
+  public void testDropTable() {
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    TableCatalog tableCatalogNormalUser =
+        
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG).asTableCatalog();
+    // reset owner
+    gravitinoMetalake.setOwner(
+        MetadataObjects.of(ImmutableList.of(CATALOG, SCHEMA, "table1"), 
MetadataObject.Type.TABLE),
+        USER,
+        Owner.Type.USER);
+    // normal user cannot drop table1
+    assertThrows(
+        String.format("Can not access metadata {%s.%s.%s}.", CATALOG, SCHEMA, 
"table1"),
+        RuntimeException.class,
+        () -> {
+          tableCatalogNormalUser.dropTable(NameIdentifier.of(SCHEMA, 
"table1"));
+        });
+    // normal user can drop table2 and table3 (they created them)
+    tableCatalogNormalUser.dropTable(NameIdentifier.of(SCHEMA, "table2"));
+    tableCatalogNormalUser.dropTable(NameIdentifier.of(SCHEMA, "table3"));
+
+    // owner can drop table1
+    TableCatalog tableCatalog = 
client.loadMetalake(METALAKE).loadCatalog(CATALOG).asTableCatalog();
+    tableCatalog.dropTable(NameIdentifier.of(SCHEMA, "table1"));
+    // check tables are dropped
+    NameIdentifier[] tablesList = 
tableCatalog.listTables(Namespace.of(SCHEMA));
+    assertArrayEquals(new NameIdentifier[] {}, tablesList);
+    NameIdentifier[] tablesListNormalUser = 
tableCatalogNormalUser.listTables(Namespace.of(SCHEMA));
+    assertArrayEquals(new NameIdentifier[] {}, tablesListNormalUser);
+  }
+}
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
index d1fbc22faf..bc4c2d5d67 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
@@ -54,6 +54,7 @@ public class AuthorizationExpressionConverter {
    * @return an OGNL expression used to call GravitinoAuthorizer
    */
   public static String convertToOgnlExpression(String authorizationExpression) 
{
+    authorizationExpression = replaceAnyPrivilege(authorizationExpression);
     authorizationExpression = replaceAnyExpressions(authorizationExpression);
     return EXPRESSION_CACHE.computeIfAbsent(
         authorizationExpression,
@@ -118,4 +119,32 @@ public class AuthorizationExpressionConverter {
     matcher.appendTail(result);
     return result.toString();
   }
+
+  /**
+   * Replace any privilege expression to any expression
+   *
+   * @param expression authorization expression
+   * @return authorization expression
+   */
+  public static String replaceAnyPrivilege(String expression) {
+    expression = expression.replaceAll("ANY_USE_CATALOG", "(ANY(USE_CATALOG, 
METALAKE, CATALOG))");
+    expression =
+        expression.replaceAll("ANY_USE_SCHEMA", "(ANY(USE_SCHEMA, METALAKE, 
CATALOG, SCHEMA))");
+    expression =
+        expression.replaceAll("ANY_CREATE_SCHEMA", "(ANY(CREATE_SCHEMA, 
METALAKE, CATALOG))");
+    expression =
+        expression.replaceAll(
+            "ANY_SELECT_TABLE", "(ANY(SELECT_TABLE, METALAKE, CATALOG, SCHEMA, 
TABLE))");
+    expression =
+        expression.replaceAll(
+            "ANY_MODIFY_TABLE", "(ANY(MODIFY_TABLE, METALAKE, CATALOG, SCHEMA, 
TABLE))");
+    expression =
+        expression.replaceAll(
+            "ANY_CREATE_TABLE", "(ANY(CREATE_TABLE, METALAKE, CATALOG, SCHEMA, 
TABLE))");
+    expression =
+        expression.replaceAll(
+            "SCHEMA_OWNER_WITH_USE_CATALOG",
+            "SCHEMA::OWNER && (ANY(USE_CATALOG, METALAKE, CATALOG))");
+    return expression;
+  }
 }
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/expression/TestAuthorizationExpressionConverter.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/expression/TestAuthorizationExpressionConverter.java
index 2bc9b6a51e..d512cdeaf0 100644
--- 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/expression/TestAuthorizationExpressionConverter.java
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/expression/TestAuthorizationExpressionConverter.java
@@ -42,7 +42,7 @@ public class TestAuthorizationExpressionConverter {
     assertFalse(ANY_PATTERN.matcher("ANY").matches());
     assertFalse(ANY_PATTERN.matcher("ANYOWNER,METALAKE,CATALOG").matches());
     assertFalse(ANY_PATTERN.matcher("ANY(OWNER,METALAKE,CATALOG").matches());
-    assertTrue(ANY_PATTERN.matcher("ANY(OWNER,METALAKE,CATALOG)").matches());
+    assertTrue(ANY_PATTERN.matcher("ANY(OWNER, METALAKE, CATALOG)").matches());
     
assertTrue(ANY_PATTERN.matcher("ANY(USE_CATALOG,METALAKE,CATALOG,SCHEMA)").matches());
   }
 
@@ -87,7 +87,7 @@ public class TestAuthorizationExpressionConverter {
             + "|| authorizer.isOwner(principal,METALAKE_NAME,SCHEMA)",
         createTableOgnlExpression);
 
-    String expressionWithOwner2 = "(ANY(OWNER,METALAKE,CATALOG)) && 
CATALOG::USE_CATALOG)";
+    String expressionWithOwner2 = "(ANY(OWNER, METALAKE, CATALOG)) && 
CATALOG::USE_CATALOG)";
     String useCatalogOgnExpression =
         
AuthorizationExpressionConverter.convertToOgnlExpression(expressionWithOwner2);
     Assertions.assertEquals(
@@ -113,11 +113,11 @@ public class TestAuthorizationExpressionConverter {
     Assertions.assertEquals(
         "METALAKE::OWNER || CATALOG::OWNER && CATALOG::OWNER",
         AuthorizationExpressionConverter.replaceAnyExpressions(
-            "ANY(OWNER,METALAKE,CATALOG) && CATALOG::OWNER"));
+            "ANY(OWNER, METALAKE, CATALOG) && CATALOG::OWNER"));
 
     Assertions.assertEquals(
         "(METALAKE::OWNER || CATALOG::OWNER) && CATALOG::USE_CATALOG",
         AuthorizationExpressionConverter.replaceAnyExpressions(
-            "(ANY(OWNER,METALAKE,CATALOG)) && CATALOG::USE_CATALOG"));
+            "(ANY(OWNER, METALAKE, CATALOG)) && CATALOG::USE_CATALOG"));
   }
 }
diff --git a/server/build.gradle.kts b/server/build.gradle.kts
index 5d54834b2d..e417e70927 100644
--- a/server/build.gradle.kts
+++ b/server/build.gradle.kts
@@ -61,6 +61,7 @@ dependencies {
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.junit.jupiter.params)
   testImplementation(libs.mockito.inline)
+  testImplementation(libs.ognl)
 
   testRuntimeOnly(libs.junit.jupiter.engine)
 }
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
index a54b84eebd..c25743eb5a 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
@@ -41,6 +41,7 @@ import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpress
 import org.apache.gravitino.server.web.Utils;
 import org.apache.gravitino.server.web.rest.CatalogOperations;
 import org.apache.gravitino.server.web.rest.SchemaOperations;
+import org.apache.gravitino.server.web.rest.TableOperations;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.glassfish.hk2.api.Descriptor;
 import org.glassfish.hk2.api.Filter;
@@ -56,7 +57,10 @@ public class GravitinoInterceptionService implements 
InterceptionService {
   @Override
   public Filter getDescriptorFilter() {
     return new ClassListFilter(
-        ImmutableSet.of(CatalogOperations.class.getName(), 
SchemaOperations.class.getName()));
+        ImmutableSet.of(
+            CatalogOperations.class.getName(),
+            SchemaOperations.class.getName(),
+            TableOperations.class.getName()));
   }
 
   @Override
@@ -146,12 +150,12 @@ public class GravitinoInterceptionService implements 
InterceptionService {
                 break;
               case TABLE:
                 nameIdentifierMap.put(
-                    Entity.EntityType.SCHEMA,
+                    Entity.EntityType.TABLE,
                     NameIdentifierUtil.ofTable(metalake, catalog, schema, 
table));
                 break;
               case TOPIC:
                 nameIdentifierMap.put(
-                    Entity.EntityType.SCHEMA,
+                    Entity.EntityType.TOPIC,
                     NameIdentifierUtil.ofTopic(metalake, catalog, schema, 
topic));
                 break;
               case METALAKE:
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/CatalogOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/CatalogOperations.java
index 25565856c8..0b8e3900c5 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/CatalogOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/CatalogOperations.java
@@ -74,6 +74,9 @@ public class CatalogOperations {
 
   private final CatalogDispatcher catalogDispatcher;
 
+  private static final String loadCatalogAuthorizationExpression =
+      "ANY_USE_CATALOG || ANY(OWNER, METALAKE, CATALOG)";
+
   @Context private HttpServletRequest httpRequest;
 
   @Inject
@@ -109,8 +112,7 @@ public class CatalogOperations {
                             };
                         return MetadataFilterHelper.filterByExpression(
                                     metalake,
-                                    "ANY(USE_CATALOG,METALAKE,CATALOG) || "
-                                        + "ANY(OWNER,METALAKE,CATALOG)",
+                                    loadCatalogAuthorizationExpression,
                                     Entity.EntityType.CATALOG,
                                     nameIdentifiers)
                                 .length
@@ -126,7 +128,7 @@ public class CatalogOperations {
               idents =
                   MetadataFilterHelper.filterByExpression(
                       metalake,
-                      "ANY(USE_CATALOG,METALAKE,CATALOG) || " + 
"ANY(OWNER,METALAKE,CATALOG)",
+                      loadCatalogAuthorizationExpression,
                       Entity.EntityType.CATALOG,
                       idents);
               Response response = Utils.ok(new EntityListResponse(idents));
@@ -215,7 +217,7 @@ public class CatalogOperations {
   @Timed(name = "set-catalog." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
   @ResponseMetered(name = "set-catalog", absolute = true)
   @AuthorizationExpression(
-      expression = "ANY(USE_CATALOG,METALAKE,CATALOG) || 
ANY(OWNER,METALAKE,CATALOG)",
+      expression = "ANY_USE_CATALOG || ANY(OWNER, METALAKE, CATALOG)",
       accessMetadataType = MetadataObject.Type.CATALOG)
   public Response setCatalog(
       @PathParam("metalake") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
@@ -261,7 +263,7 @@ public class CatalogOperations {
   @Timed(name = "load-catalog." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
   @ResponseMetered(name = "load-catalog", absolute = true)
   @AuthorizationExpression(
-      expression = "ANY(USE_CATALOG,METALAKE,CATALOG) || 
ANY(OWNER,METALAKE,CATALOG)",
+      expression = "ANY_USE_CATALOG || ANY(OWNER, METALAKE, CATALOG)",
       accessMetadataType = MetadataObject.Type.CATALOG)
   public Response loadCatalog(
       @PathParam("metalake") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
@@ -288,7 +290,7 @@ public class CatalogOperations {
   @Timed(name = "alter-catalog." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
   @ResponseMetered(name = "alter-catalog", absolute = true)
   @AuthorizationExpression(
-      expression = "METALAKE::OWNER || CATALOG::OWNER",
+      expression = "ANY(OWNER, METALAKE, CATALOG)",
       accessMetadataType = MetadataObject.Type.CATALOG)
   public Response alterCatalog(
       @PathParam("metalake") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
@@ -325,7 +327,7 @@ public class CatalogOperations {
   @Timed(name = "drop-catalog." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
   @ResponseMetered(name = "drop-catalog", absolute = true)
   @AuthorizationExpression(
-      expression = "METALAKE::OWNER || CATALOG::OWNER",
+      expression = "ANY(OWNER, METALAKE, CATALOG)",
       accessMetadataType = MetadataObject.Type.CATALOG)
   public Response dropCatalog(
       @PathParam("metalake") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/SchemaOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/SchemaOperations.java
index 17b8fc97eb..939ef8a726 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/SchemaOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/SchemaOperations.java
@@ -66,6 +66,10 @@ public class SchemaOperations {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(SchemaOperations.class);
 
+  private static final String loadSchemaAuthorizationExpression =
+      " ANY(OWNER, METALAKE, CATALOG) || "
+          + "ANY_USE_CATALOG && (SCHEMA::OWNER || ANY_USE_SCHEMA) ";
+
   private final SchemaDispatcher dispatcher;
 
   @Context private HttpServletRequest httpRequest;
@@ -90,12 +94,7 @@ public class SchemaOperations {
             NameIdentifier[] idents = dispatcher.listSchemas(schemaNS);
             idents =
                 MetadataFilterHelper.filterByExpression(
-                    metalake,
-                    " ((ANY(USE_CATALOG,METALAKE,CATALOG)) && "
-                        + "(SCHEMA::OWNER || 
ANY(USE_SCHEMA,METALAKE,CATALOG,SCHEMA))) "
-                        + "|| ANY(OWNER,METALAKE,CATALOG) ",
-                    Entity.EntityType.SCHEMA,
-                    idents);
+                    metalake, loadSchemaAuthorizationExpression, 
Entity.EntityType.SCHEMA, idents);
             Response response = Utils.ok(new EntityListResponse(idents));
             LOG.info("List {} schemas in catalog {}.{}", idents.length, 
metalake, catalog);
             return response;
@@ -110,9 +109,7 @@ public class SchemaOperations {
   @Timed(name = "create-schema." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
   @ResponseMetered(name = "create-schema", absolute = true)
   @AuthorizationExpression(
-      expression =
-          "( (ANY(USE_CATALOG,METALAKE,CATALOG)) && 
(ANY(CREATE_SCHEMA,METALAKE,CATALOG)) ) "
-              + "|| ANY(OWNER,METALAKE,CATALOG)",
+      expression = "ANY(OWNER, METALAKE, CATALOG) || ANY_USE_CATALOG && 
ANY_CREATE_SCHEMA",
       accessMetadataType = MetadataObject.Type.CATALOG)
   public Response createSchema(
       @PathParam("metalake") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
@@ -147,10 +144,7 @@ public class SchemaOperations {
   @Timed(name = "load-schema." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
   @ResponseMetered(name = "load-schema", absolute = true)
   @AuthorizationExpression(
-      expression =
-          "( (ANY(USE_CATALOG,METALAKE,CATALOG)) &&"
-              + " (ANY(USE_SCHEMA,METALAKE,CATALOG,SCHEMA) || SCHEMA::OWNER) ) 
"
-              + " || ANY(OWNER,METALAKE,CATALOG)",
+      expression = loadSchemaAuthorizationExpression,
       accessMetadataType = MetadataObject.Type.SCHEMA)
   public Response loadSchema(
       @PathParam("metalake") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
@@ -182,8 +176,7 @@ public class SchemaOperations {
   @Timed(name = "alter-schema." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
   @ResponseMetered(name = "alter-schema", absolute = true)
   @AuthorizationExpression(
-      expression =
-          "ANY(OWNER,METALAKE,CATALOG) || ((ANY(USE_CATALOG,METALAKE,CATALOG) 
&& SCHEMA::OWNER))",
+      expression = "ANY(OWNER, METALAKE, CATALOG) || 
SCHEMA_OWNER_WITH_USE_CATALOG",
       accessMetadataType = MetadataObject.Type.SCHEMA)
   public Response alterSchema(
       @PathParam("metalake") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
@@ -220,8 +213,7 @@ public class SchemaOperations {
   @Timed(name = "drop-schema." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
   @ResponseMetered(name = "drop-schema", absolute = true)
   @AuthorizationExpression(
-      expression =
-          "ANY(OWNER,METALAKE,CATALOG) || ((ANY(USE_CATALOG,METALAKE,CATALOG) 
&& SCHEMA::OWNER))",
+      expression = "ANY(OWNER, METALAKE, CATALOG) || 
SCHEMA_OWNER_WITH_USE_CATALOG",
       accessMetadataType = MetadataObject.Type.SCHEMA)
   public Response dropSchema(
       @PathParam("metalake") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
index 41146d2546..e59756625b 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
@@ -36,6 +36,8 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.catalog.TableDispatcher;
@@ -49,6 +51,9 @@ import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.metrics.MetricNames;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
 import org.apache.gravitino.server.web.Utils;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.NamespaceUtil;
@@ -60,6 +65,11 @@ public class TableOperations {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TableOperations.class);
 
+  private static final String loadTableAuthorizationExpression =
+      "ANY(OWNER, METALAKE, CATALOG) ||"
+          + "SCHEMA_OWNER_WITH_USE_CATALOG ||"
+          + "ANY_USE_CATALOG && ANY_USE_SCHEMA  && (TABLE::OWNER || 
ANY_SELECT_TABLE || ANY_MODIFY_TABLE)";
+
   private final TableDispatcher dispatcher;
 
   @Context private HttpServletRequest httpRequest;
@@ -84,6 +94,9 @@ public class TableOperations {
           () -> {
             Namespace tableNS = NamespaceUtil.ofTable(metalake, catalog, 
schema);
             NameIdentifier[] idents = dispatcher.listTables(tableNS);
+            idents =
+                MetadataFilterHelper.filterByExpression(
+                    metalake, loadTableAuthorizationExpression, 
Entity.EntityType.TABLE, idents);
             Response response = Utils.ok(new EntityListResponse(idents));
             LOG.info(
                 "List {} tables under schema: {}.{}.{}", idents.length, 
metalake, catalog, schema);
@@ -99,10 +112,18 @@ public class TableOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "create-table." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
   @ResponseMetered(name = "create-table", absolute = true)
+  @AuthorizationExpression(
+      expression =
+          "ANY(OWNER, METALAKE, CATALOG) || "
+              + "SCHEMA_OWNER_WITH_USE_CATALOG || "
+              + "ANY_USE_CATALOG && ANY_USE_SCHEMA && ANY_CREATE_TABLE",
+      accessMetadataType = MetadataObject.Type.TABLE)
   public Response createTable(
-      @PathParam("metalake") String metalake,
-      @PathParam("catalog") String catalog,
-      @PathParam("schema") String schema,
+      @PathParam("metalake") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
+          String metalake,
+      @PathParam("catalog") @AuthorizationMetadata(type = 
MetadataObject.Type.CATALOG)
+          String catalog,
+      @PathParam("schema") @AuthorizationMetadata(type = 
MetadataObject.Type.SCHEMA) String schema,
       TableCreateRequest request) {
     LOG.info(
         "Received create table request: {}.{}.{}.{}", metalake, catalog, 
schema, request.getName());
@@ -140,11 +161,19 @@ public class TableOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "load-table." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
   @ResponseMetered(name = "load-table", absolute = true)
+  @AuthorizationExpression(
+      expression =
+          "ANY(OWNER, METALAKE, CATALOG) || "
+              + "SCHEMA_OWNER_WITH_USE_CATALOG || "
+              + "ANY_USE_CATALOG && ANY_USE_SCHEMA  && (TABLE::OWNER || 
ANY_SELECT_TABLE|| ANY_MODIFY_TABLE)",
+      accessMetadataType = MetadataObject.Type.TABLE)
   public Response loadTable(
-      @PathParam("metalake") String metalake,
-      @PathParam("catalog") String catalog,
-      @PathParam("schema") String schema,
-      @PathParam("table") String table) {
+      @PathParam("metalake") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
+          String metalake,
+      @PathParam("catalog") @AuthorizationMetadata(type = 
MetadataObject.Type.CATALOG)
+          String catalog,
+      @PathParam("schema") @AuthorizationMetadata(type = 
MetadataObject.Type.SCHEMA) String schema,
+      @PathParam("table") @AuthorizationMetadata(type = 
MetadataObject.Type.TABLE) String table) {
     LOG.info(
         "Received load table request for table: {}.{}.{}.{}", metalake, 
catalog, schema, table);
     try {
@@ -167,11 +196,19 @@ public class TableOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "alter-table." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
   @ResponseMetered(name = "alter-table", absolute = true)
+  @AuthorizationExpression(
+      expression =
+          "ANY(OWNER, METALAKE, CATALOG) || "
+              + "SCHEMA_OWNER_WITH_USE_CATALOG || "
+              + "ANY_USE_CATALOG && ANY_USE_SCHEMA  && (TABLE::OWNER || 
ANY_MODIFY_TABLE)",
+      accessMetadataType = MetadataObject.Type.TABLE)
   public Response alterTable(
-      @PathParam("metalake") String metalake,
-      @PathParam("catalog") String catalog,
-      @PathParam("schema") String schema,
-      @PathParam("table") String table,
+      @PathParam("metalake") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
+          String metalake,
+      @PathParam("catalog") @AuthorizationMetadata(type = 
MetadataObject.Type.CATALOG)
+          String catalog,
+      @PathParam("schema") @AuthorizationMetadata(type = 
MetadataObject.Type.SCHEMA) String schema,
+      @PathParam("table") @AuthorizationMetadata(type = 
MetadataObject.Type.TABLE) String table,
       TableUpdatesRequest request) {
     LOG.info("Received alter table request: {}.{}.{}.{}", metalake, catalog, 
schema, table);
     try {
@@ -200,11 +237,19 @@ public class TableOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "drop-table." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
   @ResponseMetered(name = "drop-table", absolute = true)
+  @AuthorizationExpression(
+      expression =
+          "ANY(OWNER, METALAKE, CATALOG) || "
+              + "SCHEMA_OWNER_WITH_USE_CATALOG || "
+              + "ANY_USE_CATALOG && ANY_USE_SCHEMA  && TABLE::OWNER ",
+      accessMetadataType = MetadataObject.Type.TABLE)
   public Response dropTable(
-      @PathParam("metalake") String metalake,
-      @PathParam("catalog") String catalog,
-      @PathParam("schema") String schema,
-      @PathParam("table") String table,
+      @PathParam("metalake") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
+          String metalake,
+      @PathParam("catalog") @AuthorizationMetadata(type = 
MetadataObject.Type.CATALOG)
+          String catalog,
+      @PathParam("schema") @AuthorizationMetadata(type = 
MetadataObject.Type.SCHEMA) String schema,
+      @PathParam("table") @AuthorizationMetadata(type = 
MetadataObject.Type.TABLE) String table,
       @QueryParam("purge") @DefaultValue("false") boolean purge) {
     LOG.info(
         "Received {} table request: {}.{}.{}.{}",
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTableOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTableOperations.java
index 47537813bf..33d4ebcbb8 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTableOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTableOperations.java
@@ -88,14 +88,13 @@ import org.apache.gravitino.rel.types.Types;
 import org.apache.gravitino.rest.RESTUtils;
 import org.glassfish.jersey.internal.inject.AbstractBinder;
 import org.glassfish.jersey.server.ResourceConfig;
-import org.glassfish.jersey.test.JerseyTest;
 import org.glassfish.jersey.test.TestProperties;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
-public class TestTableOperations extends JerseyTest {
+public class TestTableOperations extends BaseOperationsTest {
 
   private static class MockServletRequestFactory extends 
ServletRequestFactoryBase {
     @Override
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/authorization/MockAuthorizationExpressionEvaluator.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/authorization/MockAuthorizationExpressionEvaluator.java
new file mode 100644
index 0000000000..e782f1dadb
--- /dev/null
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/authorization/MockAuthorizationExpressionEvaluator.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.server.web.rest.authorization;
+
+import java.util.Set;
+import java.util.regex.Matcher;
+import ognl.Ognl;
+import ognl.OgnlContext;
+import ognl.OgnlException;
+import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConverter;
+
+/** MockAuthorizationExpressionEvaluator for test authorization in rest api. */
+public class MockAuthorizationExpressionEvaluator {
+
+  private final String ognlExpression;
+
+  public MockAuthorizationExpressionEvaluator(String expression) {
+    expression = 
AuthorizationExpressionConverter.replaceAnyPrivilege(expression);
+    expression = 
AuthorizationExpressionConverter.replaceAnyExpressions(expression);
+    Matcher matcher = 
AuthorizationExpressionConverter.PATTERN.matcher(expression);
+    StringBuffer result = new StringBuffer();
+    while (matcher.find()) {
+      String metadataPrivilege = matcher.group(0);
+      String replacement;
+      replacement = String.format("authorizer.authorize('%s')", 
metadataPrivilege);
+      matcher.appendReplacement(result, replacement);
+    }
+    matcher.appendTail(result);
+    this.ognlExpression = result.toString();
+  }
+
+  /**
+   * mock authorization with privilege
+   *
+   * @param mockPrivileges mock user has some privilege
+   * @return mock authorization result
+   * @throws OgnlException OgnlException
+   */
+  public boolean getResult(Set<String> mockPrivileges) throws OgnlException {
+    MockAuthorizer mockAuthorizer = new MockAuthorizer(mockPrivileges);
+    OgnlContext ognlContext = Ognl.createDefaultContext(null);
+    ognlContext.put("authorizer", mockAuthorizer);
+    Object value = Ognl.getValue(ognlExpression, ognlContext);
+    return (boolean) value;
+  }
+
+  private static final class MockAuthorizer {
+
+    private Set<String> mockPrivilege;
+
+    private MockAuthorizer(Set<String> mockPrivilege) {
+      this.mockPrivilege = mockPrivilege;
+    }
+
+    public boolean authorize(String metadataPrivilege) {
+      return mockPrivilege.contains(metadataPrivilege);
+    }
+  }
+}
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/authorization/TestCatalogAuthorizationExpression.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/authorization/TestCatalogAuthorizationExpression.java
new file mode 100644
index 0000000000..2cc3851255
--- /dev/null
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/authorization/TestCatalogAuthorizationExpression.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.server.web.rest.authorization;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.google.common.collect.ImmutableSet;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import ognl.OgnlException;
+import org.apache.gravitino.dto.requests.CatalogCreateRequest;
+import org.apache.gravitino.dto.requests.CatalogUpdatesRequest;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
+import org.apache.gravitino.server.web.rest.CatalogOperations;
+import org.junit.jupiter.api.Test;
+
+public class TestCatalogAuthorizationExpression {
+
+  @Test
+  public void testCreateCatalog() throws NoSuchMethodException, OgnlException {
+    Method method =
+        CatalogOperations.class.getMethod(
+            "createCatalog", String.class, CatalogCreateRequest.class);
+    AuthorizationExpression authorizationExpressionAnnotation =
+        method.getAnnotation(AuthorizationExpression.class);
+    String expression = authorizationExpressionAnnotation.expression();
+    MockAuthorizationExpressionEvaluator mockEvaluator =
+        new MockAuthorizationExpressionEvaluator(expression);
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_METALAKE")));
+    
assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::CREATE_CATALOG")));
+  }
+
+  @Test
+  public void testLoadCatalog() throws NoSuchMethodException, OgnlException {
+    Method method = CatalogOperations.class.getMethod("loadCatalog", 
String.class, String.class);
+    AuthorizationExpression authorizationExpressionAnnotation =
+        method.getAnnotation(AuthorizationExpression.class);
+    String expression = authorizationExpressionAnnotation.expression();
+    MockAuthorizationExpressionEvaluator mockEvaluator =
+        new MockAuthorizationExpressionEvaluator(expression);
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_SCHEMA")));
+    
assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("CATALOG::OWNER")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::CREATE_CATALOG")));
+    
assertTrue(mockEvaluator.getResult(ImmutableSet.of("CATALOG::USE_CATALOG")));
+  }
+
+  @Test
+  public void testListCatalog() throws NoSuchFieldException, 
IllegalAccessException, OgnlException {
+    Field loadTableAuthorizationExpressionField =
+        
CatalogOperations.class.getDeclaredField("loadCatalogAuthorizationExpression");
+    loadTableAuthorizationExpressionField.setAccessible(true);
+    String loadTableAuthExpression = (String) 
loadTableAuthorizationExpressionField.get(null);
+    MockAuthorizationExpressionEvaluator mockEvaluator =
+        new MockAuthorizationExpressionEvaluator(loadTableAuthExpression);
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_SCHEMA")));
+    
assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("CATALOG::OWNER")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::CREATE_CATALOG")));
+    
assertTrue(mockEvaluator.getResult(ImmutableSet.of("CATALOG::USE_CATALOG")));
+  }
+
+  @Test
+  public void testAlterCatalog() throws NoSuchMethodException, OgnlException {
+    Method method =
+        CatalogOperations.class.getMethod(
+            "alterCatalog", String.class, String.class, 
CatalogUpdatesRequest.class);
+    AuthorizationExpression authorizationExpressionAnnotation =
+        method.getAnnotation(AuthorizationExpression.class);
+    String expression = authorizationExpressionAnnotation.expression();
+    MockAuthorizationExpressionEvaluator mockEvaluator =
+        new MockAuthorizationExpressionEvaluator(expression);
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_SCHEMA")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("CATALOG::OWNER")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::CREATE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::USE_CATALOG")));
+  }
+
+  @Test
+  public void testDropCatalog() throws NoSuchMethodException, OgnlException {
+    Method method =
+        CatalogOperations.class.getMethod("dropCatalog", String.class, 
String.class, boolean.class);
+    AuthorizationExpression authorizationExpressionAnnotation =
+        method.getAnnotation(AuthorizationExpression.class);
+    String expression = authorizationExpressionAnnotation.expression();
+    MockAuthorizationExpressionEvaluator mockEvaluator =
+        new MockAuthorizationExpressionEvaluator(expression);
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_SCHEMA")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("CATALOG::OWNER")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::CREATE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::USE_CATALOG")));
+  }
+}
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/authorization/TestSchemaAuthorizationExpression.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/authorization/TestSchemaAuthorizationExpression.java
new file mode 100644
index 0000000000..fe1a85fc9b
--- /dev/null
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/authorization/TestSchemaAuthorizationExpression.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.server.web.rest.authorization;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.google.common.collect.ImmutableSet;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import ognl.OgnlException;
+import org.apache.gravitino.dto.requests.SchemaCreateRequest;
+import org.apache.gravitino.dto.requests.SchemaUpdatesRequest;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
+import org.apache.gravitino.server.web.rest.SchemaOperations;
+import org.junit.jupiter.api.Test;
+
+public class TestSchemaAuthorizationExpression {
+
+  @Test
+  public void testCreateSchema() throws NoSuchMethodException, OgnlException {
+    Method method =
+        SchemaOperations.class.getMethod(
+            "createSchema", String.class, String.class, 
SchemaCreateRequest.class);
+    AuthorizationExpression authorizationExpressionAnnotation =
+        method.getAnnotation(AuthorizationExpression.class);
+    String expression = authorizationExpressionAnnotation.expression();
+    MockAuthorizationExpressionEvaluator mockEvaluator =
+        new MockAuthorizationExpressionEvaluator(expression);
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_METALAKE")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::CREATE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::CREATE_SCHEMA")));
+    assertTrue(
+        mockEvaluator.getResult(
+            ImmutableSet.of("METALAKE::CREATE_SCHEMA", 
"METALAKE::USE_CATALOG")));
+  }
+
+  @Test
+  public void testLoadSchema() throws NoSuchMethodException, OgnlException {
+    Method method =
+        SchemaOperations.class.getMethod("loadSchema", String.class, 
String.class, String.class);
+    AuthorizationExpression authorizationExpressionAnnotation =
+        method.getAnnotation(AuthorizationExpression.class);
+    String expression = authorizationExpressionAnnotation.expression();
+    MockAuthorizationExpressionEvaluator mockEvaluator =
+        new MockAuthorizationExpressionEvaluator(expression);
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_SCHEMA")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("CATALOG::OWNER")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::CREATE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::USE_CATALOG")));
+    assertTrue(
+        mockEvaluator.getResult(ImmutableSet.of("CATALOG::USE_CATALOG", 
"METALAKE::USE_SCHEMA")));
+  }
+
+  @Test
+  public void testListSchema() throws NoSuchFieldException, 
IllegalAccessException, OgnlException {
+    Field loadTableAuthorizationExpressionField =
+        
SchemaOperations.class.getDeclaredField("loadSchemaAuthorizationExpression");
+    loadTableAuthorizationExpressionField.setAccessible(true);
+    String loadTableAuthExpression = (String) 
loadTableAuthorizationExpressionField.get(null);
+    MockAuthorizationExpressionEvaluator mockEvaluator =
+        new MockAuthorizationExpressionEvaluator(loadTableAuthExpression);
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("CATALOG::OWNER")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::CREATE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::USE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_SCHEMA")));
+    assertTrue(
+        mockEvaluator.getResult(ImmutableSet.of("CATALOG::USE_CATALOG", 
"METALAKE::USE_SCHEMA")));
+  }
+
+  @Test
+  public void testAlterSchema() throws NoSuchMethodException, OgnlException {
+    Method method =
+        SchemaOperations.class.getMethod(
+            "alterSchema", String.class, String.class, String.class, 
SchemaUpdatesRequest.class);
+    AuthorizationExpression authorizationExpressionAnnotation =
+        method.getAnnotation(AuthorizationExpression.class);
+    String expression = authorizationExpressionAnnotation.expression();
+    MockAuthorizationExpressionEvaluator mockEvaluator =
+        new MockAuthorizationExpressionEvaluator(expression);
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_SCHEMA")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("CATALOG::OWNER")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::CREATE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::USE_CATALOG")));
+    assertFalse(
+        mockEvaluator.getResult(ImmutableSet.of("CATALOG::USE_CATALOG", 
"METALAKE::USE_SCHEMA")));
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER", 
"CATALOG::USE_CATALOG")));
+  }
+
+  @Test
+  public void testDropSchema() throws NoSuchMethodException, OgnlException {
+    Method method =
+        SchemaOperations.class.getMethod(
+            "dropSchema", String.class, String.class, String.class, 
boolean.class);
+    AuthorizationExpression authorizationExpressionAnnotation =
+        method.getAnnotation(AuthorizationExpression.class);
+    String expression = authorizationExpressionAnnotation.expression();
+    MockAuthorizationExpressionEvaluator mockEvaluator =
+        new MockAuthorizationExpressionEvaluator(expression);
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_SCHEMA")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("CATALOG::OWNER")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::CREATE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::USE_CATALOG")));
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER", 
"CATALOG::USE_CATALOG")));
+  }
+}
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/authorization/TestTableAuthorizationExpression.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/authorization/TestTableAuthorizationExpression.java
new file mode 100644
index 0000000000..2ab74c08b2
--- /dev/null
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/authorization/TestTableAuthorizationExpression.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.server.web.rest.authorization;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.google.common.collect.ImmutableSet;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import ognl.OgnlException;
+import org.apache.gravitino.dto.requests.TableCreateRequest;
+import org.apache.gravitino.dto.requests.TableUpdatesRequest;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
+import org.apache.gravitino.server.web.rest.TableOperations;
+import org.junit.jupiter.api.Test;
+
+public class TestTableAuthorizationExpression {
+
+  @Test
+  public void testCreateTable() throws NoSuchMethodException, OgnlException {
+    Method method =
+        TableOperations.class.getMethod(
+            "createTable", String.class, String.class, String.class, 
TableCreateRequest.class);
+    AuthorizationExpression authorizationExpressionAnnotation =
+        method.getAnnotation(AuthorizationExpression.class);
+    String expression = authorizationExpressionAnnotation.expression();
+    MockAuthorizationExpressionEvaluator mockEvaluator =
+        new MockAuthorizationExpressionEvaluator(expression);
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("CATALOG::OWNER")));
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER", 
"CATALOG::USE_CATALOG")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER", 
"METALAKE::USE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TABLE")));
+    assertFalse(
+        mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TABLE", 
"SCHEMA::USE_SCHEMA")));
+    assertTrue(
+        mockEvaluator.getResult(
+            ImmutableSet.of("SCHEMA::CREATE_TABLE", "SCHEMA::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+    assertTrue(
+        mockEvaluator.getResult(
+            ImmutableSet.of(
+                "CATALOG::CREATE_TABLE", "CATALOG::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+    assertTrue(
+        mockEvaluator.getResult(
+            ImmutableSet.of(
+                "METALAKE::CREATE_TABLE", "METALAKE::USE_SCHEMA", 
"METALAKE::USE_CATALOG")));
+  }
+
+  @Test
+  public void testListTable() throws IllegalAccessException, OgnlException, 
NoSuchFieldException {
+    Field loadTableAuthorizationExpressionField =
+        
TableOperations.class.getDeclaredField("loadTableAuthorizationExpression");
+    loadTableAuthorizationExpressionField.setAccessible(true);
+    String loadTableAuthExpression = (String) 
loadTableAuthorizationExpressionField.get(null);
+    MockAuthorizationExpressionEvaluator mockEvaluator =
+        new MockAuthorizationExpressionEvaluator(loadTableAuthExpression);
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("CATALOG::OWNER")));
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER", 
"CATALOG::USE_CATALOG")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER", 
"METALAKE::USE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TABLE")));
+    assertFalse(
+        mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TABLE", 
"SCHEMA::USE_SCHEMA")));
+    assertFalse(
+        mockEvaluator.getResult(
+            ImmutableSet.of("SCHEMA::CREATE_TABLE", "SCHEMA::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::SELECT_TABLE")));
+    assertFalse(
+        mockEvaluator.getResult(ImmutableSet.of("SCHEMA::SELECT_TABLE", 
"SCHEMA::USE_SCHEMA")));
+    assertTrue(
+        mockEvaluator.getResult(
+            ImmutableSet.of("SCHEMA::SELECT_TABLE", "SCHEMA::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+    assertTrue(
+        mockEvaluator.getResult(
+            ImmutableSet.of(
+                "CATALOG::SELECT_TABLE", "CATALOG::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+    assertTrue(
+        mockEvaluator.getResult(
+            ImmutableSet.of(
+                "METALAKE::SELECT_TABLE", "METALAKE::USE_SCHEMA", 
"METALAKE::USE_CATALOG")));
+  }
+
+  @Test
+  public void testLoadTable() throws NoSuchMethodException, OgnlException {
+    Method method =
+        TableOperations.class.getMethod(
+            "loadTable", String.class, String.class, String.class, 
String.class);
+    AuthorizationExpression authorizationExpressionAnnotation =
+        method.getAnnotation(AuthorizationExpression.class);
+    String expression = authorizationExpressionAnnotation.expression();
+    MockAuthorizationExpressionEvaluator mockEvaluator =
+        new MockAuthorizationExpressionEvaluator(expression);
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("CATALOG::OWNER")));
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER", 
"CATALOG::USE_CATALOG")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER", 
"METALAKE::USE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TABLE")));
+    assertFalse(
+        mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TABLE", 
"SCHEMA::USE_SCHEMA")));
+    assertFalse(
+        mockEvaluator.getResult(
+            ImmutableSet.of("SCHEMA::CREATE_TABLE", "SCHEMA::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::SELECT_TABLE")));
+    assertFalse(
+        mockEvaluator.getResult(ImmutableSet.of("SCHEMA::SELECT_TABLE", 
"SCHEMA::USE_SCHEMA")));
+    assertTrue(
+        mockEvaluator.getResult(
+            ImmutableSet.of("SCHEMA::SELECT_TABLE", "SCHEMA::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+    assertTrue(
+        mockEvaluator.getResult(
+            ImmutableSet.of(
+                "CATALOG::SELECT_TABLE", "CATALOG::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+    assertTrue(
+        mockEvaluator.getResult(
+            ImmutableSet.of(
+                "METALAKE::SELECT_TABLE", "METALAKE::USE_SCHEMA", 
"METALAKE::USE_CATALOG")));
+  }
+
+  @Test
+  public void testAlterTable() throws NoSuchMethodException, OgnlException {
+    Method method =
+        TableOperations.class.getMethod(
+            "alterTable",
+            String.class,
+            String.class,
+            String.class,
+            String.class,
+            TableUpdatesRequest.class);
+    AuthorizationExpression authorizationExpressionAnnotation =
+        method.getAnnotation(AuthorizationExpression.class);
+    String expression = authorizationExpressionAnnotation.expression();
+    MockAuthorizationExpressionEvaluator mockEvaluator =
+        new MockAuthorizationExpressionEvaluator(expression);
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("CATALOG::OWNER")));
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER", 
"CATALOG::USE_CATALOG")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER", 
"METALAKE::USE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TABLE")));
+    assertFalse(
+        mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TABLE", 
"SCHEMA::USE_SCHEMA")));
+    assertFalse(
+        mockEvaluator.getResult(
+            ImmutableSet.of("SCHEMA::CREATE_TABLE", "SCHEMA::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::SELECT_TABLE")));
+    assertFalse(
+        mockEvaluator.getResult(ImmutableSet.of("SCHEMA::SELECT_TABLE", 
"SCHEMA::USE_SCHEMA")));
+    assertFalse(
+        mockEvaluator.getResult(
+            ImmutableSet.of("SCHEMA::SELECT_TABLE", "SCHEMA::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+    assertFalse(
+        mockEvaluator.getResult(
+            ImmutableSet.of(
+                "CATALOG::SELECT_TABLE", "CATALOG::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+    assertFalse(
+        mockEvaluator.getResult(
+            ImmutableSet.of(
+                "METALAKE::SELECT_TABLE", "METALAKE::USE_SCHEMA", 
"METALAKE::USE_CATALOG")));
+    assertFalse(
+        mockEvaluator.getResult(ImmutableSet.of("SCHEMA::MODIFY_TABLE", 
"SCHEMA::USE_SCHEMA")));
+    assertTrue(
+        mockEvaluator.getResult(
+            ImmutableSet.of("SCHEMA::MODIFY_TABLE", "SCHEMA::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+    assertTrue(
+        mockEvaluator.getResult(
+            ImmutableSet.of(
+                "CATALOG::MODIFY_TABLE", "CATALOG::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+    assertTrue(
+        mockEvaluator.getResult(
+            ImmutableSet.of(
+                "METALAKE::MODIFY_TABLE", "METALAKE::USE_SCHEMA", 
"METALAKE::USE_CATALOG")));
+  }
+
+  @Test
+  public void testDropTable() throws NoSuchMethodException, OgnlException {
+    Method method =
+        TableOperations.class.getMethod(
+            "dropTable", String.class, String.class, String.class, 
String.class, boolean.class);
+    AuthorizationExpression authorizationExpressionAnnotation =
+        method.getAnnotation(AuthorizationExpression.class);
+    String expression = authorizationExpressionAnnotation.expression();
+    MockAuthorizationExpressionEvaluator mockEvaluator =
+        new MockAuthorizationExpressionEvaluator(expression);
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("CATALOG::OWNER")));
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER", 
"CATALOG::USE_CATALOG")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER", 
"METALAKE::USE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TABLE")));
+    assertFalse(
+        mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TABLE", 
"SCHEMA::USE_SCHEMA")));
+    assertFalse(
+        mockEvaluator.getResult(
+            ImmutableSet.of("SCHEMA::CREATE_TABLE", "SCHEMA::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::SELECT_TABLE")));
+    assertFalse(
+        mockEvaluator.getResult(ImmutableSet.of("SCHEMA::SELECT_TABLE", 
"SCHEMA::USE_SCHEMA")));
+    assertFalse(
+        mockEvaluator.getResult(
+            ImmutableSet.of("SCHEMA::SELECT_TABLE", "SCHEMA::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+    assertFalse(
+        mockEvaluator.getResult(
+            ImmutableSet.of(
+                "CATALOG::SELECT_TABLE", "CATALOG::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+    assertFalse(
+        mockEvaluator.getResult(
+            ImmutableSet.of(
+                "METALAKE::SELECT_TABLE", "METALAKE::USE_SCHEMA", 
"METALAKE::USE_CATALOG")));
+    assertFalse(
+        mockEvaluator.getResult(ImmutableSet.of("SCHEMA::MODIFY_TABLE", 
"SCHEMA::USE_SCHEMA")));
+    assertFalse(
+        mockEvaluator.getResult(
+            ImmutableSet.of("SCHEMA::MODIFY_TABLE", "SCHEMA::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+    assertFalse(
+        mockEvaluator.getResult(
+            ImmutableSet.of(
+                "CATALOG::MODIFY_TABLE", "CATALOG::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+    assertFalse(
+        mockEvaluator.getResult(
+            ImmutableSet.of(
+                "METALAKE::MODIFY_TABLE", "METALAKE::USE_SCHEMA", 
"METALAKE::USE_CATALOG")));
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of("TABLE::OWNER")));
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of("TABLE::OWNER", 
"SCHEMA::USE_SCHEMA")));
+    assertTrue(
+        mockEvaluator.getResult(
+            ImmutableSet.of("TABLE::OWNER", "SCHEMA::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+  }
+}

Reply via email to