This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 17928301ce [#9220] feat(iceberg): Add authorization for 
cross-namespace table renames (#9219)
17928301ce is described below

commit 17928301ce11e852be8add455979847164afe657
Author: Bharath Krishna <[email protected]>
AuthorDate: Sun Nov 23 08:54:14 2025 -0800

    [#9220] feat(iceberg): Add authorization for cross-namespace table renames 
(#9219)
    
    ### What changes were proposed in this pull request?
    
    - Enhanced `IcebergMetadataAuthorizationMethodInterceptor` to validate
    destination schema privileges for rename operations across different
    namespaces
    - Added dual authorization check requiring both source table ownership
    and destination CREATE_TABLE privileges
    
    ### Why are the changes needed?
    
    The current rename table operation only checks privileges for the source
    table, not the destination table. This creates a security gap where
    users can rename tables to schemas they shouldn't have access to.
    
    Fix: #9220
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, rename table operations to different schemas now require
    CREATE_TABLE privilege on the destination schema in addition to existing
    source table ownership requirements.
    
    
    ### How was this patch tested?
    - Added integration test covering three scenarios:
      1. No privileges (should fail)
      2. Source table ownership only (should fail)
    3. Both source ownership and destination CREATE_TABLE privilege (should
    succeed)
    - Verified existing same-schema rename functionality remains unchanged
    - All existing authorization tests continue to pass
---
 .../apache/gravitino/hook/TableHookDispatcher.java |   1 +
 .../dispatcher/IcebergTableHookDispatcher.java     |   2 +
 .../service/rest/IcebergTableRenameOperations.java |   2 +-
 ...BaseMetadataAuthorizationMethodInterceptor.java | 104 ++++++++--
 ...bergMetadataAuthorizationMethodInterceptor.java |  73 +++----
 .../server/web/filter/LoadTableAuthzHandler.java   | 124 +++++++++++
 .../server/web/filter/RenameTableAuthzHandler.java | 184 ++++++++++++++++
 .../test/IcebergTableAuthorizationIT.java          | 231 ++++++++++++++++++++-
 8 files changed, 651 insertions(+), 70 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java
index 5afa9ac533..55e9ad40a2 100644
--- a/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java
@@ -75,6 +75,7 @@ public class TableHookDispatcher implements TableDispatcher {
       Index[] indexes)
       throws NoSuchSchemaException, TableAlreadyExistsException {
     // Check whether the current user exists or not
+    // TODO(bharos): Move this to PassThroughAuthorizer
     AuthorizationUtils.checkCurrentUser(
         ident.namespace().level(0), PrincipalUtils.getCurrentUserName());
 
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
index 388205ffd0..f96c38e6c2 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
@@ -116,6 +116,8 @@ public class IcebergTableHookDispatcher implements 
IcebergTableOperationDispatch
 
   @Override
   public void renameTable(IcebergRequestContext context, RenameTableRequest 
renameTableRequest) {
+    AuthorizationUtils.checkCurrentUser(metalake, context.userName());
+
     dispatcher.renameTable(context, renameTableRequest);
     NameIdentifier tableSource =
         IcebergIdentifierUtils.toGravitinoTableIdentifier(
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableRenameOperations.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableRenameOperations.java
index 09766912ac..09c5f6e72a 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableRenameOperations.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableRenameOperations.java
@@ -70,7 +70,7 @@ public class IcebergTableRenameOperations {
       expression =
           "ANY(OWNER, METALAKE, CATALOG) || "
               + "SCHEMA_OWNER_WITH_USE_CATALOG || "
-              + "ANY_USE_CATALOG && ANY_USE_SCHEMA  && (TABLE::OWNER || 
ANY_MODIFY_TABLE)",
+              + "ANY_USE_CATALOG && ANY_USE_SCHEMA && (TABLE::OWNER || 
ANY_MODIFY_TABLE)",
       accessMetadataType = MetadataObject.Type.TABLE)
   public Response renameTable(
       @AuthorizationMetadata(type = Entity.EntityType.CATALOG) 
@PathParam("prefix") String prefix,
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/BaseMetadataAuthorizationMethodInterceptor.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/BaseMetadataAuthorizationMethodInterceptor.java
index d24e950391..7dc4931cf1 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/BaseMetadataAuthorizationMethodInterceptor.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/BaseMetadataAuthorizationMethodInterceptor.java
@@ -22,6 +22,7 @@ package org.apache.gravitino.server.web.filter;
 import java.lang.reflect.Method;
 import java.lang.reflect.Parameter;
 import java.util.Map;
+import java.util.Optional;
 import org.aopalliance.intercept.MethodInterceptor;
 import org.aopalliance.intercept.MethodInvocation;
 import org.apache.gravitino.Entity;
@@ -31,6 +32,7 @@ import 
org.apache.gravitino.authorization.AuthorizationRequestContext;
 import org.apache.gravitino.iceberg.service.IcebergExceptionMapper;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
 import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionEvaluator;
+import org.apache.gravitino.server.web.Utils;
 import org.apache.gravitino.utils.PrincipalUtils;
 import org.apache.iceberg.exceptions.ForbiddenException;
 import org.slf4j.Logger;
@@ -45,9 +47,57 @@ public abstract class 
BaseMetadataAuthorizationMethodInterceptor implements Meth
   private static final Logger LOG =
       
LoggerFactory.getLogger(BaseMetadataAuthorizationMethodInterceptor.class);
 
+  /**
+   * Handler for request-specific authorization processing that cannot be 
handled by standard
+   * annotation-based expressions. Implementations can enrich identifiers, 
validate requests, and/or
+   * perform custom authorization.
+   */
+  protected interface AuthorizationHandler {
+    /**
+     * Process the request for authorization purposes. This may include:
+     *
+     * <ul>
+     *   <li>Extracting additional identifiers from request bodies
+     *   <li>Validating request parameters
+     *   <li>Performing custom authorization logic
+     * </ul>
+     *
+     * @param nameIdentifierMap Name identifier map (can be modified to add 
identifiers)
+     * @throws ForbiddenException if authorization or validation fails
+     */
+    void process(Map<Entity.EntityType, NameIdentifier> nameIdentifierMap)
+        throws ForbiddenException;
+
+    /**
+     * Whether this handler has completed full authorization. Called after 
{@link #process} to
+     * determine if standard expression-based authorization should be skipped.
+     *
+     * @return true if authorization is complete (skip standard check), false 
to continue with
+     *     standard expression-based authorization
+     */
+    boolean authorizationCompleted();
+  }
+
   protected abstract Map<Entity.EntityType, NameIdentifier> 
extractNameIdentifierFromParameters(
       Parameter[] parameters, Object[] args);
 
+  /**
+   * Create an authorization handler for this request, if special handling is 
needed beyond standard
+   * annotation-based authorization.
+   *
+   * <p>Override this method to provide custom handlers based on request 
characteristics (e.g.,
+   * annotations, request types, parameters).
+   *
+   * @param parameters Method parameters
+   * @param args Method arguments
+   * @return Optional handler for custom authorization processing, or empty if 
standard
+   *     authorization is sufficient
+   */
+  protected Optional<AuthorizationHandler> createAuthorizationHandler(
+      Parameter[] parameters, Object[] args) {
+    return Optional.empty();
+  }
+
   protected boolean isExceptionPropagate(Exception e) {
     return false;
   }
@@ -70,25 +120,43 @@ public abstract class 
BaseMetadataAuthorizationMethodInterceptor implements Meth
       if (expressionAnnotation != null) {
         String expression = expressionAnnotation.expression();
         Object[] args = methodInvocation.getArguments();
-        Map<Entity.EntityType, NameIdentifier> metadataContext =
+        Map<Entity.EntityType, NameIdentifier> nameIdentifierMap =
             extractNameIdentifierFromParameters(parameters, args);
-        AuthorizationExpressionEvaluator authorizationExpressionEvaluator =
-            new AuthorizationExpressionEvaluator(expression);
-        boolean authorizeResult =
-            authorizationExpressionEvaluator.evaluate(
-                metadataContext, new AuthorizationRequestContext());
-        if (!authorizeResult) {
-          MetadataObject.Type type = expressionAnnotation.accessMetadataType();
-          NameIdentifier accessMetadataName =
-              metadataContext.get(Entity.EntityType.valueOf(type.name()));
-          String currentUser = PrincipalUtils.getCurrentUserName();
-          String methodName = method.getName();
-          String notAuthzMessage =
-              String.format(
-                  "User '%s' is not authorized to perform operation '%s' on 
metadata '%s' with expression '%s'",
-                  currentUser, methodName, accessMetadataName, expression);
-          LOG.info(notAuthzMessage);
-          return IcebergExceptionMapper.toRESTResponse(new 
ForbiddenException(notAuthzMessage));
+
+        // Process custom authorization if handler exists
+        Optional<AuthorizationHandler> handler = 
createAuthorizationHandler(parameters, args);
+        boolean skipStandardCheck = false;
+
+        if (handler.isPresent()) {
+          AuthorizationHandler authzHandler = handler.get();
+          authzHandler.process(nameIdentifierMap);
+          skipStandardCheck = authzHandler.authorizationCompleted();
+        }
+
+        // Perform standard authorization check if custom handler didn't 
complete it
+        if (!skipStandardCheck) {
+          Map<String, Object> pathParams = 
Utils.extractPathParamsFromParameters(parameters, args);
+          AuthorizationExpressionEvaluator authorizationExpressionEvaluator =
+              new AuthorizationExpressionEvaluator(expression);
+          boolean authorizeResult =
+              authorizationExpressionEvaluator.evaluate(
+                  nameIdentifierMap,
+                  pathParams,
+                  new AuthorizationRequestContext(),
+                  Optional.empty());
+          if (!authorizeResult) {
+            MetadataObject.Type type = 
expressionAnnotation.accessMetadataType();
+            NameIdentifier accessMetadataName =
+                nameIdentifierMap.get(Entity.EntityType.valueOf(type.name()));
+            String currentUser = PrincipalUtils.getCurrentUserName();
+            String methodName = method.getName();
+            String notAuthzMessage =
+                String.format(
+                    "User '%s' is not authorized to perform operation '%s' on 
metadata '%s' with expression '%s'",
+                    currentUser, methodName, accessMetadataName, expression);
+            LOG.info(notAuthzMessage);
+            return IcebergExceptionMapper.toRESTResponse(new 
ForbiddenException(notAuthzMessage));
+          }
         }
       }
     } catch (Exception ex) {
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/IcebergMetadataAuthorizationMethodInterceptor.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/IcebergMetadataAuthorizationMethodInterceptor.java
index 4092e339b7..f5d9c997d2 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/IcebergMetadataAuthorizationMethodInterceptor.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/IcebergMetadataAuthorizationMethodInterceptor.java
@@ -22,6 +22,7 @@ package org.apache.gravitino.server.web.filter;
 import java.lang.reflect.Parameter;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.Entity.EntityType;
 import org.apache.gravitino.NameIdentifier;
@@ -29,12 +30,10 @@ import 
org.apache.gravitino.iceberg.service.IcebergRESTUtils;
 import 
org.apache.gravitino.iceberg.service.authorization.IcebergRESTServerContext;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
 import 
org.apache.gravitino.server.authorization.annotations.IcebergAuthorizationMetadata;
+import 
org.apache.gravitino.server.authorization.annotations.IcebergAuthorizationMetadata.RequestType;
 import org.apache.gravitino.utils.NameIdentifierUtil;
-import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.catalog.Namespace;
-import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.rest.RESTUtil;
-import org.apache.iceberg.rest.requests.RenameTableRequest;
 
 /**
  * Through dynamic proxy, obtain the annotations on the method and parameter 
list to perform
@@ -42,6 +41,7 @@ import org.apache.iceberg.rest.requests.RenameTableRequest;
  */
 public class IcebergMetadataAuthorizationMethodInterceptor
     extends BaseMetadataAuthorizationMethodInterceptor {
+
   private final String metalakeName = 
IcebergRESTServerContext.getInstance().metalakeName();
 
   @Override
@@ -57,34 +57,6 @@ public class IcebergMetadataAuthorizationMethodInterceptor
       Parameter parameter = parameters[i];
       AuthorizationMetadata authorizeResource =
           parameter.getAnnotation(AuthorizationMetadata.class);
-      IcebergAuthorizationMetadata icebergAuthorizeResource =
-          parameter.getAnnotation(IcebergAuthorizationMetadata.class);
-      if (icebergAuthorizeResource != null) {
-        switch (icebergAuthorizeResource.type()) {
-          case RENAME_TABLE:
-            RenameTableRequest renameTableRequest = (RenameTableRequest) 
args[i];
-            schema = renameTableRequest.source().namespace().level(0);
-            nameIdentifierMap.put(
-                Entity.EntityType.SCHEMA,
-                NameIdentifierUtil.ofSchema(metalakeName, catalog, schema));
-            nameIdentifierMap.put(
-                Entity.EntityType.TABLE,
-                NameIdentifierUtil.ofTable(
-                    metalakeName, catalog, schema, 
renameTableRequest.source().name()));
-            break;
-          case LOAD_TABLE:
-            String tableName = String.valueOf(args[i]);
-            if (isMetadataTable(tableName, rawNamespace)) {
-              throw new NoSuchTableException("Table %s not found", tableName);
-            }
-            nameIdentifierMap.put(
-                Entity.EntityType.TABLE,
-                NameIdentifierUtil.ofTable(metalakeName, catalog, schema, 
tableName));
-            break;
-          default:
-            break;
-        }
-      }
       if (authorizeResource == null) {
         continue;
       }
@@ -113,21 +85,34 @@ public class IcebergMetadataAuthorizationMethodInterceptor
     return nameIdentifierMap;
   }
 
+  /**
+   * Creates an authorization handler for Iceberg-specific operations that 
require custom logic
+   * beyond standard annotation-based authorization.
+   */
   @Override
-  protected boolean isExceptionPropagate(Exception e) {
-    return e.getClass().getName().startsWith("org.apache.iceberg.exceptions");
-  }
-
-  // Check if the table is a metadata table, for metadata table, the namespace 
is `catalog.db.table`
-  private boolean isMetadataTable(String tableName, Namespace namespace) {
-    MetadataTableType metadataTableType = MetadataTableType.from(tableName);
-    if (metadataTableType == null) {
-      return false;
+  protected Optional<AuthorizationHandler> createAuthorizationHandler(
+      Parameter[] parameters, Object[] args) {
+    for (Parameter parameter : parameters) {
+      IcebergAuthorizationMetadata icebergMetadata =
+          parameter.getAnnotation(IcebergAuthorizationMetadata.class);
+      if (icebergMetadata != null) {
+        // Create handler with parameters and args for processing
+        RequestType type = icebergMetadata.type();
+        switch (type) {
+          case LOAD_TABLE:
+            return Optional.of(new LoadTableAuthzHandler(parameters, args));
+          case RENAME_TABLE:
+            return Optional.of(new RenameTableAuthzHandler(parameters, args));
+          default:
+            break;
+        }
+      }
     }
+    return Optional.empty();
+  }
 
-    if (namespace.levels().length > 1) {
-      return true;
-    }
-    return false;
+  @Override
+  protected boolean isExceptionPropagate(Exception e) {
+    return e.getClass().getName().startsWith("org.apache.iceberg.exceptions");
   }
 }
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/LoadTableAuthzHandler.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/LoadTableAuthzHandler.java
new file mode 100644
index 0000000000..c4814b533e
--- /dev/null
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/LoadTableAuthzHandler.java
@@ -0,0 +1,124 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.server.web.filter;
+
+import java.lang.reflect.Parameter;
+import java.util.Map;
+import org.apache.gravitino.Entity.EntityType;
+import org.apache.gravitino.NameIdentifier;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
+import 
org.apache.gravitino.server.authorization.annotations.IcebergAuthorizationMetadata;
+import 
org.apache.gravitino.server.authorization.annotations.IcebergAuthorizationMetadata.RequestType;
+import 
org.apache.gravitino.server.web.filter.BaseMetadataAuthorizationMethodInterceptor.AuthorizationHandler;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.rest.RESTUtil;
+
+/**
+ * Handler for LOAD_TABLE operations. Validates that the requested table is 
not a metadata table
+ * (e.g., table$snapshots) and extracts the table identifier for authorization 
checks.
+ */
+public class LoadTableAuthzHandler implements AuthorizationHandler {
+  private final Parameter[] parameters;
+  private final Object[] args;
+
+  public LoadTableAuthzHandler(Parameter[] parameters, Object[] args) {
+    this.parameters = parameters;
+    this.args = args;
+  }
+
+  @Override
+  public void process(Map<EntityType, NameIdentifier> nameIdentifierMap) {
+    // Find the table name and namespace from parameters
+    String tableName = null;
+    Namespace namespace = null;
+
+    for (int i = 0; i < parameters.length; i++) {
+      Parameter parameter = parameters[i];
+
+      IcebergAuthorizationMetadata icebergMetadata =
+          parameter.getAnnotation(IcebergAuthorizationMetadata.class);
+      if (icebergMetadata != null && icebergMetadata.type() == 
RequestType.LOAD_TABLE) {
+        tableName = String.valueOf(args[i]);
+      }
+
+      AuthorizationMetadata authMetadata = 
parameter.getAnnotation(AuthorizationMetadata.class);
+      if (authMetadata != null && authMetadata.type() == EntityType.SCHEMA) {
+        // Decode the raw Iceberg namespace parameter
+        namespace = RESTUtil.decodeNamespace(String.valueOf(args[i]));
+      }
+    }
+
+    if (tableName == null || namespace == null) {
+      throw new NoSuchTableException("Table not found - missing table name or 
namespace");
+    }
+
+    // Validate that this is not a metadata table access
+    if (isMetadataTable(tableName, namespace)) {
+      throw new NoSuchTableException("Table %s not found", tableName);
+    }
+
+    NameIdentifier catalogId = nameIdentifierMap.get(EntityType.CATALOG);
+    NameIdentifier schemaId = nameIdentifierMap.get(EntityType.SCHEMA);
+
+    if (catalogId == null || schemaId == null) {
+      throw new NoSuchTableException("Missing catalog or schema context for 
table authorization");
+    }
+
+    String metalakeName = catalogId.namespace().level(0);
+    String catalog = catalogId.name();
+    String schema = schemaId.name();
+
+    // Add table identifier to the map for authorization expression evaluation
+    nameIdentifierMap.put(
+        EntityType.TABLE, NameIdentifierUtil.ofTable(metalakeName, catalog, 
schema, tableName));
+  }
+
+  @Override
+  public boolean authorizationCompleted() {
+    // This handler only enriches identifiers, doesn't perform full 
authorization
+    return false;
+  }
+
+  /**
+   * Check if the table is a metadata table. Metadata tables have special 
names like
+   * `table$snapshots`, `table$files`, etc., and are accessed with longer 
namespace paths
+   * (catalog.db.table instead of catalog.db).
+   *
+   * @param tableName The table name to check
+   * @param namespace The Iceberg namespace of the table
+   * @return true if this is a metadata table access, false otherwise
+   */
+  private boolean isMetadataTable(String tableName, Namespace namespace) {
+    MetadataTableType metadataTableType = MetadataTableType.from(tableName);
+    if (metadataTableType == null) {
+      return false;
+    }
+
+    // Metadata tables have namespace length > 1 (e.g., catalog.db.table has 3 
levels)
+    // Regular tables have namespace length = 1 (e.g., catalog.db has 2 
levels, but we get "db")
+    if (namespace.levels().length > 1) {
+      return true;
+    }
+    return false;
+  }
+}
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/RenameTableAuthzHandler.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/RenameTableAuthzHandler.java
new file mode 100644
index 0000000000..f839858b57
--- /dev/null
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/RenameTableAuthzHandler.java
@@ -0,0 +1,184 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.server.web.filter;
+
+import java.lang.reflect.Parameter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.Entity.EntityType;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.authorization.AuthorizationRequestContext;
+import 
org.apache.gravitino.server.authorization.annotations.IcebergAuthorizationMetadata;
+import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionEvaluator;
+import 
org.apache.gravitino.server.web.filter.BaseMetadataAuthorizationMethodInterceptor.AuthorizationHandler;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.iceberg.exceptions.ForbiddenException;
+import org.apache.iceberg.rest.requests.RenameTableRequest;
+
+/**
+ * Handler for RENAME_TABLE operations. Performs authorization checks for 
cross-namespace renames
+ * which require stricter checks than same-namespace renames, following MySQL 
privilege model.
+ */
+@SuppressWarnings("FormatStringAnnotation")
+public class RenameTableAuthzHandler implements AuthorizationHandler {
+  private final Parameter[] parameters;
+  private final Object[] args;
+  private boolean crossNamespaceRename = false;
+
+  public RenameTableAuthzHandler(Parameter[] parameters, Object[] args) {
+    this.parameters = parameters;
+    this.args = args;
+  }
+
+  @Override
+  public void process(Map<EntityType, NameIdentifier> nameIdentifierMap) {
+    RenameTableRequest renameTableRequest = null;
+    for (int i = 0; i < parameters.length; i++) {
+      IcebergAuthorizationMetadata metadata =
+          parameters[i].getAnnotation(IcebergAuthorizationMetadata.class);
+      if (metadata != null
+          && metadata.type() == 
IcebergAuthorizationMetadata.RequestType.RENAME_TABLE) {
+        renameTableRequest = (RenameTableRequest) args[i];
+        break;
+      }
+    }
+
+    if (renameTableRequest == null) {
+      throw new ForbiddenException("RenameTableRequest not found in 
parameters");
+    }
+
+    // Extract metalake and catalog from nameIdentifierMap
+    NameIdentifier metalakeIdent = nameIdentifierMap.get(EntityType.METALAKE);
+    NameIdentifier catalogIdent = nameIdentifierMap.get(EntityType.CATALOG);
+
+    if (metalakeIdent == null || catalogIdent == null) {
+      throw new ForbiddenException("Missing metalake or catalog context for 
authorization");
+    }
+
+    String metalakeName = metalakeIdent.name();
+    String catalog = catalogIdent.name();
+
+    // Extract source table information from the request and add to map
+    // The source table is NOT extracted via standard @AuthorizationMetadata 
annotations
+    // because it's embedded in the RenameTableRequest body
+    String sourceSchema = renameTableRequest.source().namespace().level(0);
+    String sourceTable = renameTableRequest.source().name();
+
+    nameIdentifierMap.put(
+        EntityType.SCHEMA, NameIdentifierUtil.ofSchema(metalakeName, catalog, 
sourceSchema));
+    nameIdentifierMap.put(
+        EntityType.TABLE,
+        NameIdentifierUtil.ofTable(metalakeName, catalog, sourceSchema, 
sourceTable));
+
+    String destSchema = renameTableRequest.destination().namespace().level(0);
+    if (!sourceSchema.equals(destSchema)) {
+      // Cross-namespace rename - perform complete authorization here
+      crossNamespaceRename = true;
+      validateCrossNamespaceRename(catalog, metalakeName, sourceSchema, 
sourceTable, destSchema);
+    }
+  }
+
+  @Override
+  public boolean authorizationCompleted() {
+    // Return true if we performed complete authorization (cross-namespace 
case)
+    return crossNamespaceRename;
+  }
+
+  /**
+   * Validates authorization for cross-namespace renames following MySQL 
privilege model: - Requires
+   * ownership on source table (equivalent to DROP privilege) - Requires 
CREATE_TABLE privilege on
+   * destination schema
+   *
+   * @param catalog The catalog name
+   * @param metalakeName The metalake name
+   * @param sourceSchema The source schema name
+   * @param sourceTable The source table name
+   * @param destSchema The destination schema name
+   * @throws ForbiddenException if the user lacks required privileges
+   */
+  private void validateCrossNamespaceRename(
+      String catalog,
+      String metalakeName,
+      String sourceSchema,
+      String sourceTable,
+      String destSchema) {
+    String currentUser = PrincipalUtils.getCurrentUserName();
+    Map<EntityType, NameIdentifier> sourceContext = new HashMap<>();
+    sourceContext.put(EntityType.METALAKE, 
NameIdentifierUtil.ofMetalake(metalakeName));
+    sourceContext.put(EntityType.CATALOG, 
NameIdentifierUtil.ofCatalog(metalakeName, catalog));
+    sourceContext.put(
+        EntityType.SCHEMA, NameIdentifierUtil.ofSchema(metalakeName, catalog, 
sourceSchema));
+    sourceContext.put(
+        EntityType.TABLE,
+        NameIdentifierUtil.ofTable(metalakeName, catalog, sourceSchema, 
sourceTable));
+
+    String sourceExpression =
+        "ANY(OWNER, METALAKE, CATALOG) || "
+            + "SCHEMA_OWNER_WITH_USE_CATALOG || "
+            + "ANY_USE_CATALOG && ANY_USE_SCHEMA && TABLE::OWNER";
+
+    AuthorizationExpressionEvaluator sourceEvaluator =
+        new AuthorizationExpressionEvaluator(sourceExpression);
+
+    boolean sourceAuthorized =
+        sourceEvaluator.evaluate(
+            sourceContext, new HashMap<>(), new AuthorizationRequestContext(), 
Optional.empty());
+
+    if (!sourceAuthorized) {
+      String notAuthzMessage =
+          String.format(
+              "User '%s' is not authorized to drop/move table '%s' from schema 
'%s'. "
+                  + "Only the table owner can move a table to a different 
schema.",
+              currentUser, sourceTable, sourceSchema);
+      throw new ForbiddenException(notAuthzMessage);
+    }
+
+    // Check CREATE_TABLE privilege on destination schema
+    // Note: The destination schema is NOT in the nameIdentifierMap, so the 
standard expression
+    // never checked it. We need to perform a full authorization check here.
+    Map<EntityType, NameIdentifier> destContext = new HashMap<>();
+    destContext.put(EntityType.METALAKE, 
NameIdentifierUtil.ofMetalake(metalakeName));
+    destContext.put(EntityType.CATALOG, 
NameIdentifierUtil.ofCatalog(metalakeName, catalog));
+    destContext.put(
+        EntityType.SCHEMA, NameIdentifierUtil.ofSchema(metalakeName, catalog, 
destSchema));
+
+    String destExpression =
+        "ANY(OWNER, METALAKE, CATALOG) || "
+            + "SCHEMA_OWNER_WITH_USE_CATALOG || "
+            + "ANY_USE_CATALOG && ANY_USE_SCHEMA && ANY_CREATE_TABLE";
+
+    AuthorizationExpressionEvaluator destEvaluator =
+        new AuthorizationExpressionEvaluator(destExpression);
+
+    boolean destAuthorized =
+        destEvaluator.evaluate(
+            destContext, new HashMap<>(), new AuthorizationRequestContext(), 
Optional.empty());
+
+    if (!destAuthorized) {
+      String notAuthzMessage =
+          String.format(
+              "User '%s' is not authorized to create table in destination 
schema '%s'",
+              currentUser, destSchema);
+      throw new ForbiddenException(notAuthzMessage);
+    }
+  }
+}
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergTableAuthorizationIT.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergTableAuthorizationIT.java
index d5a760f8bb..f0850db376 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergTableAuthorizationIT.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergTableAuthorizationIT.java
@@ -271,24 +271,221 @@ public class IcebergTableAuthorizationIT extends 
IcebergAuthorizationIT {
   }
 
   @Test
-  public void testRenameTable() {
-    String tableName = "test_rename";
+  public void testRenameTableSameNamespace() {
+    String tableName = "test_rename_same_ns";
     createTable(SCHEMA_NAME, tableName);
+
+    // No privileges - should fail
     Assertions.assertThrowsExactly(
         ForbiddenException.class,
         () -> sql("ALTER TABLE %s RENAME TO %s", tableName, tableName + 
"_renamed"));
 
-    setTableOwner(tableName);
+    // Test with MODIFY_TABLE privilege
+    String modifyTableRole = grantModifyTableRole(tableName);
     Assertions.assertDoesNotThrow(
         () -> sql("ALTER TABLE %s RENAME TO %s", tableName, tableName + 
"_renamed"));
 
+    // Verify ownership remains with original owner (SUPER_USER created the 
table)
     Optional<Owner> owner =
         metalakeClientWithAllPrivilege.getOwner(
             MetadataObjects.of(
                 Arrays.asList(GRAVITINO_CATALOG_NAME, SCHEMA_NAME, tableName + 
"_renamed"),
                 MetadataObject.Type.TABLE));
     Assertions.assertTrue(owner.isPresent());
+    Assertions.assertEquals(SUPER_USER, owner.get().name());
+
+    // Clean up for next test
+    revokeRole(modifyTableRole);
+    catalogClientWithAllPrivilege
+        .asTableCatalog()
+        .dropTable(NameIdentifier.of(SCHEMA_NAME, tableName + "_renamed"));
+
+    // Test with table ownership (without MODIFY_TABLE)
+    createTable(SCHEMA_NAME, tableName);
+    setTableOwner(tableName);
+    Assertions.assertDoesNotThrow(
+        () -> sql("ALTER TABLE %s RENAME TO %s", tableName, tableName + 
"_renamed2"));
+
+    // Verify ownership is retained
+    owner =
+        metalakeClientWithAllPrivilege.getOwner(
+            MetadataObjects.of(
+                Arrays.asList(GRAVITINO_CATALOG_NAME, SCHEMA_NAME, tableName + 
"_renamed2"),
+                MetadataObject.Type.TABLE));
+    Assertions.assertTrue(owner.isPresent());
+    Assertions.assertEquals(NORMAL_USER, owner.get().name());
+  }
+
+  @Test
+  public void testRenameTableToDifferentNamespace() {
+    String sourceSchema = SCHEMA_NAME;
+    String destSchema = SCHEMA_NAME + "_dest";
+    String tableName = "test_cross_ns_rename";
+
+    // Create destination schema
+    catalogClientWithAllPrivilege
+        .asSchemas()
+        .createSchema(destSchema, "dest schema", new HashMap<>());
+    grantUseSchemaRole(destSchema);
+
+    // Create table in source schema
+    createTable(sourceSchema, tableName);
+
+    // Test 1: No privileges - should fail
+    Assertions.assertThrowsExactly(
+        ForbiddenException.class,
+        () ->
+            sql(
+                "ALTER TABLE %s.%s RENAME TO %s.%s",
+                sourceSchema, tableName, destSchema, tableName + "_renamed1"));
+
+    // Test 2: Only MODIFY_TABLE on source (no CREATE_TABLE on dest) - should 
fail
+    String modifyTableRole = grantModifyTableRole(tableName);
+    Assertions.assertThrowsExactly(
+        ForbiddenException.class,
+        () ->
+            sql(
+                "ALTER TABLE %s.%s RENAME TO %s.%s",
+                sourceSchema, tableName, destSchema, tableName + "_renamed2"));
+    revokeRole(modifyTableRole);
+
+    // Test 3: Only CREATE_TABLE on dest (no ownership on source) - should fail
+    String createTableRole = grantCreateTableRole(destSchema);
+    Assertions.assertThrowsExactly(
+        ForbiddenException.class,
+        () ->
+            sql(
+                "ALTER TABLE %s.%s RENAME TO %s.%s",
+                sourceSchema, tableName, destSchema, tableName + "_renamed3"));
+    revokeRole(createTableRole);
+
+    // Test 4: Table ownership + CREATE_TABLE on dest - should succeed
+    setTableOwner(tableName);
+    createTableRole = grantCreateTableRole(destSchema);
+    Assertions.assertDoesNotThrow(
+        () ->
+            sql(
+                "ALTER TABLE %s.%s RENAME TO %s.%s",
+                sourceSchema, tableName, destSchema, tableName + "_renamed4"));
+
+    // Verify table exists in destination schema
+    boolean existsInDest =
+        catalogClientWithAllPrivilege
+            .asTableCatalog()
+            .tableExists(NameIdentifier.of(destSchema, tableName + 
"_renamed4"));
+    Assertions.assertTrue(existsInDest);
+
+    // Verify table no longer exists in source schema
+    boolean existsInSource =
+        catalogClientWithAllPrivilege
+            .asTableCatalog()
+            .tableExists(NameIdentifier.of(sourceSchema, tableName));
+    Assertions.assertFalse(existsInSource);
+
+    // Verify ownership is retained (NORMAL_USER was already the owner before 
rename)
+    Optional<Owner> owner =
+        metalakeClientWithAllPrivilege.getOwner(
+            MetadataObjects.of(
+                Arrays.asList(GRAVITINO_CATALOG_NAME, destSchema, tableName + 
"_renamed4"),
+                MetadataObject.Type.TABLE));
+    Assertions.assertTrue(owner.isPresent());
     Assertions.assertEquals(NORMAL_USER, owner.get().name());
+
+    // Clean up destination schema
+    revokeRole(createTableRole);
+    clearTable(destSchema);
+    catalogClientWithAllPrivilege.asSchemas().dropSchema(destSchema, false);
+  }
+
+  @Test
+  public void testRenameTableToDifferentNamespaceWithOwnership() {
+    String sourceSchema = SCHEMA_NAME;
+    String destSchema = SCHEMA_NAME + "_dest2";
+    String tableName = "test_owner_rename";
+
+    // Create destination schema
+    catalogClientWithAllPrivilege
+        .asSchemas()
+        .createSchema(destSchema, "dest schema for ownership test", new 
HashMap<>());
+    grantUseSchemaRole(destSchema);
+
+    // Test 1: Source schema owner + CREATE_TABLE on dest - should succeed
+    createTable(sourceSchema, tableName + "_1");
+    setSchemaOwner(NORMAL_USER); // NORMAL_USER owns source schema
+    String createTableRole = grantCreateTableRole(destSchema);
+
+    Assertions.assertDoesNotThrow(
+        () ->
+            sql(
+                "ALTER TABLE %s.%s RENAME TO %s.%s",
+                sourceSchema, tableName + "_1", destSchema, tableName + 
"_1_renamed"));
+
+    revokeRole(createTableRole);
+    setSchemaOwner(SUPER_USER); // Reset source schema owner
+
+    // Test 2: Table owner + destination schema owner - should succeed
+    createTable(sourceSchema, tableName + "_2");
+    setTableOwner(tableName + "_2");
+    setSchemaOwner(destSchema, NORMAL_USER); // NORMAL_USER owns dest schema
+
+    Assertions.assertDoesNotThrow(
+        () ->
+            sql(
+                "ALTER TABLE %s.%s RENAME TO %s.%s",
+                sourceSchema, tableName + "_2", destSchema, tableName + 
"_2_renamed"));
+
+    setSchemaOwner(destSchema, SUPER_USER); // Reset dest schema owner
+
+    // Test 3: Both source and dest schema owner - should succeed
+    createTable(sourceSchema, tableName + "_3");
+    setSchemaOwner(NORMAL_USER); // Source schema owner
+    setSchemaOwner(destSchema, NORMAL_USER); // Dest schema owner
+
+    Assertions.assertDoesNotThrow(
+        () ->
+            sql(
+                "ALTER TABLE %s.%s RENAME TO %s.%s",
+                sourceSchema, tableName + "_3", destSchema, tableName + 
"_3_renamed"));
+
+    // Test 4: Only source schema owner (no dest privileges) - should fail
+    setSchemaOwner(destSchema, SUPER_USER); // Reset dest schema owner
+    createTable(sourceSchema, tableName + "_4");
+
+    Assertions.assertThrowsExactly(
+        ForbiddenException.class,
+        () ->
+            sql(
+                "ALTER TABLE %s.%s RENAME TO %s.%s",
+                sourceSchema, tableName + "_4", destSchema, tableName + 
"_4_renamed"));
+
+    // Test 5: Only dest schema owner (no source table ownership) - should fail
+    setSchemaOwner(SUPER_USER); // Reset source schema owner
+    setSchemaOwner(destSchema, NORMAL_USER);
+
+    Assertions.assertThrowsExactly(
+        ForbiddenException.class,
+        () ->
+            sql(
+                "ALTER TABLE %s.%s RENAME TO %s.%s",
+                sourceSchema, tableName + "_4", destSchema, tableName + 
"_4_renamed2"));
+
+    // Test 6: Catalog owner - should succeed
+    setSchemaOwner(destSchema, SUPER_USER); // Reset dest schema owner
+    createTable(sourceSchema, tableName + "_5");
+    setCatalogOwner(NORMAL_USER);
+
+    Assertions.assertDoesNotThrow(
+        () ->
+            sql(
+                "ALTER TABLE %s.%s RENAME TO %s.%s",
+                sourceSchema, tableName + "_5", destSchema, tableName + 
"_5_renamed"));
+
+    setCatalogOwner(SUPER_USER); // Reset catalog owner
+
+    // Clean up
+    setSchemaOwner(SUPER_USER);
+    clearTable(destSchema);
+    catalogClientWithAllPrivilege.asSchemas().dropSchema(destSchema, false);
   }
 
   @Test
@@ -444,23 +641,43 @@ public class IcebergTableAuthorizationIT extends 
IcebergAuthorizationIT {
   }
 
   private void setSchemaOwner(String userName) {
+    setSchemaOwner(SCHEMA_NAME, userName);
+  }
+
+  private void setSchemaOwner(String schemaName, String userName) {
     MetadataObject schemaMetadataObject =
         MetadataObjects.of(
-            Arrays.asList(GRAVITINO_CATALOG_NAME, SCHEMA_NAME), 
MetadataObject.Type.SCHEMA);
+            Arrays.asList(GRAVITINO_CATALOG_NAME, schemaName), 
MetadataObject.Type.SCHEMA);
     metalakeClientWithAllPrivilege.setOwner(schemaMetadataObject, userName, 
Owner.Type.USER);
   }
 
+  private void setCatalogOwner(String userName) {
+    MetadataObject catalogMetadataObject =
+        MetadataObjects.of(Arrays.asList(GRAVITINO_CATALOG_NAME), 
MetadataObject.Type.CATALOG);
+    metalakeClientWithAllPrivilege.setOwner(catalogMetadataObject, userName, 
Owner.Type.USER);
+  }
+
+  private void setMetalakeOwner(String userName) {
+    MetadataObject metalakeMetadataObject =
+        MetadataObjects.of(Arrays.asList(METALAKE_NAME), 
MetadataObject.Type.METALAKE);
+    metalakeClientWithAllPrivilege.setOwner(metalakeMetadataObject, userName, 
Owner.Type.USER);
+  }
+
   private void clearTable() {
+    clearTable(SCHEMA_NAME);
+  }
+
+  private void clearTable(String schemaName) {
     Arrays.stream(
-            
catalogClientWithAllPrivilege.asTableCatalog().listTables(Namespace.of(SCHEMA_NAME)))
+            
catalogClientWithAllPrivilege.asTableCatalog().listTables(Namespace.of(schemaName)))
         .forEach(
             table -> {
               catalogClientWithAllPrivilege
                   .asTableCatalog()
-                  .dropTable(NameIdentifier.of(SCHEMA_NAME, table.name()));
+                  .dropTable(NameIdentifier.of(schemaName, table.name()));
             });
     NameIdentifier[] nameIdentifiers =
-        
catalogClientWithAllPrivilege.asTableCatalog().listTables(Namespace.of(SCHEMA_NAME));
+        
catalogClientWithAllPrivilege.asTableCatalog().listTables(Namespace.of(schemaName));
     Assertions.assertEquals(0, nameIdentifiers.length);
   }
 }


Reply via email to