This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch internal-main
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit fbd5394239135c7e0ee377544e0a04d295f78234
Author: roryqi <[email protected]>
AuthorDate: Mon Dec 15 10:53:22 2025 +0800

    [#9294] improvement: Refactor the code to reduce duplicated code (#9467)
    
    ### What changes were proposed in this pull request?
    
     Refactor the code to reduce duplicated code
    
    ### Why are the changes needed?
    
    Fix: #9294
    
    ### Does this PR introduce _any_ user-facing change?
    
    No need.
    
    ### How was this patch tested?
    
    Existing tests.
---
 build.gradle.kts                                   |  29 ++--
 .../test/HadoopUserImpersonationIT.java            |   4 +-
 .../test/authorization/KerberosOperationsIT.java   |   2 +-
 .../test/authorization/MultiAuthOperationsIT.java  |   2 +-
 .../apache/gravitino/cache/ReverseIndexRules.java  |  76 +--------
 .../apache/gravitino/utils/NameIdentifierUtil.java | 171 +++++++++++++++++++++
 .../gravitino/job/local/TestLocalJobExecutor.java  |   2 +-
 .../TestLancePartitionStatisticStorage.java        |   2 +-
 .../gravitino/utils/TestNameIdentifierUtil.java    | 150 ++++++++++++++++++
 .../iceberg/service/rest/IcebergRestTestUtil.java  |   6 +-
 .../server/authorization/MetadataAuthzHelper.java  |  35 +----
 .../web/filter/GravitinoInterceptionService.java   |   3 +-
 .../gravitino/server/web/filter/ParameterUtil.java | 171 ++++++---------------
 .../AssociatePolicyAuthorizationExecutor.java      |   4 +-
 .../AssociateTagAuthorizationExecutor.java         |   4 +-
 .../authorization/AuthorizeExecutorFactory.java    |   3 +-
 .../authorization/CommonAuthorizerExecutor.java    |   9 +-
 .../authorization/RunJobAuthorizationExecutor.java |   6 +-
 .../integration/test/sql/SparkQueryRunner.java     |   2 +-
 .../integration/test/TrinoQueryITBase.java         |   2 +-
 20 files changed, 419 insertions(+), 264 deletions(-)

diff --git a/build.gradle.kts b/build.gradle.kts
index 5c21b39c98..e2b75f13f2 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -130,24 +130,29 @@ allprojects {
           "$1"
         )
         replaceRegex(
-          "Use Guava collect classes instead of any shadowed versions",
-          
"import\\s+(?:.*\\.com\\.google\\.common\\.collect|org\\.glassfish\\.jersey\\.internal\\.guava)\\.(Sets|Maps|Lists|ImmutableList|ImmutableMap|ImmutableSet|ImmutableBiMap|Iterables|Iterators|Multimap|SetMultimap|ListMultimap|BiMap|Table);",
-          "import com.google.common.collect.${'$'}1;"
+          "Use Guava classes from collect/base packages instead of any 
shadowed versions (including Jersey)",
+          
"import\\s+(?:.*\\.com\\.google\\.common\\.(collect|base)|org\\.glassfish\\.jersey\\.internal\\.guava)\\.([A-Z][a-zA-Z0-9_]*);",
+          "import com.google.common.${'$'}1.${'$'}2;"
         )
         replaceRegex(
-          "Use Guava base classes instead of any shadowed versions",
-          
"import\\s+(?:.*\\.com\\.google\\.common\\.base|org\\.glassfish\\.jersey\\.internal\\.guava)\\.(Preconditions|Strings|Optional|Predicate|Function|Supplier|Joiner|Splitter|Objects);",
-          "import com.google.common.base.${'$'}1;"
+          "Use Guava classes from all other packages instead of any shadowed 
versions",
+          
"import\\s+.*\\.com\\.google\\.common\\.(io|util\\.concurrent|annotations|cache|primitives|hash|net|reflect)\\.([A-Z][a-zA-Z0-9_]*);",
+          "import com.google.common.${'$'}1.${'$'}2;"
         )
         replaceRegex(
-          "Use Guava io classes instead of any shadowed versions",
-          
"import\\s+.*\\.com\\.google\\.common\\.io\\.(Files|Resources|ByteStreams|CharStreams);",
-          "import com.google.common.io.${'$'}1;"
+          "Use Apache Commons Lang3 instead of shadowed versions",
+          
"import\\s+.*\\.org\\.apache\\.commons\\.lang3\\.([A-Z][a-zA-Z0-9_]*);",
+          "import org.apache.commons.lang3.${'$'}1;"
         )
         replaceRegex(
-          "Use Guava util.concurrent classes instead of any shadowed versions",
-          
"import\\s+.*\\.com\\.google\\.common\\.util\\.concurrent\\.(ListenableFuture|Futures|MoreExecutors);",
-          "import com.google.common.util.concurrent.${'$'}1;"
+          "Use Apache Commons IO instead of shadowed versions",
+          "import\\s+.*\\.org\\.apache\\.commons\\.io\\.([A-Z][a-zA-Z0-9_]*);",
+          "import org.apache.commons.io.${'$'}1;"
+        )
+        replaceRegex(
+          "Use SLF4J Logger instead of other logging frameworks",
+          "import\\s+.*\\.(Logger|LoggerFactory);",
+          "import org.slf4j.${'$'}1;"
         )
 
         targetExclude("**/build/**", "**/.pnpm/***")
diff --git 
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/integration/test/HadoopUserImpersonationIT.java
 
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/integration/test/HadoopUserImpersonationIT.java
index 10665e3872..6a9b493d12 100644
--- 
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/integration/test/HadoopUserImpersonationIT.java
+++ 
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/integration/test/HadoopUserImpersonationIT.java
@@ -36,6 +36,8 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
+import org.apache.commons.lang3.JavaVersion;
+import org.apache.commons.lang3.SystemUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Schema;
@@ -63,8 +65,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.EnabledIf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.shaded.org.apache.commons.lang3.JavaVersion;
-import org.testcontainers.shaded.org.apache.commons.lang3.SystemUtils;
 
 @Tag("gravitino-docker-test")
 public class HadoopUserImpersonationIT extends BaseIT {
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/KerberosOperationsIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/KerberosOperationsIT.java
index 2cf313925c..d30877c5d5 100644
--- 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/KerberosOperationsIT.java
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/KerberosOperationsIT.java
@@ -24,6 +24,7 @@ import static 
org.apache.gravitino.server.authentication.KerberosConfig.PRINCIPA
 import static org.apache.hadoop.minikdc.MiniKdc.MAX_TICKET_LIFETIME;
 
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.File;
 import java.io.IOException;
 import java.util.Map;
@@ -41,7 +42,6 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import 
org.testcontainers.shaded.com.google.common.util.concurrent.Uninterruptibles;
 
 public class KerberosOperationsIT extends BaseIT {
 
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/MultiAuthOperationsIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/MultiAuthOperationsIT.java
index 01628d95d1..00b1d26793 100644
--- 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/MultiAuthOperationsIT.java
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/MultiAuthOperationsIT.java
@@ -24,6 +24,7 @@ import static 
org.apache.gravitino.server.authentication.KerberosConfig.PRINCIPA
 import static org.apache.hadoop.minikdc.MiniKdc.MAX_TICKET_LIFETIME;
 
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Uninterruptibles;
 import io.jsonwebtoken.Jwts;
 import io.jsonwebtoken.SignatureAlgorithm;
 import io.jsonwebtoken.security.Keys;
@@ -52,7 +53,6 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
-import 
org.testcontainers.shaded.com.google.common.util.concurrent.Uninterruptibles;
 
 @Tag("gravitino-docker-test")
 public class MultiAuthOperationsIT extends BaseIT {
diff --git 
a/core/src/main/java/org/apache/gravitino/cache/ReverseIndexRules.java 
b/core/src/main/java/org/apache/gravitino/cache/ReverseIndexRules.java
index d457daa092..1c9f4d6532 100644
--- a/core/src/main/java/org/apache/gravitino/cache/ReverseIndexRules.java
+++ b/core/src/main/java/org/apache/gravitino/cache/ReverseIndexRules.java
@@ -30,6 +30,7 @@ import org.apache.gravitino.meta.PolicyEntity;
 import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.meta.TagEntity;
 import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.utils.MetadataObjectUtil;
 import org.apache.gravitino.utils.NamespaceUtil;
 
 /**
@@ -103,79 +104,10 @@ public class ReverseIndexRules {
               .securableObjects()
               .forEach(
                   securableObject -> {
-                    Namespace namespace = Namespace.empty();
-                    Entity.EntityType entityType = Entity.EntityType.METALAKE;
-                    switch (securableObject.type()) {
-                      case METALAKE:
-                        entityType = Entity.EntityType.METALAKE;
-                        namespace = NamespaceUtil.ofMetalake();
-                        break;
-                      case CATALOG:
-                        entityType = Entity.EntityType.CATALOG;
-                        namespace = 
NamespaceUtil.ofCatalog(roleEntity.namespace().level(0));
-                        break;
-                      case SCHEMA:
-                        entityType = Entity.EntityType.SCHEMA;
-                        Namespace nsSchema = 
Namespace.fromString(securableObject.parent());
-                        namespace =
-                            NamespaceUtil.ofSchema(
-                                roleEntity.namespace().level(0), 
nsSchema.level(0));
-                        break;
-                      case TABLE:
-                        entityType = Entity.EntityType.TABLE;
-                        Namespace nsTable = 
Namespace.fromString(securableObject.parent());
-                        namespace =
-                            NamespaceUtil.ofTable(
-                                roleEntity.namespace().level(0),
-                                nsTable.level(0),
-                                nsTable.level(1));
-                        break;
-                      case TOPIC:
-                        entityType = Entity.EntityType.TOPIC;
-                        Namespace nsTopic = 
Namespace.fromString(securableObject.parent());
-                        namespace =
-                            NamespaceUtil.ofTopic(
-                                roleEntity.namespace().level(0),
-                                nsTopic.level(0),
-                                nsTopic.level(1));
-                        break;
-                      case MODEL:
-                        entityType = Entity.EntityType.MODEL;
-                        Namespace nsModel = 
Namespace.fromString(securableObject.parent());
-                        namespace =
-                            NamespaceUtil.ofModel(
-                                roleEntity.namespace().level(0),
-                                nsModel.level(0),
-                                nsModel.level(1));
-                        break;
-                      case FILESET:
-                        entityType = Entity.EntityType.FILESET;
-                        Namespace nsFileset = 
Namespace.fromString(securableObject.parent());
-                        namespace =
-                            NamespaceUtil.ofFileset(
-                                roleEntity.namespace().level(0),
-                                nsFileset.level(0),
-                                nsFileset.level(1));
-                        break;
-                      case TAG:
-                        entityType = Entity.EntityType.TAG;
-                        namespace = 
NamespaceUtil.ofTag(roleEntity.namespace().level(0));
-                        break;
-                      case POLICY:
-                        entityType = Entity.EntityType.POLICY;
-                        namespace = 
NamespaceUtil.ofPolicy(roleEntity.namespace().level(0));
-                        break;
-                      case JOB_TEMPLATE:
-                        entityType = Entity.EntityType.JOB_TEMPLATE;
-                        namespace = 
NamespaceUtil.ofJobTemplate(roleEntity.namespace().level(0));
-                        break;
-                      default:
-                        throw new UnsupportedOperationException(
-                            "Don't support securable object type: " + 
securableObject.type());
-                    }
-                    Namespace securableObjectNamespace = 
Namespace.of(namespace.levels());
                     NameIdentifier securableObjectIdent =
-                        NameIdentifier.of(securableObjectNamespace, 
securableObject.name());
+                        MetadataObjectUtil.toEntityIdent(
+                            roleEntity.namespace().level(0), securableObject);
+                    EntityType entityType = 
MetadataObjectUtil.toEntityType(securableObject.type());
                     reverseIndexCache.put(securableObjectIdent, entityType, 
key);
                   });
         }
diff --git 
a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java 
b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
index b607e3bfd5..f79e948f22 100644
--- a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
@@ -29,8 +29,12 @@ import static org.apache.gravitino.Entity.EntityType.USER;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -700,6 +704,130 @@ public class NameIdentifierUtil {
     }
   }
 
+  /**
+   * Build a NameIdentifier for the given entity type and name, using the 
provided entity context.
+   * This method constructs the identifier hierarchically using the parent 
entity type
+   * relationships.
+   *
+   * <p>Examples:
+   *
+   * <pre>
+   * 1. Build a TABLE identifier:
+   *    Input:
+   *      type = Entity.EntityType.TABLE
+   *      name = "users"
+   *      entities = {
+   *        METALAKE -> "my_metalake",
+   *        CATALOG -> "my_catalog",
+   *        SCHEMA -> "my_schema"
+   *      }
+   *    Result: NameIdentifier.of("my_metalake", "my_catalog", "my_schema", 
"users")
+   *
+   * 2. Build a SCHEMA identifier:
+   *    Input:
+   *      type = Entity.EntityType.SCHEMA
+   *      name = "my_schema"
+   *      entities = {
+   *        METALAKE -> "my_metalake",
+   *        CATALOG -> "my_catalog"
+   *      }
+   *    Result: NameIdentifier.of("my_metalake", "my_catalog", "my_schema")
+   *
+   * 3. Build a USER identifier (virtual namespace type):
+   *    Input:
+   *      type = Entity.EntityType.USER
+   *      name = "john_doe"
+   *      entities = {
+   *        METALAKE -> "my_metalake"
+   *      }
+   *    Result: NameIdentifier.of(NamespaceUtil.ofUser("my_metalake"), 
"john_doe")
+   *            which creates a NameIdentifier with namespace ["my_metalake", 
"system", "user"]
+   *            and name "john_doe"
+   *
+   * 4. Build a COLUMN identifier:
+   *    Input:
+   *      type = Entity.EntityType.COLUMN
+   *      name = "id"
+   *      entities = {
+   *        METALAKE -> "my_metalake",
+   *        CATALOG -> "my_catalog",
+   *        SCHEMA -> "my_schema",
+   *        TABLE -> "users"
+   *      }
+   *    Result: NameIdentifier.of("my_metalake", "my_catalog", "my_schema", 
"users", "id")
+   * </pre>
+   *
+   * @param type The entity type
+   * @param name The entity name
+   * @param entities Map containing the metalake, catalog, schema names and 
other parent entities
+   * @return The created NameIdentifier
+   * @throws IllegalArgumentException if the entity type is not supported or 
required parent
+   *     entities are missing
+   */
+  public static NameIdentifier buildNameIdentifier(
+      Entity.EntityType type, String name, Map<Entity.EntityType, String> 
entities) {
+    String metalake = entities.get(Entity.EntityType.METALAKE);
+
+    // Handle metalake as the root
+    if (type == Entity.EntityType.METALAKE) {
+      return ofMetalake(metalake);
+    }
+
+    // Handle virtual namespace types early to avoid unnecessary work
+    if (SUPPORT_VIRTUAL_NAMESPACE_TYPES.contains(type)) {
+      Namespace namespace = createVirtualNamespace(type, metalake);
+      return NameIdentifier.of(namespace, name);
+    }
+
+    // Build the full name path by traversing the hierarchy from child to root
+    // Using LinkedList for efficient prepend operations
+    LinkedList<String> levels = new LinkedList<>();
+    levels.add(name);
+
+    Entity.EntityType currentType = type;
+    while (currentType != Entity.EntityType.METALAKE) {
+      Entity.EntityType parentType = parentEntityType(currentType);
+      String parentName = entities.get(parentType);
+
+      if (parentType == Entity.EntityType.METALAKE) {
+        levels.addFirst(metalake);
+        break;
+      } else if (parentName == null) {
+        throw new IllegalArgumentException(
+            "Cannot build NameIdentifier for type "
+                + type
+                + ": missing parent entity of type "
+                + parentType);
+      }
+
+      levels.addFirst(parentName);
+      currentType = parentType;
+    }
+
+    // Create NameIdentifier from the full path
+    return NameIdentifier.of(levels.toArray(new String[0]));
+  }
+
+  /**
+   * Build a map of NameIdentifiers from entity data.
+   *
+   * @param entities Map containing entity types and their names (e.g., 
METALAKE, CATALOG, SCHEMA,
+   *     etc.)
+   * @return Map of entity types to their NameIdentifiers
+   */
+  public static Map<Entity.EntityType, NameIdentifier> buildNameIdentifierMap(
+      Map<Entity.EntityType, String> entities) {
+    Map<Entity.EntityType, NameIdentifier> nameIdentifierMap = 
Maps.newHashMap();
+
+    entities.forEach(
+        (type, name) -> {
+          NameIdentifier identifier = buildNameIdentifier(type, name, 
entities);
+          nameIdentifierMap.put(type, identifier);
+        });
+
+    return nameIdentifierMap;
+  }
+
   public static Entity.EntityType parentEntityType(Entity.EntityType type) {
     switch (type) {
       case COLUMN:
@@ -731,4 +859,47 @@ public class NameIdentifierUtil {
         throw new IllegalArgumentException("Metalake has no parent entity 
type");
     }
   }
+
+  /**
+   * Extract the parent metadata from NameIdentifier. For example, when given 
a Table
+   * NameIdentifier, it returns a map containing the Table itself along with 
its parent Schema and
+   * Catalog.
+   *
+   * @param metalake metalake
+   * @param entityType metadata type
+   * @param nameIdentifier metadata name
+   * @return A map containing the metadata object and all its parent objects, 
keyed by their types
+   */
+  public static Map<Entity.EntityType, NameIdentifier> splitNameIdentifier(
+      String metalake, Entity.EntityType entityType, NameIdentifier 
nameIdentifier) {
+    Map<Entity.EntityType, NameIdentifier> nameIdentifierMap = new HashMap<>();
+    nameIdentifierMap.put(Entity.EntityType.METALAKE, ofMetalake(metalake));
+    while (entityType != Entity.EntityType.METALAKE) {
+      nameIdentifierMap.put(entityType, nameIdentifier);
+      entityType = parentEntityType(entityType);
+      nameIdentifier = parentNameIdentifier(nameIdentifier, entityType);
+    }
+    return nameIdentifierMap;
+  }
+
+  private static Namespace createVirtualNamespace(Entity.EntityType type, 
String metalake) {
+    switch (type) {
+      case USER:
+        return NamespaceUtil.ofUser(metalake);
+      case GROUP:
+        return NamespaceUtil.ofGroup(metalake);
+      case ROLE:
+        return NamespaceUtil.ofRole(metalake);
+      case TAG:
+        return NamespaceUtil.ofTag(metalake);
+      case POLICY:
+        return NamespaceUtil.ofPolicy(metalake);
+      case JOB:
+        return NamespaceUtil.ofJob(metalake);
+      case JOB_TEMPLATE:
+        return NamespaceUtil.ofJobTemplate(metalake);
+      default:
+        throw new IllegalArgumentException("Unsupported virtual namespace 
type: " + type);
+    }
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/job/local/TestLocalJobExecutor.java 
b/core/src/test/java/org/apache/gravitino/job/local/TestLocalJobExecutor.java
index 4f4100a3e3..bfb3ec3a91 100644
--- 
a/core/src/test/java/org/apache/gravitino/job/local/TestLocalJobExecutor.java
+++ 
b/core/src/test/java/org/apache/gravitino/job/local/TestLocalJobExecutor.java
@@ -28,6 +28,7 @@ import java.nio.file.Files;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
 import org.apache.gravitino.connector.job.JobExecutor;
 import org.apache.gravitino.job.JobHandle;
 import org.apache.gravitino.job.JobManager;
@@ -41,7 +42,6 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 public class TestLocalJobExecutor {
diff --git 
a/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
 
b/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
index 10fbfc7909..bafe0b4b94 100644
--- 
a/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
+++ 
b/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
@@ -28,6 +28,7 @@ import java.io.File;
 import java.nio.file.Files;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
@@ -41,7 +42,6 @@ import org.apache.gravitino.stats.StatisticValue;
 import org.apache.gravitino.stats.StatisticValues;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
-import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
 
 public class TestLancePartitionStatisticStorage {
 
diff --git 
a/core/src/test/java/org/apache/gravitino/utils/TestNameIdentifierUtil.java 
b/core/src/test/java/org/apache/gravitino/utils/TestNameIdentifierUtil.java
index 27352df551..6a75a5c5b6 100644
--- a/core/src/test/java/org/apache/gravitino/utils/TestNameIdentifierUtil.java
+++ b/core/src/test/java/org/apache/gravitino/utils/TestNameIdentifierUtil.java
@@ -23,6 +23,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.Maps;
+import java.util.Map;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
@@ -197,4 +199,152 @@ public class TestNameIdentifierUtil {
     Assertions.assertThrows(
         IllegalArgumentException.class, () -> NameIdentifierUtil.ofRole(null, 
roleName));
   }
+
+  @Test
+  void testBuildNameIdentifier() {
+    String metalake = "my_metalake";
+    String catalog = "my_catalog";
+    String schema = "my_schema";
+    String tableName = "users";
+    String columnName = "id";
+    String userName = "john_doe";
+
+    // Test 1: Build a METALAKE identifier
+    Map<Entity.EntityType, String> metalakeEntities = Maps.newHashMap();
+    metalakeEntities.put(Entity.EntityType.METALAKE, metalake);
+    NameIdentifier metalakeIdent =
+        NameIdentifierUtil.buildNameIdentifier(
+            Entity.EntityType.METALAKE, metalake, metalakeEntities);
+    assertEquals(NameIdentifier.of(metalake), metalakeIdent);
+    assertEquals(metalake, metalakeIdent.name());
+
+    // Test 2: Build a CATALOG identifier
+    Map<Entity.EntityType, String> catalogEntities = Maps.newHashMap();
+    catalogEntities.put(Entity.EntityType.METALAKE, metalake);
+    NameIdentifier catalogIdent =
+        NameIdentifierUtil.buildNameIdentifier(Entity.EntityType.CATALOG, 
catalog, catalogEntities);
+    assertEquals(NameIdentifier.of(metalake, catalog), catalogIdent);
+    assertEquals(catalog, catalogIdent.name());
+    assertEquals(1, catalogIdent.namespace().length());
+
+    // Test 3: Build a SCHEMA identifier
+    Map<Entity.EntityType, String> schemaEntities = Maps.newHashMap();
+    schemaEntities.put(Entity.EntityType.METALAKE, metalake);
+    schemaEntities.put(Entity.EntityType.CATALOG, catalog);
+    NameIdentifier schemaIdent =
+        NameIdentifierUtil.buildNameIdentifier(Entity.EntityType.SCHEMA, 
schema, schemaEntities);
+    assertEquals(NameIdentifier.of(metalake, catalog, schema), schemaIdent);
+    assertEquals(schema, schemaIdent.name());
+    assertEquals(2, schemaIdent.namespace().length());
+
+    // Test 4: Build a TABLE identifier
+    Map<Entity.EntityType, String> tableEntities = Maps.newHashMap();
+    tableEntities.put(Entity.EntityType.METALAKE, metalake);
+    tableEntities.put(Entity.EntityType.CATALOG, catalog);
+    tableEntities.put(Entity.EntityType.SCHEMA, schema);
+    NameIdentifier tableIdent =
+        NameIdentifierUtil.buildNameIdentifier(Entity.EntityType.TABLE, 
tableName, tableEntities);
+    assertEquals(NameIdentifier.of(metalake, catalog, schema, tableName), 
tableIdent);
+    assertEquals(tableName, tableIdent.name());
+    assertEquals(3, tableIdent.namespace().length());
+
+    // Test 5: Build a COLUMN identifier
+    Map<Entity.EntityType, String> columnEntities = Maps.newHashMap();
+    columnEntities.put(Entity.EntityType.METALAKE, metalake);
+    columnEntities.put(Entity.EntityType.CATALOG, catalog);
+    columnEntities.put(Entity.EntityType.SCHEMA, schema);
+    columnEntities.put(Entity.EntityType.TABLE, tableName);
+    NameIdentifier columnIdent =
+        NameIdentifierUtil.buildNameIdentifier(
+            Entity.EntityType.COLUMN, columnName, columnEntities);
+    assertEquals(NameIdentifier.of(metalake, catalog, schema, tableName, 
columnName), columnIdent);
+    assertEquals(columnName, columnIdent.name());
+    assertEquals(4, columnIdent.namespace().length());
+
+    // Test 6: Build a USER identifier (virtual namespace type)
+    Map<Entity.EntityType, String> userEntities = Maps.newHashMap();
+    userEntities.put(Entity.EntityType.METALAKE, metalake);
+    NameIdentifier userIdent =
+        NameIdentifierUtil.buildNameIdentifier(Entity.EntityType.USER, 
userName, userEntities);
+    NameIdentifier expectedUserIdent = NameIdentifierUtil.ofUser(metalake, 
userName);
+    assertEquals(expectedUserIdent, userIdent);
+    assertEquals(userName, userIdent.name());
+    // Virtual namespace should have 3 levels: [metalake, "system", "user"]
+    assertEquals(3, userIdent.namespace().length());
+    assertEquals(metalake, userIdent.namespace().level(0));
+    assertEquals(Entity.SYSTEM_CATALOG_RESERVED_NAME, 
userIdent.namespace().level(1));
+    assertEquals(Entity.USER_SCHEMA_NAME, userIdent.namespace().level(2));
+
+    // Test 7: Build a TAG identifier (virtual namespace type)
+    String tagName = "sensitive";
+    Map<Entity.EntityType, String> tagEntities = Maps.newHashMap();
+    tagEntities.put(Entity.EntityType.METALAKE, metalake);
+    NameIdentifier tagIdent =
+        NameIdentifierUtil.buildNameIdentifier(Entity.EntityType.TAG, tagName, 
tagEntities);
+    NameIdentifier expectedTagIdent = NameIdentifierUtil.ofTag(metalake, 
tagName);
+    assertEquals(expectedTagIdent, tagIdent);
+    assertEquals(tagName, tagIdent.name());
+
+    // Test 8: Build a ROLE identifier (virtual namespace type)
+    String roleName = "admin";
+    Map<Entity.EntityType, String> roleEntities = Maps.newHashMap();
+    roleEntities.put(Entity.EntityType.METALAKE, metalake);
+    NameIdentifier roleIdent =
+        NameIdentifierUtil.buildNameIdentifier(Entity.EntityType.ROLE, 
roleName, roleEntities);
+    NameIdentifier expectedRoleIdent = NameIdentifierUtil.ofRole(metalake, 
roleName);
+    assertEquals(expectedRoleIdent, roleIdent);
+    assertEquals(roleName, roleIdent.name());
+
+    // Test 9: Error case - missing parent entity (CATALOG without METALAKE)
+    Map<Entity.EntityType, String> invalidEntities = Maps.newHashMap();
+    // Missing METALAKE entry - should throw an exception about null or empty 
identifiers
+    Throwable exception1 =
+        assertThrows(
+            Exception.class,
+            () ->
+                NameIdentifierUtil.buildNameIdentifier(
+                    Entity.EntityType.CATALOG, catalog, invalidEntities));
+    // The exception could be about null identifiers or missing parent, both 
are valid
+    assertTrue(
+        exception1.getMessage().contains("null")
+            || exception1.getMessage().contains("empty")
+            || exception1.getMessage().contains("missing parent entity of 
type"));
+
+    // Test 10: Error case - missing intermediate parent (TABLE without SCHEMA)
+    Map<Entity.EntityType, String> missingSchemaEntities = Maps.newHashMap();
+    missingSchemaEntities.put(Entity.EntityType.METALAKE, metalake);
+    missingSchemaEntities.put(Entity.EntityType.CATALOG, catalog);
+    // Missing SCHEMA
+    Throwable exception2 =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                NameIdentifierUtil.buildNameIdentifier(
+                    Entity.EntityType.TABLE, tableName, 
missingSchemaEntities));
+    assertTrue(exception2.getMessage().contains("missing parent entity of 
type"));
+    assertTrue(exception2.getMessage().contains("SCHEMA"));
+
+    // Test 11: Build a FILESET identifier
+    String filesetName = "my_fileset";
+    Map<Entity.EntityType, String> filesetEntities = Maps.newHashMap();
+    filesetEntities.put(Entity.EntityType.METALAKE, metalake);
+    filesetEntities.put(Entity.EntityType.CATALOG, catalog);
+    filesetEntities.put(Entity.EntityType.SCHEMA, schema);
+    NameIdentifier filesetIdent =
+        NameIdentifierUtil.buildNameIdentifier(
+            Entity.EntityType.FILESET, filesetName, filesetEntities);
+    assertEquals(NameIdentifier.of(metalake, catalog, schema, filesetName), 
filesetIdent);
+    assertEquals(filesetName, filesetIdent.name());
+
+    // Test 12: Build a TOPIC identifier
+    String topicName = "my_topic";
+    Map<Entity.EntityType, String> topicEntities = Maps.newHashMap();
+    topicEntities.put(Entity.EntityType.METALAKE, metalake);
+    topicEntities.put(Entity.EntityType.CATALOG, catalog);
+    topicEntities.put(Entity.EntityType.SCHEMA, schema);
+    NameIdentifier topicIdent =
+        NameIdentifierUtil.buildNameIdentifier(Entity.EntityType.TOPIC, 
topicName, topicEntities);
+    assertEquals(NameIdentifier.of(metalake, catalog, schema, topicName), 
topicIdent);
+    assertEquals(topicName, topicIdent.name());
+  }
 }
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
index 211bb15700..5818de1fb3 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
@@ -28,7 +28,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.logging.Level;
-import java.util.logging.Logger;
 import java.util.stream.Stream;
 import javax.servlet.http.HttpServletRequest;
 import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
@@ -97,7 +96,10 @@ public class IcebergRestTestUtil {
     if (DEBUG_SERVER_LOG_ENABLED) {
       resourceConfig.register(
           new LoggingFeature(
-              Logger.getLogger(LoggingFeature.DEFAULT_LOGGER_NAME),
+              // Use fully qualified name to avoid ambiguity. Jersey's 
LoggingFeature requires
+              // java.util.logging.Logger, not org.slf4j.Logger which is 
commonly used elsewhere
+              // in the Gravitino codebase.
+              
java.util.logging.Logger.getLogger(LoggingFeature.DEFAULT_LOGGER_NAME),
               Level.INFO,
               Verbosity.PAYLOAD_ANY,
               10000));
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
index 15d5f542ad..028747559b 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
@@ -20,7 +20,6 @@ package org.apache.gravitino.server.authorization;
 import java.lang.reflect.Array;
 import java.security.Principal;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -69,7 +68,7 @@ public class MetadataAuthzHelper {
         authorizationRequestContext,
         metalake -> {
           String metalakeName = metalake.name();
-          return splitMetadataNames(
+          return NameIdentifierUtil.splitNameIdentifier(
               metalakeName,
               Entity.EntityType.METALAKE,
               NameIdentifierUtil.ofMetalake(metalakeName));
@@ -93,7 +92,7 @@ public class MetadataAuthzHelper {
         GravitinoAuthorizerProvider.getInstance().getGravitinoAuthorizer(),
         new AuthorizationRequestContext(),
         metadataObject ->
-            splitMetadataNames(
+            NameIdentifierUtil.splitNameIdentifier(
                 metalake,
                 MetadataObjectUtil.toEntityType(metadataObject.type()),
                 MetadataObjectUtil.toEntityIdent(metalake, metadataObject)),
@@ -116,7 +115,7 @@ public class MetadataAuthzHelper {
         GravitinoAuthorizerProvider.getInstance().getGravitinoAuthorizer(),
         new AuthorizationRequestContext(),
         metadataObject ->
-            splitMetadataNames(
+            NameIdentifierUtil.splitNameIdentifier(
                 metalake,
                 MetadataObjectUtil.toEntityType(metadataObject.type()),
                 MetadataObjectUtil.toEntityIdent(metalake, metadataObject)),
@@ -156,7 +155,7 @@ public class MetadataAuthzHelper {
 
     String metalake = NameIdentifierUtil.getMetalake(identifier);
     Map<Entity.EntityType, NameIdentifier> nameIdentifierMap =
-        splitMetadataNames(metalake, entityType, identifier);
+        NameIdentifierUtil.splitNameIdentifier(metalake, entityType, 
identifier);
     AuthorizationExpressionEvaluator authorizationExpressionEvaluator =
         new AuthorizationExpressionEvaluator(expression);
     return authorizationExpressionEvaluator.evaluate(
@@ -191,7 +190,7 @@ public class MetadataAuthzHelper {
         authorizationRequestContext,
         (entity) -> {
           NameIdentifier nameIdentifier = toNameIdentifier.apply(entity);
-          return splitMetadataNames(metalake, entityType, nameIdentifier);
+          return NameIdentifierUtil.splitNameIdentifier(metalake, entityType, 
nameIdentifier);
         },
         (unused) -> null);
   }
@@ -231,7 +230,7 @@ public class MetadataAuthzHelper {
         authorizationRequestContext,
         (entity) -> {
           NameIdentifier nameIdentifier = toNameIdentifier.apply(entity);
-          return splitMetadataNames(metalake, entityType, nameIdentifier);
+          return NameIdentifierUtil.splitNameIdentifier(metalake, entityType, 
nameIdentifier);
         },
         (unused) -> null);
   }
@@ -295,28 +294,6 @@ public class MetadataAuthzHelper {
         .toArray(size -> (E[]) 
Array.newInstance(entities.getClass().getComponentType(), size));
   }
 
-  /**
-   * Extract the parent metadata from NameIdentifier. For example, when given 
a Table
-   * NameIdentifier, it returns a map containing the Table itself along with 
its parent Schema and
-   * Catalog.
-   *
-   * @param metalake metalake
-   * @param entityType metadata type
-   * @param nameIdentifier metadata name
-   * @return A map containing the metadata object and all its parent objects, 
keyed by their types
-   */
-  public static Map<Entity.EntityType, NameIdentifier> splitMetadataNames(
-      String metalake, Entity.EntityType entityType, NameIdentifier 
nameIdentifier) {
-    Map<Entity.EntityType, NameIdentifier> nameIdentifierMap = new HashMap<>();
-    nameIdentifierMap.put(Entity.EntityType.METALAKE, 
NameIdentifierUtil.ofMetalake(metalake));
-    while (entityType != Entity.EntityType.METALAKE) {
-      nameIdentifierMap.put(entityType, nameIdentifier);
-      entityType = NameIdentifierUtil.parentEntityType(entityType);
-      nameIdentifier = NameIdentifierUtil.parentNameIdentifier(nameIdentifier, 
entityType);
-    }
-    return nameIdentifierMap;
-  }
-
   private static boolean enableAuthorization() {
     Config config = GravitinoEnv.getInstance().config();
     return config != null && config.get(Configs.ENABLE_AUTHORIZATION);
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
index fa15a048ac..e974c19492 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
@@ -30,6 +30,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import javax.ws.rs.core.Response;
 import org.aopalliance.intercept.ConstructorInterceptor;
@@ -146,7 +147,7 @@ public class GravitinoInterceptionService implements 
InterceptionService {
         if (expressionAnnotation != null) {
           String expression = expressionAnnotation.expression();
           Object[] args = methodInvocation.getArguments();
-          String entityType = 
extractMetadataObjectTypeFromParameters(parameters, args);
+          Optional<String> entityType = 
extractMetadataObjectTypeFromParameters(parameters, args);
           Map<Entity.EntityType, NameIdentifier> metadataContext =
               extractNameIdentifierFromParameters(parameters, args);
 
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/ParameterUtil.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/ParameterUtil.java
index ddefaf01fb..70c0731d64 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/filter/ParameterUtil.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/ParameterUtil.java
@@ -17,16 +17,17 @@
 
 package org.apache.gravitino.server.web.filter;
 
+import java.lang.annotation.Annotation;
 import java.lang.reflect.Parameter;
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.server.authorization.MetadataAuthzHelper;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationFullName;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationObjectType;
@@ -65,149 +66,67 @@ public class ParameterUtil {
 
   public static Map<Entity.EntityType, NameIdentifier> 
extractNameIdentifierFromParameters(
       Parameter[] parameters, Object[] args) {
-    Map<Entity.EntityType, String> entities = new HashMap<>();
-    Map<Entity.EntityType, NameIdentifier> nameIdentifierMap = new HashMap<>();
-    // Extract AuthorizationMetadata
-    for (int i = 0; i < parameters.length; i++) {
-      Parameter parameter = parameters[i];
-      AuthorizationMetadata authorizeResource =
-          parameter.getAnnotation(AuthorizationMetadata.class);
-      if (authorizeResource == null) {
-        continue;
+    Map<Entity.EntityType, String> entities = 
extractEntitiesFromAnnotations(parameters, args);
+
+    // Build name identifiers from extracted entities
+    Map<Entity.EntityType, NameIdentifier> nameIdentifierMap =
+        NameIdentifierUtil.buildNameIdentifierMap(entities);
+
+    // Extract full name and object type for metadata objects
+    Optional<String> fullName =
+        extractAnnotatedValue(parameters, args, AuthorizationFullName.class);
+    Optional<String> metadataObjectType =
+        extractAnnotatedValue(parameters, args, AuthorizationObjectType.class);
+
+    // Handle fullName and metadataObjectType if present
+    if (fullName.isPresent() && metadataObjectType.isPresent()) {
+      String metalake = entities.get(Entity.EntityType.METALAKE);
+      if (metalake != null) {
+        MetadataObject.Type type =
+            
MetadataObject.Type.valueOf(metadataObjectType.get().toUpperCase(Locale.ROOT));
+        NameIdentifier nameIdentifier =
+            MetadataObjectUtil.toEntityIdent(metalake, 
MetadataObjects.parse(fullName.get(), type));
+        nameIdentifierMap.putAll(
+            NameIdentifierUtil.splitNameIdentifier(
+                metalake, MetadataObjectUtil.toEntityType(type), 
nameIdentifier));
       }
-      Entity.EntityType type = authorizeResource.type();
-      entities.put(type, String.valueOf(args[i]));
     }
 
-    String metalake = entities.get(Entity.EntityType.METALAKE);
-    String catalog = entities.get(Entity.EntityType.CATALOG);
-    String schema = entities.get(Entity.EntityType.SCHEMA);
-    String table = entities.get(Entity.EntityType.TABLE);
-    String topic = entities.get(Entity.EntityType.TOPIC);
-    String fileset = entities.get(Entity.EntityType.FILESET);
+    return nameIdentifierMap;
+  }
 
-    // Extract full name and types
-    String fullName = null;
-    String metadataObjectType = null;
+  private static Map<Entity.EntityType, String> extractEntitiesFromAnnotations(
+      Parameter[] parameters, Object[] args) {
+    Map<Entity.EntityType, String> entities = new HashMap<>();
     for (int i = 0; i < parameters.length; i++) {
-      Parameter parameter = parameters[i];
-      AuthorizationFullName authorizeFullName =
-          parameter.getAnnotation(AuthorizationFullName.class);
-      if (authorizeFullName != null) {
-        fullName = String.valueOf(args[i]);
-      }
-
-      AuthorizationObjectType objectType = 
parameter.getAnnotation(AuthorizationObjectType.class);
-      if (objectType != null) {
-        metadataObjectType = String.valueOf(args[i]);
+      AuthorizationMetadata annotation = 
parameters[i].getAnnotation(AuthorizationMetadata.class);
+      if (annotation != null) {
+        entities.put(annotation.type(), String.valueOf(args[i]));
       }
     }
+    return entities;
+  }
 
-    entities.forEach(
-        (type, metadata) -> {
-          switch (type) {
-            case CATALOG:
-              nameIdentifierMap.put(
-                  Entity.EntityType.CATALOG, 
NameIdentifierUtil.ofCatalog(metalake, catalog));
-              break;
-            case SCHEMA:
-              nameIdentifierMap.put(
-                  Entity.EntityType.SCHEMA, 
NameIdentifierUtil.ofSchema(metalake, catalog, schema));
-              break;
-            case TABLE:
-              nameIdentifierMap.put(
-                  Entity.EntityType.TABLE,
-                  NameIdentifierUtil.ofTable(metalake, catalog, schema, 
table));
-              break;
-            case TOPIC:
-              nameIdentifierMap.put(
-                  Entity.EntityType.TOPIC,
-                  NameIdentifierUtil.ofTopic(metalake, catalog, schema, 
topic));
-              break;
-            case FILESET:
-              nameIdentifierMap.put(
-                  Entity.EntityType.FILESET,
-                  NameIdentifierUtil.ofFileset(metalake, catalog, schema, 
fileset));
-              break;
-            case MODEL:
-              String model = entities.get(Entity.EntityType.MODEL);
-              nameIdentifierMap.put(
-                  Entity.EntityType.MODEL,
-                  NameIdentifierUtil.ofModel(metalake, catalog, schema, 
model));
-              break;
-            case METALAKE:
-              nameIdentifierMap.put(
-                  Entity.EntityType.METALAKE, 
NameIdentifierUtil.ofMetalake(metalake));
-              break;
-            case USER:
-              nameIdentifierMap.put(
-                  Entity.EntityType.USER,
-                  NameIdentifierUtil.ofUser(metadata, 
entities.get(Entity.EntityType.USER)));
-              break;
-            case GROUP:
-              nameIdentifierMap.put(
-                  Entity.EntityType.GROUP,
-                  NameIdentifierUtil.ofGroup(metalake, 
entities.get(Entity.EntityType.GROUP)));
-              break;
-            case ROLE:
-              nameIdentifierMap.put(
-                  Entity.EntityType.ROLE,
-                  NameIdentifierUtil.ofRole(metalake, 
entities.get(Entity.EntityType.ROLE)));
-              break;
-            case TAG:
-              nameIdentifierMap.put(
-                  Entity.EntityType.TAG,
-                  NameIdentifierUtil.ofTag(metalake, 
entities.get(Entity.EntityType.TAG)));
-              break;
-
-            case POLICY:
-              nameIdentifierMap.put(
-                  Entity.EntityType.POLICY,
-                  NameIdentifierUtil.ofPolicy(metalake, 
entities.get(Entity.EntityType.POLICY)));
-              break;
-
-            case JOB:
-              nameIdentifierMap.put(
-                  Entity.EntityType.JOB,
-                  NameIdentifierUtil.ofJob(metalake, 
entities.get(Entity.EntityType.JOB)));
-              break;
-
-            case JOB_TEMPLATE:
-              nameIdentifierMap.put(
-                  Entity.EntityType.JOB_TEMPLATE,
-                  NameIdentifierUtil.ofJobTemplate(
-                      metalake, entities.get(Entity.EntityType.JOB_TEMPLATE)));
-              break;
-
-            default:
-              break;
-          }
-        });
-
-    // Extract fullName and metadataObjectType
-    if (fullName != null && metadataObjectType != null && metalake != null) {
-      MetadataObject.Type type =
-          
MetadataObject.Type.valueOf(metadataObjectType.toUpperCase(Locale.ROOT));
-      NameIdentifier nameIdentifier =
-          MetadataObjectUtil.toEntityIdent(metalake, 
MetadataObjects.parse(fullName, type));
-      nameIdentifierMap.putAll(
-          MetadataAuthzHelper.splitMetadataNames(
-              metalake, MetadataObjectUtil.toEntityType(type), 
nameIdentifier));
+  private static Optional<String> extractAnnotatedValue(
+      Parameter[] parameters, Object[] args, Class<? extends Annotation> 
annotationClass) {
+    for (int i = 0; i < parameters.length; i++) {
+      if (parameters[i].getAnnotation(annotationClass) != null) {
+        return Optional.of(String.valueOf(args[i]));
+      }
     }
-
-    return nameIdentifierMap;
+    return Optional.empty();
   }
 
-  public static String extractMetadataObjectTypeFromParameters(
+  public static Optional<String> extractMetadataObjectTypeFromParameters(
       Parameter[] parameters, Object[] args) {
     for (int i = 0; i < parameters.length; i++) {
       Parameter parameter = parameters[i];
       AuthorizationObjectType objectType = 
parameter.getAnnotation(AuthorizationObjectType.class);
       if (objectType != null) {
-        return String.valueOf(args[i]).toUpperCase();
+        return Optional.of(String.valueOf(args[i]).toUpperCase());
       }
     }
-    return null;
+    return Optional.empty();
   }
 
   public static void buildNameIdentifierForBatchAuthorization(
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AssociatePolicyAuthorizationExecutor.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AssociatePolicyAuthorizationExecutor.java
index cc8c055d99..0f170efb0f 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AssociatePolicyAuthorizationExecutor.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AssociatePolicyAuthorizationExecutor.java
@@ -49,7 +49,7 @@ public class AssociatePolicyAuthorizationExecutor extends 
CommonAuthorizerExecut
       Map<Entity.EntityType, NameIdentifier> metadataContext,
       AuthorizationExpressionEvaluator authorizationExpressionEvaluator,
       Map<String, Object> pathParams,
-      String entityType) {
+      Optional<String> entityType) {
     super(expression, metadataContext, authorizationExpressionEvaluator, 
pathParams, entityType);
     this.parameters = parameters;
     this.args = args;
@@ -100,7 +100,7 @@ public class AssociatePolicyAuthorizationExecutor extends 
CommonAuthorizerExecut
 
       boolean authorized =
           authorizationExpressionEvaluator.evaluate(
-              currentContext, pathParams, context, 
Optional.ofNullable(entityType));
+              currentContext, pathParams, context, entityType);
 
       if (!authorized) {
         return false; // Fail fast on first unauthorized policy
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AssociateTagAuthorizationExecutor.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AssociateTagAuthorizationExecutor.java
index 09694e96a7..af445fa82d 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AssociateTagAuthorizationExecutor.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AssociateTagAuthorizationExecutor.java
@@ -49,7 +49,7 @@ public class AssociateTagAuthorizationExecutor extends 
CommonAuthorizerExecutor
       Map<Entity.EntityType, NameIdentifier> metadataContext,
       AuthorizationExpressionEvaluator authorizationExpressionEvaluator,
       Map<String, Object> pathParams,
-      String entityType) {
+      Optional<String> entityType) {
     super(expression, metadataContext, authorizationExpressionEvaluator, 
pathParams, entityType);
     this.parameters = parameters;
     this.args = args;
@@ -100,7 +100,7 @@ public class AssociateTagAuthorizationExecutor extends 
CommonAuthorizerExecutor
 
       boolean authorized =
           authorizationExpressionEvaluator.evaluate(
-              currentContext, pathParams, context, 
Optional.ofNullable(entityType));
+              currentContext, pathParams, context, entityType);
 
       if (!authorized) {
         return false; // Fail fast on first unauthorized tag
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AuthorizeExecutorFactory.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AuthorizeExecutorFactory.java
index b6051b1c88..a32b2a93ab 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AuthorizeExecutorFactory.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/AuthorizeExecutorFactory.java
@@ -19,6 +19,7 @@ package org.apache.gravitino.server.web.filter.authorization;
 
 import java.lang.reflect.Parameter;
 import java.util.Map;
+import java.util.Optional;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.NameIdentifier;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationRequest;
@@ -32,7 +33,7 @@ public class AuthorizeExecutorFactory {
       Map<Entity.EntityType, NameIdentifier> metadataContext,
       AuthorizationExpressionEvaluator authorizationExpressionEvaluator,
       Map<String, Object> pathParams,
-      String entityType,
+      Optional<String> entityType,
       Parameter[] parameters,
       Object[] args) {
     return switch (requestType) {
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/CommonAuthorizerExecutor.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/CommonAuthorizerExecutor.java
index 5b78ae28ad..3ea84933cb 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/CommonAuthorizerExecutor.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/CommonAuthorizerExecutor.java
@@ -29,7 +29,7 @@ public class CommonAuthorizerExecutor implements 
AuthorizationExecutor {
   protected Map<Entity.EntityType, NameIdentifier> metadataContext;
   protected AuthorizationExpressionEvaluator authorizationExpressionEvaluator;
   protected Map<String, Object> pathParams;
-  protected String entityType;
+  protected Optional<String> entityType;
   protected String expression;
 
   public CommonAuthorizerExecutor(
@@ -37,7 +37,7 @@ public class CommonAuthorizerExecutor implements 
AuthorizationExecutor {
       Map<Entity.EntityType, NameIdentifier> metadataContext,
       AuthorizationExpressionEvaluator authorizationExpressionEvaluator,
       Map<String, Object> pathParams,
-      String entityType) {
+      Optional<String> entityType) {
     this.expression = expression;
     this.metadataContext = metadataContext;
     this.authorizationExpressionEvaluator = authorizationExpressionEvaluator;
@@ -50,9 +50,6 @@ public class CommonAuthorizerExecutor implements 
AuthorizationExecutor {
     AuthorizationRequestContext authorizationRequestContext = new 
AuthorizationRequestContext();
     authorizationRequestContext.setOriginalAuthorizationExpression(expression);
     return authorizationExpressionEvaluator.evaluate(
-        metadataContext,
-        pathParams,
-        new AuthorizationRequestContext(),
-        Optional.ofNullable(entityType));
+        metadataContext, pathParams, new AuthorizationRequestContext(), 
entityType);
   }
 }
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/RunJobAuthorizationExecutor.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/RunJobAuthorizationExecutor.java
index a4c5ea2039..05e8308696 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/RunJobAuthorizationExecutor.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/authorization/RunJobAuthorizationExecutor.java
@@ -35,7 +35,7 @@ public class RunJobAuthorizationExecutor implements 
AuthorizationExecutor {
   private final Map<Entity.EntityType, NameIdentifier> metadataContext;
   private final AuthorizationExpressionEvaluator 
authorizationExpressionEvaluator;
   private final Map<String, Object> pathParams;
-  private final String entityType;
+  private final Optional<String> entityType;
 
   public RunJobAuthorizationExecutor(
       Parameter[] parameters,
@@ -43,7 +43,7 @@ public class RunJobAuthorizationExecutor implements 
AuthorizationExecutor {
       Map<Entity.EntityType, NameIdentifier> metadataContext,
       AuthorizationExpressionEvaluator authorizationExpressionEvaluator,
       Map<String, Object> pathParams,
-      String entityType) {
+      Optional<String> entityType) {
     this.parameters = parameters;
     this.args = args;
     this.metadataContext = metadataContext;
@@ -73,6 +73,6 @@ public class RunJobAuthorizationExecutor implements 
AuthorizationExecutor {
     metadataContext.put(Entity.EntityType.JOB_TEMPLATE, 
jobTemplateNameIdentifier);
 
     return authorizationExpressionEvaluator.evaluate(
-        metadataContext, pathParams, context, Optional.ofNullable(entityType));
+        metadataContext, pathParams, context, entityType);
   }
 }
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/sql/SparkQueryRunner.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/sql/SparkQueryRunner.java
index b0961a0878..21a7c260e5 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/sql/SparkQueryRunner.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/sql/SparkQueryRunner.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.client.GravitinoAdminClient;
@@ -44,7 +45,6 @@ import org.apache.spark.sql.SparkSession;
 import org.junit.jupiter.api.Assertions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
 
 /** Run and check the correctness of the SparkSQLs */
 public class SparkQueryRunner {
diff --git 
a/trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/TrinoQueryITBase.java
 
b/trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/TrinoQueryITBase.java
index 2a54df2980..5580ffbd36 100644
--- 
a/trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/TrinoQueryITBase.java
+++ 
b/trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/TrinoQueryITBase.java
@@ -26,6 +26,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
+import org.apache.commons.io.FileUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
@@ -40,7 +41,6 @@ import org.apache.gravitino.rel.TableCatalog;
 import org.junit.jupiter.api.Assertions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
 
 public class TrinoQueryITBase {
   private static final Logger LOG = 
LoggerFactory.getLogger(TrinoQueryITBase.class);

Reply via email to