This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new c84693dc4c [#9210] feat(iceberg) : Add authorization for Iceberg 
Namespace operations (#9211)
c84693dc4c is described below

commit c84693dc4c2adb7bd7a8a3bf87728414ef712654
Author: Bharath Krishna <[email protected]>
AuthorDate: Fri Nov 21 21:50:47 2025 -0800

    [#9210] feat(iceberg) : Add authorization for Iceberg Namespace operations 
(#9211)
    
    ### What changes were proposed in this pull request?
    - Added @AuthorizationExpression annotations to all namespace REST
    endpoints in IcebergNamespaceOperations
    - Added comprehensive tests via IcebergNamespaceAuthorizationIT
    - Fixed table ownership - registerTable() now sets proper owner via
    unified IcebergOwnershipUtils
    
    ### Why are the changes needed?
    
    - Iceberg namespace operations were completely unprotected - any user
    could create/modify/delete namespaces without authorization checks,
    creating a security gap in production deployments.
    
    Fix: #9210
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes - Users now need proper privileges (USE_CATALOG, CREATE_SCHEMA,
    schema ownership) to perform namespace operations if authorization is
    turned ON. Unauthorized operations return 403 Forbidden.
    
    ### How was this patch tested?
    
    - IcebergNamespaceAuthorizationIT tests all authorization scenarios
    - TestIcebergOwnershipUtils validates ownership logic
    - Verified proper ForbiddenException responses for unauthorized
    operations
    - Existing authorization tests continue to pass
---
 .../dispatcher/IcebergNamespaceHookDispatcher.java |  71 ++--
 .../service/dispatcher/IcebergOwnershipUtils.java  |  90 +++++
 .../dispatcher/IcebergTableHookDispatcher.java     |  27 +-
 .../service/rest/IcebergNamespaceOperations.java   | 103 +++++-
 .../filter/IcebergRESTAuthInterceptionService.java |   5 +-
 .../test/IcebergNamespaceAuthorizationIT.java      | 392 ++++++++++++++++++++-
 .../dispatcher/TestIcebergOwnershipUtils.java      | 108 ++++++
 7 files changed, 734 insertions(+), 62 deletions(-)

diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergNamespaceHookDispatcher.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergNamespaceHookDispatcher.java
index 2eb60fc321..b474b9d16f 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergNamespaceHookDispatcher.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergNamespaceHookDispatcher.java
@@ -18,18 +18,19 @@
  */
 package org.apache.gravitino.iceberg.service.dispatcher;
 
+import java.io.IOException;
 import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
-import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.authorization.AuthorizationUtils;
-import org.apache.gravitino.authorization.Owner;
-import org.apache.gravitino.authorization.OwnerDispatcher;
 import org.apache.gravitino.catalog.SchemaDispatcher;
+import org.apache.gravitino.catalog.TableDispatcher;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.iceberg.common.utils.IcebergIdentifierUtils;
 import 
org.apache.gravitino.iceberg.service.authorization.IcebergRESTServerContext;
 import org.apache.gravitino.listener.api.event.IcebergRequestContext;
-import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
 import org.apache.iceberg.rest.requests.RegisterTableRequest;
 import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
@@ -62,7 +63,12 @@ public class IcebergNamespaceHookDispatcher implements 
IcebergNamespaceOperation
     CreateNamespaceResponse response = dispatcher.createNamespace(context, 
createRequest);
 
     importSchema(context.catalogName(), createRequest.namespace());
-    setSchemaOwner(context.catalogName(), createRequest.namespace(), 
context.userName());
+    IcebergOwnershipUtils.setSchemaOwner(
+        metalake,
+        context.catalogName(),
+        createRequest.namespace(),
+        context.userName(),
+        GravitinoEnv.getInstance().ownerDispatcher());
 
     return response;
   }
@@ -78,6 +84,22 @@ public class IcebergNamespaceHookDispatcher implements 
IcebergNamespaceOperation
   @Override
   public void dropNamespace(IcebergRequestContext context, Namespace 
namespace) {
     dispatcher.dropNamespace(context, namespace);
+
+    // Clean up the schema from Gravitino's entity store after successful drop
+    EntityStore store = GravitinoEnv.getInstance().entityStore();
+    try {
+      if (store != null) {
+        // Delete the entity for the dropped namespace (schema).
+        store.delete(
+            IcebergIdentifierUtils.toGravitinoSchemaIdentifier(
+                metalake, context.catalogName(), namespace),
+            Entity.EntityType.SCHEMA);
+      }
+    } catch (NoSuchEntityException ignore) {
+      // Ignore if the schema entity does not exist.
+    } catch (IOException ioe) {
+      throw new RuntimeException("io exception when deleting schema entity", 
ioe);
+    }
   }
 
   @Override
@@ -101,7 +123,30 @@ public class IcebergNamespaceHookDispatcher implements 
IcebergNamespaceOperation
       IcebergRequestContext context,
       Namespace namespace,
       RegisterTableRequest registerTableRequest) {
-    return dispatcher.registerTable(context, namespace, registerTableRequest);
+    LoadTableResponse response = dispatcher.registerTable(context, namespace, 
registerTableRequest);
+
+    // Import the registered table into Gravitino's catalog so it exists as a 
metadata object
+    importTable(context.catalogName(), namespace, registerTableRequest.name());
+
+    // Set the owner of the registered table to the current user
+    IcebergOwnershipUtils.setTableOwner(
+        metalake,
+        context.catalogName(),
+        namespace,
+        registerTableRequest.name(),
+        context.userName(),
+        GravitinoEnv.getInstance().ownerDispatcher());
+
+    return response;
+  }
+
+  private void importTable(String catalogName, Namespace namespace, String 
tableName) {
+    TableDispatcher tableDispatcher = 
GravitinoEnv.getInstance().tableDispatcher();
+    if (tableDispatcher != null) {
+      tableDispatcher.loadTable(
+          IcebergIdentifierUtils.toGravitinoTableIdentifier(
+              metalake, catalogName, TableIdentifier.of(namespace, 
tableName)));
+    }
   }
 
   private void importSchema(String catalogName, Namespace namespace) {
@@ -111,18 +156,4 @@ public class IcebergNamespaceHookDispatcher implements 
IcebergNamespaceOperation
           IcebergIdentifierUtils.toGravitinoSchemaIdentifier(metalake, 
catalogName, namespace));
     }
   }
