This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d24bd25dc [#8943] feat(authz): support policy access control (#9252)
5d24bd25dc is described below

commit 5d24bd25dc50eaf372d73c58701a51927066e3b3
Author: yangyang zhong <[email protected]>
AuthorDate: Fri Nov 28 10:37:25 2025 +0800

    [#8943] feat(authz): support policy access control (#9252)
    
    ### What changes were proposed in this pull request?
    
    support policy access control
    
    ### Why are the changes needed?
    
    Fix: #8943
    
    ### Does this PR introduce _any_ user-facing change?
    
    None
    
    ### How was this patch tested?
    
    
org.apache.gravitino.client.integration.test.authorization.PolicyAuthorizationIT
---
 .../test/authorization/PolicyAuthorizationIT.java  | 387 +++++++++++++++++++++
 .../java/org/apache/gravitino/GravitinoEnv.java    |   4 +-
 .../authorization/AuthorizationRequestContext.java |  10 +
 .../authorization/AuthorizationUtils.java          |   3 +-
 .../authorization/GravitinoAuthorizer.java         |   7 +-
 .../apache/gravitino/cache/ReverseIndexRules.java  |   8 +
 .../gravitino/hook/PolicyHookDispatcher.java       | 128 +++++++
 .../RelationalEntityStoreIdResolver.java           |   9 +-
 .../relational/mapper/PolicyMetaMapper.java        |  29 ++
 .../mapper/PolicyMetaSQLProviderFactory.java       |   9 +
 .../provider/base/PolicyMetaBaseSQLProvider.java   |  26 ++
 .../relational/service/MetadataObjectService.java  |  20 +-
 .../relational/service/PolicyMetaService.java      |  21 ++
 .../apache/gravitino/utils/MetadataObjectUtil.java |  14 +
 .../apache/gravitino/utils/NameIdentifierUtil.java |  12 +
 .../server/authorization/MetadataAuthzHelper.java  |   1 +
 .../server/authorization/MetadataIdConverter.java  |  19 +-
 .../authorization/PassThroughAuthorizer.java       |   6 +-
 .../annotations/AuthorizationRequest.java          |   3 +-
 .../AuthorizationExpressionConstants.java          |   5 +
 .../AuthorizationExpressionConverter.java          |  11 +-
 .../AuthorizationExpressionEvaluator.java          |   6 +
 .../authorization/jcasbin/JcasbinAuthorizer.java   |  37 +-
 .../authorization/MockGravitinoAuthorizer.java     |   6 +-
 .../authorization/TestPassThroughAuthorizer.java   |   4 +-
 .../TestAuthorizationExpressionConverter.java      |   6 +-
 .../jcasbin/TestJcasbinAuthorizer.java             |   4 +-
 .../web/filter/GravitinoInterceptionService.java   |   7 +-
 .../gravitino/server/web/filter/ParameterUtil.java |  10 +
 ...a => AssociatePolicyAuthorizationExecutor.java} |  60 ++--
 .../AssociateTagAuthorizationExecutor.java         |  14 +-
 .../authorization/AuthorizeExecutorFactory.java    |  12 +-
 .../authorization/CommonAuthorizerExecutor.java    |  13 +-
 .../web/rest/MetadataObjectPolicyOperations.java   |  52 ++-
 .../server/web/rest/PolicyOperations.java          |  62 +++-
 .../filter/TestGravitinoInterceptionService.java   |   6 +-
 .../server/web/rest/TestPolicyOperations.java      |   3 +-
 37 files changed, 913 insertions(+), 121 deletions(-)

diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/PolicyAuthorizationIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/PolicyAuthorizationIT.java
new file mode 100644
index 0000000000..211137b493
--- /dev/null
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/PolicyAuthorizationIT.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.client.integration.test.authorization;
+
+import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.ForbiddenException;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.container.HiveContainer;
+import org.apache.gravitino.policy.PolicyChange;
+import org.apache.gravitino.policy.PolicyContents;
+import org.apache.gravitino.policy.SupportsPolicies;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestMethodOrder;
+
+@Tag("gravitino-docker-test")
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class PolicyAuthorizationIT extends BaseRestApiAuthorizationIT {
+
+  private static final String CATALOG = "catalog";
+  private static final String SCHEMA = "schema";
+  private static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
+  private static String hmsUri;
+  private static String role = "role";
+
+  @BeforeAll
+  public void startIntegrationTest() throws Exception {
+    containerSuite.startHiveContainer();
+    super.startIntegrationTest();
+    hmsUri =
+        String.format(
+            "thrift://%s:%d",
+            containerSuite.getHiveContainer().getContainerIpAddress(),
+            HiveContainer.HIVE_METASTORE_PORT);
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("metastore.uris", hmsUri);
+    client
+        .loadMetalake(METALAKE)
+        .createCatalog(CATALOG, Catalog.Type.RELATIONAL, "hive", "comment", 
properties)
+        .asSchemas()
+        .createSchema(SCHEMA, "test", new HashMap<>());
+    // try to load the schema as normal user, expect failure
+    assertThrows(
+        "Can not access metadata.{" + CATALOG + "." + SCHEMA + "}.",
+        RuntimeException.class,
+        () -> {
+          normalUserClient
+              .loadMetalake(METALAKE)
+              .loadCatalog(CATALOG)
+              .asSchemas()
+              .loadSchema(SCHEMA);
+        });
+    // grant tester privilege
+    List<SecurableObject> securableObjects = new ArrayList<>();
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    SecurableObject catalogObject =
+        SecurableObjects.ofCatalog(CATALOG, 
ImmutableList.of(Privileges.UseCatalog.allow()));
+    securableObjects.add(catalogObject);
+    gravitinoMetalake.createRole(role, new HashMap<>(), securableObjects);
+    gravitinoMetalake.grantRolesToUser(ImmutableList.of(role), NORMAL_USER);
+    // normal user can load the catalog but not the schema
+    Catalog catalogLoadByNormalUser = 
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG);
+    assertEquals(CATALOG, catalogLoadByNormalUser.name());
+    assertThrows(
+        "Can not access metadata.{" + CATALOG + "." + SCHEMA + "}.",
+        ForbiddenException.class,
+        () -> {
+          catalogLoadByNormalUser.asSchemas().loadSchema(SCHEMA);
+        });
+    TableCatalog tableCatalog = 
client.loadMetalake(METALAKE).loadCatalog(CATALOG).asTableCatalog();
+    tableCatalog.createTable(
+        NameIdentifier.of(SCHEMA, "table1"), createColumns(), "test", new 
HashMap<>());
+  }
+
+  private Column[] createColumns() {
+    return new Column[] {Column.of("col1", Types.StringType.get())};
+  }
+
+  @AfterAll
+  public void stopIntegrationTest() throws IOException, InterruptedException {
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    gravitinoMetalake.loadCatalog(CATALOG).asSchemas().dropSchema(SCHEMA, 
true);
+    gravitinoMetalake.dropCatalog(CATALOG, true);
+    super.stopIntegrationTest();
+  }
+
+  @Test
+  @Order(1)
+  public void createPolicy() {
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    Set<MetadataObject.Type> supportedTypes = 
ImmutableSet.of(MetadataObject.Type.TABLE);
+    gravitinoMetalake.createPolicy(
+        "policy1", "custom", "policy1", true, PolicyContents.custom(null, 
supportedTypes, null));
+    GravitinoMetalake metalakeLoadByNormalUser = 
normalUserClient.loadMetalake(METALAKE);
+    assertThrows(
+        "Can not access metadata.{" + CATALOG + "." + SCHEMA + "}.",
+        ForbiddenException.class,
+        () -> {
+          metalakeLoadByNormalUser.createPolicy(
+              "policy2",
+              "custom",
+              "policy2",
+              true,
+              PolicyContents.custom(null, supportedTypes, null));
+        });
+    gravitinoMetalake.grantPrivilegesToRole(
+        role,
+        MetadataObjects.of(ImmutableList.of(METALAKE), 
MetadataObject.Type.METALAKE),
+        ImmutableSet.of(Privileges.CreatePolicy.allow()));
+    metalakeLoadByNormalUser.createPolicy(
+        "policy2", "custom", "policy2", true, PolicyContents.custom(null, 
supportedTypes, null));
+    gravitinoMetalake.createPolicy(
+        "policy3", "custom", "policy3", true, PolicyContents.custom(null, 
supportedTypes, null));
+  }
+
+  @Test
+  @Order(2)
+  public void listPolicy() {
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    String[] policies = gravitinoMetalake.listPolicies();
+    Assertions.assertArrayEquals(new String[] {"policy1", "policy2", 
"policy3"}, policies);
+    policies = normalUserClient.loadMetalake(METALAKE).listPolicies();
+    Assertions.assertArrayEquals(new String[] {"policy2"}, policies);
+  }
+
+  @Test
+  @Order(3)
+  public void testGetPolicy() {
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    gravitinoMetalake.getPolicy("policy1");
+    gravitinoMetalake.getPolicy("policy2");
+    gravitinoMetalake.getPolicy("policy3");
+    GravitinoMetalake metalakeLoadByNormalUser = 
normalUserClient.loadMetalake(METALAKE);
+    assertThrows(
+        "Can not access metadata.{" + CATALOG + "." + SCHEMA + "}.",
+        ForbiddenException.class,
+        () -> {
+          metalakeLoadByNormalUser.getPolicy("policy1");
+        });
+    metalakeLoadByNormalUser.getPolicy("policy2");
+    assertThrows(
+        "Can not access metadata.{" + CATALOG + "." + SCHEMA + "}.",
+        ForbiddenException.class,
+        () -> {
+          metalakeLoadByNormalUser.getPolicy("policy3");
+        });
+
+    gravitinoMetalake.grantPrivilegesToRole(
+        role,
+        MetadataObjects.of(ImmutableList.of("policy3"), 
MetadataObject.Type.POLICY),
+        ImmutableSet.of(Privileges.ApplyPolicy.allow()));
+    metalakeLoadByNormalUser.getPolicy("policy3");
+    String[] policies = metalakeLoadByNormalUser.listPolicies();
+    Assertions.assertArrayEquals(new String[] {"policy2", "policy3"}, 
policies);
+  }
+
+  @Test
+  @Order(4)
+  public void testAlterPolicy() {
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    GravitinoMetalake metalakeLoadByNormalUser = 
normalUserClient.loadMetalake(METALAKE);
+    gravitinoMetalake.alterPolicy("policy1", 
PolicyChange.updateComment("222"));
+    assertThrows(
+        "Can not access metadata.{" + CATALOG + "." + SCHEMA + "}.",
+        ForbiddenException.class,
+        () -> {
+          metalakeLoadByNormalUser.alterPolicy("policy3", 
PolicyChange.updateComment("222"));
+        });
+    metalakeLoadByNormalUser.alterPolicy("policy2", 
PolicyChange.updateComment("222"));
+  }
+
+  @Test
+  @Order(6)
+  public void testAssociatePolicy() {
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    GravitinoMetalake metalakeLoadByNormalUser = 
normalUserClient.loadMetalake(METALAKE);
+    SupportsPolicies supportsPolicies =
+        gravitinoMetalake
+            .loadCatalog(CATALOG)
+            .asTableCatalog()
+            .loadTable(NameIdentifier.of(SCHEMA, "table1"))
+            .supportsPolicies();
+    gravitinoMetalake.grantPrivilegesToRole(
+        role,
+        MetadataObjects.of(ImmutableList.of(CATALOG, SCHEMA), 
MetadataObject.Type.SCHEMA),
+        ImmutableList.of(Privileges.SelectTable.allow(), 
Privileges.UseSchema.allow()));
+    SupportsPolicies supportsPoliciesByNormalUser =
+        metalakeLoadByNormalUser
+            .loadCatalog(CATALOG)
+            .asTableCatalog()
+            .loadTable(NameIdentifier.of(SCHEMA, "table1"))
+            .supportsPolicies();
+    supportsPolicies.associatePolicies(new String[] {"policy1"}, new String[] 
{});
+    assertThrows(
+        "Can not access metadata.",
+        ForbiddenException.class,
+        () -> {
+          supportsPoliciesByNormalUser.associatePolicies(new String[] 
{"policy1"}, new String[] {});
+        });
+    gravitinoMetalake.grantPrivilegesToRole(
+        role,
+        MetadataObjects.of(ImmutableList.of(METALAKE), 
MetadataObject.Type.METALAKE),
+        ImmutableSet.of(Privileges.CreateTable.allow()));
+    TableCatalog tableCatalog =
+        
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG).asTableCatalog();
+    tableCatalog.createTable(
+        NameIdentifier.of(SCHEMA, "table2"), createColumns(), "test", new 
HashMap<>());
+    tableCatalog.createTable(
+        NameIdentifier.of(SCHEMA, "table3"), createColumns(), "test", new 
HashMap<>());
+    metalakeLoadByNormalUser
+        .loadCatalog(CATALOG)
+        .asTableCatalog()
+        .loadTable(NameIdentifier.of(SCHEMA, "table2"))
+        .supportsPolicies()
+        .associatePolicies(new String[] {"policy2"}, new String[] {});
+
+    metalakeLoadByNormalUser
+        .loadCatalog(CATALOG)
+        .asTableCatalog()
+        .loadTable(NameIdentifier.of(SCHEMA, "table3"))
+        .supportsPolicies()
+        .associatePolicies(new String[] {"policy2"}, new String[] {});
+  }
+
+  @Test
+  @Order(7)
+  public void testListPolicyForTable() {
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    GravitinoMetalake gravitinoMetalakeLoadByNormalUser = 
normalUserClient.loadMetalake(METALAKE);
+    String[] policies =
+        gravitinoMetalake
+            .loadCatalog(CATALOG)
+            .asTableCatalog()
+            .loadTable(NameIdentifier.of(SCHEMA, "table1"))
+            .supportsPolicies()
+            .listPolicies();
+    Assertions.assertArrayEquals(new String[] {"policy1"}, policies);
+    policies =
+        gravitinoMetalake
+            .loadCatalog(CATALOG)
+            .asTableCatalog()
+            .loadTable(NameIdentifier.of(SCHEMA, "table2"))
+            .supportsPolicies()
+            .listPolicies();
+    Assertions.assertArrayEquals(new String[] {"policy2"}, policies);
+    policies =
+        gravitinoMetalake
+            .loadCatalog(CATALOG)
+            .asTableCatalog()
+            .loadTable(NameIdentifier.of(SCHEMA, "table3"))
+            .supportsPolicies()
+            .listPolicies();
+    Assertions.assertArrayEquals(new String[] {"policy2"}, policies);
+    policies =
+        gravitinoMetalakeLoadByNormalUser
+            .loadCatalog(CATALOG)
+            .asTableCatalog()
+            .loadTable(NameIdentifier.of(SCHEMA, "table1"))
+            .supportsPolicies()
+            .listPolicies();
+    Assertions.assertArrayEquals(new String[] {}, policies);
+    policies =
+        gravitinoMetalakeLoadByNormalUser
+            .loadCatalog(CATALOG)
+            .asTableCatalog()
+            .loadTable(NameIdentifier.of(SCHEMA, "table2"))
+            .supportsPolicies()
+            .listPolicies();
+    Assertions.assertArrayEquals(new String[] {"policy2"}, policies);
+    policies =
+        gravitinoMetalakeLoadByNormalUser
+            .loadCatalog(CATALOG)
+            .asTableCatalog()
+            .loadTable(NameIdentifier.of(SCHEMA, "table3"))
+            .supportsPolicies()
+            .listPolicies();
+    Assertions.assertArrayEquals(new String[] {"policy2"}, policies);
+  }
+
+  @Test
+  @Order(8)
+  public void testGetPolicyForTable() {
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    GravitinoMetalake gravitinoMetalakeLoadByNormalUser = 
normalUserClient.loadMetalake(METALAKE);
+    gravitinoMetalake
+        .loadCatalog(CATALOG)
+        .asTableCatalog()
+        .loadTable(NameIdentifier.of(SCHEMA, "table1"))
+        .supportsPolicies()
+        .getPolicy("policy1");
+    gravitinoMetalake
+        .loadCatalog(CATALOG)
+        .asTableCatalog()
+        .loadTable(NameIdentifier.of(SCHEMA, "table2"))
+        .supportsPolicies()
+        .getPolicy("policy2");
+    gravitinoMetalake
+        .loadCatalog(CATALOG)
+        .asTableCatalog()
+        .loadTable(NameIdentifier.of(SCHEMA, "table3"))
+        .supportsPolicies()
+        .getPolicy("policy2");
+    assertThrows(
+        "Can not access metadata.",
+        ForbiddenException.class,
+        () -> {
+          gravitinoMetalakeLoadByNormalUser
+              .loadCatalog(CATALOG)
+              .asTableCatalog()
+              .loadTable(NameIdentifier.of(SCHEMA, "table1"))
+              .supportsPolicies()
+              .getPolicy("policy1");
+        });
+
+    gravitinoMetalakeLoadByNormalUser
+        .loadCatalog(CATALOG)
+        .asTableCatalog()
+        .loadTable(NameIdentifier.of(SCHEMA, "table2"))
+        .supportsPolicies()
+        .getPolicy("policy2");
+    gravitinoMetalakeLoadByNormalUser
+        .loadCatalog(CATALOG)
+        .asTableCatalog()
+        .loadTable(NameIdentifier.of(SCHEMA, "table3"))
+        .supportsPolicies()
+        .getPolicy("policy2");
+  }
+
+  @Test
+  @Order(9)
+  public void testDropPolicy() {
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    GravitinoMetalake metalakeLoadByNormalUser = 
normalUserClient.loadMetalake(METALAKE);
+    assertThrows(
+        "Can not access metadata.",
+        ForbiddenException.class,
+        () -> {
+          metalakeLoadByNormalUser.deletePolicy("policy1");
+        });
+    gravitinoMetalake.deletePolicy("policy1");
+    metalakeLoadByNormalUser.deletePolicy("policy2");
+  }
+}
diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java 
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index 68c74a0800..318d07a624 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -55,6 +55,7 @@ import org.apache.gravitino.hook.CatalogHookDispatcher;
 import org.apache.gravitino.hook.FilesetHookDispatcher;
 import org.apache.gravitino.hook.MetalakeHookDispatcher;
 import org.apache.gravitino.hook.ModelHookDispatcher;
