This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ead18cc49 [#7556][followup] feat: Get the credential with the correct 
privileges  (#9218)
7ead18cc49 is described below

commit 7ead18cc4908e6ec80cd1b7a3f223889cb0b9960
Author: roryqi <[email protected]>
AuthorDate: Sun Nov 23 18:22:57 2025 +0800

    [#7556][followup] feat: Get the credential with the correct privileges  
(#9218)
    
    ### What changes were proposed in this pull request?
    
    Get the credential with the correct privileges
    
    ### Why are the changes needed?
    
    Fix: #7556
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add S3 IT
---
 .../iceberg/service/CatalogWrapperForREST.java     |  35 ++-
 .../dispatcher/IcebergTableOperationExecutor.java  |  32 +-
 .../service/rest/IcebergNamespaceOperations.java   |   4 +-
 .../service/rest/IcebergTableOperations.java       |  17 +-
 .../integration/test/IcebergAuthorizationIT.java   |  58 ++--
 .../test/IcebergRESTS3TokenAuthorizationIT.java    | 322 +++++++++++++++++++++
 .../service/rest/TestIcebergTableOperations.java   |   5 +
 ...aFilterHelper.java => MetadataAuthzHelper.java} |  27 +-
 .../AuthorizationExpressionConstants.java          |   6 +
 .../AuthorizationExpressionConverter.java          |   4 +-
 ...terHelper.java => TestMetadataAuthzHelper.java} |  12 +-
 .../gravitino/server/web/filter/ParameterUtil.java |   4 +-
 .../server/web/rest/CatalogOperations.java         |   6 +-
 .../server/web/rest/FilesetOperations.java         |   4 +-
 .../web/rest/MetadataObjectRoleOperations.java     |   4 +-
 .../web/rest/MetadataObjectTagOperations.java      |   6 +-
 .../server/web/rest/MetalakeOperations.java        |   4 +-
 .../gravitino/server/web/rest/ModelOperations.java |   8 +-
 .../gravitino/server/web/rest/RoleOperations.java  |   4 +-
 .../server/web/rest/SchemaOperations.java          |   4 +-
 .../gravitino/server/web/rest/TableOperations.java |   4 +-
 .../gravitino/server/web/rest/TagOperations.java   |   6 +-
 .../gravitino/server/web/rest/TopicOperations.java |   4 +-
 .../gravitino/server/web/rest/UserOperations.java  |   6 +-
 24 files changed, 505 insertions(+), 81 deletions(-)

diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
index 2cac2c7b33..c62ad9f9c9 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
@@ -32,6 +32,7 @@ import 
org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
 import org.apache.gravitino.credential.CatalogCredentialManager;
 import org.apache.gravitino.credential.Credential;
 import org.apache.gravitino.credential.CredentialConstants;
+import org.apache.gravitino.credential.CredentialPrivilege;
 import org.apache.gravitino.credential.CredentialPropertyUtils;
 import org.apache.gravitino.credential.PathBasedCredentialContext;
 import org.apache.gravitino.iceberg.common.IcebergConfig;
@@ -89,15 +90,18 @@ public class CatalogWrapperForREST extends 
IcebergCatalogWrapper {
     LoadTableResponse loadTableResponse = super.createTable(namespace, 
request);
     if (requestCredential) {
       return injectCredentialConfig(
-          TableIdentifier.of(namespace, request.name()), loadTableResponse);
+          TableIdentifier.of(namespace, request.name()),
+          loadTableResponse,
+          CredentialPrivilege.WRITE);
     }
     return loadTableResponse;
   }
 
-  public LoadTableResponse loadTable(TableIdentifier identifier, boolean 
requestCredential) {
+  public LoadTableResponse loadTable(
+      TableIdentifier identifier, boolean requestCredential, 
CredentialPrivilege privilege) {
     LoadTableResponse loadTableResponse = super.loadTable(identifier);
     if (requestCredential) {
-      return injectCredentialConfig(identifier, loadTableResponse);
+      return injectCredentialConfig(identifier, loadTableResponse, privilege);
     }
     return loadTableResponse;
   }
@@ -109,10 +113,11 @@ public class CatalogWrapperForREST extends 
IcebergCatalogWrapper {
    * @return A {@link 
org.apache.iceberg.rest.responses.LoadCredentialsResponse} object containing
    *     the credentials.
    */
-  public LoadCredentialsResponse getTableCredentials(TableIdentifier 
identifier) {
+  public LoadCredentialsResponse getTableCredentials(
+      TableIdentifier identifier, CredentialPrivilege privilege) {
     try {
       LoadTableResponse loadTableResponse = super.loadTable(identifier);
-      Credential credential = getCredential(loadTableResponse);
+      Credential credential = getCredential(loadTableResponse, privilege);
       org.apache.iceberg.rest.credentials.Credential icebergCredential =
           new org.apache.iceberg.rest.credentials.Credential() {
             @Override
@@ -155,8 +160,10 @@ public class CatalogWrapperForREST extends 
IcebergCatalogWrapper {
   }
 
   private LoadTableResponse injectCredentialConfig(
-      TableIdentifier tableIdentifier, LoadTableResponse loadTableResponse) {
-    final Credential credential = getCredential(loadTableResponse);
+      TableIdentifier tableIdentifier,
+      LoadTableResponse loadTableResponse,
+      CredentialPrivilege privilege) {
+    final Credential credential = getCredential(loadTableResponse, privilege);
 
     LOG.info(
         "Generate credential: {} for Iceberg table: {}",
@@ -172,7 +179,8 @@ public class CatalogWrapperForREST extends 
IcebergCatalogWrapper {
         .build();
   }
 
-  private Credential getCredential(LoadTableResponse loadTableResponse) {
+  private Credential getCredential(
+      LoadTableResponse loadTableResponse, CredentialPrivilege privilege) {
     TableMetadata tableMetadata = loadTableResponse.tableMetadata();
     String[] path =
         Stream.of(
@@ -183,8 +191,15 @@ public class CatalogWrapperForREST extends 
IcebergCatalogWrapper {
             .toArray(String[]::new);
 
     PathBasedCredentialContext context =
-        new PathBasedCredentialContext(
-            PrincipalUtils.getCurrentUserName(), ImmutableSet.copyOf(path), 
Collections.emptySet());
+        privilege == CredentialPrivilege.WRITE
+            ? new PathBasedCredentialContext(
+                PrincipalUtils.getCurrentUserName(),
+                ImmutableSet.copyOf(path),
+                Collections.emptySet())
+            : new PathBasedCredentialContext(
+                PrincipalUtils.getCurrentUserName(),
+                Collections.emptySet(),
+                ImmutableSet.copyOf(path));
     Credential credential = catalogCredentialManager.getCredential(context);
     if (credential == null) {
       throw new ServiceUnavailableException("Couldn't generate credential, 
%s", context);
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java
index bebc9c1afa..1c34f259ff 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java
@@ -21,10 +21,17 @@ package org.apache.gravitino.iceberg.service.dispatcher;
 
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.auth.AuthConstants;
 import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.credential.CredentialPrivilege;
+import org.apache.gravitino.iceberg.common.utils.IcebergIdentifierUtils;
 import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
+import 
org.apache.gravitino.iceberg.service.authorization.IcebergRESTServerContext;
 import org.apache.gravitino.listener.api.event.IcebergRequestContext;
+import org.apache.gravitino.server.authorization.MetadataAuthzHelper;
+import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.rest.requests.CreateTableRequest;
@@ -107,9 +114,14 @@ public class IcebergTableOperationExecutor implements 
IcebergTableOperationDispa
   @Override
   public LoadTableResponse loadTable(
       IcebergRequestContext context, TableIdentifier tableIdentifier) {
+    CredentialPrivilege privilege = CredentialPrivilege.READ;
+    if (context.requestCredentialVending()) {
+      privilege = getCredentialPrivilege(context, tableIdentifier);
+    }
+
     return icebergCatalogWrapperManager
         .getCatalogWrapper(context.catalogName())
-        .loadTable(tableIdentifier, context.requestCredentialVending());
+        .loadTable(tableIdentifier, context.requestCredentialVending(), 
privilege);
   }
 
   @Override
@@ -136,8 +148,24 @@ public class IcebergTableOperationExecutor implements 
IcebergTableOperationDispa
   @Override
   public LoadCredentialsResponse getTableCredentials(
       IcebergRequestContext context, TableIdentifier tableIdentifier) {
+    CredentialPrivilege privilege = getCredentialPrivilege(context, 
tableIdentifier);
     return icebergCatalogWrapperManager
         .getCatalogWrapper(context.catalogName())
-        .getTableCredentials(tableIdentifier);
+        .getTableCredentials(tableIdentifier, privilege);
+  }
+
+  private static CredentialPrivilege getCredentialPrivilege(
+      IcebergRequestContext context, TableIdentifier tableIdentifier) {
+    String metalake = IcebergRESTServerContext.getInstance().metalakeName();
+    NameIdentifier identifier =
+        IcebergIdentifierUtils.toGravitinoTableIdentifier(
+            metalake, context.catalogName(), tableIdentifier);
+    boolean writable =
+        MetadataAuthzHelper.checkAccess(
+            identifier,
+            Entity.EntityType.TABLE,
+            
AuthorizationExpressionConstants.filterModifyTableAuthorizationExpression);
+
+    return writable ? CredentialPrivilege.WRITE : CredentialPrivilege.READ;
   }
 }
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergNamespaceOperations.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergNamespaceOperations.java
index 1472d9a7f2..b9b526bb64 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergNamespaceOperations.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergNamespaceOperations.java
@@ -51,7 +51,7 @@ import 
org.apache.gravitino.iceberg.service.authorization.IcebergRESTServerConte
 import 
org.apache.gravitino.iceberg.service.dispatcher.IcebergNamespaceOperationDispatcher;
 import org.apache.gravitino.listener.api.event.IcebergRequestContext;
 import org.apache.gravitino.metrics.MetricNames;
-import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import org.apache.gravitino.server.authorization.MetadataAuthzHelper;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
 import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants;
@@ -341,7 +341,7 @@ public class IcebergNamespaceOperations {
   private ListNamespacesResponse filterListNamespacesResponse(
       ListNamespacesResponse listNamespacesResponse, String metalake, String 
catalogName) {
     NameIdentifier[] idents =
-        MetadataFilterHelper.filterByExpression(
+        MetadataAuthzHelper.filterByExpression(
             metalake,
             
AuthorizationExpressionConstants.filterSchemaAuthorizationExpression,
             Entity.EntityType.SCHEMA,
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
index 98746b5e22..cc0db74147 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
@@ -55,7 +55,7 @@ import 
org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDisp
 import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager;
 import org.apache.gravitino.listener.api.event.IcebergRequestContext;
 import org.apache.gravitino.metrics.MetricNames;
-import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import org.apache.gravitino.server.authorization.MetadataAuthzHelper;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
 import 
org.apache.gravitino.server.authorization.annotations.IcebergAuthorizationMetadata;
@@ -405,10 +405,17 @@ public class IcebergTableOperations {
   @Produces(MediaType.APPLICATION_JSON)
   @Timed(name = "get-table-credentials." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "get-table-credentials", absolute = true)
+  @AuthorizationExpression(
+      expression =
+          "ANY(OWNER, METALAKE, CATALOG) || "
+              + "SCHEMA_OWNER_WITH_USE_CATALOG || "
+              + "ANY_USE_CATALOG && ANY_USE_SCHEMA  && (TABLE::OWNER || 
ANY_SELECT_TABLE || ANY_MODIFY_TABLE)",
+      accessMetadataType = MetadataObject.Type.TABLE)
   public Response getTableCredentials(
-      @PathParam("prefix") String prefix,
-      @Encoded() @PathParam("namespace") String namespace,
-      @PathParam("table") String table) {
+      @AuthorizationMetadata(type = Entity.EntityType.CATALOG) 
@PathParam("prefix") String prefix,
+      @AuthorizationMetadata(type = EntityType.SCHEMA) @Encoded() 
@PathParam("namespace")
+          String namespace,
+      @AuthorizationMetadata(type = EntityType.TABLE) @PathParam("table") 
String table) {
     String catalogName = IcebergRESTUtils.getCatalogName(prefix);
     Namespace icebergNS = RESTUtil.decodeNamespace(namespace);
     LOG.info(
@@ -471,7 +478,7 @@ public class IcebergTableOperations {
   private ListTablesResponse filterListTablesResponse(
       ListTablesResponse listTablesResponse, String metalake, String 
catalogName) {
     NameIdentifier[] idents =
-        MetadataFilterHelper.filterByExpression(
+        MetadataAuthzHelper.filterByExpression(
             metalake,
             
AuthorizationExpressionConstants.filterTableAuthorizationExpression,
             Entity.EntityType.TABLE,
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergAuthorizationIT.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergAuthorizationIT.java
index 561c8211e1..0c28bb0b7e 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergAuthorizationIT.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergAuthorizationIT.java
@@ -20,6 +20,7 @@
 package org.apache.gravitino.iceberg.integration.test;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
 import com.google.errorprone.annotations.FormatMethod;
 import java.io.IOException;
 import java.util.Arrays;
@@ -117,29 +118,37 @@ public class IcebergAuthorizationIT extends BaseIT {
       metalakeClientWithAllPrivilege.addUser(NORMAL_USER);
     }
 
+    Map<String, String> basicProps =
+        ImmutableMap.of(
+            IcebergConstants.URI,
+            getPGUri(),
+            IcebergConstants.CATALOG_BACKEND,
+            "jdbc",
+            IcebergConstants.GRAVITINO_JDBC_DRIVER,
+            "org.postgresql.Driver",
+            IcebergConstants.GRAVITINO_JDBC_USER,
+            getPGUser(),
+            IcebergConstants.GRAVITINO_JDBC_PASSWORD,
+            getPGPassword(),
+            "gravitino.bypass.jdbc.schema-version",
+            "v1",
+            IcebergConstants.ICEBERG_JDBC_INITIALIZE,
+            "true",
+            IcebergConstants.WAREHOUSE,
+            "file:///tmp/");
+
+    Map<String, String> customProps = getCustomProperties();
+    Map<String, String> catalogProps = Maps.newHashMap();
+    catalogProps.putAll(basicProps);
+    catalogProps.putAll(customProps);
+
     catalogClientWithAllPrivilege =
         metalakeClientWithAllPrivilege.createCatalog(
             GRAVITINO_CATALOG_NAME,
             Catalog.Type.RELATIONAL,
             "lakehouse-iceberg",
             "comment",
-            ImmutableMap.of(
-                IcebergConstants.URI,
-                getPGUri(),
-                IcebergConstants.CATALOG_BACKEND,
-                "jdbc",
-                IcebergConstants.GRAVITINO_JDBC_DRIVER,
-                "org.postgresql.Driver",
-                IcebergConstants.GRAVITINO_JDBC_USER,
-                getPGUser(),
-                IcebergConstants.GRAVITINO_JDBC_PASSWORD,
-                getPGPassword(),
-                "gravitino.bypass.jdbc.schema-version",
-                "v1",
-                IcebergConstants.ICEBERG_JDBC_INITIALIZE,
-                "true",
-                IcebergConstants.WAREHOUSE,
-                "file:///tmp/"));
+            catalogProps);
   }
 
   private void startGravitinoServerWithIcebergREST() throws Exception {
@@ -175,6 +184,14 @@ public class IcebergAuthorizationIT extends BaseIT {
     super.startIntegrationTest();
   }
 
+  protected Map<String, String> getCustomProperties() {
+    return Maps.newHashMap();
+  }
+
+  protected boolean supportsCredentialVending() {
+    return false;
+  }
+
   void revokeUserRoles() {
     List<String> roles = 
metalakeClientWithAllPrivilege.getUser(NORMAL_USER).roles();
     if (roles.size() > 0) {
@@ -196,11 +213,12 @@ public class IcebergAuthorizationIT extends BaseIT {
 
   void createTable(String schemaName, String tableName) {
     Column col1 = Column.of("col_1", Types.IntegerType.get(), "col_1_comment");
+    Column col2 = Column.of("col_2", Types.IntegerType.get(), "col_2_comment");
     catalogClientWithAllPrivilege
         .asTableCatalog()
         .createTable(
             NameIdentifier.of(schemaName, tableName),
-            new Column[] {col1},
+            new Column[] {col1, col2},
             "table_comment",
             new HashMap<>());
     boolean exists =
@@ -228,6 +246,10 @@ public class IcebergAuthorizationIT extends BaseIT {
             .set("spark.sql.catalog.rest.rest.auth.basic.password", "mock")
             // drop Iceberg table purge may hang in spark local mode
             .set("spark.locality.wait.node", "0");
+    if (supportsCredentialVending()) {
+      sparkConf.set(
+          "spark.sql.catalog.rest.header.X-Iceberg-Access-Delegation", 
"vended-credentials");
+    }
 
     sparkSession = 
SparkSession.builder().master("local[1]").config(sparkConf).getOrCreate();
   }
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTS3TokenAuthorizationIT.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTS3TokenAuthorizationIT.java
new file mode 100644
index 0000000000..1f3edd453c
--- /dev/null
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTS3TokenAuthorizationIT.java
@@ -0,0 +1,322 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.iceberg.integration.test;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.credential.CredentialConstants;
+import org.apache.gravitino.credential.S3TokenCredential;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.DownloaderUtils;
+import org.apache.gravitino.integration.test.util.ITUtils;
+import org.apache.gravitino.storage.S3Properties;
+import org.apache.iceberg.exceptions.ForbiddenException;
+import org.apache.spark.SparkException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+import org.junit.platform.commons.util.StringUtils;
+
+@SuppressWarnings("FormatStringAnnotation")
+@EnabledIfEnvironmentVariable(named = "GRAVITINO_TEST_CLOUD_IT", matches = 
"true")
+public class IcebergRESTS3TokenAuthorizationIT extends IcebergAuthorizationIT {
+
+  private static final String SCHEMA_NAME = "schema";
+  private String s3Warehouse;
+  private String accessKey;
+  private String secretKey;
+  private String region;
+  private String roleArn;
+  private String externalId;
+
+  @BeforeAll
+  public void startIntegrationTest() throws Exception {
+    this.s3Warehouse =
+        String.format(
+            "s3://%s/test1", 
System.getenv().getOrDefault("GRAVITINO_S3_BUCKET", "{BUCKET_NAME}"));
+    this.accessKey = System.getenv().getOrDefault("GRAVITINO_S3_ACCESS_KEY", 
"{ACCESS_KEY}");
+    this.secretKey = System.getenv().getOrDefault("GRAVITINO_S3_SECRET_KEY", 
"{SECRET_KEY}");
+    this.region = System.getenv().getOrDefault("GRAVITINO_S3_REGION", 
"ap-southeast-2");
+    this.roleArn = System.getenv().getOrDefault("GRAVITINO_S3_ROLE_ARN", 
"{ROLE_ARN}");
+    this.externalId = System.getenv().getOrDefault("GRAVITINO_S3_EXTERNAL_ID", 
"");
+
+    super.startIntegrationTest();
+
+    catalogClientWithAllPrivilege.asSchemas().createSchema(SCHEMA_NAME, 
"test", new HashMap<>());
+
+    if (ITUtils.isEmbedded()) {
+      return;
+    }
+    try {
+      downloadIcebergAwsBundleJar();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    copyS3BundleJar();
+  }
+
+  @BeforeEach
+  void revokePrivilege() {
+    revokeUserRoles();
+    resetMetalakeAndCatalogOwner();
+    MetadataObject schemaObject =
+        MetadataObjects.of(
+            Arrays.asList(GRAVITINO_CATALOG_NAME, SCHEMA_NAME), 
MetadataObject.Type.SCHEMA);
+    metalakeClientWithAllPrivilege.setOwner(schemaObject, SUPER_USER, 
Owner.Type.USER);
+    clearTable();
+    // Grant user the privilege to use the catalog and schema
+    grantUseSchemaRole(SCHEMA_NAME);
+    sql("USE %s;", SPARK_CATALOG_NAME);
+    sql("USE %s;", SCHEMA_NAME);
+  }
+
+  @Override
+  public Map<String, String> getCustomProperties() {
+    HashMap<String, String> m = new HashMap<>();
+    m.putAll(getS3Config());
+    return m;
+  }
+
+  @Override
+  protected boolean supportsCredentialVending() {
+    return true;
+  }
+
+  @Test
+  void testIcebergOwnerS3Token() {
+    String tableName = "test_owner_s3";
+    grantCreateTableRole(SCHEMA_NAME);
+    sql("CREATE TABLE %s(a int, b int) PARTITIONED BY (a)", tableName);
+    sql("INSERT INTO %s VALUES (1,1),(2,2)", tableName);
+    List<Object[]> rows = sql("SELECT * FROM %s", tableName);
+    Assertions.assertEquals(2, rows.size());
+
+    rows = sql("SELECT * FROM %s.%s.%s", SPARK_CATALOG_NAME, SCHEMA_NAME, 
tableName);
+    Assertions.assertEquals(2, rows.size());
+
+    rows = sql("SELECT * FROM %s.%s", SCHEMA_NAME, tableName);
+    Assertions.assertEquals(2, rows.size());
+
+    rows = sql("SELECT * FROM %s.%s.%s.partitions", SPARK_CATALOG_NAME, 
SCHEMA_NAME, tableName);
+    Assertions.assertEquals(2, rows.size());
+
+    rows = sql("SELECT *,_file FROM %s", tableName);
+    Assertions.assertEquals(2, rows.size());
+  }
+
+  @Test
+  void testIcebergSelectTableS3Token() {
+    String tableName = "test_select_s3";
+    createTable(SCHEMA_NAME, tableName);
+
+    // No privileges
+    Assertions.assertThrows(
+        ForbiddenException.class, () -> sql("INSERT INTO %s VALUES 
(1,1),(2,2)", tableName));
+    Assertions.assertThrows(
+        ForbiddenException.class,
+        () -> sql("SELECT * FROM %s.%s.%s", SPARK_CATALOG_NAME, SCHEMA_NAME, 
tableName));
+
+    grantSelectTableRole(tableName);
+    Assertions.assertThrows(
+        SparkException.class, () -> sql("INSERT INTO %s VALUES (1,1),(2,2)", 
tableName));
+    List<Object[]> rows = sql("SELECT * FROM %s", tableName);
+    Assertions.assertEquals(0, rows.size());
+
+    rows = sql("SELECT * FROM %s.%s.%s", SPARK_CATALOG_NAME, SCHEMA_NAME, 
tableName);
+    Assertions.assertEquals(0, rows.size());
+
+    rows = sql("SELECT * FROM %s.%s", SCHEMA_NAME, tableName);
+    Assertions.assertEquals(0, rows.size());
+
+    rows = sql("SELECT * FROM %s.%s.%s.partitions", SPARK_CATALOG_NAME, 
SCHEMA_NAME, tableName);
+    Assertions.assertEquals(0, rows.size());
+
+    rows = sql("SELECT *,_file FROM %s", tableName);
+    Assertions.assertEquals(0, rows.size());
+  }
+
+  @Test
+  void testIcebergModifyTableS3Token() {
+    String tableName = "test_modify_s3";
+    createTable(SCHEMA_NAME, tableName);
+
+    // No privileges
+    Assertions.assertThrows(
+        ForbiddenException.class, () -> sql("INSERT INTO %s VALUES 
(1,1),(2,2)", tableName));
+    Assertions.assertThrows(
+        ForbiddenException.class,
+        () -> sql("SELECT * FROM %s.%s.%s", SPARK_CATALOG_NAME, SCHEMA_NAME, 
tableName));
+
+    grantModifyTableRole(tableName);
+
+    sql("INSERT INTO %s VALUES (1,1),(2,2)", tableName);
+    List<Object[]> rows = sql("SELECT * FROM %s", tableName);
+    Assertions.assertEquals(2, rows.size());
+
+    rows = sql("SELECT * FROM %s.%s.%s", SPARK_CATALOG_NAME, SCHEMA_NAME, 
tableName);
+    Assertions.assertEquals(2, rows.size());
+
+    rows = sql("SELECT * FROM %s.%s", SCHEMA_NAME, tableName);
+    Assertions.assertEquals(2, rows.size());
+
+    rows = sql("SELECT * FROM %s.%s.%s.partitions", SPARK_CATALOG_NAME, 
SCHEMA_NAME, tableName);
+    Assertions.assertEquals(1, rows.size());
+
+    rows = sql("SELECT *,_file FROM %s", tableName);
+    Assertions.assertEquals(2, rows.size());
+  }
+
+  private void grantUseSchemaRole(String schema) {
+    String roleName = "useSchema_" + UUID.randomUUID();
+    List<SecurableObject> securableObjects = new ArrayList<>();
+    SecurableObject catalogObject =
+        SecurableObjects.ofCatalog(
+            GRAVITINO_CATALOG_NAME, 
ImmutableList.of(Privileges.UseCatalog.allow()));
+    securableObjects.add(catalogObject);
+    SecurableObject schemaObject =
+        SecurableObjects.ofSchema(
+            catalogObject, schema, 
ImmutableList.of(Privileges.UseSchema.allow()));
+    securableObjects.add(schemaObject);
+    metalakeClientWithAllPrivilege.createRole(roleName, new HashMap<>(), 
securableObjects);
+
+    
metalakeClientWithAllPrivilege.grantRolesToUser(ImmutableList.of(roleName), 
NORMAL_USER);
+  }
+
+  private String grantCreateTableRole(String schema) {
+    String roleName = "createTable_" + UUID.randomUUID();
+    List<SecurableObject> securableObjects = new ArrayList<>();
+    SecurableObject catalogObject =
+        SecurableObjects.ofCatalog(
+            GRAVITINO_CATALOG_NAME, 
ImmutableList.of(Privileges.UseCatalog.allow()));
+    securableObjects.add(catalogObject);
+    SecurableObject schemaObject =
+        SecurableObjects.ofSchema(
+            catalogObject, schema, 
ImmutableList.of(Privileges.CreateTable.allow()));
+    securableObjects.add(schemaObject);
+    metalakeClientWithAllPrivilege.createRole(roleName, new HashMap<>(), 
securableObjects);
+    
metalakeClientWithAllPrivilege.grantRolesToUser(ImmutableList.of(roleName), 
NORMAL_USER);
+    return roleName;
+  }
+
+  private String grantSelectTableRole(String tableName) {
+    String roleName = "selectTable_" + UUID.randomUUID();
+    List<SecurableObject> securableObjects = new ArrayList<>();
+    SecurableObject catalogObject =
+        SecurableObjects.ofCatalog(
+            GRAVITINO_CATALOG_NAME, 
ImmutableList.of(Privileges.UseCatalog.allow()));
+    securableObjects.add(catalogObject);
+    SecurableObject schemaObject =
+        SecurableObjects.ofSchema(
+            catalogObject, SCHEMA_NAME, 
ImmutableList.of(Privileges.UseSchema.allow()));
+    SecurableObject tableObject =
+        SecurableObjects.ofTable(
+            schemaObject, tableName, 
ImmutableList.of(Privileges.SelectTable.allow()));
+    securableObjects.add(tableObject);
+    metalakeClientWithAllPrivilege.createRole(roleName, new HashMap<>(), 
securableObjects);
+    
metalakeClientWithAllPrivilege.grantRolesToUser(ImmutableList.of(roleName), 
NORMAL_USER);
+    return roleName;
+  }
+
+  private String grantModifyTableRole(String tableName) {
+    String roleName = "modifyTable_" + UUID.randomUUID();
+    List<SecurableObject> securableObjects = new ArrayList<>();
+    SecurableObject catalogObject =
+        SecurableObjects.ofCatalog(
+            GRAVITINO_CATALOG_NAME, 
ImmutableList.of(Privileges.UseCatalog.allow()));
+    securableObjects.add(catalogObject);
+    SecurableObject schemaObject =
+        SecurableObjects.ofSchema(
+            catalogObject, SCHEMA_NAME, 
ImmutableList.of(Privileges.UseSchema.allow()));
+    SecurableObject tableObject =
+        SecurableObjects.ofTable(
+            schemaObject, tableName, 
ImmutableList.of(Privileges.ModifyTable.allow()));
+    securableObjects.add(tableObject);
+    metalakeClientWithAllPrivilege.createRole(roleName, new HashMap<>(), 
securableObjects);
+    
metalakeClientWithAllPrivilege.grantRolesToUser(ImmutableList.of(roleName), 
NORMAL_USER);
+    return roleName;
+  }
+
+  private void clearTable() {
+    Arrays.stream(
+            
catalogClientWithAllPrivilege.asTableCatalog().listTables(Namespace.of(SCHEMA_NAME)))
+        .forEach(
+            table -> {
+              catalogClientWithAllPrivilege
+                  .asTableCatalog()
+                  .dropTable(NameIdentifier.of(SCHEMA_NAME, table.name()));
+            });
+    NameIdentifier[] nameIdentifiers =
+        
catalogClientWithAllPrivilege.asTableCatalog().listTables(Namespace.of(SCHEMA_NAME));
+    Assertions.assertEquals(0, nameIdentifiers.length);
+  }
+
+  private void downloadIcebergAwsBundleJar() throws IOException {
+    String icebergBundleJarUri =
+        String.format(
+            "https://repo1.maven.org/maven2/org/apache/iceberg/";
+                + "iceberg-aws-bundle/%s/iceberg-aws-bundle-%s.jar",
+            ITUtils.icebergVersion(), ITUtils.icebergVersion());
+    String gravitinoHome = System.getenv("GRAVITINO_HOME");
+    String targetDir = String.format("%s/iceberg-rest-server/libs/", 
gravitinoHome);
+    DownloaderUtils.downloadFile(icebergBundleJarUri, targetDir);
+  }
+
+  private void copyS3BundleJar() {
+    String gravitinoHome = System.getenv("GRAVITINO_HOME");
+    String targetDir = String.format("%s/iceberg-rest-server/libs/", 
gravitinoHome);
+    BaseIT.copyBundleJarsToDirectory("aws", targetDir);
+  }
+
+  private Map<String, String> getS3Config() {
+    Map configMap = new HashMap<String, String>();
+
+    configMap.put(
+        CredentialConstants.CREDENTIAL_PROVIDERS, 
S3TokenCredential.S3_TOKEN_CREDENTIAL_TYPE);
+    configMap.put(S3Properties.GRAVITINO_S3_REGION, region);
+    configMap.put(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, accessKey);
+    configMap.put(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, secretKey);
+    configMap.put(S3Properties.GRAVITINO_S3_ROLE_ARN, roleArn);
+    if (StringUtils.isNotBlank(externalId)) {
+      configMap.put(S3Properties.GRAVITINO_S3_EXTERNAL_ID, externalId);
+    }
+
+    configMap.put(IcebergConstants.IO_IMPL, 
"org.apache.iceberg.aws.s3.S3FileIO");
+    configMap.put(IcebergConstants.WAREHOUSE, s3Warehouse);
+
+    return configMap;
+  }
+}
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java
index 0705fc66fd..94af167579 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java
@@ -54,6 +54,8 @@ import 
org.apache.gravitino.listener.api.event.IcebergTableExistsPreEvent;
 import org.apache.gravitino.listener.api.event.IcebergUpdateTableEvent;
 import org.apache.gravitino.listener.api.event.IcebergUpdateTableFailureEvent;
 import org.apache.gravitino.listener.api.event.IcebergUpdateTablePreEvent;
+import org.apache.gravitino.server.ServerConfig;
+import org.apache.gravitino.server.authorization.GravitinoAuthorizerProvider;
 import org.apache.iceberg.MetadataUpdate;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.TableMetadata;
@@ -111,6 +113,9 @@ public class TestIcebergTableOperations extends 
IcebergNamespaceTestBase {
           }
         });
 
+    GravitinoAuthorizerProvider provider = 
GravitinoAuthorizerProvider.getInstance();
+    provider.initialize(new ServerConfig());
+
     return resourceConfig;
   }
 
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
similarity index 93%
rename from 
server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java
rename to 
server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
index 3d1707b3e7..661f4749c2 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
@@ -48,12 +48,12 @@ import org.slf4j.LoggerFactory;
  * on expressions or metadata types, and calls {@link GravitinoAuthorizer} for 
authorization,
  * returning only the metadata that the user has permission to access.
  */
-public class MetadataFilterHelper {
+public class MetadataAuthzHelper {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(MetadataFilterHelper.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetadataAuthzHelper.class);
   private static volatile Executor executor = null;
 
-  private MetadataFilterHelper() {}
+  private MetadataAuthzHelper() {}
 
   /**
    * Call {@link GravitinoAuthorizer} to filter the metadata list
@@ -139,6 +139,25 @@ public class MetadataFilterHelper {
         .toArray(NameIdentifier[]::new);
   }
 
+  /**
+   * Call {@link AuthorizationExpressionEvaluator} to check access
+   *
+   * @param identifier
+   * @param entityType
+   * @param expression
+   * @return
+   */
+  public static boolean checkAccess(
+      NameIdentifier identifier, Entity.EntityType entityType, String 
expression) {
+    String metalake = NameIdentifierUtil.getMetalake(identifier);
+    Map<Entity.EntityType, NameIdentifier> nameIdentifierMap =
+        spiltMetadataNames(metalake, entityType, identifier);
+    AuthorizationExpressionEvaluator authorizationExpressionEvaluator =
+        new AuthorizationExpressionEvaluator(expression);
+    return authorizationExpressionEvaluator.evaluate(
+        nameIdentifierMap, new AuthorizationRequestContext());
+  }
+
   /**
    * Call {@link AuthorizationExpressionEvaluator} to filter the metadata list
    *
@@ -307,7 +326,7 @@ public class MetadataFilterHelper {
 
   private static void checkExecutor() {
     if (executor == null) {
-      synchronized (MetadataFilterHelper.class) {
+      synchronized (MetadataAuthzHelper.class) {
         if (executor == null) {
           executor =
               Executors.newFixedThreadPool(
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConstants.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConstants.java
index b6370435ab..55851ea1f8 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConstants.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConstants.java
@@ -74,6 +74,12 @@ public class AuthorizationExpressionConstants {
                   ANY_MODIFY_TABLE
                   """;
 
+  public static final String filterModifyTableAuthorizationExpression =
+      """
+                  ANY(OWNER, METALAKE, CATALOG, SCHEMA, TABLE) ||
+                  ANY_MODIFY_TABLE
+                  """;
+
   public static final String filterTopicsAuthorizationExpression =
       """
               ANY(OWNER, METALAKE, CATALOG, SCHEMA, TOPIC) ||
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
index 4804532e42..6e4616b535 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
@@ -29,7 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import org.apache.gravitino.auth.AuthConstants;
-import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import org.apache.gravitino.server.authorization.MetadataAuthzHelper;
 
 /**
  * Convert the authorization expression into an executable expression, such as 
OGNL expression, etc.
@@ -73,7 +73,7 @@ public class AuthorizationExpressionConverter {
    * extras such as list projection and selection and lambda expressions. You 
use the same
    * expression for both getting and setting the value of a property.
    *
-   * @param authorizationExpression authorization expression from {@link 
MetadataFilterHelper}
+   * @param authorizationExpression authorization expression from {@link 
MetadataAuthzHelper}
    * @return an OGNL expression used to call GravitinoAuthorizer
    */
   public static String convertToOgnlExpression(String authorizationExpression) 
{
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/TestMetadataFilterHelper.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/TestMetadataAuthzHelper.java
similarity index 95%
rename from 
server-common/src/test/java/org/apache/gravitino/server/authorization/TestMetadataFilterHelper.java
rename to 
server-common/src/test/java/org/apache/gravitino/server/authorization/TestMetadataAuthzHelper.java
index 7a55f30e2f..26581b4a0b 100644
--- 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/TestMetadataFilterHelper.java
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/TestMetadataAuthzHelper.java
@@ -40,8 +40,8 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.mockito.MockedStatic;
 
-/** Test of {@link MetadataFilterHelper} */
-public class TestMetadataFilterHelper {
+/** Test of {@link MetadataAuthzHelper} */
+public class TestMetadataAuthzHelper {
 
   private static MockedStatic<GravitinoEnv> mockedStaticGravitinoEnv;
 
@@ -83,7 +83,7 @@ public class TestMetadataFilterHelper {
       nameIdentifiers[2] =
           NameIdentifierUtil.ofSchema("testMetalake", "testCatalog2", 
"testSchema");
       NameIdentifier[] filtered =
-          MetadataFilterHelper.filterByPrivilege(
+          MetadataAuthzHelper.filterByPrivilege(
               "testMetalake",
               Entity.EntityType.SCHEMA,
               Privilege.Name.USE_SCHEMA.name(),
@@ -115,7 +115,7 @@ public class TestMetadataFilterHelper {
       nameIdentifiers[2] =
           NameIdentifierUtil.ofSchema("testMetalake", "testCatalog2", 
"testSchema");
       NameIdentifier[] filtered =
-          MetadataFilterHelper.filterByExpression(
+          MetadataAuthzHelper.filterByExpression(
               "testMetalake",
               "CATALOG::USE_CATALOG && SCHEMA::USE_SCHEMA",
               Entity.EntityType.SCHEMA,
@@ -123,7 +123,7 @@ public class TestMetadataFilterHelper {
       Assertions.assertEquals(1, filtered.length);
       Assertions.assertEquals("testMetalake.testCatalog.testSchema", 
filtered[0].toString());
       NameIdentifier[] filtered2 =
-          MetadataFilterHelper.filterByExpression(
+          MetadataAuthzHelper.filterByExpression(
               "testMetalake", "CATALOG::USE_CATALOG", 
Entity.EntityType.SCHEMA, nameIdentifiers);
       Assertions.assertEquals(2, filtered2.length);
       Assertions.assertEquals("testMetalake.testCatalog.testSchema", 
filtered2[0].toString());
@@ -134,7 +134,7 @@ public class TestMetadataFilterHelper {
   private static void makeCompletableFutureUseCurrentThread() {
     try {
       Executor currentThread = Runnable::run;
-      Class<MetadataFilterHelper> jcasbinAuthorizerClass = 
MetadataFilterHelper.class;
+      Class<MetadataAuthzHelper> jcasbinAuthorizerClass = 
MetadataAuthzHelper.class;
       Field field = jcasbinAuthorizerClass.getDeclaredField("executor");
       field.setAccessible(true);
       field.set(null, currentThread);
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/ParameterUtil.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/ParameterUtil.java
index a1b898b34c..8658374d36 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/filter/ParameterUtil.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/ParameterUtil.java
@@ -26,7 +26,7 @@ import org.apache.gravitino.Entity;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import org.apache.gravitino.server.authorization.MetadataAuthzHelper;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationFullName;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationObjectType;
@@ -171,7 +171,7 @@ public class ParameterUtil {
       NameIdentifier nameIdentifier =
           MetadataObjectUtil.toEntityIdent(metalake, 
MetadataObjects.parse(fullName, type));
       nameIdentifierMap.putAll(
-          MetadataFilterHelper.spiltMetadataNames(
+          MetadataAuthzHelper.spiltMetadataNames(
               metalake, MetadataObjectUtil.toEntityType(type), 
nameIdentifier));
     }
 
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/CatalogOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/CatalogOperations.java
index a12fc99ba5..11b15809b2 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/CatalogOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/CatalogOperations.java
@@ -54,7 +54,7 @@ import org.apache.gravitino.dto.responses.DropResponse;
 import org.apache.gravitino.dto.responses.EntityListResponse;
 import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.metrics.MetricNames;
-import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import org.apache.gravitino.server.authorization.MetadataAuthzHelper;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
 import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants;
@@ -100,7 +100,7 @@ public class CatalogOperations {
             if (verbose) {
               Catalog[] catalogs = 
catalogDispatcher.listCatalogsInfo(catalogNS);
               catalogs =
-                  MetadataFilterHelper.filterByExpression(
+                  MetadataAuthzHelper.filterByExpression(
                       metalake,
                       
AuthorizationExpressionConstants.loadCatalogAuthorizationExpression,
                       Entity.EntityType.CATALOG,
@@ -113,7 +113,7 @@ public class CatalogOperations {
             } else {
               NameIdentifier[] idents = 
catalogDispatcher.listCatalogs(catalogNS);
               idents =
-                  MetadataFilterHelper.filterByExpression(
+                  MetadataAuthzHelper.filterByExpression(
                       metalake,
                       
AuthorizationExpressionConstants.loadCatalogAuthorizationExpression,
                       Entity.EntityType.CATALOG,
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/FilesetOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/FilesetOperations.java
index 3a5a0c2e07..6b654433c1 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/FilesetOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/FilesetOperations.java
@@ -61,7 +61,7 @@ import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.file.FilesetChange;
 import org.apache.gravitino.metrics.MetricNames;
 import org.apache.gravitino.rest.RESTUtils;
-import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import org.apache.gravitino.server.authorization.MetadataAuthzHelper;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
 import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants;
@@ -106,7 +106,7 @@ public class FilesetOperations {
             Namespace filesetNS = NamespaceUtil.ofFileset(metalake, catalog, 
schema);
             NameIdentifier[] idents = dispatcher.listFilesets(filesetNS);
             idents =
-                MetadataFilterHelper.filterByExpression(
+                MetadataAuthzHelper.filterByExpression(
                     metalake,
                     
AuthorizationExpressionConstants.filterFilesetAuthorizationExpression,
                     Entity.EntityType.FILESET,
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectRoleOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectRoleOperations.java
index 5773215db6..10c08857f6 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectRoleOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectRoleOperations.java
@@ -35,7 +35,7 @@ import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.authorization.AccessControlDispatcher;
 import org.apache.gravitino.dto.responses.NameListResponse;
 import org.apache.gravitino.metrics.MetricNames;
-import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import org.apache.gravitino.server.authorization.MetadataAuthzHelper;
 import org.apache.gravitino.server.authorization.NameBindings;
 import org.apache.gravitino.server.web.Utils;
 import org.apache.gravitino.utils.NameIdentifierUtil;
@@ -76,7 +76,7 @@ public class MetadataObjectRoleOperations {
           () -> {
             String[] names = 
accessControlDispatcher.listRoleNamesByObject(metalake, object);
             names =
-                MetadataFilterHelper.filterByExpression(
+                MetadataAuthzHelper.filterByExpression(
                     metalake,
                     LIST_ROLE_PRIVILEGE,
                     Entity.EntityType.ROLE,
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectTagOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectTagOperations.java
index 8438c1f583..18e268754c 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectTagOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectTagOperations.java
@@ -51,7 +51,7 @@ import org.apache.gravitino.dto.tag.TagDTO;
 import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.exceptions.NoSuchTagException;
 import org.apache.gravitino.metrics.MetricNames;
-import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import org.apache.gravitino.server.authorization.MetadataAuthzHelper;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationFullName;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
@@ -213,7 +213,7 @@ public class MetadataObjectTagOperations {
                   metalake);
               TagDTO[] tagDTOS = tags.toArray(new TagDTO[0]);
               tagDTOS =
-                  MetadataFilterHelper.filterByExpression(
+                  MetadataAuthzHelper.filterByExpression(
                       metalake,
                       
AuthorizationExpressionConstants.loadTagAuthorizationExpression,
                       Entity.EntityType.TAG,
@@ -225,7 +225,7 @@ public class MetadataObjectTagOperations {
               // We have used Set to avoid duplicate tag names
               String[] tagNames = 
tags.stream().map(TagDTO::name).toArray(String[]::new);
               tagNames =
-                  MetadataFilterHelper.filterByExpression(
+                  MetadataAuthzHelper.filterByExpression(
                       metalake,
                       
AuthorizationExpressionConstants.loadTagAuthorizationExpression,
                       Entity.EntityType.TAG,
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/MetalakeOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/MetalakeOperations.java
index 54aa24dccb..0d0895b11b 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/MetalakeOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/MetalakeOperations.java
@@ -54,7 +54,7 @@ import org.apache.gravitino.dto.responses.MetalakeResponse;
 import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.metalake.MetalakeDispatcher;
 import org.apache.gravitino.metrics.MetricNames;
-import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import org.apache.gravitino.server.authorization.MetadataAuthzHelper;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
 import org.apache.gravitino.server.web.Utils;
@@ -95,7 +95,7 @@ public class MetalakeOperations {
                         metalake -> {
                           NameIdentifier[] nameIdentifiers =
                               new NameIdentifier[] 
{NameIdentifierUtil.ofMetalake(metalake.name())};
-                          return MetadataFilterHelper.filterByExpression(
+                          return MetadataAuthzHelper.filterByExpression(
                                       metalake.name(),
                                       "METALAKE_USER",
                                       Entity.EntityType.METALAKE,
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/ModelOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/ModelOperations.java
index b94a7780cf..b3b21b1854 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/ModelOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/ModelOperations.java
@@ -63,7 +63,7 @@ import org.apache.gravitino.model.Model;
 import org.apache.gravitino.model.ModelChange;
 import org.apache.gravitino.model.ModelVersion;
 import org.apache.gravitino.model.ModelVersionChange;
-import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import org.apache.gravitino.server.authorization.MetadataAuthzHelper;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
 import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants;
@@ -109,7 +109,7 @@ public class ModelOperations {
             NameIdentifier[] modelIds = modelDispatcher.listModels(modelNs);
             modelIds = modelIds == null ? new NameIdentifier[0] : modelIds;
             modelIds =
-                MetadataFilterHelper.filterByExpression(
+                MetadataAuthzHelper.filterByExpression(
                     metalake,
                     
AuthorizationExpressionConstants.filterModelAuthorizationExpression,
                     Entity.EntityType.MODEL,
@@ -275,7 +275,7 @@ public class ModelOperations {
                                   NameIdentifierUtil.ofModelVersion(
                                       metalake, catalog, schema, model, 
modelVersion.version())
                                 };
-                            return MetadataFilterHelper.filterByExpression(
+                            return MetadataAuthzHelper.filterByExpression(
                                         metalake,
                                         AuthorizationExpressionConstants
                                             .loadModelAuthorizationExpression,
@@ -300,7 +300,7 @@ public class ModelOperations {
                                   NameIdentifierUtil.ofModelVersion(
                                       metalake, catalog, schema, model, 
modelVersion)
                                 };
-                            return MetadataFilterHelper.filterByExpression(
+                            return MetadataAuthzHelper.filterByExpression(
                                         metalake,
                                         AuthorizationExpressionConstants
                                             .loadModelAuthorizationExpression,
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/RoleOperations.java 
b/server/src/main/java/org/apache/gravitino/server/web/rest/RoleOperations.java
index dae825806c..4257819156 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/RoleOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/RoleOperations.java
@@ -54,7 +54,7 @@ import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.exceptions.IllegalMetadataObjectException;
 import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
 import org.apache.gravitino.metrics.MetricNames;
-import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import org.apache.gravitino.server.authorization.MetadataAuthzHelper;
 import org.apache.gravitino.server.authorization.NameBindings;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
@@ -94,7 +94,7 @@ public class RoleOperations {
                         role -> {
                           NameIdentifier[] nameIdentifiers =
                               new NameIdentifier[] 
{NameIdentifierUtil.ofRole(metalake, role)};
-                          return MetadataFilterHelper.filterByExpression(
+                          return MetadataAuthzHelper.filterByExpression(
                                       metalake,
                                       AuthorizationExpressionConstants
                                           .loadRoleAuthorizationExpression,
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/SchemaOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/SchemaOperations.java
index 9877b07365..61015099f9 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/SchemaOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/SchemaOperations.java
@@ -50,7 +50,7 @@ import org.apache.gravitino.dto.responses.EntityListResponse;
 import org.apache.gravitino.dto.responses.SchemaResponse;
 import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.metrics.MetricNames;
-import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import org.apache.gravitino.server.authorization.MetadataAuthzHelper;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
 import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants;
@@ -96,7 +96,7 @@ public class SchemaOperations {
             Namespace schemaNS = NamespaceUtil.ofSchema(metalake, catalog);
             NameIdentifier[] idents = dispatcher.listSchemas(schemaNS);
             idents =
-                MetadataFilterHelper.filterByExpression(
+                MetadataAuthzHelper.filterByExpression(
                     metalake,
                     
AuthorizationExpressionConstants.filterSchemaAuthorizationExpression,
                     Entity.EntityType.SCHEMA,
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
index 31ef89544e..803d097e19 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
@@ -51,7 +51,7 @@ import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.metrics.MetricNames;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableChange;
-import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import org.apache.gravitino.server.authorization.MetadataAuthzHelper;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
 import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants;
@@ -95,7 +95,7 @@ public class TableOperations {
             Namespace tableNS = NamespaceUtil.ofTable(metalake, catalog, 
schema);
             NameIdentifier[] idents = dispatcher.listTables(tableNS);
             idents =
-                MetadataFilterHelper.filterByExpression(
+                MetadataAuthzHelper.filterByExpression(
                     metalake,
                     
AuthorizationExpressionConstants.filterTableAuthorizationExpression,
                     Entity.EntityType.TABLE,
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TagOperations.java 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TagOperations.java
index d4fb945589..0813ad4862 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TagOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TagOperations.java
@@ -54,7 +54,7 @@ import org.apache.gravitino.dto.tag.MetadataObjectDTO;
 import org.apache.gravitino.dto.tag.TagDTO;
 import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.metrics.MetricNames;
-import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import org.apache.gravitino.server.authorization.MetadataAuthzHelper;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationFullName;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
@@ -109,7 +109,7 @@ public class TagOperations {
                         .toArray(TagDTO[]::new);
               }
               tagDTOs =
-                  MetadataFilterHelper.filterByExpression(
+                  MetadataAuthzHelper.filterByExpression(
                       metalake,
                       
AuthorizationExpressionConstants.loadTagAuthorizationExpression,
                       Entity.EntityType.TAG,
@@ -122,7 +122,7 @@ public class TagOperations {
               String[] tagNames = tagDispatcher.listTags(metalake);
               tagNames = tagNames == null ? new String[0] : tagNames;
               tagNames =
-                  MetadataFilterHelper.filterByExpression(
+                  MetadataAuthzHelper.filterByExpression(
                       metalake,
                       
AuthorizationExpressionConstants.loadTagAuthorizationExpression,
                       Entity.EntityType.TAG,
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
index 79f0b270f7..b4483042f1 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
@@ -46,7 +46,7 @@ import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.messaging.Topic;
 import org.apache.gravitino.messaging.TopicChange;
 import org.apache.gravitino.metrics.MetricNames;
-import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import org.apache.gravitino.server.authorization.MetadataAuthzHelper;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
 import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants;
@@ -91,7 +91,7 @@ public class TopicOperations {
             NameIdentifier[] topics = dispatcher.listTopics(topicNS);
             topics = topics == null ? new NameIdentifier[0] : topics;
             topics =
-                MetadataFilterHelper.filterByExpression(
+                MetadataAuthzHelper.filterByExpression(
                     metalake,
                     
AuthorizationExpressionConstants.filterTopicsAuthorizationExpression,
                     Entity.EntityType.TOPIC,
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/UserOperations.java 
b/server/src/main/java/org/apache/gravitino/server/web/rest/UserOperations.java
index 24f10e1cb1..f35c4e5f49 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/UserOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/UserOperations.java
@@ -46,7 +46,7 @@ import org.apache.gravitino.dto.responses.UserListResponse;
 import org.apache.gravitino.dto.responses.UserResponse;
 import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.metrics.MetricNames;
-import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import org.apache.gravitino.server.authorization.MetadataAuthzHelper;
 import org.apache.gravitino.server.authorization.NameBindings;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
@@ -113,7 +113,7 @@ public class UserOperations {
             if (verbose) {
               User[] users = accessControlManager.listUsers(metalake);
               users =
-                  MetadataFilterHelper.filterByExpression(
+                  MetadataAuthzHelper.filterByExpression(
                       metalake,
                       LOAD_USER_PRIVILEGE,
                       Entity.EntityType.USER,
@@ -124,7 +124,7 @@ public class UserOperations {
             } else {
               String[] users = accessControlManager.listUserNames(metalake);
               users =
-                  MetadataFilterHelper.filterByExpression(
+                  MetadataAuthzHelper.filterByExpression(
                       metalake,
                       LOAD_USER_PRIVILEGE,
                       Entity.EntityType.USER,

Reply via email to