-
-  private void setSchemaOwner(String catalogName, Namespace namespace, String 
user) {
-    NameIdentifier schemaIdentifier =
-        IcebergIdentifierUtils.toGravitinoSchemaIdentifier(metalake, 
catalogName, namespace);
-    // Set the creator as the owner of the namespace
-    OwnerDispatcher ownerManager = 
GravitinoEnv.getInstance().ownerDispatcher();
-    if (ownerManager != null) {
-      ownerManager.setOwner(
-          metalake,
-          NameIdentifierUtil.toMetadataObject(schemaIdentifier, 
Entity.EntityType.SCHEMA),
-          user,
-          Owner.Type.USER);
-    }
-  }
 }
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergOwnershipUtils.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergOwnershipUtils.java
new file mode 100644
index 0000000000..b17cdf25cd
--- /dev/null
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergOwnershipUtils.java
@@ -0,0 +1,90 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.gravitino.iceberg.service.dispatcher;
+
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.iceberg.common.utils.IcebergIdentifierUtils;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/**
+ * Utility class for managing ownership of Iceberg resources (schemas and 
tables) in Gravitino.
+ * Provides centralized methods for setting ownership during resource creation 
operations.
+ */
+public class IcebergOwnershipUtils {
+
+  /**
+   * Sets the owner of a schema (namespace) to the specified user using the 
provided owner
+   * dispatcher.
+   *
+   * @param metalake the metalake name
+   * @param catalogName the catalog name
+   * @param namespace the Iceberg namespace
+   * @param user the user to set as owner
+   * @param ownerDispatcher the owner dispatcher to use
+   */
+  public static void setSchemaOwner(
+      String metalake,
+      String catalogName,
+      Namespace namespace,
+      String user,
+      OwnerDispatcher ownerDispatcher) {
+    if (ownerDispatcher != null) {
+      ownerDispatcher.setOwner(
+          metalake,
+          NameIdentifierUtil.toMetadataObject(
+              IcebergIdentifierUtils.toGravitinoSchemaIdentifier(metalake, 
catalogName, namespace),
+              Entity.EntityType.SCHEMA),
+          user,
+          Owner.Type.USER);
+    }
+  }
+
+  /**
+   * Sets the owner of a table to the specified user using the provided owner 
dispatcher.
+   *
+   * @param metalake the metalake name
+   * @param catalogName the catalog name
+   * @param namespace the Iceberg namespace containing the table
+   * @param tableName the table name
+   * @param user the user to set as owner
+   * @param ownerDispatcher the owner dispatcher to use
+   */
+  public static void setTableOwner(
+      String metalake,
+      String catalogName,
+      Namespace namespace,
+      String tableName,
+      String user,
+      OwnerDispatcher ownerDispatcher) {
+    if (ownerDispatcher != null) {
+      ownerDispatcher.setOwner(
+          metalake,
+          NameIdentifierUtil.toMetadataObject(
+              IcebergIdentifierUtils.toGravitinoTableIdentifier(
+                  metalake, catalogName, TableIdentifier.of(namespace, 
tableName)),
+              Entity.EntityType.TABLE),
+          user,
+          Owner.Type.USER);
+    }
+  }
+}
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
index eae2a19591..388205ffd0 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
@@ -25,8 +25,6 @@ import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.authorization.AuthorizationUtils;
-import org.apache.gravitino.authorization.Owner;
-import org.apache.gravitino.authorization.OwnerDispatcher;
 import org.apache.gravitino.catalog.TableDispatcher;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.iceberg.common.utils.IcebergIdentifierUtils;
@@ -34,7 +32,6 @@ import 
org.apache.gravitino.iceberg.service.authorization.IcebergRESTServerConte
 import org.apache.gravitino.listener.api.event.IcebergRequestContext;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.TableEntity;
-import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.PrincipalUtils;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -62,7 +59,13 @@ public class IcebergTableHookDispatcher implements 
IcebergTableOperationDispatch
 
     LoadTableResponse response = dispatcher.createTable(context, namespace, 
createTableRequest);
     importTable(context.catalogName(), namespace, createTableRequest.name());
-    setTableOwner(context.catalogName(), namespace, createTableRequest.name(), 
context.userName());
+    IcebergOwnershipUtils.setTableOwner(
+        metalake,
+        context.catalogName(),
+        namespace,
+        createTableRequest.name(),
+        context.userName(),
+        GravitinoEnv.getInstance().ownerDispatcher());
 
     return response;
   }
@@ -164,20 +167,4 @@ public class IcebergTableHookDispatcher implements 
IcebergTableOperationDispatch
               metalake, catalogName, TableIdentifier.of(namespace, 
tableName)));
     }
   }
-
-  private void setTableOwner(
-      String catalogName, Namespace namespace, String tableName, String user) {
-    // Set the creator as the owner of the table.
-    OwnerDispatcher ownerManager = 
GravitinoEnv.getInstance().ownerDispatcher();
-    if (ownerManager != null) {
-      ownerManager.setOwner(
-          metalake,
-          NameIdentifierUtil.toMetadataObject(
-              IcebergIdentifierUtils.toGravitinoTableIdentifier(
-                  metalake, catalogName, TableIdentifier.of(namespace, 
tableName)),
-              Entity.EntityType.TABLE),
-          user,
-          Owner.Type.USER);
-    }
-  }
 }
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergNamespaceOperations.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergNamespaceOperations.java
index 9742e8049b..1472d9a7f2 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergNamespaceOperations.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergNamespaceOperations.java
@@ -23,6 +23,8 @@ import com.codahale.metrics.annotation.Timed;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.List;
 import javax.inject.Inject;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
