This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new f1ca524dc2 [#8942]feat(authz): support tag access control (#9018)
f1ca524dc2 is described below

commit f1ca524dc2508cbc8b1c8904fd757878ef266217
Author: yangyang zhong <[email protected]>
AuthorDate: Fri Nov 21 19:13:31 2025 +0800

    [#8942]feat(authz): support tag access control (#9018)
    
    ### What changes were proposed in this pull request?
    
    support tag access control
    
    ### Why are the changes needed?
    
    Fix: #8942
    
    ### Does this PR introduce _any_ user-facing change?
    
    None
    
    ### How was this patch tested?
    
    
    
org.apache.gravitino.client.integration.test.authorization.TagOperationsAuthorizationIT
---
 .../apache/gravitino/authorization/Privileges.java |   4 +-
 .../test/authorization/TableAuthorizationIT.java   |  10 +
 .../TagOperationsAuthorizationIT.java              | 300 +++++++++++++++++++++
 .../java/org/apache/gravitino/GravitinoEnv.java    |   7 +-
 .../authorization/AuthorizationUtils.java          |   4 +-
 .../apache/gravitino/hook/TagHookDispatcher.java   | 113 ++++++++
 .../storage/relational/mapper/TagMetaMapper.java   |   9 +
 .../mapper/TagMetaSQLProviderFactory.java          |   9 +
 .../provider/base/TagMetaBaseSQLProvider.java      |  29 ++
 .../relational/service/MetadataObjectService.java  |  44 ++-
 .../storage/relational/service/TagMetaService.java |   7 +
 .../apache/gravitino/utils/MetadataObjectUtil.java |  13 +-
 .../apache/gravitino/utils/NameIdentifierUtil.java |  13 +-
 .../gravitino/utils/TestNameIdentifierUtil.java    |  12 +-
 .../server/authorization/MetadataFilterHelper.java |   3 +
 .../server/authorization/MetadataIdConverter.java  |   3 +-
 .../annotations/AuthorizationRequest.java          |  35 +++
 .../AuthorizationExpressionConstants.java          |  14 +
 .../AuthorizationExpressionConverter.java          |  10 +-
 .../authorization/jcasbin/JcasbinAuthorizer.java   |   2 +-
 .../web/filter/GravitinoInterceptionService.java   | 206 ++++----------
 .../gravitino/server/web/filter/ParameterUtil.java | 205 ++++++++++++++
 .../AssociateTagAuthorizationExecutor.java         | 115 ++++++++
 .../authorization/AuthorizationExecutor.java       |  23 ++
 .../authorization/AuthorizeExecutorFactory.java    |  49 ++++
 .../authorization/CommonAuthorizerExecutor.java    |  53 ++++
 .../server/web/rest/ExceptionHandlers.java         |   2 +
 .../web/rest/MetadataObjectTagOperations.java      |  63 +++--
 .../gravitino/server/web/rest/OwnerOperations.java |   3 +-
 .../gravitino/server/web/rest/TagOperations.java   |  85 ++++--
 .../web/rest/TestMetadataObjectTagOperations.java  |   3 +-
 .../server/web/rest/TestTagOperations.java         |   3 +-
 32 files changed, 1228 insertions(+), 223 deletions(-)

diff --git 
a/api/src/main/java/org/apache/gravitino/authorization/Privileges.java 
b/api/src/main/java/org/apache/gravitino/authorization/Privileges.java
index 7a85f12237..11697521a0 100644
--- a/api/src/main/java/org/apache/gravitino/authorization/Privileges.java
+++ b/api/src/main/java/org/apache/gravitino/authorization/Privileges.java
@@ -935,8 +935,8 @@ public class Privileges {
   /** The privilege to apply tag to object. */
   public static final class ApplyTag extends GenericPrivilege<ApplyTag> {
 
-    private static final ApplyTag ALLOW_INSTANCE = new 
ApplyTag(Condition.ALLOW, Name.CREATE_TAG);
-    private static final ApplyTag DENY_INSTANCE = new ApplyTag(Condition.DENY, 
Name.CREATE_TAG);
+    private static final ApplyTag ALLOW_INSTANCE = new 
ApplyTag(Condition.ALLOW, Name.APPLY_TAG);
+    private static final ApplyTag DENY_INSTANCE = new ApplyTag(Condition.DENY, 
Name.APPLY_TAG);
 
     /**
      * Constructor for GenericPrivilege.
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/TableAuthorizationIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/TableAuthorizationIT.java
index 0b21e21a65..0df6a82dc4 100644
--- 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/TableAuthorizationIT.java
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/TableAuthorizationIT.java
@@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -47,6 +48,7 @@ import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableCatalog;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.MethodOrderer;
 import org.junit.jupiter.api.Order;
@@ -110,6 +112,14 @@ public class TableAuthorizationIT extends 
BaseRestApiAuthorizationIT {
         });
   }
 
+  @AfterAll
+  public void stopIntegrationTest() throws IOException, InterruptedException {
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    gravitinoMetalake.loadCatalog(CATALOG).asSchemas().dropSchema(SCHEMA, 
true);
+    gravitinoMetalake.dropCatalog(CATALOG, true);
+    super.stopIntegrationTest();
+  }
+
   private Column[] createColumns() {
     return new Column[] {Column.of("col1", Types.StringType.get())};
   }
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/TagOperationsAuthorizationIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/TagOperationsAuthorizationIT.java
new file mode 100644
index 0000000000..3d7d5eb961
--- /dev/null
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/TagOperationsAuthorizationIT.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.client.integration.test.authorization;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.ForbiddenException;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.container.HiveContainer;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.tag.SupportsTags;
+import org.apache.gravitino.tag.TagChange;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+@Tag("gravitino-docker-test")
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class TagOperationsAuthorizationIT extends BaseRestApiAuthorizationIT {
+
+  private static final String CATALOG = "catalog";
+  private static final String SCHEMA = "schema";
+  private static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
+  private static String hmsUri;
+  private static String role = "role";
+
+  @BeforeAll
+  public void startIntegrationTest() throws Exception {
+    containerSuite.startHiveContainer();
+    super.startIntegrationTest();
+    hmsUri =
+        String.format(
+            "thrift://%s:%d",
+            containerSuite.getHiveContainer().getContainerIpAddress(),
+            HiveContainer.HIVE_METASTORE_PORT);
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("metastore.uris", hmsUri);
+    client
+        .loadMetalake(METALAKE)
+        .createCatalog(CATALOG, Catalog.Type.RELATIONAL, "hive", "comment", 
properties)
+        .asSchemas()
+        .createSchema(SCHEMA, "test", new HashMap<>());
+    // try to load the schema as normal user, expect failure
+    assertThrows(
+        "Can not access metadata.{" + CATALOG + "." + SCHEMA + "}.",
+        RuntimeException.class,
+        () -> {
+          normalUserClient
+              .loadMetalake(METALAKE)
+              .loadCatalog(CATALOG)
+              .asSchemas()
+              .loadSchema(SCHEMA);
+        });
+    // grant tester privilege
+    List<SecurableObject> securableObjects = new ArrayList<>();
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    SecurableObject catalogObject =
+        SecurableObjects.ofCatalog(CATALOG, 
ImmutableList.of(Privileges.UseCatalog.allow()));
+    securableObjects.add(catalogObject);
+    gravitinoMetalake.createRole(role, new HashMap<>(), securableObjects);
+    gravitinoMetalake.grantRolesToUser(ImmutableList.of(role), NORMAL_USER);
+    // normal user can load the catalog but not the schema
+    Catalog catalogLoadByNormalUser = 
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG);
+    assertEquals(CATALOG, catalogLoadByNormalUser.name());
+    assertThrows(
+        "Can not access metadata.{" + CATALOG + "." + SCHEMA + "}.",
+        ForbiddenException.class,
+        () -> {
+          catalogLoadByNormalUser.asSchemas().loadSchema(SCHEMA);
+        });
+    TableCatalog tableCatalog = 
client.loadMetalake(METALAKE).loadCatalog(CATALOG).asTableCatalog();
+    tableCatalog.createTable(
+        NameIdentifier.of(SCHEMA, "table1"), createColumns(), "test", new 
HashMap<>());
+  }
+
+  @AfterAll
+  public void stopIntegrationTest() throws IOException, InterruptedException {
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    gravitinoMetalake.loadCatalog(CATALOG).asSchemas().dropSchema(SCHEMA, 
true);
+    gravitinoMetalake.dropCatalog(CATALOG, true);
+    super.stopIntegrationTest();
+  }
+
+  @Test
+  @Order(1)
+  public void createTag() {
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    gravitinoMetalake.createTag("tag1", "tag1", Map.of());
+    assertThrows(
+        "Can not access metadata.",
+        ForbiddenException.class,
+        () -> {
+          normalUserClient.loadMetalake(METALAKE).createTag("tag2", "tag2", 
Map.of());
+        });
+    gravitinoMetalake.grantPrivilegesToRole(
+        role,
+        MetadataObjects.of(ImmutableList.of(METALAKE), 
MetadataObject.Type.METALAKE),
+        ImmutableSet.of(Privileges.CreateTag.allow()));
+    normalUserClient.loadMetalake(METALAKE).createTag("tag2", "tag2", 
Map.of());
+    client.loadMetalake(METALAKE).createTag("tag3", "tag3", Map.of());
+  }
+
+  @Test
+  @Order(2)
+  public void testAlterTag() {
+    GravitinoMetalake gravitinoMetalakeLoadByNormalUser = 
normalUserClient.loadMetalake(METALAKE);
+    assertThrows(
+        "Can not access metadata.",
+        ForbiddenException.class,
+        () -> {
+          gravitinoMetalakeLoadByNormalUser.alterTag("tag1", 
TagChange.setProperty("k1", "v1"));
+        });
+    gravitinoMetalakeLoadByNormalUser.alterTag("tag2", 
TagChange.setProperty("k1", "v1"));
+  }
+
+  @Test
+  @Order(3)
+  public void testLoadTag() {
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    String[] tagsLoadByAdmin = gravitinoMetalake.listTags();
+    assertArrayEquals(new String[] {"tag1", "tag2", "tag3"}, tagsLoadByAdmin);
+    GravitinoMetalake gravitinoMetalakeLoadByNormalUser = 
normalUserClient.loadMetalake(METALAKE);
+    String[] tagsLoadByNormalUser = 
gravitinoMetalakeLoadByNormalUser.listTags();
+    assertArrayEquals(new String[] {"tag2"}, tagsLoadByNormalUser);
+  }
+
+  @Test
+  @Order(4)
+  public void testGetTag() {
+    GravitinoMetalake gravitinoMetalakeLoadByNormalUser = 
normalUserClient.loadMetalake(METALAKE);
+    gravitinoMetalakeLoadByNormalUser.getTag("tag2");
+    assertThrows(
+        "Can not access metadata.",
+        ForbiddenException.class,
+        () -> {
+          gravitinoMetalakeLoadByNormalUser.getTag("tag1");
+        });
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    gravitinoMetalake.getTag("tag1");
+    gravitinoMetalake.getTag("tag2");
+    gravitinoMetalake.getTag("tag3");
+  }
+
+  @Test
+  @Order(6)
+  public void testAssociateTag() {
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    SupportsTags supportsTags = 
gravitinoMetalake.loadCatalog(CATALOG).supportsTags();
+    supportsTags.associateTags(new String[] {"tag1"}, null);
+    assertThrows(
+        "Can not access metadata.",
+        ForbiddenException.class,
+        () -> {
+          normalUserClient
+              .loadMetalake(METALAKE)
+              .loadCatalog(CATALOG)
+              .asTableCatalog()
+              .loadTable(NameIdentifier.of(SCHEMA, "table1"))
+              .supportsTags()
+              .associateTags(new String[] {"tag1"}, null);
+        });
+    assertThrows(
+        "Can not access metadata.",
+        ForbiddenException.class,
+        () -> {
+          normalUserClient
+              .loadMetalake(METALAKE)
+              .loadCatalog(CATALOG)
+              .asTableCatalog()
+              .loadTable(NameIdentifier.of(SCHEMA, "table1"))
+              .supportsTags()
+              .associateTags(new String[] {"tag2"}, null);
+        });
+    gravitinoMetalake.grantPrivilegesToRole(
+        role,
+        MetadataObjects.of(ImmutableList.of(CATALOG, SCHEMA), 
MetadataObject.Type.SCHEMA),
+        ImmutableList.of(Privileges.SelectTable.allow(), 
Privileges.UseSchema.allow()));
+    assertThrows(
+        "Can not access metadata.",
+        ForbiddenException.class,
+        () -> {
+          normalUserClient
+              .loadMetalake(METALAKE)
+              .loadCatalog(CATALOG)
+              .asTableCatalog()
+              .loadTable(NameIdentifier.of(SCHEMA, "table1"))
+              .supportsTags()
+              .associateTags(new String[] {"tag1"}, null);
+        });
+    normalUserClient
+        .loadMetalake(METALAKE)
+        .loadCatalog(CATALOG)
+        .asTableCatalog()
+        .loadTable(NameIdentifier.of(SCHEMA, "table1"))
+        .supportsTags()
+        .associateTags(new String[] {"tag2"}, null);
+    client
+        .loadMetalake(METALAKE)
+        .loadCatalog(CATALOG)
+        .asTableCatalog()
+        .loadTable(NameIdentifier.of(SCHEMA, "table1"))
+        .supportsTags()
+        .associateTags(new String[] {"tag1"}, null);
+    gravitinoMetalake.grantPrivilegesToRole(
+        role,
+        MetadataObjects.of(ImmutableList.of("tag3"), MetadataObject.Type.TAG),
+        ImmutableList.of(Privileges.ApplyTag.allow()));
+    normalUserClient
+        .loadMetalake(METALAKE)
+        .loadCatalog(CATALOG)
+        .asTableCatalog()
+        .loadTable(NameIdentifier.of(SCHEMA, "table1"))
+        .supportsTags()
+        .associateTags(new String[] {"tag3"}, null);
+  }
+
+  @Test
+  @Order(7)
+  public void testListTagByMetadata() {
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    SupportsTags tableSupportTag =
+        gravitinoMetalake
+            .loadCatalog(CATALOG)
+            .asTableCatalog()
+            .loadTable(NameIdentifier.of(SCHEMA, "table1"))
+            .supportsTags();
+    String[] tagsLoadByAdmin = tableSupportTag.listTags();
+    Arrays.sort(tagsLoadByAdmin);
+    assertArrayEquals(new String[] {"tag1", "tag2", "tag3"}, tagsLoadByAdmin);
+    GravitinoMetalake gravitinoMetalakeByNormalUser = 
normalUserClient.loadMetalake(METALAKE);
+    SupportsTags tableSupportTagByNormalUser =
+        gravitinoMetalakeByNormalUser
+            .loadCatalog(CATALOG)
+            .asTableCatalog()
+            .loadTable(NameIdentifier.of(SCHEMA, "table1"))
+            .supportsTags();
+    String[] tagsLoadByNormalUser = tableSupportTagByNormalUser.listTags();
+    Arrays.sort(tagsLoadByNormalUser);
+    assertArrayEquals(new String[] {"tag2", "tag3"}, tagsLoadByNormalUser);
+  }
+
+  @Test
+  @Order(8)
+  public void testDropTag() {
+    GravitinoMetalake gravitinoMetalakeLoadByNormalUser = 
normalUserClient.loadMetalake(METALAKE);
+    assertThrows(
+        "Can not access metadata.",
+        ForbiddenException.class,
+        () -> {
+          gravitinoMetalakeLoadByNormalUser.deleteTag("tag1");
+        });
+    gravitinoMetalakeLoadByNormalUser.deleteTag("tag2");
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    gravitinoMetalake.deleteTag("tag1");
+  }
+
+  private Column[] createColumns() {
+    return new Column[] {Column.of("col1", Types.StringType.get())};
+  }
+}
diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java 
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index 7e3f003ef2..fc37302436 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -57,6 +57,7 @@ import org.apache.gravitino.hook.MetalakeHookDispatcher;
 import org.apache.gravitino.hook.ModelHookDispatcher;
 import org.apache.gravitino.hook.SchemaHookDispatcher;
 import org.apache.gravitino.hook.TableHookDispatcher;
+import org.apache.gravitino.hook.TagHookDispatcher;
 import org.apache.gravitino.hook.TopicHookDispatcher;
 import org.apache.gravitino.job.JobManager;
 import org.apache.gravitino.job.JobOperationDispatcher;
@@ -601,8 +602,10 @@ public class GravitinoEnv {
     this.auxServiceManager.serviceInit(config);
 
     // Create and initialize Tag related modules
-    this.tagDispatcher = new TagEventDispatcher(eventBus, new 
TagManager(idGenerator, entityStore));
-    // Create and initialize Policy related modules
+    TagEventDispatcher tagEventDispatcher =
+        new TagEventDispatcher(eventBus, new TagManager(idGenerator, 
entityStore));
+    this.tagDispatcher = new TagHookDispatcher(tagEventDispatcher);
+
     this.policyDispatcher =
         new PolicyEventDispatcher(eventBus, new PolicyManager(idGenerator, 
entityStore));
 
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
index ff0c220c1a..1d5dd79133 100644
--- 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
@@ -357,7 +357,9 @@ public class AuthorizationUtils {
   }
 
   private static boolean needApplyAuthorization(MetadataObject.Type type) {
-    return type != MetadataObject.Type.ROLE && type != 
MetadataObject.Type.METALAKE;
+    return type != MetadataObject.Type.ROLE
+        && type != MetadataObject.Type.METALAKE
+        && type != MetadataObject.Type.TAG;
   }
 
   private static void callAuthorizationPluginImpl(
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/TagHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/TagHookDispatcher.java
new file mode 100644
index 0000000000..bc5dffd581
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/hook/TagHookDispatcher.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hook;
+
+import java.util.Map;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.AuthorizationUtils;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.exceptions.NoSuchTagException;
+import org.apache.gravitino.tag.Tag;
+import org.apache.gravitino.tag.TagChange;
+import org.apache.gravitino.tag.TagDispatcher;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+
+public class TagHookDispatcher implements TagDispatcher {
+
+  private final TagDispatcher dispatcher;
+
+  public TagHookDispatcher(TagDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public String[] listTags(String metalake) {
+    return dispatcher.listTags(metalake);
+  }
+
+  @Override
+  public Tag[] listTagsInfo(String metalake) {
+    return dispatcher.listTagsInfo(metalake);
+  }
+
+  @Override
+  public Tag getTag(String metalake, String name) throws NoSuchTagException {
+    return dispatcher.getTag(metalake, name);
+  }
+
+  @Override
+  public Tag createTag(
+      String metalake, String name, String comment, Map<String, String> 
properties) {
+    AuthorizationUtils.checkCurrentUser(metalake, 
PrincipalUtils.getCurrentUserName());
+    Tag tag = dispatcher.createTag(metalake, name, comment, properties);
+
+    // Set the creator as the owner of the catalog.
+    OwnerDispatcher ownerDispatcher = 
GravitinoEnv.getInstance().ownerDispatcher();
+    if (ownerDispatcher != null) {
+      ownerDispatcher.setOwner(
+          metalake,
+          NameIdentifierUtil.toMetadataObject(
+              NameIdentifierUtil.ofTag(metalake, name), Entity.EntityType.TAG),
+          PrincipalUtils.getCurrentUserName(),
+          Owner.Type.USER);
+    }
+
+    return tag;
+  }
+
+  @Override
+  public Tag alterTag(String metalake, String name, TagChange... changes) {
+    return dispatcher.alterTag(metalake, name, changes);
+  }
+
+  @Override
+  public boolean deleteTag(String metalake, String name) {
+    return dispatcher.deleteTag(metalake, name);
+  }
+
+  @Override
+  public MetadataObject[] listMetadataObjectsForTag(String metalake, String 
name) {
+    return dispatcher.listMetadataObjectsForTag(metalake, name);
+  }
+
+  @Override
+  public String[] listTagsForMetadataObject(String metalake, MetadataObject 
metadataObject) {
+    return dispatcher.listTagsForMetadataObject(metalake, metadataObject);
+  }
+
+  @Override
+  public Tag[] listTagsInfoForMetadataObject(String metalake, MetadataObject 
metadataObject) {
+    return dispatcher.listTagsInfoForMetadataObject(metalake, metadataObject);
+  }
+
+  @Override
+  public String[] associateTagsForMetadataObject(
+      String metalake, MetadataObject metadataObject, String[] tagsToAdd, 
String[] tagsToRemove) {
+    return dispatcher.associateTagsForMetadataObject(
+        metalake, metadataObject, tagsToAdd, tagsToRemove);
+  }
+
+  @Override
+  public Tag getTagForMetadataObject(String metalake, MetadataObject 
metadataObject, String name) {
+    return dispatcher.getTagForMetadataObject(metalake, metadataObject, name);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaMapper.java
index a76a459f42..5e980cb643 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaMapper.java
@@ -47,6 +47,12 @@ public interface TagMetaMapper {
   TagPO selectTagMetaByMetalakeAndName(
       @Param("metalakeName") String metalakeName, @Param("tagName") String 
tagName);
 
+  @SelectProvider(
+      type = TagMetaSQLProviderFactory.class,
+      method = "selectTagMetaByMetalakeIdAndName")
+  TagPO selectTagMetaByMetalakeIdAndName(
+      @Param("metalakeId") Long metalakeId, @Param("name") String tagName);
+
   @InsertProvider(type = TagMetaSQLProviderFactory.class, method = 
"insertTagMeta")
   void insertTagMeta(@Param("tagMeta") TagPO tagPO);
 
@@ -70,4 +76,7 @@ public interface TagMetaMapper {
   @DeleteProvider(type = TagMetaSQLProviderFactory.class, method = 
"deleteTagMetasByLegacyTimeline")
   Integer deleteTagMetasByLegacyTimeline(
       @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+
+  @SelectProvider(type = TagMetaSQLProviderFactory.class, method = 
"selectTagByTagId")
+  TagPO selectTagByTagId(@Param("tagId") Long tagId);
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaSQLProviderFactory.java
index fbaf9fec45..172434097e 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaSQLProviderFactory.java
@@ -96,4 +96,13 @@ public class TagMetaSQLProviderFactory {
       @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
     return getProvider().deleteTagMetasByLegacyTimeline(legacyTimeline, limit);
   }
+
+  public static String selectTagMetaByMetalakeIdAndName(
+      @Param("metalakeId") Long metalakeId, @Param("name") String name) {
+    return getProvider().selectTagMetaByMetalakeIdAndName(metalakeId, name);
+  }
+
+  public static String selectTagByTagId(@Param("tagId") Long tagId) {
+    return getProvider().selectTagByTagId(tagId);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TagMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TagMetaBaseSQLProvider.java
index 2be50a64ff..330b25d179 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TagMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TagMetaBaseSQLProvider.java
@@ -195,4 +195,33 @@ public class TagMetaBaseSQLProvider {
         + TAG_TABLE_NAME
         + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit}";
   }
+
+  public String selectTagMetaByMetalakeIdAndName(
+      @Param("metalakeId") Long metalakeId, @Param("name") String name) {
+    return "SELECT tag_id as tagId, tag_name as tagName,"
+        + " metalake_id as metalakeId,"
+        + " tag_comment as comment,"
+        + " properties as properties,"
+        + " audit_info as auditInfo,"
+        + " current_version as currentVersion,"
+        + " last_version as lastVersion,"
+        + " deleted_at as deletedAt"
+        + " FROM "
+        + TAG_TABLE_NAME
+        + " WHERE metalake_id = #{metalakeId} AND tag_name = #{name} and 
deleted_at = 0";
+  }
+
+  public String selectTagByTagId(@Param("tagId") Long tagId) {
+    return "SELECT tag_id as tagId, tag_name as tagName,"
+        + " metalake_id as metalakeId,"
+        + " tag_comment as comment,"
+        + " properties as properties,"
+        + " audit_info as auditInfo,"
+        + " current_version as currentVersion,"
+        + " last_version as lastVersion,"
+        + " deleted_at as deletedAt"
+        + " FROM "
+        + TAG_TABLE_NAME
+        + " WHERE tag_id = #{tagId} and deleted_at = 0";
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
index c3c3053eeb..b2dd95803e 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
@@ -41,6 +41,7 @@ import 
org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.TableColumnMapper;
 import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.TagMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.TopicMetaMapper;
 import org.apache.gravitino.storage.relational.po.CatalogPO;
 import org.apache.gravitino.storage.relational.po.ColumnPO;
@@ -69,14 +70,38 @@ public class MetadataObjectService {
   static final Map<MetadataObject.Type, Function<List<Long>, Map<Long, 
String>>>
       TYPE_TO_FULLNAME_FUNCTION_MAP =
           ImmutableMap.of(
-              MetadataObject.Type.METALAKE, 
MetadataObjectService::getMetalakeObjectsFullName,
-              MetadataObject.Type.CATALOG, 
MetadataObjectService::getCatalogObjectsFullName,
-              MetadataObject.Type.SCHEMA, 
MetadataObjectService::getSchemaObjectsFullName,
-              MetadataObject.Type.TABLE, 
MetadataObjectService::getTableObjectsFullName,
-              MetadataObject.Type.FILESET, 
MetadataObjectService::getFilesetObjectsFullName,
-              MetadataObject.Type.MODEL, 
MetadataObjectService::getModelObjectsFullName,
-              MetadataObject.Type.TOPIC, 
MetadataObjectService::getTopicObjectsFullName,
-              MetadataObject.Type.COLUMN, 
MetadataObjectService::getColumnObjectsFullName);
+              MetadataObject.Type.METALAKE,
+              MetadataObjectService::getMetalakeObjectsFullName,
+              MetadataObject.Type.CATALOG,
+              MetadataObjectService::getCatalogObjectsFullName,
+              MetadataObject.Type.SCHEMA,
+              MetadataObjectService::getSchemaObjectsFullName,
+              MetadataObject.Type.TABLE,
+              MetadataObjectService::getTableObjectsFullName,
+              MetadataObject.Type.FILESET,
+              MetadataObjectService::getFilesetObjectsFullName,
+              MetadataObject.Type.MODEL,
+              MetadataObjectService::getModelObjectsFullName,
+              MetadataObject.Type.TOPIC,
+              MetadataObjectService::getTopicObjectsFullName,
+              MetadataObject.Type.COLUMN,
+              MetadataObjectService::getColumnObjectsFullName,
+              MetadataObject.Type.TAG,
+              MetadataObjectService::getTagObjectsFullName);
+
+  private static Map<Long, String> getTagObjectsFullName(List<Long> tagIds) {
+    if (tagIds == null || tagIds.isEmpty()) {
+      return Map.of();
+    }
+    return tagIds.stream()
+        .collect(
+            Collectors.toMap(
+                tagId -> tagId,
+                tagId ->
+                    SessionUtils.getWithoutCommit(
+                        TagMetaMapper.class,
+                        tagMetaMapper -> 
tagMetaMapper.selectTagByTagId(tagId).getTagName())));
+  }
 
   private MetadataObjectService() {}
 
@@ -128,6 +153,9 @@ public class MetadataObjectService {
       return 
RoleMetaService.getInstance().getRoleIdByMetalakeIdAndName(metalakeId, 
fullName);
     }
     List<String> names = DOT_SPLITTER.splitToList(fullName);
+    if (type == MetadataObject.Type.TAG) {
+      return TagMetaService.getInstance().getTagIdByTagName(metalakeId, 
fullName);
+    }
 
     long catalogId =
         
CatalogMetaService.getInstance().getCatalogIdByMetalakeIdAndName(metalakeId, 
names.get(0));
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TagMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TagMetaService.java
index fd3a6137a1..0087b1e16b 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TagMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TagMetaService.java
@@ -418,6 +418,13 @@ public class TagMetaService {
     return tagPO;
   }
 
+  public Long getTagIdByTagName(Long metalakeId, String tagName) {
+    return SessionUtils.getWithoutCommit(
+            TagMetaMapper.class,
+            mapper -> mapper.selectTagMetaByMetalakeIdAndName(metalakeId, 
tagName))
+        .getTagId();
+  }
+
   private List<TagPO> getTagPOsByMetalakeAndNames(String metalakeName, 
List<String> tagNames) {
     return SessionUtils.getWithoutCommit(
         TagMetaMapper.class,
diff --git 
a/core/src/main/java/org/apache/gravitino/utils/MetadataObjectUtil.java 
b/core/src/main/java/org/apache/gravitino/utils/MetadataObjectUtil.java
index d36591564e..fe67d06ae0 100644
--- a/core/src/main/java/org/apache/gravitino/utils/MetadataObjectUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/MetadataObjectUtil.java
@@ -35,6 +35,7 @@ import org.apache.gravitino.authorization.AuthorizationUtils;
 import org.apache.gravitino.exceptions.IllegalMetadataObjectException;
 import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
 import org.apache.gravitino.exceptions.NoSuchRoleException;
+import org.apache.gravitino.exceptions.NoSuchTagException;
 
 public class MetadataObjectUtil {
 
@@ -51,6 +52,7 @@ public class MetadataObjectUtil {
           .put(MetadataObject.Type.COLUMN, Entity.EntityType.COLUMN)
           .put(MetadataObject.Type.ROLE, Entity.EntityType.ROLE)
           .put(MetadataObject.Type.MODEL, Entity.EntityType.MODEL)
+          .put(MetadataObject.Type.TAG, Entity.EntityType.TAG)
           .build();
 
   private MetadataObjectUtil() {}
@@ -104,6 +106,8 @@ public class MetadataObjectUtil {
         return NameIdentifierUtil.ofMetalake(metalakeName);
       case ROLE:
         return AuthorizationUtils.ofRole(metalakeName, metadataObject.name());
+      case TAG:
+        return NameIdentifierUtil.ofTag(metalakeName, metadataObject.name());
       case CATALOG:
       case SCHEMA:
       case TABLE:
@@ -191,7 +195,14 @@ public class MetadataObjectUtil {
           throw checkNotNull(exceptionToThrowSupplier).get();
         }
         break;
-
+      case TAG:
+        NameIdentifierUtil.checkTag(identifier);
+        try {
+          env.tagDispatcher().getTag(metalake, object.fullName());
+        } catch (NoSuchTagException nsr) {
+          throw checkNotNull(exceptionToThrowSupplier).get();
+        }
+        break;
       default:
         throw new IllegalArgumentException(
             String.format("Doesn't support the type %s", object.type()));
diff --git 
a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java 
b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
index 095fe6259b..d319d3b0e1 100644
--- a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
@@ -370,6 +370,15 @@ public class NameIdentifierUtil {
     NamespaceUtil.checkMetalake(ident.namespace());
   }
 
+  public static void checkTag(NameIdentifier ident) {
+    NameIdentifier.check(ident != null, "Tag identifier must not be null");
+    Namespace namespace = ident.namespace();
+    Namespace.check(
+        namespace != null && !namespace.isEmpty() && namespace.length() == 3,
+        "Tag namespace must be 3 level, the input namespace is %s",
+        namespace);
+  }
+
   /**
    * Check the given {@link NameIdentifier} is a catalog identifier. Throw an 
{@link
    * IllegalNameIdentifierException} if it's not.
@@ -516,7 +525,9 @@ public class NameIdentifierUtil {
       case ROLE:
         AuthorizationUtils.checkRole(ident);
         return MetadataObjects.of(null, ident.name(), 
MetadataObject.Type.ROLE);
-
+      case TAG:
+        checkTag(ident);
+        return MetadataObjects.of(null, ident.name(), MetadataObject.Type.TAG);
       default:
         throw new IllegalArgumentException(
             "Entity type " + entityType + " is not supported to convert to 
MetadataObject");
diff --git 
a/core/src/test/java/org/apache/gravitino/utils/TestNameIdentifierUtil.java 
b/core/src/test/java/org/apache/gravitino/utils/TestNameIdentifierUtil.java
index e13ec72211..27352df551 100644
--- a/core/src/test/java/org/apache/gravitino/utils/TestNameIdentifierUtil.java
+++ b/core/src/test/java/org/apache/gravitino/utils/TestNameIdentifierUtil.java
@@ -140,12 +140,12 @@ public class TestNameIdentifierUtil {
             IllegalArgumentException.class, () -> 
NameIdentifierUtil.toMetadataObject(null, null));
     assertTrue(e1.getMessage().contains("The identifier and entity type must 
not be null"));
 
-    // test unsupported
-    Throwable e2 =
-        assertThrows(
-            IllegalArgumentException.class,
-            () -> NameIdentifierUtil.toMetadataObject(fileset, 
Entity.EntityType.TAG));
-    assertTrue(e2.getMessage().contains("Entity type TAG is not supported"));
+    // test tag
+    MetadataObject expectedTagObject = MetadataObjects.parse("tag1", 
MetadataObject.Type.TAG);
+    MetadataObject tagObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofTag("metalake1", "tag1"), 
Entity.EntityType.TAG);
+    assertEquals(expectedTagObject, tagObject);
 
     // test model version
     Throwable e3 =
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java
index da16a6967e..3d1707b3e7 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java
@@ -291,6 +291,9 @@ public class MetadataFilterHelper {
       case USER:
         nameIdentifierMap.put(entityType, nameIdentifier);
         break;
+      case TAG:
+        nameIdentifierMap.put(entityType, nameIdentifier);
+        break;
       default:
         throw new IllegalArgumentException("Unsupported entity type: " + 
entityType);
     }
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataIdConverter.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataIdConverter.java
index ded3183752..b2ad1ea4fa 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataIdConverter.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataIdConverter.java
@@ -62,7 +62,8 @@ public class MetadataIdConverter {
           MetadataObject.Type.FILESET, Entity.EntityType.FILESET,
           MetadataObject.Type.TOPIC, Entity.EntityType.TOPIC,
           MetadataObject.Type.COLUMN, Entity.EntityType.COLUMN,
-          MetadataObject.Type.ROLE, Entity.EntityType.ROLE);
+          MetadataObject.Type.ROLE, Entity.EntityType.ROLE,
+          MetadataObject.Type.TAG, Entity.EntityType.TAG);
   // Maps metadata type to capability scope
   private static final Map<MetadataObject.Type, Capability.Scope> 
METADATA_SCOPE_MAPPING =
       ImmutableMap.of(
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/annotations/AuthorizationRequest.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/annotations/AuthorizationRequest.java
new file mode 100644
index 0000000000..ee2d43d2c1
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/annotations/AuthorizationRequest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.server.authorization.annotations;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.PARAMETER})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface AuthorizationRequest {
+
+  RequestType type() default RequestType.COMMON;
+
+  enum RequestType {
+    COMMON,
+    ASSOCIATE_TAG
+  }
+}
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConstants.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConstants.java
index b898e4bd60..b6370435ab 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConstants.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConstants.java
@@ -93,4 +93,18 @@ public class AuthorizationExpressionConstants {
           METALAKE::OWNER || METALAKE::MANAGE_GRANTS
           || ROLE::OWNER || ROLE::SELF
           """;
+
+  public static final String CAN_ACCESS_METADATA = "CAN_ACCESS_METADATA";
+
+  public static final String CAN_ACCESS_METADATA_AND_TAG =
+      """
+          METALAKE::OWNER ||
+          ((CAN_ACCESS_METADATA) && (TAG::OWNER || ANY_APPLY_TAG))
+          """;
+
+  public static final String loadTagAuthorizationExpression =
+      "METALAKE::OWNER || TAG::OWNER || ANY_APPLY_TAG";
+
+  public static final String applyTagAuthorizationExpression =
+      "METALAKE::OWNER || TAG::OWNER || ANY_APPLY_TAG";
 }
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
index 2f8aa045de..4804532e42 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
@@ -80,7 +80,7 @@ public class AuthorizationExpressionConverter {
     return EXPRESSION_CACHE.computeIfAbsent(
         authorizationExpression,
         (expression) -> {
-          String replacedExpression = replaceGetOwnerPrivilege(expression);
+          String replacedExpression = 
replaceCanAccessMetadataPrivilege(expression);
           replacedExpression = replaceAnyPrivilege(replacedExpression);
           replacedExpression = replaceAnyExpressions(replacedExpression);
           Matcher matcher = PATTERN.matcher(replacedExpression);
@@ -156,9 +156,9 @@ public class AuthorizationExpressionConverter {
     return result.toString();
   }
 
-  public static String replaceGetOwnerPrivilege(String expression) {
+  public static String replaceCanAccessMetadataPrivilege(String expression) {
     return expression.replaceAll(
-        "CAN_GET_OWNER",
+        AuthorizationExpressionConstants.CAN_ACCESS_METADATA,
         """
               ( entityType == 'CATALOG' && (%s)) ||
               ( entityType == 'SCHEMA' && (%s)) ||
@@ -272,6 +272,10 @@ public class AuthorizationExpressionConverter {
             "ANY_WRITE_FILESET",
             "((ANY(WRITE_FILESET, METALAKE, CATALOG, SCHEMA, FILESET))"
                 + "&& !(ANY(DENY_WRITE_FILESET, METALAKE, CATALOG, SCHEMA, 
FILESET)))");
+    expression =
+        expression.replaceAll(
+            "ANY_APPLY_TAG",
+            "((ANY(APPLY_TAG, METALAKE, TAG))" + "&& !(ANY(DENY_APPLY_TAG, 
METALAKE, TAG)))");
     expression =
         expression.replaceAll(
             CAN_SET_OWNER,
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
index 090ba1e0b7..ec20803119 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
@@ -194,7 +194,7 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
       result = false;
     }
     LOG.debug(
-        "principal {},metalake {},metadata object {},privilege {},deny result 
{}",
+        "principal {},metalake {},metadata object {},privilege {},owner result 
{}",
         principal,
         metalake,
         metadataObject,
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
index 950fd6e989..23db4a1948 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
@@ -17,18 +17,19 @@
 
 package org.apache.gravitino.server.web.filter;
 
+import static 
org.apache.gravitino.server.web.filter.ParameterUtil.extractAuthorizationRequestTypeFromParameters;
+import static 
org.apache.gravitino.server.web.filter.ParameterUtil.extractMetadataObjectTypeFromParameters;
+import static 
org.apache.gravitino.server.web.filter.ParameterUtil.extractNameIdentifierFromParameters;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
 import java.lang.reflect.Parameter;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import javax.ws.rs.core.Response;
 import org.aopalliance.intercept.ConstructorInterceptor;
@@ -37,19 +38,17 @@ import org.aopalliance.intercept.MethodInvocation;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.MetadataObject;
-import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.authorization.AuthorizationRequestContext;
-import org.apache.gravitino.server.authorization.MetadataFilterHelper;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
-import 
org.apache.gravitino.server.authorization.annotations.AuthorizationFullName;
-import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
-import 
org.apache.gravitino.server.authorization.annotations.AuthorizationObjectType;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationRequest;
 import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionEvaluator;
 import org.apache.gravitino.server.web.Utils;
+import 
org.apache.gravitino.server.web.filter.authorization.AuthorizationExecutor;
+import 
org.apache.gravitino.server.web.filter.authorization.AuthorizeExecutorFactory;
 import org.apache.gravitino.server.web.rest.CatalogOperations;
 import org.apache.gravitino.server.web.rest.FilesetOperations;
 import org.apache.gravitino.server.web.rest.GroupOperations;
+import org.apache.gravitino.server.web.rest.MetadataObjectTagOperations;
 import org.apache.gravitino.server.web.rest.MetalakeOperations;
 import org.apache.gravitino.server.web.rest.ModelOperations;
 import org.apache.gravitino.server.web.rest.OwnerOperations;
@@ -59,10 +58,9 @@ import org.apache.gravitino.server.web.rest.RoleOperations;
 import org.apache.gravitino.server.web.rest.SchemaOperations;
 import org.apache.gravitino.server.web.rest.StatisticOperations;
 import org.apache.gravitino.server.web.rest.TableOperations;
+import org.apache.gravitino.server.web.rest.TagOperations;
 import org.apache.gravitino.server.web.rest.TopicOperations;
 import org.apache.gravitino.server.web.rest.UserOperations;
-import org.apache.gravitino.utils.MetadataObjectUtil;
-import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.PrincipalUtils;
 import org.glassfish.hk2.api.Descriptor;
 import org.glassfish.hk2.api.Filter;
@@ -94,7 +92,9 @@ public class GravitinoInterceptionService implements 
InterceptionService {
             RoleOperations.class.getName(),
             OwnerOperations.class.getName(),
             StatisticOperations.class.getName(),
-            PartitionOperations.class.getName()));
+            PartitionOperations.class.getName(),
+            MetadataObjectTagOperations.class.getName(),
+            TagOperations.class.getName()));
   }
 
   @Override
@@ -130,6 +130,7 @@ public class GravitinoInterceptionService implements 
InterceptionService {
         Parameter[] parameters = method.getParameters();
         AuthorizationExpression expressionAnnotation =
             method.getAnnotation(AuthorizationExpression.class);
+        AuthorizationExecutor executor;
         if (expressionAnnotation != null) {
           String expression = expressionAnnotation.expression();
           Object[] args = methodInvocation.getArguments();
@@ -139,28 +140,20 @@ public class GravitinoInterceptionService implements 
InterceptionService {
           Map<String, Object> pathParams = 
Utils.extractPathParamsFromParameters(parameters, args);
           AuthorizationExpressionEvaluator authorizationExpressionEvaluator =
               new AuthorizationExpressionEvaluator(expression);
-          boolean authorizeResult =
-              authorizationExpressionEvaluator.evaluate(
+          AuthorizationRequest.RequestType requestType =
+              extractAuthorizationRequestTypeFromParameters(parameters);
+          executor =
+              AuthorizeExecutorFactory.create(
+                  requestType,
                   metadataContext,
+                  authorizationExpressionEvaluator,
                   pathParams,
-                  new AuthorizationRequestContext(),
-                  Optional.ofNullable(entityType));
+                  entityType,
+                  parameters,
+                  args);
+          boolean authorizeResult = executor.execute();
           if (!authorizeResult) {
-            MetadataObject.Type type = 
expressionAnnotation.accessMetadataType();
-            NameIdentifier accessMetadataName =
-                metadataContext.get(Entity.EntityType.valueOf(type.name()));
-            String errorMessage = expressionAnnotation.errorMessage();
-            String currentUser = PrincipalUtils.getCurrentUserName();
-            String methodName = method.getName();
-
-            LOG.warn(
-                "Authorization failed - User: {}, Operation: {}, Metadata: {}, 
Expression: {}",
-                currentUser,
-                methodName,
-                accessMetadataName,
-                expression);
-
-            return buildNoAuthResponse(errorMessage, accessMetadataName, 
currentUser, methodName);
+            return buildNoAuthResponse(expressionAnnotation, metadataContext, 
method, expression);
           }
         }
         return methodInvocation.proceed();
@@ -178,6 +171,28 @@ public class GravitinoInterceptionService implements 
InterceptionService {
       }
     }
 
+    private Response buildNoAuthResponse(
+        AuthorizationExpression expressionAnnotation,
+        Map<Entity.EntityType, NameIdentifier> metadataContext,
+        Method method,
+        String expression) {
+      MetadataObject.Type type = expressionAnnotation.accessMetadataType();
+      NameIdentifier accessMetadataName =
+          metadataContext.get(Entity.EntityType.valueOf(type.name()));
+      String errorMessage = expressionAnnotation.errorMessage();
+      String currentUser = PrincipalUtils.getCurrentUserName();
+      String methodName = method.getName();
+
+      LOG.warn(
+          "Authorization failed - User: {}, Operation: {}, Metadata: {}, 
Expression: {}",
+          currentUser,
+          methodName,
+          accessMetadataName,
+          expression);
+
+      return buildNoAuthResponse(errorMessage, accessMetadataName, 
currentUser, methodName);
+    }
+
     private Response buildNoAuthResponse(
         String errorMessage,
         NameIdentifier accessMetadataName,
@@ -201,135 +216,10 @@ public class GravitinoInterceptionService implements 
InterceptionService {
       }
       return Utils.forbidden(contextualMessage, null);
     }
-
-    private Map<Entity.EntityType, NameIdentifier> 
extractNameIdentifierFromParameters(
-        Parameter[] parameters, Object[] args) {
-      Map<Entity.EntityType, String> entities = new HashMap<>();
-      Map<Entity.EntityType, NameIdentifier> nameIdentifierMap = new 
HashMap<>();
-      // Extract AuthorizationMetadata
-      for (int i = 0; i < parameters.length; i++) {
-        Parameter parameter = parameters[i];
-        AuthorizationMetadata authorizeResource =
-            parameter.getAnnotation(AuthorizationMetadata.class);
-        if (authorizeResource == null) {
-          continue;
-        }
-        Entity.EntityType type = authorizeResource.type();
-        entities.put(type, String.valueOf(args[i]));
-      }
-
-      String metalake = entities.get(Entity.EntityType.METALAKE);
-      String catalog = entities.get(Entity.EntityType.CATALOG);
-      String schema = entities.get(Entity.EntityType.SCHEMA);
-      String table = entities.get(Entity.EntityType.TABLE);
-      String topic = entities.get(Entity.EntityType.TOPIC);
-      String fileset = entities.get(Entity.EntityType.FILESET);
-
-      // Extract full name and types
-      String fullName = null;
-      String metadataObjectType = null;
-      for (int i = 0; i < parameters.length; i++) {
-        Parameter parameter = parameters[i];
-        AuthorizationFullName authorizeFullName =
-            parameter.getAnnotation(AuthorizationFullName.class);
-        if (authorizeFullName != null) {
-          fullName = String.valueOf(args[i]);
-        }
-
-        AuthorizationObjectType objectType = 
parameter.getAnnotation(AuthorizationObjectType.class);
-        if (objectType != null) {
-          metadataObjectType = String.valueOf(args[i]);
-        }
-      }
-
-      entities.forEach(
-          (type, metadata) -> {
-            switch (type) {
-              case CATALOG:
-                nameIdentifierMap.put(
-                    Entity.EntityType.CATALOG, 
NameIdentifierUtil.ofCatalog(metalake, catalog));
-                break;
-              case SCHEMA:
-                nameIdentifierMap.put(
-                    Entity.EntityType.SCHEMA,
-                    NameIdentifierUtil.ofSchema(metalake, catalog, schema));
-                break;
-              case TABLE:
-                nameIdentifierMap.put(
-                    Entity.EntityType.TABLE,
-                    NameIdentifierUtil.ofTable(metalake, catalog, schema, 
table));
-                break;
-              case TOPIC:
-                nameIdentifierMap.put(
-                    Entity.EntityType.TOPIC,
-                    NameIdentifierUtil.ofTopic(metalake, catalog, schema, 
topic));
-                break;
-              case FILESET:
-                nameIdentifierMap.put(
-                    Entity.EntityType.FILESET,
-                    NameIdentifierUtil.ofFileset(metalake, catalog, schema, 
fileset));
-                break;
-              case MODEL:
-                String model = entities.get(Entity.EntityType.MODEL);
-                nameIdentifierMap.put(
-                    Entity.EntityType.MODEL,
-                    NameIdentifierUtil.ofModel(metalake, catalog, schema, 
model));
-                break;
-              case METALAKE:
-                nameIdentifierMap.put(
-                    Entity.EntityType.METALAKE, 
NameIdentifierUtil.ofMetalake(metalake));
-                break;
-              case USER:
-                nameIdentifierMap.put(
-                    Entity.EntityType.USER,
-                    NameIdentifierUtil.ofUser(metadata, 
entities.get(Entity.EntityType.USER)));
-                break;
-              case GROUP:
-                nameIdentifierMap.put(
-                    Entity.EntityType.GROUP,
-                    NameIdentifierUtil.ofGroup(metalake, 
entities.get(Entity.EntityType.GROUP)));
-                break;
-              case ROLE:
-                nameIdentifierMap.put(
-                    Entity.EntityType.ROLE,
-                    NameIdentifierUtil.ofRole(metalake, 
entities.get(Entity.EntityType.ROLE)));
-                break;
-              default:
-                break;
-            }
-          });
-
-      // Extract fullName and metadataObjectType
-      if (fullName != null && metadataObjectType != null && metalake != null) {
-        MetadataObject.Type type =
-            
MetadataObject.Type.valueOf(metadataObjectType.toUpperCase(Locale.ROOT));
-        NameIdentifier nameIdentifier =
-            MetadataObjectUtil.toEntityIdent(metalake, 
MetadataObjects.parse(fullName, type));
-        nameIdentifierMap.putAll(
-            MetadataFilterHelper.spiltMetadataNames(
-                metalake, MetadataObjectUtil.toEntityType(type), 
nameIdentifier));
-      }
-
-      return nameIdentifierMap;
-    }
-  }
-
-  private static String extractMetadataObjectTypeFromParameters(
-      Parameter[] parameters, Object[] args) {
-    for (int i = 0; i < parameters.length; i++) {
-      Parameter parameter = parameters[i];
-      AuthorizationObjectType objectType = 
parameter.getAnnotation(AuthorizationObjectType.class);
-      if (objectType != null) {
-        return String.valueOf(args[i]).toUpperCase();
-      }
-    }
-    return null;
   }
 
-  private static class ClassListFilter implements Filter {
-    private final Set<String> targetClasses;
-
-    public ClassListFilter(Set<String> targetClasses) {
+  private record ClassListFilter(Set<String> targetClasses) implements Filter {
+    private ClassListFilter(Set<String> targetClasses) {
       this.targetClasses = new HashSet<>(targetClasses);
     }
 
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/ParameterUtil.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/ParameterUtil.java
new file mode 100644
index 0000000000..a1b898b34c
--- /dev/null
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/ParameterUtil.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.server.web.filter;
+
+import java.lang.reflect.Parameter;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationFullName;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationObjectType;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationRequest;
+import org.apache.gravitino.utils.MetadataObjectUtil;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+
+public class ParameterUtil {
+
+  private ParameterUtil() {}
+
+  public static AuthorizationRequest.RequestType 
extractAuthorizationRequestTypeFromParameters(
+      Parameter[] parameters) {
+    for (Parameter parameter : parameters) {
+      AuthorizationRequest authorizationRequest =
+          parameter.getAnnotation(AuthorizationRequest.class);
+      if (authorizationRequest != null) {
+        return authorizationRequest.type();
+      }
+    }
+    return AuthorizationRequest.RequestType.COMMON;
+  }
+
+  public static Object extractFromParameters(Parameter[] parameters, Object[] 
args) {
+    for (int i = 0; i < parameters.length; i++) {
+      Parameter parameter = parameters[i];
+      AuthorizationRequest authorizationBatchTarget =
+          parameter.getAnnotation(AuthorizationRequest.class);
+      if (authorizationBatchTarget == null) {
+        continue;
+      }
+      return args[i];
+    }
+    return null;
+  }
+
+  public static Map<Entity.EntityType, NameIdentifier> 
extractNameIdentifierFromParameters(
+      Parameter[] parameters, Object[] args) {
+    Map<Entity.EntityType, String> entities = new HashMap<>();
+    Map<Entity.EntityType, NameIdentifier> nameIdentifierMap = new HashMap<>();
+    // Extract AuthorizationMetadata
+    for (int i = 0; i < parameters.length; i++) {
+      Parameter parameter = parameters[i];
+      AuthorizationMetadata authorizeResource =
+          parameter.getAnnotation(AuthorizationMetadata.class);
+      if (authorizeResource == null) {
+        continue;
+      }
+      Entity.EntityType type = authorizeResource.type();
+      entities.put(type, String.valueOf(args[i]));
+    }
+
+    String metalake = entities.get(Entity.EntityType.METALAKE);
+    String catalog = entities.get(Entity.EntityType.CATALOG);
+    String schema = entities.get(Entity.EntityType.SCHEMA);
+    String table = entities.get(Entity.EntityType.TABLE);
+    String topic = entities.get(Entity.EntityType.TOPIC);
+    String fileset = entities.get(Entity.EntityType.FILESET);
+
+    // Extract full name and types
+    String fullName = null;
+    String metadataObjectType = null;
+    for (int i = 0; i < parameters.length; i++) {
+      Parameter parameter = parameters[i];
+      AuthorizationFullName authorizeFullName =
+          parameter.getAnnotation(AuthorizationFullName.class);
+      if (authorizeFullName != null) {
+        fullName = String.valueOf(args[i]);
+      }
+
+      AuthorizationObjectType objectType = 
parameter.getAnnotation(AuthorizationObjectType.class);
+      if (objectType != null) {
+        metadataObjectType = String.valueOf(args[i]);
+      }
+    }
+
+    entities.forEach(
+        (type, metadata) -> {
+          switch (type) {
+            case CATALOG:
+              nameIdentifierMap.put(
+                  Entity.EntityType.CATALOG, 
NameIdentifierUtil.ofCatalog(metalake, catalog));
+              break;
+            case SCHEMA:
+              nameIdentifierMap.put(
+                  Entity.EntityType.SCHEMA, 
NameIdentifierUtil.ofSchema(metalake, catalog, schema));
+              break;
+            case TABLE:
+              nameIdentifierMap.put(
+                  Entity.EntityType.TABLE,
+                  NameIdentifierUtil.ofTable(metalake, catalog, schema, 
table));
+              break;
+            case TOPIC:
+              nameIdentifierMap.put(
+                  Entity.EntityType.TOPIC,
+                  NameIdentifierUtil.ofTopic(metalake, catalog, schema, 
topic));
+              break;
+            case FILESET:
+              nameIdentifierMap.put(
+                  Entity.EntityType.FILESET,
+                  NameIdentifierUtil.ofFileset(metalake, catalog, schema, 
fileset));
+              break;
+            case MODEL:
+              String model = entities.get(Entity.EntityType.MODEL);
+              nameIdentifierMap.put(
+                  Entity.EntityType.MODEL,
+                  NameIdentifierUtil.ofModel(metalake, catalog, schema, 
model));
+              break;
+            case METALAKE:
+              nameIdentifierMap.put(
+                  Entity.EntityType.METALAKE, 
NameIdentifierUtil.ofMetalake(metalake));
+              break;
+            case USER:
+              nameIdentifierMap.put(
+                  Entity.EntityType.USER,
+                  NameIdentifierUtil.ofUser(metadata, 
entities.get(Entity.EntityType.USER)));
+              break;
+            case GROUP:
+              nameIdentifierMap.put(
+                  Entity.EntityType.GROUP,
+                  NameIdentifierUtil.ofGroup(metalake, 
entities.get(Entity.EntityType.GROUP)));
+              break;
+            case ROLE:
+              nameIdentifierMap.put(
+                  Entity.EntityType.ROLE,
+                  NameIdentifierUtil.ofRole(metalake, 
entities.get(Entity.EntityType.ROLE)));
+              break;
+            case TAG:
+              nameIdentifierMap.put(
+                  Entity.EntityType.TAG,
+                  NameIdentifierUtil.ofTag(metalake, 
entities.get(Entity.EntityType.TAG)));
+              break;
+            default:
+              break;
+          }
+        });
+
+    // Extract fullName and metadataObjectType
+    if (fullName != null && metadataObjectType != null && metalake != null) {
+      MetadataObject.Type type =
+          
MetadataObject.Type.valueOf(metadataObjectType.toUpperCase(Locale.ROOT));
+      NameIdentifier nameIdentifier =
+          MetadataObjectUtil.toEntityIdent(metalake, 
MetadataObjects.parse(fullName, type));
+      nameIdentifierMap.putAll(
+          MetadataFilterHelper.spiltMetadataNames(
+              metalake, MetadataObjectUtil.toEntityType(type), 
nameIdentifier));
+    }
+
+    return nameIdentifierMap;
+  }
+
+  public static String extractMetadataObjectTypeFromParameters(
+      Parameter[] parameters, Object[] args) {
+    for (int i = 0; i < parameters.length; i++) {
+      Parameter parameter = parameters[i];
+      AuthorizationObjectType objectType = 
parameter.getAnnotation(AuthorizationObjectType.class);
+      if (objectType != null) {
+        return String.valueOf(args[i]).toUpperCase();
+      }
+    }
+    return null;
+  }
+
+  public static void buildNameIdentifierForBatchAuthorization(
+      Map<Entity.EntityType, NameIdentifier> metadataNames, String name, 
Entity.EntityType type) {
+    NameIdentifier metalake = metadataNames.get(Entity.EntityType.METALAKE);
+    if (Objects.requireNonNull(type) == Entity.EntityType.TAG) {
+      metadataNames.put(
+          Entity.EntityType.TAG,
+          NameIdentifierUtil.ofTag(NameIdentifierUtil.getMetalake(metalake), 
name));
+      return;
+    }
+    throw new UnsupportedOperationException(
+        "Unsupported to build NameIdentifier for batch authorization target");
+  }
+}
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AssociateTagAuthorizationExecutor.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AssociateTagAuthorizationExecutor.java
new file mode 100644
index 0000000000..d01023cede
--- /dev/null
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AssociateTagAuthorizationExecutor.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.server.web.filter.authorization;
+
+import static 
org.apache.gravitino.server.web.filter.ParameterUtil.buildNameIdentifierForBatchAuthorization;
+import static 
org.apache.gravitino.server.web.filter.ParameterUtil.extractFromParameters;
+
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Parameter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.authorization.AuthorizationRequestContext;
+import org.apache.gravitino.dto.requests.TagsAssociateRequest;
+import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionEvaluator;
+import org.apache.gravitino.server.web.rest.MetadataObjectTagOperations;
+
+/**
+ * Metadata object authorization for {@link
+ * MetadataObjectTagOperations#associateTagsForObject(String, String, String, 
TagsAssociateRequest)}
+ */
+public class AssociateTagAuthorizationExecutor implements 
AuthorizationExecutor {
+
+  private final Parameter[] parameters;
+  private final Object[] args;
+  private final Map<Entity.EntityType, NameIdentifier> metadataContext;
+  private final AuthorizationExpressionEvaluator 
authorizationExpressionEvaluator;
+  private final Map<String, Object> pathParams;
+  private final String entityType;
+
+  public AssociateTagAuthorizationExecutor(
+      Parameter[] parameters,
+      Object[] args,
+      Map<Entity.EntityType, NameIdentifier> metadataContext,
+      AuthorizationExpressionEvaluator authorizationExpressionEvaluator,
+      Map<String, Object> pathParams,
+      String entityType) {
+    this.parameters = parameters;
+    this.args = args;
+    this.metadataContext = metadataContext;
+    this.authorizationExpressionEvaluator = authorizationExpressionEvaluator;
+    this.pathParams = pathParams;
+    this.entityType = entityType;
+  }
+
+  @Override
+  public boolean execute() throws Exception {
+    Object request = extractFromParameters(parameters, args);
+    if (request == null) {
+      return false;
+    }
+
+    AuthorizationRequestContext context = new AuthorizationRequestContext();
+    Entity.EntityType targetType =
+        Entity.EntityType.TAG; // Tags are the only supported batch target here
+    Preconditions.checkArgument(
+        request instanceof TagsAssociateRequest,
+        "Only tag can use AssociateTagAuthorizationExecutor, please contact 
the administrator.");
+    TagsAssociateRequest tagsAssociateRequest = (TagsAssociateRequest) request;
+    tagsAssociateRequest.validate();
+    // Authorize both 'tagsToAdd' and 'tagsToRemove' fields.
+    return authorizeTag(tagsAssociateRequest.getTagsToAdd(), context, 
targetType)
+        && authorizeTag(tagsAssociateRequest.getTagsToRemove(), context, 
targetType);
+  }
+
+  /**
+   * Performs batch authorization for a given field (e.g., "tagsToAdd" or 
"tagsToRemove") containing
+   * an array of tag names.
+   *
+   * @param tagNames tagNames
+   * @param context The shared authorization request context.
+   * @param targetType The entity type being authorized (expected to be TAG).
+   * @return {@code true} if all tags in the field pass authorization; {@code 
false} otherwise.
+   */
+  private boolean authorizeTag(
+      String[] tagNames, AuthorizationRequestContext context, 
Entity.EntityType targetType) {
+
+    // Treat null or empty arrays as no-op (implicitly authorized)
+    if (tagNames == null) {
+      return true;
+    }
+
+    for (String tagName : tagNames) {
+      // Use a fresh context copy for each tag to avoid cross-contamination
+      Map<Entity.EntityType, NameIdentifier> currentContext = new 
HashMap<>(this.metadataContext);
+      buildNameIdentifierForBatchAuthorization(currentContext, tagName, 
targetType);
+
+      boolean authorized =
+          authorizationExpressionEvaluator.evaluate(
+              currentContext, pathParams, context, 
Optional.ofNullable(entityType));
+
+      if (!authorized) {
+        return false; // Fail fast on first unauthorized tag
+      }
+    }
+    return true;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AuthorizationExecutor.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AuthorizationExecutor.java
new file mode 100644
index 0000000000..be9c78f96e
--- /dev/null
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AuthorizationExecutor.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.server.web.filter.authorization;
+
+public interface AuthorizationExecutor {
+
+  boolean execute() throws Exception;
+}
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AuthorizeExecutorFactory.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AuthorizeExecutorFactory.java
new file mode 100644
index 0000000000..d6b8cee810
--- /dev/null
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AuthorizeExecutorFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.server.web.filter.authorization;
+
+import java.lang.reflect.Parameter;
+import java.util.Map;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationRequest;
+import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionEvaluator;
+
+public class AuthorizeExecutorFactory {
+
+  public static AuthorizationExecutor create(
+      AuthorizationRequest.RequestType requestType,
+      Map<Entity.EntityType, NameIdentifier> metadataContext,
+      AuthorizationExpressionEvaluator authorizationExpressionEvaluator,
+      Map<String, Object> pathParams,
+      String entityType,
+      Parameter[] parameters,
+      Object[] args) {
+    return switch (requestType) {
+      case COMMON -> new CommonAuthorizerExecutor(
+          metadataContext, authorizationExpressionEvaluator, pathParams, 
entityType);
+      case ASSOCIATE_TAG -> new AssociateTagAuthorizationExecutor(
+          parameters,
+          args,
+          metadataContext,
+          authorizationExpressionEvaluator,
+          pathParams,
+          entityType);
+    };
+  }
+}
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/CommonAuthorizerExecutor.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/CommonAuthorizerExecutor.java
new file mode 100644
index 0000000000..0017ee8859
--- /dev/null
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/CommonAuthorizerExecutor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.server.web.filter.authorization;
+
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.authorization.AuthorizationRequestContext;
+import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionEvaluator;
+
+public class CommonAuthorizerExecutor implements AuthorizationExecutor {
+
+  private Map<Entity.EntityType, NameIdentifier> metadataContext;
+  private AuthorizationExpressionEvaluator authorizationExpressionEvaluator;
+  private Map<String, Object> pathParams;
+  String entityType;
+
+  public CommonAuthorizerExecutor(
+      Map<Entity.EntityType, NameIdentifier> metadataContext,
+      AuthorizationExpressionEvaluator authorizationExpressionEvaluator,
+      Map<String, Object> pathParams,
+      String entityType) {
+    this.metadataContext = metadataContext;
+    this.authorizationExpressionEvaluator = authorizationExpressionEvaluator;
+    this.pathParams = pathParams;
+    this.entityType = entityType;
+  }
+
+  @Override
+  public boolean execute() {
+    return authorizationExpressionEvaluator.evaluate(
+        metadataContext,
+        pathParams,
+        new AuthorizationRequestContext(),
+        Optional.ofNullable(entityType));
+  }
+}
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
index 0ea4787626..d3ddc1087f 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
@@ -726,6 +726,8 @@ public class ExceptionHandlers {
       } else if (e instanceof NotInUseException) {
         return Utils.notInUse(errorMsg, e);
 
+      } else if (e instanceof ForbiddenException) {
+        return Utils.forbidden(errorMsg, e);
       } else {
         return super.handle(op, tag, parent, e);
       }
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectTagOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectTagOperations.java
index 2abc645e60..8438c1f583 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectTagOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectTagOperations.java
@@ -18,6 +18,9 @@
  */
 package org.apache.gravitino.server.web.rest;
 
+import static 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants.CAN_ACCESS_METADATA;
+import static 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants.CAN_ACCESS_METADATA_AND_TAG;
+
 import com.codahale.metrics.annotation.ResponseMetered;
 import com.codahale.metrics.annotation.Timed;
 import java.util.Arrays;
@@ -37,6 +40,7 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.gravitino.Entity;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.dto.requests.TagsAssociateRequest;
@@ -47,9 +51,17 @@ import org.apache.gravitino.dto.tag.TagDTO;
 import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.exceptions.NoSuchTagException;
 import org.apache.gravitino.metrics.MetricNames;
+import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationFullName;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationObjectType;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationRequest;
+import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants;
 import org.apache.gravitino.server.web.Utils;
 import org.apache.gravitino.tag.Tag;
 import org.apache.gravitino.tag.TagDispatcher;
+import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.glassfish.jersey.internal.guava.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,11 +89,14 @@ public class MetadataObjectTagOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "get-object-tag." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "get-object-tag", absolute = true)
+  @AuthorizationExpression(
+      expression = "METALAKE::OWNER || ((TAG::OWNER || ANY_APPLY_TAG) && 
CAN_ACCESS_METADATA)")
   public Response getTagForObject(
-      @PathParam("metalake") String metalake,
-      @PathParam("type") String type,
-      @PathParam("fullName") String fullName,
-      @PathParam("tag") String tagName) {
+      @PathParam("metalake") @AuthorizationMetadata(type = 
Entity.EntityType.METALAKE)
+          String metalake,
+      @PathParam("type") @AuthorizationObjectType String type,
+      @PathParam("fullName") @AuthorizationFullName String fullName,
+      @PathParam("tag") @AuthorizationMetadata(type = Entity.EntityType.TAG) 
String tagName) {
     LOG.info(
         "Received get tag {} request for object type: {}, full name: {} under 
metalake: {}",
         tagName,
@@ -143,10 +158,12 @@ public class MetadataObjectTagOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "list-object-tags." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "list-object-tags", absolute = true)
+  @AuthorizationExpression(expression = CAN_ACCESS_METADATA)
   public Response listTagsForMetadataObject(
-      @PathParam("metalake") String metalake,
-      @PathParam("type") String type,
-      @PathParam("fullName") String fullName,
+      @PathParam("metalake") @AuthorizationMetadata(type = 
Entity.EntityType.METALAKE)
+          String metalake,
+      @PathParam("type") @AuthorizationObjectType String type,
+      @PathParam("fullName") @AuthorizationFullName String fullName,
       @QueryParam("details") @DefaultValue("false") boolean verbose) {
     LOG.info(
         "Received list tag {} request for object type: {}, full name: {} under 
metalake: {}",
@@ -194,12 +211,26 @@ public class MetadataObjectTagOperations {
                   type,
                   fullName,
                   metalake);
-              return Utils.ok(new TagListResponse(tags.toArray(new 
TagDTO[0])));
+              TagDTO[] tagDTOS = tags.toArray(new TagDTO[0]);
+              tagDTOS =
+                  MetadataFilterHelper.filterByExpression(
+                      metalake,
+                      
AuthorizationExpressionConstants.loadTagAuthorizationExpression,
+                      Entity.EntityType.TAG,
+                      tagDTOS,
+                      tagDTO -> NameIdentifierUtil.ofTag(metalake, 
tagDTO.name()));
+              return Utils.ok(new TagListResponse(tagDTOS));
 
             } else {
               // We have used Set to avoid duplicate tag names
               String[] tagNames = 
tags.stream().map(TagDTO::name).toArray(String[]::new);
-
+              tagNames =
+                  MetadataFilterHelper.filterByExpression(
+                      metalake,
+                      
AuthorizationExpressionConstants.loadTagAuthorizationExpression,
+                      Entity.EntityType.TAG,
+                      tagNames,
+                      tagName -> NameIdentifierUtil.ofTag(metalake, tagName));
               LOG.info(
                   "List {} tags for object type: {}, full name: {} under 
metalake: {}",
                   tagNames.length,
@@ -219,17 +250,19 @@ public class MetadataObjectTagOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "associate-object-tags." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "associate-object-tags", absolute = true)
+  @AuthorizationExpression(expression = CAN_ACCESS_METADATA_AND_TAG)
   public Response associateTagsForObject(
-      @PathParam("metalake") String metalake,
-      @PathParam("type") String type,
-      @PathParam("fullName") String fullName,
-      TagsAssociateRequest request) {
+      @PathParam("metalake") @AuthorizationMetadata(type = 
Entity.EntityType.METALAKE)
+          String metalake,
+      @PathParam("type") @AuthorizationObjectType String type,
+      @PathParam("fullName") @AuthorizationFullName String fullName,
+      @AuthorizationRequest(type = 
AuthorizationRequest.RequestType.ASSOCIATE_TAG)
+          TagsAssociateRequest request) {
     LOG.info(
         "Received associate tags request for object type: {}, full name: {} 
under metalake: {}",
         type,
         fullName,
         metalake);
-
     try {
       return Utils.doAs(
           httpRequest,
@@ -242,7 +275,6 @@ public class MetadataObjectTagOperations {
                 tagDispatcher.associateTagsForMetadataObject(
                     metalake, object, request.getTagsToAdd(), 
request.getTagsToRemove());
             tagNames = tagNames == null ? new String[0] : tagNames;
-
             LOG.info(
                 "Associated tags: {} for object type: {}, full name: {} under 
metalake: {}",
                 Arrays.toString(tagNames),
@@ -251,7 +283,6 @@ public class MetadataObjectTagOperations {
                 metalake);
             return Utils.ok(new NameListResponse(tagNames));
           });
-
     } catch (Exception e) {
       return ExceptionHandlers.handleTagException(OperationType.ASSOCIATE, "", 
fullName, e);
     }
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/OwnerOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/OwnerOperations.java
index 15511a4590..c597aa1bea 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/OwnerOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/OwnerOperations.java
@@ -18,6 +18,7 @@
  */
 package org.apache.gravitino.server.web.rest;
 
+import static 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants.CAN_ACCESS_METADATA;
 import static 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConverter.CAN_SET_OWNER;
 
 import com.codahale.metrics.annotation.ResponseMetered;
@@ -72,7 +73,7 @@ public class OwnerOperations {
   @Timed(name = "get-object-owner." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "get-object-owner", absolute = true)
   @AuthorizationExpression(
-      expression = "CAN_GET_OWNER",
+      expression = CAN_ACCESS_METADATA,
       errorMessage = "Current user can not get this objects's owner")
   public Response getOwnerForObject(
       @PathParam("metalake") @AuthorizationMetadata(type = 
Entity.EntityType.METALAKE)
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TagOperations.java 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TagOperations.java
index 1b91b23cfd..d4fb945589 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TagOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TagOperations.java
@@ -18,6 +18,9 @@
  */
 package org.apache.gravitino.server.web.rest;
 
+import static 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants.CAN_ACCESS_METADATA;
+import static 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants.CAN_ACCESS_METADATA_AND_TAG;
+
 import com.codahale.metrics.annotation.ResponseMetered;
 import com.codahale.metrics.annotation.Timed;
 import java.util.Arrays;
@@ -36,6 +39,7 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.gravitino.Entity;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.dto.requests.TagCreateRequest;
 import org.apache.gravitino.dto.requests.TagUpdateRequest;
@@ -50,10 +54,18 @@ import org.apache.gravitino.dto.tag.MetadataObjectDTO;
 import org.apache.gravitino.dto.tag.TagDTO;
 import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.metrics.MetricNames;
+import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationFullName;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationObjectType;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationRequest;
+import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants;
 import org.apache.gravitino.server.web.Utils;
 import org.apache.gravitino.tag.Tag;
 import org.apache.gravitino.tag.TagChange;
 import org.apache.gravitino.tag.TagDispatcher;
+import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -96,14 +108,26 @@ public class TagOperations {
                         .map(t -> DTOConverters.toDTO(t, Optional.empty()))
                         .toArray(TagDTO[]::new);
               }
-
+              tagDTOs =
+                  MetadataFilterHelper.filterByExpression(
+                      metalake,
+                      
AuthorizationExpressionConstants.loadTagAuthorizationExpression,
+                      Entity.EntityType.TAG,
+                      tagDTOs,
+                      tagDTO -> NameIdentifierUtil.ofTag(metalake, 
tagDTO.name()));
               LOG.info("List {} tags info under metalake: {}", tagDTOs.length, 
metalake);
               return Utils.ok(new TagListResponse(tagDTOs));
 
             } else {
               String[] tagNames = tagDispatcher.listTags(metalake);
               tagNames = tagNames == null ? new String[0] : tagNames;
-
+              tagNames =
+                  MetadataFilterHelper.filterByExpression(
+                      metalake,
+                      
AuthorizationExpressionConstants.loadTagAuthorizationExpression,
+                      Entity.EntityType.TAG,
+                      tagNames,
+                      tagName -> NameIdentifierUtil.ofTag(metalake, tagName));
               LOG.info("List {} tags under metalake: {}", tagNames.length, 
metalake);
               return Utils.ok(new NameListResponse(tagNames));
             }
@@ -117,7 +141,11 @@ public class TagOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "create-tag." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
   @ResponseMetered(name = "create-tag", absolute = true)
-  public Response createTag(@PathParam("metalake") String metalake, 
TagCreateRequest request) {
+  @AuthorizationExpression(expression = "METALAKE::OWNER || 
METALAKE::CREATE_TAG")
+  public Response createTag(
+      @PathParam("metalake") @AuthorizationMetadata(type = 
Entity.EntityType.METALAKE)
+          String metalake,
+      TagCreateRequest request) {
     LOG.info("Received create tag request under metalake: {}", metalake);
 
     try {
@@ -143,7 +171,13 @@ public class TagOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "get-tag." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
   @ResponseMetered(name = "get-tag", absolute = true)
-  public Response getTag(@PathParam("metalake") String metalake, 
@PathParam("tag") String name) {
+  @AuthorizationExpression(
+      expression = 
AuthorizationExpressionConstants.loadTagAuthorizationExpression,
+      accessMetadataType = MetadataObject.Type.TAG)
+  public Response getTag(
+      @PathParam("metalake") @AuthorizationMetadata(type = 
Entity.EntityType.METALAKE)
+          String metalake,
+      @PathParam("tag") @AuthorizationMetadata(type = Entity.EntityType.TAG) 
String name) {
     LOG.info("Received get tag request for tag: {} under metalake: {}", name, 
metalake);
 
     try {
@@ -164,9 +198,11 @@ public class TagOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "alter-tag." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
   @ResponseMetered(name = "alter-tag", absolute = true)
+  @AuthorizationExpression(expression = "METALAKE::OWNER || TAG::OWNER")
   public Response alterTag(
-      @PathParam("metalake") String metalake,
-      @PathParam("tag") String name,
+      @PathParam("metalake") @AuthorizationMetadata(type = 
Entity.EntityType.METALAKE)
+          String metalake,
+      @PathParam("tag") @AuthorizationMetadata(type = Entity.EntityType.TAG) 
String name,
       TagUpdatesRequest request) {
     LOG.info("Received alter tag request for tag: {} under metalake: {}", 
name, metalake);
 
@@ -195,7 +231,11 @@ public class TagOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "delete-tag." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
   @ResponseMetered(name = "delete-tag", absolute = true)
-  public Response deleteTag(@PathParam("metalake") String metalake, 
@PathParam("tag") String name) {
+  @AuthorizationExpression(expression = "METALAKE::OWNER || TAG::OWNER")
+  public Response deleteTag(
+      @PathParam("metalake") @AuthorizationMetadata(type = 
Entity.EntityType.METALAKE)
+          String metalake,
+      @PathParam("tag") @AuthorizationMetadata(type = Entity.EntityType.TAG) 
String name) {
     LOG.info("Received delete tag request for tag: {} under metalake: {}", 
name, metalake);
 
     try {
@@ -240,6 +280,7 @@ public class TagOperations {
 
             MetadataObjectDTO[] objectDTOs =
                 
Arrays.stream(objects).map(DTOConverters::toDTO).toArray(MetadataObjectDTO[]::new);
+            // TODO filter by can-access-metadata, MetadataFilterHelper can 
not support now
             return Utils.ok(new MetadataObjectListResponse(objectDTOs));
           });
 
@@ -258,10 +299,12 @@ public class TagOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "list-object-tags." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "list-object-tags", absolute = true)
+  @AuthorizationExpression(expression = CAN_ACCESS_METADATA)
   public Response listTagsForMetadataObject(
-      @PathParam("metalake") String metalake,
-      @PathParam("type") String type,
-      @PathParam("fullName") String fullName,
+      @PathParam("metalake") @AuthorizationMetadata(type = 
Entity.EntityType.METALAKE)
+          String metalake,
+      @PathParam("type") @AuthorizationObjectType String type,
+      @PathParam("fullName") @AuthorizationFullName String fullName,
       @QueryParam("details") @DefaultValue("false") boolean verbose) {
     MetadataObjectTagOperations metadataObjectTagOperations =
         new MetadataObjectTagOperations(tagDispatcher);
@@ -279,11 +322,14 @@ public class TagOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "get-object-tag." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "get-object-tag", absolute = true)
+  @AuthorizationExpression(
+      expression = "METALAKE::OWNER || ((TAG::OWNER || ANY_APPLY_TAG) && 
CAN_ACCESS_METADATA)")
   public Response getTagForObject(
-      @PathParam("metalake") String metalake,
-      @PathParam("type") String type,
-      @PathParam("fullName") String fullName,
-      @PathParam("tag") String tagName) {
+      @PathParam("metalake") @AuthorizationMetadata(type = 
Entity.EntityType.METALAKE)
+          String metalake,
+      @PathParam("type") @AuthorizationObjectType String type,
+      @PathParam("fullName") @AuthorizationFullName String fullName,
+      @PathParam("tag") @AuthorizationMetadata(type = Entity.EntityType.TAG) 
String tagName) {
     MetadataObjectTagOperations metadataObjectTagOperations =
         new MetadataObjectTagOperations(tagDispatcher);
     metadataObjectTagOperations.setHttpRequest(httpRequest);
@@ -300,11 +346,14 @@ public class TagOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "associate-object-tags." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "associate-object-tags", absolute = true)
+  @AuthorizationExpression(expression = CAN_ACCESS_METADATA_AND_TAG)
   public Response associateTagsForObject(
-      @PathParam("metalake") String metalake,
-      @PathParam("type") String type,
-      @PathParam("fullName") String fullName,
-      TagsAssociateRequest request) {
+      @PathParam("metalake") @AuthorizationMetadata(type = 
Entity.EntityType.METALAKE)
+          String metalake,
+      @PathParam("type") @AuthorizationObjectType String type,
+      @PathParam("fullName") @AuthorizationFullName String fullName,
+      @AuthorizationRequest(type = 
AuthorizationRequest.RequestType.ASSOCIATE_TAG)
+          TagsAssociateRequest request) {
     MetadataObjectTagOperations metadataObjectTagOperations =
         new MetadataObjectTagOperations(tagDispatcher);
     metadataObjectTagOperations.setHttpRequest(httpRequest);
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestMetadataObjectTagOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestMetadataObjectTagOperations.java
index f95f0ca5a1..f138539fbe 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestMetadataObjectTagOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestMetadataObjectTagOperations.java
@@ -54,12 +54,11 @@ import org.apache.gravitino.tag.TagDispatcher;
 import org.apache.gravitino.tag.TagManager;
 import org.glassfish.jersey.internal.inject.AbstractBinder;
 import org.glassfish.jersey.server.ResourceConfig;
-import org.glassfish.jersey.test.JerseyTest;
 import org.glassfish.jersey.test.TestProperties;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-public class TestMetadataObjectTagOperations extends JerseyTest {
+public class TestMetadataObjectTagOperations extends BaseOperationsTest {
 
   private static class MockServletRequestFactory extends 
ServletRequestFactoryBase {
 
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTagOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTagOperations.java
index 50258239c6..75312f4443 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTagOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTagOperations.java
@@ -64,12 +64,11 @@ import org.apache.gravitino.tag.TagDispatcher;
 import org.apache.gravitino.tag.TagManager;
 import org.glassfish.jersey.internal.inject.AbstractBinder;
 import org.glassfish.jersey.server.ResourceConfig;
-import org.glassfish.jersey.test.JerseyTest;
 import org.glassfish.jersey.test.TestProperties;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-public class TestTagOperations extends JerseyTest {
+public class TestTagOperations extends BaseOperationsTest {
 
   private static class MockServletRequestFactory extends 
ServletRequestFactoryBase {
 

Reply via email to