+import org.apache.gravitino.hook.PolicyHookDispatcher;
 import org.apache.gravitino.hook.SchemaHookDispatcher;
 import org.apache.gravitino.hook.TableHookDispatcher;
 import org.apache.gravitino.hook.TagHookDispatcher;
@@ -606,8 +607,9 @@ public class GravitinoEnv {
         new TagEventDispatcher(eventBus, new TagManager(idGenerator, 
entityStore));
     this.tagDispatcher = new TagHookDispatcher(tagEventDispatcher);
 
-    this.policyDispatcher =
+    PolicyEventDispatcher policyEventDispatcher =
         new PolicyEventDispatcher(eventBus, new PolicyManager(idGenerator, 
entityStore));
+    this.policyDispatcher = new PolicyHookDispatcher(policyEventDispatcher);
 
     this.jobOperationDispatcher =
         new JobEventDispatcher(eventBus, new JobManager(config, entityStore, 
idGenerator));
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationRequestContext.java
 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationRequestContext.java
index 804e6ae50b..1e1b635f63 100644
--- 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationRequestContext.java
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationRequestContext.java
@@ -36,6 +36,8 @@ public class AuthorizationRequestContext {
   /** Used to determine whether the role has already been loaded. */
   private final AtomicBoolean hasLoadRole = new AtomicBoolean();
 
+  private String originalAuthorizationExpression;
+
   /**
    * check allow
    *
@@ -84,6 +86,14 @@ public class AuthorizationRequestContext {
     hasLoadRole.set(true);
   }
 
+  public String getOriginalAuthorizationExpression() {
+    return originalAuthorizationExpression;
+  }
+
+  public void setOriginalAuthorizationExpression(String 
originalAuthorizationExpression) {
+    this.originalAuthorizationExpression = originalAuthorizationExpression;
+  }
+
   public static class AuthorizationKey {
     private Principal principal;
     private String metalake;
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
index 1d5dd79133..6fcd13121c 100644
--- 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
@@ -359,7 +359,8 @@ public class AuthorizationUtils {
   private static boolean needApplyAuthorization(MetadataObject.Type type) {
     return type != MetadataObject.Type.ROLE
         && type != MetadataObject.Type.METALAKE
-        && type != MetadataObject.Type.TAG;
+        && type != MetadataObject.Type.TAG
+        && type != MetadataObject.Type.POLICY;
   }
 
   private static void callAuthorizationPluginImpl(
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
 
b/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
index 7dcef770bd..7965f173ad 100644
--- 
a/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
@@ -65,9 +65,14 @@ public interface GravitinoAuthorizer extends Closeable {
    * @param principal the user principal
    * @param metalake the metalake
    * @param metadataObject the metadataObject.
+   * @param requestContext authorization request context
    * @return authorization result.
    */
-  boolean isOwner(Principal principal, String metalake, MetadataObject 
metadataObject);
+  boolean isOwner(
+      Principal principal,
+      String metalake,
+      MetadataObject metadataObject,
+      AuthorizationRequestContext requestContext);
 
   /**
    * Determine whether the user is the service admin.
diff --git 
a/core/src/main/java/org/apache/gravitino/cache/ReverseIndexRules.java 
b/core/src/main/java/org/apache/gravitino/cache/ReverseIndexRules.java
index ca7ad074a4..ace2fb044d 100644
--- a/core/src/main/java/org/apache/gravitino/cache/ReverseIndexRules.java
+++ b/core/src/main/java/org/apache/gravitino/cache/ReverseIndexRules.java
@@ -157,6 +157,14 @@ public class ReverseIndexRules {
                                 nsFileset.level(0),
                                 nsFileset.level(1));
                         break;
+                      case TAG:
+                        entityType = Entity.EntityType.TAG;
+                        namespace = 
NamespaceUtil.ofTag(roleEntity.namespace().level(0));
+                        break;
+                      case POLICY:
+                        entityType = Entity.EntityType.POLICY;
+                        namespace = 
NamespaceUtil.ofPolicy(roleEntity.namespace().level(0));
+                        break;
                       default:
                         throw new UnsupportedOperationException(
                             "Don't support securable object type: " + 
securableObject.type());
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/PolicyHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/PolicyHookDispatcher.java
new file mode 100644
index 0000000000..f0f996176a
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/hook/PolicyHookDispatcher.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hook;
+
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.exceptions.NoSuchPolicyException;
+import org.apache.gravitino.exceptions.PolicyAlreadyExistsException;
+import org.apache.gravitino.meta.PolicyEntity;
+import org.apache.gravitino.policy.Policy;
+import org.apache.gravitino.policy.PolicyChange;
+import org.apache.gravitino.policy.PolicyContent;
+import org.apache.gravitino.policy.PolicyDispatcher;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+
+public class PolicyHookDispatcher implements PolicyDispatcher {
+
+  private final PolicyDispatcher dispatcher;
+
+  public PolicyHookDispatcher(PolicyDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public String[] listPolicies(String metalake) {
+    return dispatcher.listPolicies(metalake);
+  }
+
+  @Override
+  public PolicyEntity[] listPolicyInfos(String metalake) {
+    return dispatcher.listPolicyInfos(metalake);
+  }
+
+  @Override
+  public PolicyEntity getPolicy(String metalake, String policyName) throws 
NoSuchPolicyException {
+    return dispatcher.getPolicy(metalake, policyName);
+  }
+
+  @Override
+  public PolicyEntity createPolicy(
+      String metalake,
+      String name,
+      Policy.BuiltInType type,
+      String comment,
+      boolean enabled,
+      PolicyContent content)
+      throws PolicyAlreadyExistsException {
+    PolicyEntity policy = dispatcher.createPolicy(metalake, name, type, 
comment, enabled, content);
+
+    // Set the creator as the owner of the policy.
+    OwnerDispatcher ownerDispatcher = 
GravitinoEnv.getInstance().ownerDispatcher();
+    if (ownerDispatcher != null) {
+      ownerDispatcher.setOwner(
+          metalake,
+          NameIdentifierUtil.toMetadataObject(
+              NameIdentifierUtil.ofPolicy(metalake, name), 
Entity.EntityType.POLICY),
+          PrincipalUtils.getCurrentUserName(),
+          Owner.Type.USER);
+    }
+    return policy;
+  }
+
+  @Override
+  public PolicyEntity alterPolicy(String metalake, String policyName, 
PolicyChange... changes) {
+    return dispatcher.alterPolicy(metalake, policyName, changes);
+  }
+
+  @Override
+  public void enablePolicy(String metalake, String policyName) throws 
NoSuchPolicyException {
+    dispatcher.enablePolicy(metalake, policyName);
+  }
+
+  @Override
+  public void disablePolicy(String metalake, String policyName) throws 
NoSuchPolicyException {
+    dispatcher.disablePolicy(metalake, policyName);
+  }
+
+  @Override
+  public boolean deletePolicy(String metalake, String policyName) {
+    return dispatcher.deletePolicy(metalake, policyName);
+  }
+
+  @Override
+  public MetadataObject[] listMetadataObjectsForPolicy(String metalake, String 
policyName) {
+    return dispatcher.listMetadataObjectsForPolicy(metalake, policyName);
+  }
+
+  @Override
+  public PolicyEntity[] listPolicyInfosForMetadataObject(
+      String metalake, MetadataObject metadataObject) {
+    return dispatcher.listPolicyInfosForMetadataObject(metalake, 
metadataObject);
+  }
+
+  @Override
+  public String[] associatePoliciesForMetadataObject(
+      String metalake,
+      MetadataObject metadataObject,
+      String[] policiesToAdd,
+      String[] policiesToRemove) {
+    return dispatcher.associatePoliciesForMetadataObject(
+        metalake, metadataObject, policiesToAdd, policiesToRemove);
+  }
+
+  @Override
+  public PolicyEntity getPolicyForMetadataObject(
+      String metalake, MetadataObject metadataObject, String policyName) {
+    return dispatcher.getPolicyForMetadataObject(metalake, metadataObject, 
policyName);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStoreIdResolver.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStoreIdResolver.java
index 9f46e47196..5794eeb358 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStoreIdResolver.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStoreIdResolver.java
@@ -31,6 +31,7 @@ import 
org.apache.gravitino.storage.relational.service.FilesetMetaService;
 import org.apache.gravitino.storage.relational.service.GroupMetaService;
 import org.apache.gravitino.storage.relational.service.MetalakeMetaService;
 import org.apache.gravitino.storage.relational.service.ModelMetaService;
+import org.apache.gravitino.storage.relational.service.PolicyMetaService;
 import org.apache.gravitino.storage.relational.service.RoleMetaService;
 import org.apache.gravitino.storage.relational.service.SchemaMetaService;
 import org.apache.gravitino.storage.relational.service.TableColumnMetaService;
@@ -47,7 +48,8 @@ public class RelationalEntityStoreIdResolver implements 
EntityIdResolver {
           Entity.EntityType.ROLE,
           Entity.EntityType.USER,
           Entity.EntityType.GROUP,
-          Entity.EntityType.TAG);
+          Entity.EntityType.TAG,
+          Entity.EntityType.POLICY);
   private static final Set<Entity.EntityType> 
ENTITY_TYPES_REQURING_CATALOG_IDS =
       ImmutableSet.of(Entity.EntityType.CATALOG);
   private static final Set<Entity.EntityType> ENTITY_TYPES_REQURING_SCHEMA_IDS 
=
@@ -133,6 +135,11 @@ public class RelationalEntityStoreIdResolver implements 
EntityIdResolver {
             TagMetaService.getInstance().getTagIdByTagName(metalakeId, 
nameIdentifier.name());
         return new NamespacedEntityId(tagId, metalakeId);
 
+      case POLICY:
+        long policyId =
+            PolicyMetaService.getInstance()
+                .getPolicyIdByPolicyName(metalakeId, nameIdentifier.name());
+        return new NamespacedEntityId(policyId, metalakeId);
       default:
         throw new IllegalArgumentException("Unsupported entity type: " + type);
     }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaMapper.java
index a3493be788..7531f2a52f 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaMapper.java
@@ -132,4 +132,33 @@ public interface PolicyMetaMapper {
       method = "deletePolicyMetasByLegacyTimeline")
   Integer deletePolicyMetasByLegacyTimeline(
       @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+
+  @Results({
+    @Result(property = "policyId", column = "policy_id"),
+    @Result(property = "policyName", column = "policy_name"),
+    @Result(property = "policyType", column = "policy_type"),
+    @Result(property = "metalakeId", column = "metalake_id"),
+    @Result(property = "auditInfo", column = "audit_info"),
+    @Result(property = "currentVersion", column = "current_version"),
+    @Result(property = "lastVersion", column = "last_version"),
+    @Result(property = "deletedAt", column = "deleted_at")
+  })
+  @SelectProvider(
+      type = PolicyMetaSQLProviderFactory.class,
+      method = "selectPolicyMetaByMetalakeIdAndName")
+  PolicyPO selectPolicyMetaByMetalakeIdAndName(
+      @Param("metalakeId") long metalakeId, @Param("policyName") String 
policyName);
+
+  @Results({
+    @Result(property = "policyId", column = "policy_id"),
+    @Result(property = "policyName", column = "policy_name"),
+    @Result(property = "policyType", column = "policy_type"),
+    @Result(property = "metalakeId", column = "metalake_id"),
+    @Result(property = "auditInfo", column = "audit_info"),
+    @Result(property = "currentVersion", column = "current_version"),
+    @Result(property = "lastVersion", column = "last_version"),
+    @Result(property = "deletedAt", column = "deleted_at")
+  })
+  @SelectProvider(type = PolicyMetaSQLProviderFactory.class, method = 
"selectPolicyByPolicyId")
+  PolicyPO selectPolicyByPolicyId(@Param("policyId") Long policyId);
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaSQLProviderFactory.java
index 593e0b40f5..e715d6c5ee 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaSQLProviderFactory.java
@@ -91,6 +91,15 @@ public class PolicyMetaSQLProviderFactory {
     return getProvider().softDeletePolicyMetasByMetalakeId(metalakeId);
   }
 
+  public static String selectPolicyMetaByMetalakeIdAndName(
+      @Param("metalakeId") Long metalakeId, @Param("policyName") String 
policyName) {
+    return getProvider().selectPolicyMetaByMetalakeIdAndName(metalakeId, 
policyName);
+  }
+
+  public static String selectPolicyByPolicyId(@Param("policyId") Long 
policyId) {
+    return getProvider().selectPolicyByPolicyId(policyId);
+  }
+
   static class PolicyMetaMySQLProvider extends PolicyMetaBaseSQLProvider {}
 
   static class PolicyMetaH2Provider extends PolicyMetaBaseSQLProvider {}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyMetaBaseSQLProvider.java
index e6a42bd939..5d37abbdbd 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyMetaBaseSQLProvider.java
@@ -171,4 +171,30 @@ public class PolicyMetaBaseSQLProvider {
         + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
         + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
   }
+
+  public String selectPolicyByPolicyId(@Param("policyId") Long policyId) {
+    return "SELECT pm.policy_id, pm.policy_name, pm.policy_type, 
pm.metalake_id,"
+        + " pm.audit_info, pm.current_version, pm.last_version,"
+        + " pm.deleted_at"
+        + " FROM "
+        + POLICY_META_TABLE_NAME
+        + " pm "
+        + " WHERE"
+        + " pm.policy_id = #{policyId}"
+        + " AND pm.deleted_at = 0 ";
+  }
+
+  public String selectPolicyMetaByMetalakeIdAndName(
+      @Param("metalakeId") Long metalakeId, @Param("policyName") String 
policyName) {
+    return "SELECT pm.policy_id, pm.policy_name, pm.policy_type, 
pm.metalake_id,"
+        + " pm.audit_info, pm.current_version, pm.last_version,"
+        + " pm.deleted_at"
+        + " FROM "
+        + POLICY_META_TABLE_NAME
+        + " pm "
+        + " WHERE"
+        + " pm.metalake_id = #{metalakeId}"
+        + " AND pm.policy_name = #{policyName}"
+        + " AND pm.deleted_at = 0 ";
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
index 51c812f5cc..75682d843f 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
@@ -37,6 +37,7 @@ import 
org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.PolicyMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.TableColumnMapper;
 import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
@@ -84,7 +85,24 @@ public class MetadataObjectService {
               MetadataObject.Type.COLUMN,
               MetadataObjectService::getColumnObjectsFullName,
               MetadataObject.Type.TAG,
-              MetadataObjectService::getTagObjectsFullName);
+              MetadataObjectService::getTagObjectsFullName,
+              MetadataObject.Type.POLICY,
+              MetadataObjectService::getPolicyObjectsFullName);
+
+  private static Map<Long, String> getPolicyObjectsFullName(List<Long> 
policyIds) {
+    if (policyIds == null || policyIds.isEmpty()) {
+      return Map.of();
+    }
+    return policyIds.stream()
+        .collect(
+            Collectors.toMap(
+                policyId -> policyId,
+                policyId ->
+                    SessionUtils.getWithoutCommit(
+                        PolicyMetaMapper.class,
+                        policyMetaMapper ->
+                            
policyMetaMapper.selectPolicyByPolicyId(policyId).getPolicyName())));
+  }
 
   private static Map<Long, String> getTagObjectsFullName(List<Long> tagIds) {
     if (tagIds == null || tagIds.isEmpty()) {
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
index 5735ef8ab5..7ab4de4872 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
@@ -461,4 +461,25 @@ public class PolicyMetaService {
         PolicyMetaMapper.class,
         mapper -> mapper.listPolicyPOsByMetalakeAndPolicyNames(metalakeName, 
policyNames));
   }
+
+  /**
+   * Get policy id by policy name
+   *
+   * @param metalakeId metalake id
+   * @param policyName policy name
+   * @return policy id
+   */
+  public long getPolicyIdByPolicyName(long metalakeId, String policyName) {
+    PolicyPO policyPO =
+        SessionUtils.getWithoutCommit(
+            PolicyMetaMapper.class,
+            mapper -> mapper.selectPolicyMetaByMetalakeIdAndName(metalakeId, 
policyName));
+    if (policyPO == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.POLICY.name().toLowerCase(),
+          policyName);
+    }
+    return policyPO.getPolicyId();
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/utils/MetadataObjectUtil.java 
b/core/src/main/java/org/apache/gravitino/utils/MetadataObjectUtil.java
index eb1ea37578..c33a0786a5 100644
--- a/core/src/main/java/org/apache/gravitino/utils/MetadataObjectUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/MetadataObjectUtil.java
@@ -18,6 +18,8 @@
  */
 package org.apache.gravitino.utils;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.BiMap;
@@ -32,6 +34,7 @@ import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.authorization.AuthorizationUtils;
 import org.apache.gravitino.exceptions.IllegalMetadataObjectException;
 import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
+import org.apache.gravitino.exceptions.NoSuchPolicyException;
 import org.apache.gravitino.exceptions.NoSuchRoleException;
 import org.apache.gravitino.exceptions.NoSuchTagException;
 
@@ -51,6 +54,7 @@ public class MetadataObjectUtil {
           .put(MetadataObject.Type.ROLE, Entity.EntityType.ROLE)
           .put(MetadataObject.Type.MODEL, Entity.EntityType.MODEL)
           .put(MetadataObject.Type.TAG, Entity.EntityType.TAG)
+          .put(MetadataObject.Type.POLICY, Entity.EntityType.POLICY)
           .build();
 
   private MetadataObjectUtil() {}
@@ -106,6 +110,8 @@ public class MetadataObjectUtil {
         return AuthorizationUtils.ofRole(metalakeName, metadataObject.name());
       case TAG:
         return NameIdentifierUtil.ofTag(metalakeName, metadataObject.name());
+      case POLICY:
+        return NameIdentifierUtil.ofPolicy(metalakeName, 
metadataObject.name());
       case CATALOG:
       case SCHEMA:
       case TABLE:
@@ -205,6 +211,14 @@ public class MetadataObjectUtil {
           throw exceptionToThrowSupplier.get();
         }
         break;
+      case POLICY:
+        NameIdentifierUtil.checkPolicy(identifier);
+        try {
+          env.policyDispatcher().getPolicy(metalake, object.fullName());
+        } catch (NoSuchPolicyException nsr) {
+          throw checkNotNull(exceptionToThrowSupplier).get();
+        }
+        break;
       default:
         throw new IllegalArgumentException(
             String.format("Doesn't support the type %s", object.type()));
diff --git 
a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java 
b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
index 9ebd62d6fe..73b23868c3 100644
--- a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
@@ -420,6 +420,15 @@ public class NameIdentifierUtil {
         namespace);
   }
 
+  public static void checkPolicy(NameIdentifier ident) {
+    NameIdentifier.check(ident != null, "Policy identifier must not be null");
+    Namespace namespace = ident.namespace();
+    Namespace.check(
+        namespace != null && !namespace.isEmpty() && namespace.length() == 3,
+        "Policy namespace must be 3 level, the input namespace is %s",
+        namespace);
+  }
+
   /**
    * Check the given {@link NameIdentifier} is a catalog identifier. Throw an 
{@link
    * IllegalNameIdentifierException} if it's not.
@@ -569,6 +578,9 @@ public class NameIdentifierUtil {
       case TAG:
         checkTag(ident);
         return MetadataObjects.of(null, ident.name(), MetadataObject.Type.TAG);
+      case POLICY:
+        checkPolicy(ident);
+        return MetadataObjects.of(null, ident.name(), 
MetadataObject.Type.POLICY);
       default:
         throw new IllegalArgumentException(
             "Entity type " + entityType + " is not supported to convert to 
MetadataObject");
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
index ebeb2c7200..35ebe9b860 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
@@ -214,6 +214,7 @@ public class MetadataAuthzHelper {
       AuthorizationRequestContext authorizationRequestContext,
       Function<E, Map<Entity.EntityType, NameIdentifier>> 
extractMetadataNamesMap) {
     Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    authorizationRequestContext.setOriginalAuthorizationExpression(expression);
     List<CompletableFuture<E>> futures = new ArrayList<>();
     for (E entity : entities) {
       futures.add(
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataIdConverter.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataIdConverter.java
index b2ad1ea4fa..c563c45373 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataIdConverter.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataIdConverter.java
@@ -51,19 +51,6 @@ import org.apache.gravitino.utils.MetadataObjectUtil;
 /** It is used to convert MetadataObject to MetadataId */
 public class MetadataIdConverter {
 
-  // Maps metadata type to entity type
-  private static final Map<MetadataObject.Type, Entity.EntityType> 
METADATA_TO_ENTITY_TYPE_MAPPING =
-      ImmutableMap.of(
-          MetadataObject.Type.METALAKE, Entity.EntityType.METALAKE,
-          MetadataObject.Type.CATALOG, Entity.EntityType.CATALOG,
-          MetadataObject.Type.SCHEMA, Entity.EntityType.SCHEMA,
-          MetadataObject.Type.TABLE, Entity.EntityType.TABLE,
-          MetadataObject.Type.MODEL, Entity.EntityType.MODEL,
-          MetadataObject.Type.FILESET, Entity.EntityType.FILESET,
-          MetadataObject.Type.TOPIC, Entity.EntityType.TOPIC,
-          MetadataObject.Type.COLUMN, Entity.EntityType.COLUMN,
-          MetadataObject.Type.ROLE, Entity.EntityType.ROLE,
-          MetadataObject.Type.TAG, Entity.EntityType.TAG);
   // Maps metadata type to capability scope
   private static final Map<MetadataObject.Type, Capability.Scope> 
METADATA_SCOPE_MAPPING =
       ImmutableMap.of(
@@ -111,7 +98,7 @@ public class MetadataIdConverter {
     NameIdentifier normalizedIdent =
         normalizeCaseSensitive(ident, 
METADATA_SCOPE_MAPPING.get(metadataType), catalogManager);
 
-    Entity.EntityType entityType = getEntityType(metadataType);
+    Entity.EntityType entityType = 
MetadataObjectUtil.toEntityType(metadataType);
 
     Entity entity;
     try {
@@ -135,10 +122,6 @@ public class MetadataIdConverter {
     return CapabilityHelpers.applyCaseSensitive(ident, scope, capability);
   }
 
-  private static Entity.EntityType getEntityType(MetadataObject.Type 
metadataType) {
-    return METADATA_TO_ENTITY_TYPE_MAPPING.get(metadataType);
-  }
-
   private static Long extractIdFromEntity(Entity entity) {
     Preconditions.checkArgument(
         entity instanceof HasIdentifier, "Entity must implement HasIdentifier 
interface");
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/PassThroughAuthorizer.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/PassThroughAuthorizer.java
index 3d4452c65e..50b88c00aa 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/PassThroughAuthorizer.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/PassThroughAuthorizer.java
@@ -56,7 +56,11 @@ public class PassThroughAuthorizer implements 
GravitinoAuthorizer {
   }
 
   @Override
-  public boolean isOwner(Principal principal, String metalake, MetadataObject 
metadataObject) {
+  public boolean isOwner(
+      Principal principal,
+      String metalake,
+      MetadataObject metadataObject,
+      AuthorizationRequestContext requestContext) {
     return true;
   }
 
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/annotations/AuthorizationRequest.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/annotations/AuthorizationRequest.java
index ee2d43d2c1..ee75d4f08b 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/annotations/AuthorizationRequest.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/annotations/AuthorizationRequest.java
@@ -30,6 +30,7 @@ public @interface AuthorizationRequest {
 
   enum RequestType {
     COMMON,
-    ASSOCIATE_TAG
+    ASSOCIATE_TAG,
+    ASSOCIATE_POLICY
   }
 }
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConstants.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConstants.java
index 55851ea1f8..6b68921b8a 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConstants.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConstants.java
@@ -113,4 +113,9 @@ public class AuthorizationExpressionConstants {
 
   public static final String applyTagAuthorizationExpression =
       "METALAKE::OWNER || TAG::OWNER || ANY_APPLY_TAG";
+
+  public static final String loadPolicyAuthorizationExpression =
+      """
+          METALAKE::OWNER || POLICY::OWNER || ANY_APPLY_POLICY
+          """;
 }
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
index 6e4616b535..424e910f35 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
@@ -91,7 +91,9 @@ public class AuthorizationExpressionConverter {
             String privilegeOrExpression = matcher.group(2);
             String replacement;
             if (AuthConstants.OWNER.equals(privilegeOrExpression)) {
-              replacement = 
String.format("authorizer.isOwner(principal,METALAKE_NAME,%s)", type);
+              replacement =
+                  String.format(
+                      
"authorizer.isOwner(principal,METALAKE_NAME,%s,authorizationContext)", type);
             } else if (privilegeOrExpression.startsWith(DENY_PREFIX)) {
               String privilege = privilegeOrExpression.substring(5);
               replacement =
@@ -162,7 +164,7 @@ public class AuthorizationExpressionConverter {
         """
               ( entityType == 'CATALOG' && (%s)) ||
               ( entityType == 'SCHEMA' && (%s)) ||
-              ( entityType == 'TABLE' && (%s)) ||
+               ( entityType == 'TABLE' && (%s)) ||
               ( entityType == 'MODEL' && (%s)) ||
               ( entityType == 'FILESET' && (%s)) ||
               ( entityType == 'TOPIC' && (%s)) ||
@@ -276,6 +278,11 @@ public class AuthorizationExpressionConverter {
         expression.replaceAll(
             "ANY_APPLY_TAG",
             "((ANY(APPLY_TAG, METALAKE, TAG))" + "&& !(ANY(DENY_APPLY_TAG, 
METALAKE, TAG)))");
+    expression =
+        expression.replaceAll(
+            "ANY_APPLY_POLICY",
+            "((ANY(APPLY_POLICY, METALAKE, POLICY))"
+                + "&& !(ANY(DENY_APPLY_POLICY, METALAKE, POLICY)))");
     expression =
         expression.replaceAll(
             CAN_SET_OWNER,
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionEvaluator.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionEvaluator.java
index f32ec86385..59d5375774 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionEvaluator.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionEvaluator.java
@@ -34,6 +34,8 @@ import org.apache.gravitino.authorization.GravitinoAuthorizer;
 import org.apache.gravitino.server.authorization.GravitinoAuthorizerProvider;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.PrincipalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Evaluate the runtime result of the AuthorizationExpression. */
 public class AuthorizationExpressionEvaluator {
@@ -41,6 +43,9 @@ public class AuthorizationExpressionEvaluator {
   private final String ognlAuthorizationExpression;
   private final GravitinoAuthorizer authorizer;
 
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(AuthorizationExpressionEvaluator.class);
+
   /**
    * Use {@link AuthorizationExpressionConverter} to convert the authorization 
expression into an
    * OGNL expression, and then call {@link GravitinoAuthorizer} to perform 
permission verification.
@@ -49,6 +54,7 @@ public class AuthorizationExpressionEvaluator {
    */
   public AuthorizationExpressionEvaluator(String expression) {
     this(expression, 
GravitinoAuthorizerProvider.getInstance().getGravitinoAuthorizer());
+    LOGGER.debug("Authorization expression: {}", expression);
   }
 
   /**
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
index 14f2fd04c9..d7ea252d07 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
@@ -139,12 +139,13 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
                     authorizationKey.getPrivilege().name(),
                     requestContext));
     LOG.debug(
-        "principal {},metalake {},metadata object {},privilege {}, result {}",
+        "Authorization expression: {},privilege {}, result {}\n, principal 
{},metalake {},metadata object {}",
+        requestContext.getOriginalAuthorizationExpression(),
+        privilege,
+        result,
         principal,
         metalake,
-        metadataObject,
-        privilege,
-        result);
+        metadataObject);
     return result;
   }
 
@@ -169,17 +170,22 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
                     authorizationKey.getPrivilege().name(),
                     requestContext));
     LOG.debug(
-        "principal {},metalake {},metadata object {},privilege {},deny result 
{}",
+        "Authorization expression: {},privilege {},deny result {}\n, principal 
{},metalake {},metadata object {}",
+        requestContext.getOriginalAuthorizationExpression(),
+        privilege,
+        result,
         principal,
         metalake,
-        metadataObject,
-        privilege,
-        result);
+        metadataObject);
     return result;
   }
 
   @Override
-  public boolean isOwner(Principal principal, String metalake, MetadataObject 
metadataObject) {
+  public boolean isOwner(
+      Principal principal,
+      String metalake,
+      MetadataObject metadataObject,
+      AuthorizationRequestContext requestContext) {
     Long userId;
     boolean result;
     try {
@@ -194,12 +200,13 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
       result = false;
     }
     LOG.debug(
-        "principal {},metalake {},metadata object {},privilege {},owner result 
{}",
+        "Authorization expression: {},privilege {},owner result {}\n,principal 
{},metalake {},metadata object {}",
+        requestContext.getOriginalAuthorizationExpression(),
+        "OWNER",
+        result,
         principal,
         metalake,
-        metadataObject,
-        "OWNER",
-        result);
+        metadataObject);
     return result;
   }
 
@@ -269,14 +276,14 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
     MetadataObject metalakeObject =
         MetadataObjects.of(ImmutableList.of(metalake), 
MetadataObject.Type.METALAKE);
     // metalake owner can set owner in metalake.
-    if (isOwner(currentPrincipal, metalake, metalakeObject)) {
+    if (isOwner(currentPrincipal, metalake, metalakeObject, requestContext)) {
       return true;
     }
     MetadataObject.Type metadataType = 
MetadataObject.Type.valueOf(type.toUpperCase());
     MetadataObject metadataObject =
         MetadataObjects.of(Arrays.asList(fullName.split("\\.")), metadataType);
     do {
-      if (isOwner(currentPrincipal, metalake, metadataObject)) {
+      if (isOwner(currentPrincipal, metalake, metadataObject, requestContext)) 
{
         MetadataObject.Type tempType = metadataObject.type();
         if (tempType == MetadataObject.Type.SCHEMA) {
           // schema owner need use catalog privilege
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/MockGravitinoAuthorizer.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/MockGravitinoAuthorizer.java
index 34b6210b5c..f9ef64bab0 100644
--- 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/MockGravitinoAuthorizer.java
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/MockGravitinoAuthorizer.java
@@ -70,7 +70,11 @@ public class MockGravitinoAuthorizer implements 
GravitinoAuthorizer {
   }
 
   @Override
-  public boolean isOwner(Principal principal, String metalake, MetadataObject 
metadataObject) {
+  public boolean isOwner(
+      Principal principal,
+      String metalake,
+      MetadataObject metadataObject,
+      AuthorizationRequestContext requestContext) {
     if (!("tester".equals(principal.getName()) && 
"metalakeWithOwner".equals(metalake))) {
       return false;
     }
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/TestPassThroughAuthorizer.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/TestPassThroughAuthorizer.java
index e8158ab3f7..f5290d01fa 100644
--- 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/TestPassThroughAuthorizer.java
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/TestPassThroughAuthorizer.java
@@ -56,7 +56,9 @@ public class TestPassThroughAuthorizer {
               Privilege.Name.CREATE_SCHEMA,
               new AuthorizationRequestContext()));
 
-      Assertions.assertTrue(passThroughAuthorizer.isOwner(principal, 
"metalake", metadataObject));
+      Assertions.assertTrue(
+          passThroughAuthorizer.isOwner(
+              principal, "metalake", metadataObject, new 
AuthorizationRequestContext()));
       Assertions.assertTrue(passThroughAuthorizer.isServiceAdmin());
       Assertions.assertTrue(passThroughAuthorizer.isMetalakeUser("metalake"));
       
Assertions.assertTrue(passThroughAuthorizer.isSelf(Entity.EntityType.USER, 
null));
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/expression/TestAuthorizationExpressionConverter.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/expression/TestAuthorizationExpressionConverter.java
index 29a8cfca77..da033695c9 100644
--- 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/expression/TestAuthorizationExpressionConverter.java
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/expression/TestAuthorizationExpressionConverter.java
@@ -84,15 +84,15 @@ public class TestAuthorizationExpressionConverter {
     Assertions.assertEquals(
         "authorizer.authorize(principal,METALAKE_NAME,CATALOG,"
             + 
"@org.apache.gravitino.authorization.Privilege$Name@CREATE_SCHEMA,authorizationContext)
 "
-            + "|| authorizer.isOwner(principal,METALAKE_NAME,SCHEMA)",
+            + "|| 
authorizer.isOwner(principal,METALAKE_NAME,SCHEMA,authorizationContext)",
         createTableOgnlExpression);
 
     String expressionWithOwner2 = "(ANY(OWNER, METALAKE, CATALOG)) && 
CATALOG::USE_CATALOG)";
     String useCatalogOgnExpression =
         
AuthorizationExpressionConverter.convertToOgnlExpression(expressionWithOwner2);
     Assertions.assertEquals(
-        "(authorizer.isOwner(principal,METALAKE_NAME,METALAKE) "
-            + "|| authorizer.isOwner(principal,METALAKE_NAME,CATALOG))"
+        
"(authorizer.isOwner(principal,METALAKE_NAME,METALAKE,authorizationContext) "
+            + "|| 
authorizer.isOwner(principal,METALAKE_NAME,CATALOG,authorizationContext))"
             + " && authorizer.authorize(principal,METALAKE_NAME,CATALOG"
             + 
",@org.apache.gravitino.authorization.Privilege$Name@USE_CATALOG,authorizationContext))",
         useCatalogOgnExpression);
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
index 2fd4cca724..3533dfab53 100644
--- 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
@@ -258,10 +258,12 @@ public class TestJcasbinAuthorizer {
   }
 
   private Boolean doAuthorizeOwner(Principal currentPrincipal) {
+    AuthorizationRequestContext authorizationRequestContext = new 
AuthorizationRequestContext();
     return jcasbinAuthorizer.isOwner(
         currentPrincipal,
         "testMetalake",
-        MetadataObjects.of(null, "testCatalog", MetadataObject.Type.CATALOG));
+        MetadataObjects.of(null, "testCatalog", MetadataObject.Type.CATALOG),
+        authorizationRequestContext);
   }
 
   private static UserEntity getUserEntity() {
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
index c0438a5354..9d463c99a9 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
@@ -51,12 +51,14 @@ import 
org.apache.gravitino.server.web.filter.authorization.AuthorizeExecutorFac
 import org.apache.gravitino.server.web.rest.CatalogOperations;
 import org.apache.gravitino.server.web.rest.FilesetOperations;
 import org.apache.gravitino.server.web.rest.GroupOperations;
+import org.apache.gravitino.server.web.rest.MetadataObjectPolicyOperations;
 import org.apache.gravitino.server.web.rest.MetadataObjectTagOperations;
 import org.apache.gravitino.server.web.rest.MetalakeOperations;
 import org.apache.gravitino.server.web.rest.ModelOperations;
 import org.apache.gravitino.server.web.rest.OwnerOperations;
 import org.apache.gravitino.server.web.rest.PartitionOperations;
 import org.apache.gravitino.server.web.rest.PermissionOperations;
+import org.apache.gravitino.server.web.rest.PolicyOperations;
 import org.apache.gravitino.server.web.rest.RoleOperations;
 import org.apache.gravitino.server.web.rest.SchemaOperations;
 import org.apache.gravitino.server.web.rest.StatisticOperations;
@@ -97,7 +99,9 @@ public class GravitinoInterceptionService implements 
InterceptionService {
             StatisticOperations.class.getName(),
             PartitionOperations.class.getName(),
             MetadataObjectTagOperations.class.getName(),
-            TagOperations.class.getName()));
+            TagOperations.class.getName(),
+            PolicyOperations.class.getName(),
+            MetadataObjectPolicyOperations.class.getName()));
   }
 
   @Override
@@ -179,6 +183,7 @@ public class GravitinoInterceptionService implements 
InterceptionService {
                 extractAuthorizationRequestTypeFromParameters(parameters);
             executor =
                 AuthorizeExecutorFactory.create(
+                    expression,
                     requestType,
                     metadataContext,
                     authorizationExpressionEvaluator,
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/ParameterUtil.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/ParameterUtil.java
index ac4f548026..068a9af7e5 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/filter/ParameterUtil.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/ParameterUtil.java
@@ -159,6 +159,11 @@ public class ParameterUtil {
                   Entity.EntityType.TAG,
                   NameIdentifierUtil.ofTag(metalake, 
entities.get(Entity.EntityType.TAG)));
               break;
+            case POLICY:
+              nameIdentifierMap.put(
+                  Entity.EntityType.POLICY,
+                  NameIdentifierUtil.ofPolicy(metalake, 
entities.get(Entity.EntityType.POLICY)));
+              break;
             default:
               break;
           }
@@ -198,6 +203,11 @@ public class ParameterUtil {
           Entity.EntityType.TAG,
           NameIdentifierUtil.ofTag(NameIdentifierUtil.getMetalake(metalake), 
name));
       return;
+    } else if (type == Entity.EntityType.POLICY) {
+      metadataNames.put(
+          Entity.EntityType.POLICY,
+          
NameIdentifierUtil.ofPolicy(NameIdentifierUtil.getMetalake(metalake), name));
+      return;
     }
     throw new UnsupportedOperationException(
         "Unsupported to build NameIdentifier for batch authorization target");
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AssociateTagAuthorizationExecutor.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AssociatePolicyAuthorizationExecutor.java
similarity index 58%
copy from 
server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AssociateTagAuthorizationExecutor.java
copy to 
server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AssociatePolicyAuthorizationExecutor.java
index d01023cede..cc8c055d99 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AssociateTagAuthorizationExecutor.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AssociatePolicyAuthorizationExecutor.java
@@ -28,36 +28,31 @@ import java.util.Optional;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.authorization.AuthorizationRequestContext;
-import org.apache.gravitino.dto.requests.TagsAssociateRequest;
+import org.apache.gravitino.dto.requests.PoliciesAssociateRequest;
 import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionEvaluator;
-import org.apache.gravitino.server.web.rest.MetadataObjectTagOperations;
 
 /**
  * Metadata object authorization for {@link
- * MetadataObjectTagOperations#associateTagsForObject(String, String, String, 
TagsAssociateRequest)}
+ * 
org.apache.gravitino.server.web.rest.MetadataObjectPolicyOperations#associatePoliciesForObject(String,
+ * String, String, PoliciesAssociateRequest)}
  */
-public class AssociateTagAuthorizationExecutor implements 
AuthorizationExecutor {
+public class AssociatePolicyAuthorizationExecutor extends 
CommonAuthorizerExecutor
+    implements AuthorizationExecutor {
 
   private final Parameter[] parameters;
   private final Object[] args;
-  private final Map<Entity.EntityType, NameIdentifier> metadataContext;
-  private final AuthorizationExpressionEvaluator 
authorizationExpressionEvaluator;
-  private final Map<String, Object> pathParams;
-  private final String entityType;
 
-  public AssociateTagAuthorizationExecutor(
+  public AssociatePolicyAuthorizationExecutor(
+      String expression,
       Parameter[] parameters,
       Object[] args,
       Map<Entity.EntityType, NameIdentifier> metadataContext,
       AuthorizationExpressionEvaluator authorizationExpressionEvaluator,
       Map<String, Object> pathParams,
       String entityType) {
+    super(expression, metadataContext, authorizationExpressionEvaluator, 
pathParams, entityType);
     this.parameters = parameters;
     this.args = args;
-    this.metadataContext = metadataContext;
-    this.authorizationExpressionEvaluator = authorizationExpressionEvaluator;
-    this.pathParams = pathParams;
-    this.entityType = entityType;
   }
 
   @Override
@@ -68,46 +63,47 @@ public class AssociateTagAuthorizationExecutor implements 
AuthorizationExecutor
     }
 
     AuthorizationRequestContext context = new AuthorizationRequestContext();
+    context.setOriginalAuthorizationExpression(expression);
     Entity.EntityType targetType =
-        Entity.EntityType.TAG; // Tags are the only supported batch target here
+        Entity.EntityType.POLICY; // policies are the only supported batch 
target here
     Preconditions.checkArgument(
-        request instanceof TagsAssociateRequest,
-        "Only tag can use AssociateTagAuthorizationExecutor, please contact 
the administrator.");
-    TagsAssociateRequest tagsAssociateRequest = (TagsAssociateRequest) request;
-    tagsAssociateRequest.validate();
-    // Authorize both 'tagsToAdd' and 'tagsToRemove' fields.
-    return authorizeTag(tagsAssociateRequest.getTagsToAdd(), context, 
targetType)
-        && authorizeTag(tagsAssociateRequest.getTagsToRemove(), context, 
targetType);
+        request instanceof PoliciesAssociateRequest,
+        "Only policy can use AssociatePolicyAuthorizationExecutor, please 
contact the administrator.");
+    PoliciesAssociateRequest policiesAssociateRequest = 
(PoliciesAssociateRequest) request;
+    policiesAssociateRequest.validate();
+    // Authorize both 'policiesToAdd' and 'policiesToRemove' fields.
+    return authorizePolicy(policiesAssociateRequest.getPoliciesToAdd(), 
context, targetType)
+        && authorizePolicy(policiesAssociateRequest.getPoliciesToRemove(), 
context, targetType);
   }
 
   /**
-   * Performs batch authorization for a given field (e.g., "tagsToAdd" or 
"tagsToRemove") containing
-   * an array of tag names.
+   * Performs batch authorization for a given field (e.g., "policiesToAdd" or 
"policiesToRemove")
+   * containing an array of policy names.
    *
-   * @param tagNames tagNames
+   * @param policies policies
    * @param context The shared authorization request context.
    * @param targetType The entity type being authorized (expected to be TAG).
-   * @return {@code true} if all tags in the field pass authorization; {@code 
false} otherwise.
+   * @return {@code true} if all policies in the field pass authorization; 
{@code false} otherwise.
    */
-  private boolean authorizeTag(
-      String[] tagNames, AuthorizationRequestContext context, 
Entity.EntityType targetType) {
+  private boolean authorizePolicy(
+      String[] policies, AuthorizationRequestContext context, 
Entity.EntityType targetType) {
 
     // Treat null or empty arrays as no-op (implicitly authorized)
-    if (tagNames == null) {
+    if (policies == null) {
       return true;
     }
 
-    for (String tagName : tagNames) {
-      // Use a fresh context copy for each tag to avoid cross-contamination
+    for (String policy : policies) {
+      // Use a fresh context copy for each policy to avoid cross-contamination
       Map<Entity.EntityType, NameIdentifier> currentContext = new 
HashMap<>(this.metadataContext);
-      buildNameIdentifierForBatchAuthorization(currentContext, tagName, 
targetType);
+      buildNameIdentifierForBatchAuthorization(currentContext, policy, 
targetType);
 
       boolean authorized =
           authorizationExpressionEvaluator.evaluate(
               currentContext, pathParams, context, 
Optional.ofNullable(entityType));
 
       if (!authorized) {
-        return false; // Fail fast on first unauthorized tag
+        return false; // Fail fast on first unauthorized policy
       }
     }
     return true;
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AssociateTagAuthorizationExecutor.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AssociateTagAuthorizationExecutor.java
index d01023cede..09694e96a7 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AssociateTagAuthorizationExecutor.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AssociateTagAuthorizationExecutor.java
@@ -36,28 +36,23 @@ import 
org.apache.gravitino.server.web.rest.MetadataObjectTagOperations;
  * Metadata object authorization for {@link
  * MetadataObjectTagOperations#associateTagsForObject(String, String, String, 
TagsAssociateRequest)}
  */
-public class AssociateTagAuthorizationExecutor implements 
AuthorizationExecutor {
+public class AssociateTagAuthorizationExecutor extends CommonAuthorizerExecutor
+    implements AuthorizationExecutor {
 
   private final Parameter[] parameters;
   private final Object[] args;
-  private final Map<Entity.EntityType, NameIdentifier> metadataContext;
-  private final AuthorizationExpressionEvaluator 
authorizationExpressionEvaluator;
-  private final Map<String, Object> pathParams;
-  private final String entityType;
 
   public AssociateTagAuthorizationExecutor(
+      String expression,
       Parameter[] parameters,
       Object[] args,
       Map<Entity.EntityType, NameIdentifier> metadataContext,
       AuthorizationExpressionEvaluator authorizationExpressionEvaluator,
       Map<String, Object> pathParams,
       String entityType) {
+    super(expression, metadataContext, authorizationExpressionEvaluator, 
pathParams, entityType);
     this.parameters = parameters;
     this.args = args;
-    this.metadataContext = metadataContext;
-    this.authorizationExpressionEvaluator = authorizationExpressionEvaluator;
-    this.pathParams = pathParams;
-    this.entityType = entityType;
   }
 
   @Override
@@ -68,6 +63,7 @@ public class AssociateTagAuthorizationExecutor implements 
AuthorizationExecutor
     }
 
     AuthorizationRequestContext context = new AuthorizationRequestContext();
+    context.setOriginalAuthorizationExpression(expression);
     Entity.EntityType targetType =
         Entity.EntityType.TAG; // Tags are the only supported batch target here
     Preconditions.checkArgument(
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AuthorizeExecutorFactory.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AuthorizeExecutorFactory.java
index d6b8cee810..2fbe7a69db 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AuthorizeExecutorFactory.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AuthorizeExecutorFactory.java
@@ -27,6 +27,7 @@ import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpress
 public class AuthorizeExecutorFactory {
 
   public static AuthorizationExecutor create(
+      String expression,
       AuthorizationRequest.RequestType requestType,
       Map<Entity.EntityType, NameIdentifier> metadataContext,
       AuthorizationExpressionEvaluator authorizationExpressionEvaluator,
@@ -36,8 +37,17 @@ public class AuthorizeExecutorFactory {
       Object[] args) {
     return switch (requestType) {
       case COMMON -> new CommonAuthorizerExecutor(
-          metadataContext, authorizationExpressionEvaluator, pathParams, 
entityType);
+          expression, metadataContext, authorizationExpressionEvaluator, 
pathParams, entityType);
       case ASSOCIATE_TAG -> new AssociateTagAuthorizationExecutor(
+          expression,
+          parameters,
+          args,
+          metadataContext,
+          authorizationExpressionEvaluator,
+          pathParams,
+          entityType);
+      case ASSOCIATE_POLICY -> new AssociatePolicyAuthorizationExecutor(
+          expression,
           parameters,
           args,
           metadataContext,
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/CommonAuthorizerExecutor.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/CommonAuthorizerExecutor.java
index 0017ee8859..9c15a1c52d 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/CommonAuthorizerExecutor.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/CommonAuthorizerExecutor.java
@@ -26,16 +26,19 @@ import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpress
 
 public class CommonAuthorizerExecutor implements AuthorizationExecutor {
 
-  private Map<Entity.EntityType, NameIdentifier> metadataContext;
-  private AuthorizationExpressionEvaluator authorizationExpressionEvaluator;
-  private Map<String, Object> pathParams;
-  String entityType;
+  protected Map<Entity.EntityType, NameIdentifier> metadataContext;
+  protected AuthorizationExpressionEvaluator authorizationExpressionEvaluator;
+  protected Map<String, Object> pathParams;
+  protected String entityType;
+  protected String expression;
 
   public CommonAuthorizerExecutor(
+      String expression,
       Map<Entity.EntityType, NameIdentifier> metadataContext,
       AuthorizationExpressionEvaluator authorizationExpressionEvaluator,
       Map<String, Object> pathParams,
       String entityType) {
+    this.expression = expression;
     this.metadataContext = metadataContext;
     this.authorizationExpressionEvaluator = authorizationExpressionEvaluator;
     this.pathParams = pathParams;
@@ -43,7 +46,7 @@ public class CommonAuthorizerExecutor implements 
AuthorizationExecutor {
   }
 
   @Override
-  public boolean execute() {
+  public boolean execute() throws Exception {
     return authorizationExpressionEvaluator.evaluate(
         metadataContext,
         pathParams,
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectPolicyOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectPolicyOperations.java
index d1d303a35e..9af1682477 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectPolicyOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectPolicyOperations.java
@@ -18,6 +18,8 @@
  */
 package org.apache.gravitino.server.web.rest;
 
+import static 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants.CAN_ACCESS_METADATA;
+
 import com.codahale.metrics.annotation.ResponseMetered;
 import com.codahale.metrics.annotation.Timed;
 import java.util.Arrays;
@@ -37,6 +39,7 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.gravitino.Entity;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.dto.policy.PolicyDTO;
@@ -48,7 +51,15 @@ import org.apache.gravitino.exceptions.NoSuchPolicyException;
 import org.apache.gravitino.meta.PolicyEntity;
 import org.apache.gravitino.metrics.MetricNames;
 import org.apache.gravitino.policy.PolicyDispatcher;
+import org.apache.gravitino.server.authorization.MetadataAuthzHelper;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationFullName;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationObjectType;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationRequest;
+import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants;
 import org.apache.gravitino.server.web.Utils;
+import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.glassfish.jersey.internal.guava.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,11 +82,16 @@ public class MetadataObjectPolicyOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "get-object-policy." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "get-object-policy", absolute = true)
+  @AuthorizationExpression(
+      expression =
+          "METALAKE::OWNER || ((POLICY::OWNER || ANY_APPLY_POLICY) && 
(CAN_ACCESS_METADATA))")
   public Response getPolicyForObject(
-      @PathParam("metalake") String metalake,
-      @PathParam("type") String type,
-      @PathParam("fullName") String fullName,
-      @PathParam("policy") String policyName) {
+      @PathParam("metalake") @AuthorizationMetadata(type = 
Entity.EntityType.METALAKE)
+          String metalake,
+      @PathParam("type") @AuthorizationObjectType String type,
+      @PathParam("fullName") @AuthorizationFullName String fullName,
+      @PathParam("policy") @AuthorizationMetadata(type = 
Entity.EntityType.POLICY)
+          String policyName) {
     LOG.info(
         "Received get policy {} request for object type: {}, full name: {} 
under metalake: {}",
         policyName,
@@ -138,10 +154,12 @@ public class MetadataObjectPolicyOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "list-object-policies." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "list-object-policies", absolute = true)
+  @AuthorizationExpression(expression = CAN_ACCESS_METADATA)
   public Response listPoliciesForMetadataObject(
-      @PathParam("metalake") String metalake,
-      @PathParam("type") String type,
-      @PathParam("fullName") String fullName,
+      @PathParam("metalake") @AuthorizationMetadata(type = 
Entity.EntityType.METALAKE)
+          String metalake,
+      @PathParam("type") @AuthorizationObjectType String type,
+      @PathParam("fullName") @AuthorizationFullName String fullName,
       @QueryParam("details") @DefaultValue("false") boolean verbose) {
     LOG.info(
         "Received list policy {} request for object type: {}, full name: {} 
under metalake: {}",
@@ -161,6 +179,13 @@ public class MetadataObjectPolicyOperations {
             Set<PolicyDTO> policies = Sets.newHashSet();
             PolicyEntity[] nonInheritedPolicies =
                 policyDispatcher.listPolicyInfosForMetadataObject(metalake, 
object);
+            nonInheritedPolicies =
+                MetadataAuthzHelper.filterByExpression(
+                    metalake,
+                    
AuthorizationExpressionConstants.loadPolicyAuthorizationExpression,
+                    Entity.EntityType.POLICY,
+                    nonInheritedPolicies,
+                    (policyEntity -> NameIdentifierUtil.ofPolicy(metalake, 
policyEntity.name())));
             if (ArrayUtils.isNotEmpty(nonInheritedPolicies)) {
               Collections.addAll(
                   policies,
@@ -214,11 +239,16 @@ public class MetadataObjectPolicyOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "associate-object-policies." + 
MetricNames.HTTP_PROCESS_DURATION, absolute = true)
   @ResponseMetered(name = "associate-object-policies", absolute = true)
+  @AuthorizationExpression(
+      expression =
+          "METALAKE::OWNER || ((POLICY::OWNER || ANY_APPLY_POLICY) && 
(CAN_ACCESS_METADATA))")
   public Response associatePoliciesForObject(
-      @PathParam("metalake") String metalake,
-      @PathParam("type") String type,
-      @PathParam("fullName") String fullName,
-      PoliciesAssociateRequest request) {
+      @PathParam("metalake") @AuthorizationMetadata(type = 
Entity.EntityType.METALAKE)
+          String metalake,
+      @PathParam("type") @AuthorizationObjectType String type,
+      @PathParam("fullName") @AuthorizationFullName String fullName,
+      @AuthorizationRequest(type = 
AuthorizationRequest.RequestType.ASSOCIATE_POLICY)
+          PoliciesAssociateRequest request) {
     LOG.info(
         "Received associate policies request for object type: {}, full name: 
{} under metalake: {}",
         type,
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/PolicyOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/PolicyOperations.java
index dd5d61fc55..a263e6b471 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/PolicyOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/PolicyOperations.java
@@ -39,6 +39,7 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response;
+import org.apache.gravitino.Entity;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.dto.policy.PolicyDTO;
 import org.apache.gravitino.dto.requests.PolicyCreateRequest;
@@ -58,7 +59,12 @@ import org.apache.gravitino.metrics.MetricNames;
 import org.apache.gravitino.policy.Policy;
 import org.apache.gravitino.policy.PolicyChange;
 import org.apache.gravitino.policy.PolicyDispatcher;
+import org.apache.gravitino.server.authorization.MetadataAuthzHelper;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
+import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants;
 import org.apache.gravitino.server.web.Utils;
+import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,8 +86,10 @@ public class PolicyOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "list-policies." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
   @ResponseMetered(name = "list-policies", absolute = true)
+  @AuthorizationExpression(expression = "")
   public Response listPolicies(
-      @PathParam("metalake") String metalake,
+      @PathParam("metalake") @AuthorizationMetadata(type = 
Entity.EntityType.METALAKE)
+          String metalake,
       @QueryParam("details") @DefaultValue("false") boolean verbose) {
     LOG.info(
         "Received list policy {} request for metalake: {}", verbose ? "infos" 
: "names", metalake);
@@ -96,14 +104,26 @@ public class PolicyOperations {
                   Arrays.stream(policies)
                       .map(p -> toDTO(p, Optional.empty()))
                       .toArray(PolicyDTO[]::new);
-
+              policyDTOs =
+                  MetadataAuthzHelper.filterByExpression(
+                      metalake,
+                      
AuthorizationExpressionConstants.loadPolicyAuthorizationExpression,
+                      Entity.EntityType.POLICY,
+                      policyDTOs,
+                      (policyDTO -> NameIdentifierUtil.ofPolicy(metalake, 
policyDTO.name())));
               LOG.info("List {} policies info under metalake: {}", 
policyDTOs.length, metalake);
               return Utils.ok(new PolicyListResponse(policyDTOs));
 
             } else {
               String[] policyNames = policyDispatcher.listPolicies(metalake);
               policyNames = policyNames == null ? new String[0] : policyNames;
-
+              policyNames =
+                  MetadataAuthzHelper.filterByExpression(
+                      metalake,
+                      
AuthorizationExpressionConstants.loadPolicyAuthorizationExpression,
+                      Entity.EntityType.POLICY,
+                      policyNames,
+                      (policyName -> NameIdentifierUtil.ofPolicy(metalake, 
policyName)));
               LOG.info("List {} policies under metalake: {}", 
policyNames.length, metalake);
               return Utils.ok(new NameListResponse(policyNames));
             }
@@ -117,8 +137,11 @@ public class PolicyOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "create-policy." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
   @ResponseMetered(name = "create-policy", absolute = true)
+  @AuthorizationExpression(expression = "METALAKE::OWNER || 
METALAKE::CREATE_POLICY")
   public Response createPolicy(
-      @PathParam("metalake") String metalake, PolicyCreateRequest request) {
+      @PathParam("metalake") @AuthorizationMetadata(type = 
Entity.EntityType.METALAKE)
+          String metalake,
+      PolicyCreateRequest request) {
     LOG.info("Received create policy request under metalake: {}", metalake);
 
     try {
@@ -149,8 +172,12 @@ public class PolicyOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "get-policy." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
   @ResponseMetered(name = "get-policy", absolute = true)
+  @AuthorizationExpression(
+      expression = 
AuthorizationExpressionConstants.loadPolicyAuthorizationExpression)
   public Response getPolicy(
-      @PathParam("metalake") String metalake, @PathParam("policy") String 
name) {
+      @PathParam("metalake") @AuthorizationMetadata(type = 
Entity.EntityType.METALAKE)
+          String metalake,
+      @PathParam("policy") @AuthorizationMetadata(type = 
Entity.EntityType.POLICY) String name) {
     LOG.info("Received get policy request for policy: {} under metalake: {}", 
name, metalake);
 
     try {
@@ -171,9 +198,11 @@ public class PolicyOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "alter-policy." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
   @ResponseMetered(name = "alter-policy", absolute = true)
+  @AuthorizationExpression(expression = "METALAKE::OWNER || POLICY::OWNER")
   public Response alterPolicy(
-      @PathParam("metalake") String metalake,
-      @PathParam("policy") String name,
+      @PathParam("metalake") @AuthorizationMetadata(type = 
Entity.EntityType.METALAKE)
+          String metalake,
+      @PathParam("policy") @AuthorizationMetadata(type = 
Entity.EntityType.POLICY) String name,
       PolicyUpdatesRequest request) {
     LOG.info("Received alter policy request for policy: {} under metalake: 
{}", name, metalake);
 
@@ -202,9 +231,11 @@ public class PolicyOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "set-policy." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
   @ResponseMetered(name = "set-policy", absolute = true)
+  @AuthorizationExpression(expression = "METALAKE::OWNER || POLICY::OWNER")
   public Response setPolicy(
-      @PathParam("metalake") String metalake,
-      @PathParam("policy") String name,
+      @PathParam("metalake") @AuthorizationMetadata(type = 
Entity.EntityType.METALAKE)
+          String metalake,
+      @PathParam("policy") @AuthorizationMetadata(type = 
Entity.EntityType.POLICY) String name,
       PolicySetRequest request) {
     LOG.info("Received set policy request for policy: {} under metalake: {}", 
name, metalake);
 
@@ -238,8 +269,11 @@ public class PolicyOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "delete-policy." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
   @ResponseMetered(name = "delete-policy", absolute = true)
+  @AuthorizationExpression(expression = "METALAKE::OWNER || POLICY::OWNER")
   public Response deletePolicy(
-      @PathParam("metalake") String metalake, @PathParam("policy") String 
name) {
+      @PathParam("metalake") @AuthorizationMetadata(type = 
Entity.EntityType.METALAKE)
+          String metalake,
+      @PathParam("policy") @AuthorizationMetadata(type = 
Entity.EntityType.POLICY) String name) {
     LOG.info("Received delete policy request for policy: {} under metalake: 
{}", name, metalake);
 
     try {
@@ -265,8 +299,13 @@ public class PolicyOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "list-objects-for-policy." + 
MetricNames.HTTP_PROCESS_DURATION, absolute = true)
   @ResponseMetered(name = "list-objects-for-policy", absolute = true)
+  @AuthorizationExpression(
+      expression = 
AuthorizationExpressionConstants.loadPolicyAuthorizationExpression)
   public Response listMetadataObjectsForPolicy(
-      @PathParam("metalake") String metalake, @PathParam("policy") String 
policyName) {
+      @PathParam("metalake") @AuthorizationMetadata(type = 
Entity.EntityType.METALAKE)
+          String metalake,
+      @PathParam("policy") @AuthorizationMetadata(type = 
Entity.EntityType.POLICY)
+          String policyName) {
     LOG.info("Received list objects for policy: {} under metalake: {}", 
policyName, metalake);
 
     try {
@@ -275,6 +314,7 @@ public class PolicyOperations {
           () -> {
             MetadataObject[] objects =
                 policyDispatcher.listMetadataObjectsForPolicy(metalake, 
policyName);
+            // TODO filter by can-access-metadata, MetadataFilterHelper can 
not support now
             objects = objects == null ? new MetadataObject[0] : objects;
 
             LOG.info(
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/filter/TestGravitinoInterceptionService.java
 
b/server/src/test/java/org/apache/gravitino/server/web/filter/TestGravitinoInterceptionService.java
index 7a5826df7a..a96160da84 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/filter/TestGravitinoInterceptionService.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/filter/TestGravitinoInterceptionService.java
@@ -299,7 +299,11 @@ public class TestGravitinoInterceptionService {
     }
 
     @Override
-    public boolean isOwner(Principal principal, String metalake, 
MetadataObject metadataObject) {
+    public boolean isOwner(
+        Principal principal,
+        String metalake,
+        MetadataObject metadataObject,
+        AuthorizationRequestContext requestContext) {
       return false;
     }
 
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestPolicyOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestPolicyOperations.java
index a9899aecac..698954057a 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestPolicyOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestPolicyOperations.java
@@ -66,12 +66,11 @@ import org.apache.gravitino.rest.RESTUtils;
 import org.glassfish.jersey.client.HttpUrlConnectorProvider;
 import org.glassfish.jersey.internal.inject.AbstractBinder;
 import org.glassfish.jersey.server.ResourceConfig;
-import org.glassfish.jersey.test.JerseyTest;
 import org.glassfish.jersey.test.TestProperties;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-public class TestPolicyOperations extends JerseyTest {
+public class TestPolicyOperations extends BaseOperationsTest {
 
   private static class MockServletRequestFactory extends 
ServletRequestFactoryBase {
 


Reply via email to