@@ -39,12 +41,20 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.iceberg.service.IcebergExceptionMapper;
 import org.apache.gravitino.iceberg.service.IcebergObjectMapper;
 import org.apache.gravitino.iceberg.service.IcebergRESTUtils;
+import 
org.apache.gravitino.iceberg.service.authorization.IcebergRESTServerContext;
 import 
org.apache.gravitino.iceberg.service.dispatcher.IcebergNamespaceOperationDispatcher;
 import org.apache.gravitino.listener.api.event.IcebergRequestContext;
 import org.apache.gravitino.metrics.MetricNames;
+import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
+import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants;
 import org.apache.gravitino.server.web.Utils;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.rest.RESTUtil;
@@ -82,9 +92,12 @@ public class IcebergNamespaceOperations {
   @Produces(MediaType.APPLICATION_JSON)
   @Timed(name = "list-namespace." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "list-namespace", absolute = true)
+  @AuthorizationExpression(
+      expression = 
AuthorizationExpressionConstants.loadCatalogAuthorizationExpression,
+      accessMetadataType = MetadataObject.Type.CATALOG)
   public Response listNamespaces(
       @DefaultValue("") @Encoded() @QueryParam("parent") String parent,
-      @PathParam("prefix") String prefix) {
+      @AuthorizationMetadata(type = Entity.EntityType.CATALOG) 
@PathParam("prefix") String prefix) {
     String catalogName = IcebergRESTUtils.getCatalogName(prefix);
     Namespace parentNamespace =
         parent.isEmpty() ? Namespace.empty() : 
RESTUtil.decodeNamespace(parent);
@@ -98,6 +111,12 @@ public class IcebergNamespaceOperations {
                 new IcebergRequestContext(httpServletRequest(), catalogName);
             ListNamespacesResponse response =
                 namespaceOperationDispatcher.listNamespaces(context, 
parentNamespace);
+
+            IcebergRESTServerContext authContext = 
IcebergRESTServerContext.getInstance();
+            if (authContext.isAuthorizationEnabled()) {
+              response =
+                  filterListNamespacesResponse(response, 
authContext.metalakeName(), catalogName);
+            }
             return IcebergRESTUtils.ok(response);
           });
     } catch (Exception e) {
@@ -110,8 +129,14 @@ public class IcebergNamespaceOperations {
   @Produces(MediaType.APPLICATION_JSON)
   @Timed(name = "load-namespace." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "load-namespace", absolute = true)
+  @AuthorizationExpression(
+      expression =
+          "ANY(OWNER, METALAKE, CATALOG) || ANY_USE_CATALOG && (SCHEMA::OWNER 
|| ANY_USE_SCHEMA || ANY_CREATE_SCHEMA)",
+      accessMetadataType = MetadataObject.Type.SCHEMA)
   public Response loadNamespace(
-      @PathParam("prefix") String prefix, @Encoded() @PathParam("namespace") 
String namespace) {
+      @AuthorizationMetadata(type = Entity.EntityType.CATALOG) 
@PathParam("prefix") String prefix,
+      @AuthorizationMetadata(type = Entity.EntityType.SCHEMA) @Encoded() 
@PathParam("namespace")
+          String namespace) {
     String catalogName = IcebergRESTUtils.getCatalogName(prefix);
     Namespace icebergNS = RESTUtil.decodeNamespace(namespace);
     LOG.info("Load Iceberg namespace, catalog: {}, namespace: {}", 
catalogName, icebergNS);
@@ -135,8 +160,14 @@ public class IcebergNamespaceOperations {
   @Produces(MediaType.APPLICATION_JSON)
   @Timed(name = "namespace-exists." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "namespace-exists", absolute = true)
+  @AuthorizationExpression(
+      expression =
+          "ANY(OWNER, METALAKE, CATALOG) || ANY_USE_CATALOG && (SCHEMA::OWNER 
|| ANY_USE_SCHEMA || ANY_CREATE_SCHEMA)",
+      accessMetadataType = MetadataObject.Type.SCHEMA)
   public Response namespaceExists(
-      @PathParam("prefix") String prefix, @Encoded() @PathParam("namespace") 
String namespace) {
+      @AuthorizationMetadata(type = Entity.EntityType.CATALOG) 
@PathParam("prefix") String prefix,
+      @AuthorizationMetadata(type = Entity.EntityType.SCHEMA) @Encoded() 
@PathParam("namespace")
+          String namespace) {
     String catalogName = IcebergRESTUtils.getCatalogName(prefix);
     Namespace icebergNS = RESTUtil.decodeNamespace(namespace);
     LOG.info("Check Iceberg namespace exists, catalog: {}, namespace: {}", 
catalogName, icebergNS);
@@ -163,8 +194,13 @@ public class IcebergNamespaceOperations {
   @Produces(MediaType.APPLICATION_JSON)
   @Timed(name = "drop-namespace." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "drop-namespace", absolute = true)
+  @AuthorizationExpression(
+      expression = "ANY(OWNER, METALAKE, CATALOG) || 
SCHEMA_OWNER_WITH_USE_CATALOG",
+      accessMetadataType = MetadataObject.Type.SCHEMA)
   public Response dropNamespace(
-      @PathParam("prefix") String prefix, @Encoded() @PathParam("namespace") 
String namespace) {
+      @AuthorizationMetadata(type = Entity.EntityType.CATALOG) 
@PathParam("prefix") String prefix,
+      @AuthorizationMetadata(type = Entity.EntityType.SCHEMA) @Encoded() 
@PathParam("namespace")
+          String namespace) {
     // todo check if table exists in namespace after table ops is added
     String catalogName = IcebergRESTUtils.getCatalogName(prefix);
     Namespace icebergNS = RESTUtil.decodeNamespace(namespace);
@@ -187,8 +223,12 @@ public class IcebergNamespaceOperations {
   @Produces(MediaType.APPLICATION_JSON)
   @Timed(name = "create-namespace." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "create-namespace", absolute = true)
+  @AuthorizationExpression(
+      expression = "ANY(OWNER, METALAKE, CATALOG) || ANY_USE_CATALOG && 
ANY_CREATE_SCHEMA",
+      accessMetadataType = MetadataObject.Type.CATALOG)
   public Response createNamespace(
-      @PathParam("prefix") String prefix, CreateNamespaceRequest 
createNamespaceRequest) {
+      @AuthorizationMetadata(type = Entity.EntityType.CATALOG) 
@PathParam("prefix") String prefix,
+      CreateNamespaceRequest createNamespaceRequest) {
     String catalogName = IcebergRESTUtils.getCatalogName(prefix);
     LOG.info(
         "Create Iceberg namespace, catalog: {}, createNamespaceRequest: {}",
@@ -214,9 +254,13 @@ public class IcebergNamespaceOperations {
   @Produces(MediaType.APPLICATION_JSON)
   @Timed(name = "update-namespace." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "update-namespace", absolute = true)
+  @AuthorizationExpression(
+      expression = "ANY(OWNER, METALAKE, CATALOG) || 
SCHEMA_OWNER_WITH_USE_CATALOG",
+      accessMetadataType = MetadataObject.Type.SCHEMA)
   public Response updateNamespace(
-      @PathParam("prefix") String prefix,
-      @Encoded() @PathParam("namespace") String namespace,
+      @AuthorizationMetadata(type = Entity.EntityType.CATALOG) 
@PathParam("prefix") String prefix,
+      @AuthorizationMetadata(type = Entity.EntityType.SCHEMA) @Encoded() 
@PathParam("namespace")
+          String namespace,
       UpdateNamespacePropertiesRequest updateNamespacePropertiesRequest) {
     String catalogName = IcebergRESTUtils.getCatalogName(prefix);
     Namespace icebergNS = RESTUtil.decodeNamespace(namespace);
@@ -246,9 +290,16 @@ public class IcebergNamespaceOperations {
   @Produces(MediaType.APPLICATION_JSON)
   @Timed(name = "register-table." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "register-table", absolute = true)
+  @AuthorizationExpression(
+      expression =
+          "ANY(OWNER, METALAKE, CATALOG) || "
+              + "SCHEMA_OWNER_WITH_USE_CATALOG || "
+              + "ANY_USE_CATALOG && ANY_USE_SCHEMA && ANY_CREATE_TABLE",
+      accessMetadataType = MetadataObject.Type.SCHEMA)
   public Response registerTable(
-      @PathParam("prefix") String prefix,
-      @Encoded() @PathParam("namespace") String namespace,
+      @AuthorizationMetadata(type = Entity.EntityType.CATALOG) 
@PathParam("prefix") String prefix,
+      @AuthorizationMetadata(type = Entity.EntityType.SCHEMA) @Encoded() 
@PathParam("namespace")
+          String namespace,
       RegisterTableRequest registerTableRequest) {
     String catalogName = IcebergRESTUtils.getCatalogName(prefix);
     Namespace icebergNS = RESTUtil.decodeNamespace(namespace);
@@ -273,6 +324,40 @@ public class IcebergNamespaceOperations {
     }
   }
 
+  private NameIdentifier[] toNameIdentifiers(
+      ListNamespacesResponse listNamespacesResponse, String metalake, String 
catalogName) {
+    List<Namespace> namespaces = listNamespacesResponse.namespaces();
+    NameIdentifier[] nameIdentifiers = new NameIdentifier[namespaces.size()];
+    for (int i = 0; i < namespaces.size(); i++) {
+      Namespace namespace = namespaces.get(i);
+      // Convert Iceberg Namespace to Gravitino NameIdentifier
+      // Namespace should have at least one level for schema name
+      String schemaName = namespace.isEmpty() ? "" : namespace.level(0);
+      nameIdentifiers[i] = NameIdentifier.of(metalake, catalogName, 
schemaName);
+    }
+    return nameIdentifiers;
+  }
+
+  private ListNamespacesResponse filterListNamespacesResponse(
+      ListNamespacesResponse listNamespacesResponse, String metalake, String 
catalogName) {
+    NameIdentifier[] idents =
+        MetadataFilterHelper.filterByExpression(
+            metalake,
+            
AuthorizationExpressionConstants.filterSchemaAuthorizationExpression,
+            Entity.EntityType.SCHEMA,
+            toNameIdentifiers(listNamespacesResponse, metalake, catalogName));
+    List<Namespace> filteredNamespaces = new ArrayList<>();
+    for (NameIdentifier ident : idents) {
+      // Convert back from NameIdentifier to Iceberg Namespace
+      if (ident.hasNamespace() && ident.namespace().levels().length >= 2) {
+        // Schema name is the last level in the namespace
+        String schemaName = ident.name();
+        filteredNamespaces.add(Namespace.of(schemaName));
+      }
+    }
+    return ListNamespacesResponse.builder().addAll(filteredNamespaces).build();
+  }
+
   // HTTP request is null in Jersey test, override with a mock request when 
testing.
   @VisibleForTesting
   HttpServletRequest httpServletRequest() {
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/IcebergRESTAuthInterceptionService.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/IcebergRESTAuthInterceptionService.java
index 1906d36cb5..e1f4f6005b 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/IcebergRESTAuthInterceptionService.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/IcebergRESTAuthInterceptionService.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableSet;
 import java.lang.reflect.Method;
 import java.util.List;
 import org.aopalliance.intercept.MethodInterceptor;
+import org.apache.gravitino.iceberg.service.rest.IcebergNamespaceOperations;
 import org.apache.gravitino.iceberg.service.rest.IcebergTableOperations;
 import org.apache.gravitino.iceberg.service.rest.IcebergTableRenameOperations;
 import org.glassfish.hk2.api.Filter;
@@ -38,7 +39,9 @@ public class IcebergRESTAuthInterceptionService extends 
BaseInterceptionService
   public Filter getDescriptorFilter() {
     return new ClassListFilter(
         ImmutableSet.of(
-            IcebergTableOperations.class.getName(), 
IcebergTableRenameOperations.class.getName()));
+            IcebergTableOperations.class.getName(),
+            IcebergTableRenameOperations.class.getName(),
+            IcebergNamespaceOperations.class.getName()));
   }
 
   @Override
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergNamespaceAuthorizationIT.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergNamespaceAuthorizationIT.java
index e8bc0dfbba..2265ae50dc 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergNamespaceAuthorizationIT.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergNamespaceAuthorizationIT.java
@@ -28,6 +28,7 @@ import java.util.Optional;
 import java.util.UUID;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.SchemaChange;
 import org.apache.gravitino.authorization.Owner;
 import org.apache.gravitino.authorization.Privileges;
 import org.apache.gravitino.authorization.SecurableObject;
@@ -38,6 +39,13 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 
+/**
+ * Integration tests for Iceberg namespace (schema) authorization 
functionality.
+ *
+ * <p>These tests verify that the authorization system correctly controls 
access to schema
+ * operations including creation, listing, modification, and deletion. Tests 
cover both
+ * ownership-based and privilege-based authorization models.
+ */
 @Tag("gravitino-docker-test")
 public class IcebergNamespaceAuthorizationIT extends IcebergAuthorizationIT {
 
@@ -57,35 +65,395 @@ public class IcebergNamespaceAuthorizationIT extends 
IcebergAuthorizationIT {
   @Test
   void testCreateSchemaWithOwner() {
     String namespace = "ns_owner";
+
+    // Catalog ownership enables schema creation
+    metalakeClientWithAllPrivilege.setOwner(
+        MetadataObjects.of(Arrays.asList(GRAVITINO_CATALOG_NAME), 
MetadataObject.Type.CATALOG),
+        NORMAL_USER,
+        Owner.Type.USER);
+
     sql("CREATE DATABASE %s", namespace);
-    Optional<Owner> owner =
+
+    // Verify schema owner is automatically set to the creator
+    Optional<Owner> schemaOwner =
         metalakeClientWithAllPrivilege.getOwner(
             MetadataObjects.of(
                 Arrays.asList(GRAVITINO_CATALOG_NAME, namespace), 
MetadataObject.Type.SCHEMA));
-    Assertions.assertTrue(owner.isPresent());
-    Assertions.assertEquals(NORMAL_USER, owner.get().name());
+    Assertions.assertTrue(schemaOwner.isPresent());
+    Assertions.assertEquals(NORMAL_USER, schemaOwner.get().name());
 
-    // test that the owner has privileges to create a table on the schema
-    String tableName = "table_owner";
+    // Test table creation within the owned schema
+    String tableName = "test_table";
     sql("USE %s", namespace);
-    sql("CREATE TABLE %s(a int)", tableName);
+    sql("CREATE TABLE %s(id int, name string) USING iceberg", tableName);
     sql("DESC TABLE %s", tableName);
-    owner =
+
+    // Verify table ownership is inherited
+    Optional<Owner> tableOwner =
         metalakeClientWithAllPrivilege.getOwner(
             MetadataObjects.of(
                 Arrays.asList(GRAVITINO_CATALOG_NAME, namespace, tableName),
                 MetadataObject.Type.TABLE));
-    Assertions.assertTrue(owner.isPresent());
-    Assertions.assertEquals(NORMAL_USER, owner.get().name());
+    Assertions.assertTrue(tableOwner.isPresent());
+    Assertions.assertEquals(NORMAL_USER, tableOwner.get().name());
+  }
+
+  @Test
+  void testCreateNamespaceAuthorization() {
+    String namespace = "ns_unauthorized";
+
+    // Should fail without proper authorization
+    Assertions.assertThrowsExactly(
+        org.apache.iceberg.exceptions.ForbiddenException.class,
+        () -> sql("CREATE DATABASE %s", namespace));
+
+    // Grant CREATE_SCHEMA and USE_CATALOG privileges and verify creation 
succeeds
+    grantUseCatalogRole(GRAVITINO_CATALOG_NAME);
+    grantCreateSchemaRole(GRAVITINO_CATALOG_NAME);
+
+    Assertions.assertDoesNotThrow(() -> sql("CREATE DATABASE %s", namespace));
+    Assertions.assertDoesNotThrow(() -> sql("USE %s", namespace));
+
+    // Verify schema owner is automatically set to the creator
+    Optional<Owner> schemaOwner =
+        metalakeClientWithAllPrivilege.getOwner(
+            MetadataObjects.of(
+                Arrays.asList(GRAVITINO_CATALOG_NAME, namespace), 
MetadataObject.Type.SCHEMA));
+    Assertions.assertTrue(schemaOwner.isPresent());
+    Assertions.assertEquals(NORMAL_USER, schemaOwner.get().name());
+  }
+
+  @Test
+  void testListNamespaces() {
+    String namespace1 = "ns_list_hidden1";
+    String namespace2 = "ns_list_hidden2";
+
+    // Create schemas as admin
+    catalogClientWithAllPrivilege.asSchemas().createSchema(namespace1, "test", 
new HashMap<>());
+    catalogClientWithAllPrivilege.asSchemas().createSchema(namespace2, "test", 
new HashMap<>());
+
+    // Normal user should not see unauthorized schemas
+    List<Object[]> result = sql("SHOW DATABASES");
+    List<String> visibleSchemas = extractSchemaNames(result);
+
+    Assertions.assertFalse(
+        visibleSchemas.contains(namespace1),
+        "Should not see " + namespace1 + " without USE_SCHEMA privilege");
+    Assertions.assertFalse(
+        visibleSchemas.contains(namespace2),
+        "Should not see " + namespace2 + " without USE_SCHEMA privilege");
+
+    // Without any roles, even SHOW DATABASES should fail
+    revokeUserRoles();
+    resetMetalakeAndCatalogOwner();
+    Assertions.assertThrowsExactly(
+        org.apache.iceberg.exceptions.ForbiddenException.class, () -> 
sql("SHOW DATABASES"));
+  }
+
+  @Test
+  void testListNamespacesWithFiltering() {
+    String visibleSchema = "ns_visible";
+    String hiddenSchema = "ns_hidden";
+
+    // Create test schemas
+    catalogClientWithAllPrivilege
+        .asSchemas()
+        .createSchema(visibleSchema, "visible schema", new HashMap<>());
+    catalogClientWithAllPrivilege
+        .asSchemas()
+        .createSchema(hiddenSchema, "hidden schema", new HashMap<>());
+
+    // Grant access to only one schema
+    grantUseCatalogRole(GRAVITINO_CATALOG_NAME);
+    grantUseSchemaRole(visibleSchema);
+
+    List<Object[]> result = sql("SHOW DATABASES");
+    List<String> visibleSchemas = extractSchemaNames(result);
+
+    Assertions.assertTrue(
+        visibleSchemas.contains(visibleSchema), "Should see authorized schema: 
" + visibleSchema);
+    Assertions.assertFalse(
+        visibleSchemas.contains(hiddenSchema),
+        "Should not see unauthorized schema: " + hiddenSchema);
+  }
+
+  @Test
+  void testUseNamespace() {
+    String namespace = "ns_access_test";
+
+    catalogClientWithAllPrivilege
+        .asSchemas()
+        .createSchema(namespace, "test schema", new HashMap<>());
+
+    // Access should fail without proper privileges
+    Assertions.assertThrowsExactly(
+        org.apache.iceberg.exceptions.ForbiddenException.class, () -> sql("USE 
%s", namespace));
+
+    // Grant schema access and verify success
+    grantUseCatalogRole(GRAVITINO_CATALOG_NAME);
+    grantUseSchemaRole(namespace);
+    Assertions.assertDoesNotThrow(() -> sql("USE %s", namespace));
+  }
+
+  @Test
+  void testUpdateNamespace() {
+    String namespace = "ns_modification";
+
+    catalogClientWithAllPrivilege
+        .asSchemas()
+        .createSchema(namespace, "test schema", new HashMap<>());
+    grantUseCatalogRole(GRAVITINO_CATALOG_NAME);
+
+    // Non-owners cannot modify schemas even with USE_CATALOG privilege
+    Assertions.assertThrowsExactly(
+        org.apache.iceberg.exceptions.ForbiddenException.class,
+        () -> sql("ALTER DATABASE %s SET DBPROPERTIES ('key'='value')", 
namespace));
+
+    // Schema ownership enables modification
+    metalakeClientWithAllPrivilege.setOwner(
+        MetadataObjects.of(
+            Arrays.asList(GRAVITINO_CATALOG_NAME, namespace), 
MetadataObject.Type.SCHEMA),
+        NORMAL_USER,
+        Owner.Type.USER);
+
+    grantUseSchemaRole(namespace);
+
+    Assertions.assertDoesNotThrow(
+        () ->
+            catalogClientWithAllPrivilege
+                .asSchemas()
+                .alterSchema(namespace, 
SchemaChange.setProperty("modified-by", "owner")));
+  }
+
+  @Test
+  void testDropNamespace() {
+    String namespace = "ns_deletion";
+
+    catalogClientWithAllPrivilege
+        .asSchemas()
+        .createSchema(namespace, "test schema", new HashMap<>());
+
+    // Set different owner initially
+    metalakeClientWithAllPrivilege.setOwner(
+        MetadataObjects.of(
+            Arrays.asList(GRAVITINO_CATALOG_NAME, namespace), 
MetadataObject.Type.SCHEMA),
+        SUPER_USER,
+        Owner.Type.USER);
+
+    // Non-owner cannot delete schema
+    Assertions.assertThrowsExactly(
+        org.apache.iceberg.exceptions.ForbiddenException.class,
+        () -> sql("DROP DATABASE %s", namespace));
+
+    // Transfer ownership to normal user
+    metalakeClientWithAllPrivilege.setOwner(
+        MetadataObjects.of(
+            Arrays.asList(GRAVITINO_CATALOG_NAME, namespace), 
MetadataObject.Type.SCHEMA),
+        NORMAL_USER,
+        Owner.Type.USER);
+
+    grantUseCatalogRole(GRAVITINO_CATALOG_NAME);
+    grantUseSchemaRole(namespace);
+
+    // Owner can delete schema
+    Assertions.assertDoesNotThrow(() -> sql("DROP DATABASE %s", namespace));
+
+    boolean exists = 
catalogClientWithAllPrivilege.asSchemas().schemaExists(namespace);
+    Assertions.assertFalse(exists, "Schema should be deleted");
+  }
+
+  @Test
+  void testNamespaceExistenceCheck() {
+    String namespace = "ns_existence";
+
+    catalogClientWithAllPrivilege
+        .asSchemas()
+        .createSchema(namespace, "test schema", new HashMap<>());
+
+    // Schema access requires proper authorization
+    Assertions.assertThrowsExactly(
+        org.apache.iceberg.exceptions.ForbiddenException.class, () -> sql("USE 
%s", namespace));
+
+    grantUseCatalogRole(GRAVITINO_CATALOG_NAME);
+    grantUseSchemaRole(namespace);
+    Assertions.assertDoesNotThrow(() -> sql("USE %s", namespace));
+    Assertions.assertDoesNotThrow(() -> sql("SHOW DATABASES"));
+  }
+
+  @Test
+  void testRegisterTable() {
+    String sourceNamespace = "ns_register_source";
+    String destNamespace = "ns_register_dst";
+    String sourceTable = "source_table";
+    String registeredTable = "registered_table";
+
+    // Create both schemas as admin
+    catalogClientWithAllPrivilege
+        .asSchemas()
+        .createSchema(sourceNamespace, "source schema for registration test", 
new HashMap<>());
+    catalogClientWithAllPrivilege
+        .asSchemas()
+        .createSchema(destNamespace, "destination schema for registration 
test", new HashMap<>());
+
+    // Grant source schema ownership to create source table
+    metalakeClientWithAllPrivilege.setOwner(
+        MetadataObjects.of(
+            Arrays.asList(GRAVITINO_CATALOG_NAME, sourceNamespace), 
MetadataObject.Type.SCHEMA),
+        NORMAL_USER,
+        Owner.Type.USER);
+    grantUseCatalogRole(GRAVITINO_CATALOG_NAME);
+    grantUseSchemaRole(sourceNamespace);
+
+    // Create source table and extract metadata location
+    sql("USE %s", sourceNamespace);
+    sql("CREATE TABLE %s(id bigint, data string) USING iceberg", sourceTable);
+    sql("INSERT INTO %s VALUES (1, 'test data')", sourceTable);
+
+    List<Object[]> metadataResults =
+        sql("SELECT file FROM %s.%s.metadata_log_entries", sourceNamespace, 
sourceTable);
+    String metadataLocation = (String) 
metadataResults.get(metadataResults.size() - 1)[0];
+
+    // Drop the original table to simulate external registration scenario
+    sql("DROP TABLE %s", sourceTable);
+
+    // Reset to limited privileges for testing registration authorization
+    revokeUserRoles();
+    resetMetalakeAndCatalogOwner();
+    grantUseCatalogRole(GRAVITINO_CATALOG_NAME);
+
+    // Registration should fail without destination schema ownership
+    Assertions.assertThrowsExactly(
+        org.apache.iceberg.exceptions.ForbiddenException.class,
+        () ->
+            sql(
+                "CALL rest.system.register_table(table => '%s.%s', 
metadata_file=> '%s')",
+                destNamespace, registeredTable, metadataLocation));
+
+    // Grant destination schema ownership and verify registration succeeds
+    metalakeClientWithAllPrivilege.setOwner(
+        MetadataObjects.of(
+            Arrays.asList(GRAVITINO_CATALOG_NAME, destNamespace), 
MetadataObject.Type.SCHEMA),
+        NORMAL_USER,
+        Owner.Type.USER);
+    grantUseSchemaRole(destNamespace);
+
+    Assertions.assertDoesNotThrow(
+        () ->
+            sql(
+                "CALL rest.system.register_table(table => '%s.%s', 
metadata_file=> '%s')",
+                destNamespace, registeredTable, metadataLocation));
+
+    // Verify table ownership and accessibility
+    Optional<Owner> tableOwner =
+        metalakeClientWithAllPrivilege.getOwner(
+            MetadataObjects.of(
+                Arrays.asList(GRAVITINO_CATALOG_NAME, destNamespace, 
registeredTable),
+                MetadataObject.Type.TABLE));
+    Assertions.assertTrue(tableOwner.isPresent());
+    Assertions.assertEquals(NORMAL_USER, tableOwner.get().name());
+
+    sql("USE %s", destNamespace);
+    Assertions.assertDoesNotThrow(() -> sql("DESC TABLE %s", registeredTable));
+  }
+
+  @Test
+  void testComplexAuthorizations() {
+    String namespace = "ns_complex_auth";
+
+    catalogClientWithAllPrivilege
+        .asSchemas()
+        .createSchema(namespace, "test schema", new HashMap<>());
+
+    revokeUserRoles();
+    resetMetalakeAndCatalogOwner();
+
+    // Verify operations fail without authorization
+    Assertions.assertThrowsExactly(
+        org.apache.iceberg.exceptions.ForbiddenException.class,
+        () -> sql("ALTER DATABASE %s SET DBPROPERTIES ('key1'='value1')", 
namespace));
+
+    // USE_CATALOG alone is insufficient for schema modification
+    grantUseCatalogRole(GRAVITINO_CATALOG_NAME);
+    Assertions.assertThrowsExactly(
+        org.apache.iceberg.exceptions.ForbiddenException.class,
+        () -> sql("ALTER DATABASE %s SET DBPROPERTIES ('key2'='value2')", 
namespace));
+
+    // Schema ownership with USE_CATALOG enables modification
+    metalakeClientWithAllPrivilege.setOwner(
+        MetadataObjects.of(
+            Arrays.asList(GRAVITINO_CATALOG_NAME, namespace), 
MetadataObject.Type.SCHEMA),
+        NORMAL_USER,
+        Owner.Type.USER);
+
+    grantUseSchemaRole(namespace);
+    Assertions.assertDoesNotThrow(() -> sql("USE %s", namespace));
+  }
+
+  @Test
+  void testAuthorizationException() {
+    // Authorization is checked before existence - security best practice
+    String nonExistentSchema = "non_existent_schema";
+    Assertions.assertThrowsExactly(
+        org.apache.iceberg.exceptions.ForbiddenException.class,
+        () -> sql("USE %s", nonExistentSchema));
+
+    // Special characters in schema names should work with authorization
+    String specialSchema = "schema_with_123_special";
+    catalogClientWithAllPrivilege
+        .asSchemas()
+        .createSchema(specialSchema, "special characters test", new 
HashMap<>());
+
+    grantUseCatalogRole(GRAVITINO_CATALOG_NAME);
+    grantUseSchemaRole(specialSchema);
+    Assertions.assertDoesNotThrow(() -> sql("USE %s", specialSchema));
   }
 
+  /** Grants USE_CATALOG privilege to the normal user for the specified 
catalog. */
   private void grantUseCatalogRole(String catalogName) {
     String roleName = "useCatalog_" + UUID.randomUUID();
-    List<SecurableObject> securableObjects = new ArrayList<>();
     SecurableObject catalogObject =
         SecurableObjects.ofCatalog(catalogName, 
ImmutableList.of(Privileges.UseCatalog.allow()));
-    securableObjects.add(catalogObject);
-    metalakeClientWithAllPrivilege.createRole(roleName, new HashMap<>(), 
securableObjects);
+
+    metalakeClientWithAllPrivilege.createRole(
+        roleName, new HashMap<>(), ImmutableList.of(catalogObject));
+    
metalakeClientWithAllPrivilege.grantRolesToUser(ImmutableList.of(roleName), 
NORMAL_USER);
+  }
+
+  /** Grants USE_SCHEMA privilege to the normal user for the specified schema. 
*/
+  private void grantUseSchemaRole(String schemaName) {
+    String roleName = "useSchema_" + UUID.randomUUID();
+    SecurableObject catalogObject =
+        SecurableObjects.ofCatalog(GRAVITINO_CATALOG_NAME, ImmutableList.of());
+    SecurableObject schemaObject =
+        SecurableObjects.ofSchema(
+            catalogObject, schemaName, 
ImmutableList.of(Privileges.UseSchema.allow()));
+
+    metalakeClientWithAllPrivilege.createRole(
+        roleName, new HashMap<>(), ImmutableList.of(schemaObject));
     
metalakeClientWithAllPrivilege.grantRolesToUser(ImmutableList.of(roleName), 
NORMAL_USER);
   }
+
+  /** Grants CREATE_SCHEMA privilege to the normal user for the specified 
catalog. */
+  private void grantCreateSchemaRole(String catalogName) {
+    String roleName = "createSchema_" + UUID.randomUUID();
+    SecurableObject catalogObject =
+        SecurableObjects.ofCatalog(catalogName, 
ImmutableList.of(Privileges.CreateSchema.allow()));
+
+    metalakeClientWithAllPrivilege.createRole(
+        roleName, new HashMap<>(), ImmutableList.of(catalogObject));
+    
metalakeClientWithAllPrivilege.grantRolesToUser(ImmutableList.of(roleName), 
NORMAL_USER);
+  }
+
+  /** Extracts schema names from SHOW DATABASES result, handling various 
result formats. */
+  private List<String> extractSchemaNames(List<Object[]> result) {
+    List<String> schemaNames = new ArrayList<>();
+    for (Object[] row : result) {
+      if (row.length > 1 && row[1] != null) {
+        schemaNames.add((String) row[1]);
+      } else if (row.length == 1 && row[0] != null) {
+        schemaNames.add((String) row[0]);
+      }
+    }
+    return schemaNames;
+  }
 }
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergOwnershipUtils.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergOwnershipUtils.java
new file mode 100644
index 0000000000..92a5e9f9bc
--- /dev/null
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergOwnershipUtils.java
@@ -0,0 +1,108 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.iceberg.service.dispatcher;
+
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.iceberg.common.utils.IcebergIdentifierUtils;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class TestIcebergOwnershipUtils {
+
+  private static final String METALAKE = "test_metalake";
+  private static final String CATALOG = "test_catalog";
+  private static final String USER = "test_user";
+  private static final String SCHEMA_NAME = "test_schema";
+  private static final String TABLE_NAME = "test_table";
+
+  private OwnerDispatcher mockOwnerDispatcher;
+
+  @BeforeEach
+  public void setUp() {
+    mockOwnerDispatcher = mock(OwnerDispatcher.class);
+  }
+
+  @Test
+  public void testSetSchemaOwner() {
+    Namespace namespace = Namespace.of(SCHEMA_NAME);
+    NameIdentifier expectedSchemaIdentifier =
+        IcebergIdentifierUtils.toGravitinoSchemaIdentifier(METALAKE, CATALOG, 
namespace);
+    MetadataObject expectedMetadataObject =
+        NameIdentifierUtil.toMetadataObject(expectedSchemaIdentifier, 
Entity.EntityType.SCHEMA);
+
+    IcebergOwnershipUtils.setSchemaOwner(METALAKE, CATALOG, namespace, USER, 
mockOwnerDispatcher);
+
+    verify(mockOwnerDispatcher, times(1))
+        .setOwner(eq(METALAKE), eq(expectedMetadataObject), eq(USER), 
eq(Owner.Type.USER));
+  }
+
+  @Test
+  public void testSetTableOwner() {
+    Namespace namespace = Namespace.of(SCHEMA_NAME);
+    TableIdentifier tableIdentifier = TableIdentifier.of(namespace, 
TABLE_NAME);
+    NameIdentifier expectedTableIdentifier =
+        IcebergIdentifierUtils.toGravitinoTableIdentifier(METALAKE, CATALOG, 
tableIdentifier);
+    MetadataObject expectedMetadataObject =
+        NameIdentifierUtil.toMetadataObject(expectedTableIdentifier, 
Entity.EntityType.TABLE);
+
+    IcebergOwnershipUtils.setTableOwner(
+        METALAKE, CATALOG, namespace, TABLE_NAME, USER, mockOwnerDispatcher);
+
+    verify(mockOwnerDispatcher, times(1))
+        .setOwner(eq(METALAKE), eq(expectedMetadataObject), eq(USER), 
eq(Owner.Type.USER));
+  }
+
+  @Test
+  public void testSetSchemaOwnerWithNullOwnerDispatcher() {
+    Namespace namespace = Namespace.of(SCHEMA_NAME);
+
+    try {
+      IcebergOwnershipUtils.setSchemaOwner(METALAKE, CATALOG, namespace, USER, 
null);
+    } catch (Exception e) {
+      fail("setSchemaOwner should handle null dispatcher gracefully, but 
threw: " + e.getMessage());
+    }
+  }
+
+  @Test
+  public void testSetTableOwnerWithNullOwnerDispatcher() {
+    Namespace namespace = Namespace.of(SCHEMA_NAME);
+
+    try {
+      IcebergOwnershipUtils.setTableOwner(METALAKE, CATALOG, namespace, 
TABLE_NAME, USER, null);
+    } catch (Exception e) {
+      fail("setTableOwner should handle null dispatcher gracefully, but threw: 
" + e.getMessage());
+    }
+  }
+}

Reply via email to