This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 17928301ce [#9220] feat(iceberg): Add authorization for
cross-namespace table renames (#9219)
17928301ce is described below
commit 17928301ce11e852be8add455979847164afe657
Author: Bharath Krishna <[email protected]>
AuthorDate: Sun Nov 23 08:54:14 2025 -0800
[#9220] feat(iceberg): Add authorization for cross-namespace table renames
(#9219)
### What changes were proposed in this pull request?
- Enhanced `IcebergMetadataAuthorizationMethodInterceptor` to validate
destination schema privileges for rename operations across different
namespaces
- Added dual authorization check requiring both source table ownership
and destination CREATE_TABLE privileges
### Why are the changes needed?
The current rename table operation only checks privileges for the source
table, not the destination table. This creates a security gap where
users can rename tables to schemas they shouldn't have access to.
Fix: #9220
### Does this PR introduce _any_ user-facing change?
Yes, rename table operations to different schemas now require
CREATE_TABLE privilege on the destination schema in addition to existing
source table ownership requirements.
### How was this patch tested?
- Added integration test covering three scenarios:
1. No privileges (should fail)
2. Source table ownership only (should fail)
3. Both source ownership and destination CREATE_TABLE privilege (should
succeed)
- Verified existing same-schema rename functionality remains unchanged
- All existing authorization tests continue to pass
---
.../apache/gravitino/hook/TableHookDispatcher.java | 1 +
.../dispatcher/IcebergTableHookDispatcher.java | 2 +
.../service/rest/IcebergTableRenameOperations.java | 2 +-
...BaseMetadataAuthorizationMethodInterceptor.java | 104 ++++++++--
...bergMetadataAuthorizationMethodInterceptor.java | 73 +++----
.../server/web/filter/LoadTableAuthzHandler.java | 124 +++++++++++
.../server/web/filter/RenameTableAuthzHandler.java | 184 ++++++++++++++++
.../test/IcebergTableAuthorizationIT.java | 231 ++++++++++++++++++++-
8 files changed, 651 insertions(+), 70 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java
b/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java
index 5afa9ac533..55e9ad40a2 100644
--- a/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java
@@ -75,6 +75,7 @@ public class TableHookDispatcher implements TableDispatcher {
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
// Check whether the current user exists or not
+ // TODO(bharos): Move this to PassThroughAuthorizer
AuthorizationUtils.checkCurrentUser(
ident.namespace().level(0), PrincipalUtils.getCurrentUserName());
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
index 388205ffd0..f96c38e6c2 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
@@ -116,6 +116,8 @@ public class IcebergTableHookDispatcher implements
IcebergTableOperationDispatch
@Override
public void renameTable(IcebergRequestContext context, RenameTableRequest
renameTableRequest) {
+ AuthorizationUtils.checkCurrentUser(metalake, context.userName());
+
dispatcher.renameTable(context, renameTableRequest);
NameIdentifier tableSource =
IcebergIdentifierUtils.toGravitinoTableIdentifier(
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableRenameOperations.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableRenameOperations.java
index 09766912ac..09c5f6e72a 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableRenameOperations.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableRenameOperations.java
@@ -70,7 +70,7 @@ public class IcebergTableRenameOperations {
expression =
"ANY(OWNER, METALAKE, CATALOG) || "
+ "SCHEMA_OWNER_WITH_USE_CATALOG || "
- + "ANY_USE_CATALOG && ANY_USE_SCHEMA && (TABLE::OWNER ||
ANY_MODIFY_TABLE)",
+ + "ANY_USE_CATALOG && ANY_USE_SCHEMA && (TABLE::OWNER ||
ANY_MODIFY_TABLE)",
accessMetadataType = MetadataObject.Type.TABLE)
public Response renameTable(
@AuthorizationMetadata(type = Entity.EntityType.CATALOG)
@PathParam("prefix") String prefix,
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/BaseMetadataAuthorizationMethodInterceptor.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/BaseMetadataAuthorizationMethodInterceptor.java
index d24e950391..7dc4931cf1 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/BaseMetadataAuthorizationMethodInterceptor.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/BaseMetadataAuthorizationMethodInterceptor.java
@@ -22,6 +22,7 @@ package org.apache.gravitino.server.web.filter;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.util.Map;
+import java.util.Optional;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.gravitino.Entity;
@@ -31,6 +32,7 @@ import
org.apache.gravitino.authorization.AuthorizationRequestContext;
import org.apache.gravitino.iceberg.service.IcebergExceptionMapper;
import
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
import
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionEvaluator;
+import org.apache.gravitino.server.web.Utils;
import org.apache.gravitino.utils.PrincipalUtils;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.slf4j.Logger;
@@ -45,9 +47,57 @@ public abstract class
BaseMetadataAuthorizationMethodInterceptor implements Meth
private static final Logger LOG =
LoggerFactory.getLogger(BaseMetadataAuthorizationMethodInterceptor.class);
+ /**
+ * Handler for request-specific authorization processing that cannot be
handled by standard
+ * annotation-based expressions. Implementations can enrich identifiers,
validate requests, and/or
+ * perform custom authorization.
+ */
+ protected interface AuthorizationHandler {
+ /**
+ * Process the request for authorization purposes. This may include:
+ *
+ * <ul>
+ * <li>Extracting additional identifiers from request bodies
+ * <li>Validating request parameters
+ * <li>Performing custom authorization logic
+ * </ul>
+ *
+ * @param nameIdentifierMap Name identifier map (can be modified to add
identifiers)
+ * @throws ForbiddenException if authorization or validation fails
+ */
+ void process(Map<Entity.EntityType, NameIdentifier> nameIdentifierMap)
+ throws ForbiddenException;
+
+ /**
+ * Whether this handler has completed full authorization. Called after
{@link #process} to
+ * determine if standard expression-based authorization should be skipped.
+ *
+ * @return true if authorization is complete (skip standard check), false
to continue with
+ * standard expression-based authorization
+ */
+ boolean authorizationCompleted();
+ }
+
protected abstract Map<Entity.EntityType, NameIdentifier>
extractNameIdentifierFromParameters(
Parameter[] parameters, Object[] args);
+ /**
+ * Create an authorization handler for this request, if special handling is
needed beyond standard
+ * annotation-based authorization.
+ *
+ * <p>Override this method to provide custom handlers based on request
characteristics (e.g.,
+ * annotations, request types, parameters).
+ *
+ * @param parameters Method parameters
+ * @param args Method arguments
+ * @return Optional handler for custom authorization processing, or empty if
standard
+ * authorization is sufficient
+ */
+ protected Optional<AuthorizationHandler> createAuthorizationHandler(
+ Parameter[] parameters, Object[] args) {
+ return Optional.empty();
+ }
+
protected boolean isExceptionPropagate(Exception e) {
return false;
}
@@ -70,25 +120,43 @@ public abstract class
BaseMetadataAuthorizationMethodInterceptor implements Meth
if (expressionAnnotation != null) {
String expression = expressionAnnotation.expression();
Object[] args = methodInvocation.getArguments();
- Map<Entity.EntityType, NameIdentifier> metadataContext =
+ Map<Entity.EntityType, NameIdentifier> nameIdentifierMap =
extractNameIdentifierFromParameters(parameters, args);
- AuthorizationExpressionEvaluator authorizationExpressionEvaluator =
- new AuthorizationExpressionEvaluator(expression);
- boolean authorizeResult =
- authorizationExpressionEvaluator.evaluate(
- metadataContext, new AuthorizationRequestContext());
- if (!authorizeResult) {
- MetadataObject.Type type = expressionAnnotation.accessMetadataType();
- NameIdentifier accessMetadataName =
- metadataContext.get(Entity.EntityType.valueOf(type.name()));
- String currentUser = PrincipalUtils.getCurrentUserName();
- String methodName = method.getName();
- String notAuthzMessage =
- String.format(
- "User '%s' is not authorized to perform operation '%s' on
metadata '%s' with expression '%s'",
- currentUser, methodName, accessMetadataName, expression);
- LOG.info(notAuthzMessage);
- return IcebergExceptionMapper.toRESTResponse(new
ForbiddenException(notAuthzMessage));
+
+ // Process custom authorization if handler exists
+ Optional<AuthorizationHandler> handler =
createAuthorizationHandler(parameters, args);
+ boolean skipStandardCheck = false;
+
+ if (handler.isPresent()) {
+ AuthorizationHandler authzHandler = handler.get();
+ authzHandler.process(nameIdentifierMap);
+ skipStandardCheck = authzHandler.authorizationCompleted();
+ }
+
+ // Perform standard authorization check if custom handler didn't
complete it
+ if (!skipStandardCheck) {
+ Map<String, Object> pathParams =
Utils.extractPathParamsFromParameters(parameters, args);
+ AuthorizationExpressionEvaluator authorizationExpressionEvaluator =
+ new AuthorizationExpressionEvaluator(expression);
+ boolean authorizeResult =
+ authorizationExpressionEvaluator.evaluate(
+ nameIdentifierMap,
+ pathParams,
+ new AuthorizationRequestContext(),
+ Optional.empty());
+ if (!authorizeResult) {
+ MetadataObject.Type type =
expressionAnnotation.accessMetadataType();
+ NameIdentifier accessMetadataName =
+ nameIdentifierMap.get(Entity.EntityType.valueOf(type.name()));
+ String currentUser = PrincipalUtils.getCurrentUserName();
+ String methodName = method.getName();
+ String notAuthzMessage =
+ String.format(
+ "User '%s' is not authorized to perform operation '%s' on
metadata '%s' with expression '%s'",
+ currentUser, methodName, accessMetadataName, expression);
+ LOG.info(notAuthzMessage);
+ return IcebergExceptionMapper.toRESTResponse(new
ForbiddenException(notAuthzMessage));
+ }
}
}
} catch (Exception ex) {
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/IcebergMetadataAuthorizationMethodInterceptor.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/IcebergMetadataAuthorizationMethodInterceptor.java
index 4092e339b7..f5d9c997d2 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/IcebergMetadataAuthorizationMethodInterceptor.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/IcebergMetadataAuthorizationMethodInterceptor.java
@@ -22,6 +22,7 @@ package org.apache.gravitino.server.web.filter;
import java.lang.reflect.Parameter;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import org.apache.gravitino.Entity;
import org.apache.gravitino.Entity.EntityType;
import org.apache.gravitino.NameIdentifier;
@@ -29,12 +30,10 @@ import
org.apache.gravitino.iceberg.service.IcebergRESTUtils;
import
org.apache.gravitino.iceberg.service.authorization.IcebergRESTServerContext;
import
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
import
org.apache.gravitino.server.authorization.annotations.IcebergAuthorizationMetadata;
+import
org.apache.gravitino.server.authorization.annotations.IcebergAuthorizationMetadata.RequestType;
import org.apache.gravitino.utils.NameIdentifierUtil;
-import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.catalog.Namespace;
-import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.rest.RESTUtil;
-import org.apache.iceberg.rest.requests.RenameTableRequest;
/**
* Through dynamic proxy, obtain the annotations on the method and parameter
list to perform
@@ -42,6 +41,7 @@ import org.apache.iceberg.rest.requests.RenameTableRequest;
*/
public class IcebergMetadataAuthorizationMethodInterceptor
extends BaseMetadataAuthorizationMethodInterceptor {
+
private final String metalakeName =
IcebergRESTServerContext.getInstance().metalakeName();
@Override
@@ -57,34 +57,6 @@ public class IcebergMetadataAuthorizationMethodInterceptor
Parameter parameter = parameters[i];
AuthorizationMetadata authorizeResource =
parameter.getAnnotation(AuthorizationMetadata.class);
- IcebergAuthorizationMetadata icebergAuthorizeResource =
- parameter.getAnnotation(IcebergAuthorizationMetadata.class);
- if (icebergAuthorizeResource != null) {
- switch (icebergAuthorizeResource.type()) {
- case RENAME_TABLE:
- RenameTableRequest renameTableRequest = (RenameTableRequest)
args[i];
- schema = renameTableRequest.source().namespace().level(0);
- nameIdentifierMap.put(
- Entity.EntityType.SCHEMA,
- NameIdentifierUtil.ofSchema(metalakeName, catalog, schema));
- nameIdentifierMap.put(
- Entity.EntityType.TABLE,
- NameIdentifierUtil.ofTable(
- metalakeName, catalog, schema,
renameTableRequest.source().name()));
- break;
- case LOAD_TABLE:
- String tableName = String.valueOf(args[i]);
- if (isMetadataTable(tableName, rawNamespace)) {
- throw new NoSuchTableException("Table %s not found", tableName);
- }
- nameIdentifierMap.put(
- Entity.EntityType.TABLE,
- NameIdentifierUtil.ofTable(metalakeName, catalog, schema,
tableName));
- break;
- default:
- break;
- }
- }
if (authorizeResource == null) {
continue;
}
@@ -113,21 +85,34 @@ public class IcebergMetadataAuthorizationMethodInterceptor
return nameIdentifierMap;
}
+ /**
+ * Creates an authorization handler for Iceberg-specific operations that
require custom logic
+ * beyond standard annotation-based authorization.
+ */
@Override
- protected boolean isExceptionPropagate(Exception e) {
- return e.getClass().getName().startsWith("org.apache.iceberg.exceptions");
- }
-
- // Check if the table is a metadata table, for metadata table, the namespace
is `catalog.db.table`
- private boolean isMetadataTable(String tableName, Namespace namespace) {
- MetadataTableType metadataTableType = MetadataTableType.from(tableName);
- if (metadataTableType == null) {
- return false;
+ protected Optional<AuthorizationHandler> createAuthorizationHandler(
+ Parameter[] parameters, Object[] args) {
+ for (Parameter parameter : parameters) {
+ IcebergAuthorizationMetadata icebergMetadata =
+ parameter.getAnnotation(IcebergAuthorizationMetadata.class);
+ if (icebergMetadata != null) {
+ // Create handler with parameters and args for processing
+ RequestType type = icebergMetadata.type();
+ switch (type) {
+ case LOAD_TABLE:
+ return Optional.of(new LoadTableAuthzHandler(parameters, args));
+ case RENAME_TABLE:
+ return Optional.of(new RenameTableAuthzHandler(parameters, args));
+ default:
+ break;
+ }
+ }
}
+ return Optional.empty();
+ }
- if (namespace.levels().length > 1) {
- return true;
- }
- return false;
+ @Override
+ protected boolean isExceptionPropagate(Exception e) {
+ return e.getClass().getName().startsWith("org.apache.iceberg.exceptions");
}
}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/LoadTableAuthzHandler.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/LoadTableAuthzHandler.java
new file mode 100644
index 0000000000..c4814b533e
--- /dev/null
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/LoadTableAuthzHandler.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.server.web.filter;
+
+import java.lang.reflect.Parameter;
+import java.util.Map;
+import org.apache.gravitino.Entity.EntityType;
+import org.apache.gravitino.NameIdentifier;
+import
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
+import
org.apache.gravitino.server.authorization.annotations.IcebergAuthorizationMetadata;
+import
org.apache.gravitino.server.authorization.annotations.IcebergAuthorizationMetadata.RequestType;
+import
org.apache.gravitino.server.web.filter.BaseMetadataAuthorizationMethodInterceptor.AuthorizationHandler;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.rest.RESTUtil;
+
+/**
+ * Handler for LOAD_TABLE operations. Validates that the requested table is
not a metadata table
+ * (e.g., table$snapshots) and extracts the table identifier for authorization
checks.
+ */
+public class LoadTableAuthzHandler implements AuthorizationHandler {
+ private final Parameter[] parameters;
+ private final Object[] args;
+
+ public LoadTableAuthzHandler(Parameter[] parameters, Object[] args) {
+ this.parameters = parameters;
+ this.args = args;
+ }
+
+ @Override
+ public void process(Map<EntityType, NameIdentifier> nameIdentifierMap) {
+ // Find the table name and namespace from parameters
+ String tableName = null;
+ Namespace namespace = null;
+
+ for (int i = 0; i < parameters.length; i++) {
+ Parameter parameter = parameters[i];
+
+ IcebergAuthorizationMetadata icebergMetadata =
+ parameter.getAnnotation(IcebergAuthorizationMetadata.class);
+ if (icebergMetadata != null && icebergMetadata.type() ==
RequestType.LOAD_TABLE) {
+ tableName = String.valueOf(args[i]);
+ }
+
+ AuthorizationMetadata authMetadata =
parameter.getAnnotation(AuthorizationMetadata.class);
+ if (authMetadata != null && authMetadata.type() == EntityType.SCHEMA) {
+ // Decode the raw Iceberg namespace parameter
+ namespace = RESTUtil.decodeNamespace(String.valueOf(args[i]));
+ }
+ }
+
+ if (tableName == null || namespace == null) {
+ throw new NoSuchTableException("Table not found - missing table name or
namespace");
+ }
+
+ // Validate that this is not a metadata table access
+ if (isMetadataTable(tableName, namespace)) {
+ throw new NoSuchTableException("Table %s not found", tableName);
+ }
+
+ NameIdentifier catalogId = nameIdentifierMap.get(EntityType.CATALOG);
+ NameIdentifier schemaId = nameIdentifierMap.get(EntityType.SCHEMA);
+
+ if (catalogId == null || schemaId == null) {
+ throw new NoSuchTableException("Missing catalog or schema context for
table authorization");
+ }
+
+ String metalakeName = catalogId.namespace().level(0);
+ String catalog = catalogId.name();
+ String schema = schemaId.name();
+
+ // Add table identifier to the map for authorization expression evaluation
+ nameIdentifierMap.put(
+ EntityType.TABLE, NameIdentifierUtil.ofTable(metalakeName, catalog,
schema, tableName));
+ }
+
+ @Override
+ public boolean authorizationCompleted() {
+ // This handler only enriches identifiers, doesn't perform full
authorization
+ return false;
+ }
+
+ /**
+ * Check if the table is a metadata table. Metadata tables have special
names like
+ * `table$snapshots`, `table$files`, etc., and are accessed with longer
namespace paths
+ * (catalog.db.table instead of catalog.db).
+ *
+ * @param tableName The table name to check
+ * @param namespace The Iceberg namespace of the table
+ * @return true if this is a metadata table access, false otherwise
+ */
+ private boolean isMetadataTable(String tableName, Namespace namespace) {
+ MetadataTableType metadataTableType = MetadataTableType.from(tableName);
+ if (metadataTableType == null) {
+ return false;
+ }
+
+ // Metadata tables have namespace length > 1 (e.g., catalog.db.table has 3
levels)
+ // Regular tables have namespace length = 1 (e.g., catalog.db has 2
levels, but we get "db")
+ if (namespace.levels().length > 1) {
+ return true;
+ }
+ return false;
+ }
+}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/RenameTableAuthzHandler.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/RenameTableAuthzHandler.java
new file mode 100644
index 0000000000..f839858b57
--- /dev/null
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/RenameTableAuthzHandler.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.server.web.filter;
+
+import java.lang.reflect.Parameter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.Entity.EntityType;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.authorization.AuthorizationRequestContext;
+import
org.apache.gravitino.server.authorization.annotations.IcebergAuthorizationMetadata;
+import
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionEvaluator;
+import
org.apache.gravitino.server.web.filter.BaseMetadataAuthorizationMethodInterceptor.AuthorizationHandler;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.iceberg.exceptions.ForbiddenException;
+import org.apache.iceberg.rest.requests.RenameTableRequest;
+
+/**
+ * Handler for RENAME_TABLE operations. Performs authorization checks for
cross-namespace renames
+ * which require stricter checks than same-namespace renames, following MySQL
privilege model.
+ */
+@SuppressWarnings("FormatStringAnnotation")
+public class RenameTableAuthzHandler implements AuthorizationHandler {
+ private final Parameter[] parameters;
+ private final Object[] args;
+ private boolean crossNamespaceRename = false;
+
+ public RenameTableAuthzHandler(Parameter[] parameters, Object[] args) {
+ this.parameters = parameters;
+ this.args = args;
+ }
+
+ @Override
+ public void process(Map<EntityType, NameIdentifier> nameIdentifierMap) {
+ RenameTableRequest renameTableRequest = null;
+ for (int i = 0; i < parameters.length; i++) {
+ IcebergAuthorizationMetadata metadata =
+ parameters[i].getAnnotation(IcebergAuthorizationMetadata.class);
+ if (metadata != null
+ && metadata.type() ==
IcebergAuthorizationMetadata.RequestType.RENAME_TABLE) {
+ renameTableRequest = (RenameTableRequest) args[i];
+ break;
+ }
+ }
+
+ if (renameTableRequest == null) {
+ throw new ForbiddenException("RenameTableRequest not found in
parameters");
+ }
+
+ // Extract metalake and catalog from nameIdentifierMap
+ NameIdentifier metalakeIdent = nameIdentifierMap.get(EntityType.METALAKE);
+ NameIdentifier catalogIdent = nameIdentifierMap.get(EntityType.CATALOG);
+
+ if (metalakeIdent == null || catalogIdent == null) {
+ throw new ForbiddenException("Missing metalake or catalog context for
authorization");
+ }
+
+ String metalakeName = metalakeIdent.name();
+ String catalog = catalogIdent.name();
+
+ // Extract source table information from the request and add to map
+ // The source table is NOT extracted via standard @AuthorizationMetadata
annotations
+ // because it's embedded in the RenameTableRequest body
+ String sourceSchema = renameTableRequest.source().namespace().level(0);
+ String sourceTable = renameTableRequest.source().name();
+
+ nameIdentifierMap.put(
+ EntityType.SCHEMA, NameIdentifierUtil.ofSchema(metalakeName, catalog,
sourceSchema));
+ nameIdentifierMap.put(
+ EntityType.TABLE,
+ NameIdentifierUtil.ofTable(metalakeName, catalog, sourceSchema,
sourceTable));
+
+ String destSchema = renameTableRequest.destination().namespace().level(0);
+ if (!sourceSchema.equals(destSchema)) {
+ // Cross-namespace rename - perform complete authorization here
+ crossNamespaceRename = true;
+ validateCrossNamespaceRename(catalog, metalakeName, sourceSchema,
sourceTable, destSchema);
+ }
+ }
+
+ @Override
+ public boolean authorizationCompleted() {
+ // Return true if we performed complete authorization (cross-namespace
case)
+ return crossNamespaceRename;
+ }
+
+ /**
+ * Validates authorization for cross-namespace renames following MySQL
privilege model: - Requires
+ * ownership on source table (equivalent to DROP privilege) - Requires
CREATE_TABLE privilege on
+ * destination schema
+ *
+ * @param catalog The catalog name
+ * @param metalakeName The metalake name
+ * @param sourceSchema The source schema name
+ * @param sourceTable The source table name
+ * @param destSchema The destination schema name
+ * @throws ForbiddenException if the user lacks required privileges
+ */
+ private void validateCrossNamespaceRename(
+ String catalog,
+ String metalakeName,
+ String sourceSchema,
+ String sourceTable,
+ String destSchema) {
+ String currentUser = PrincipalUtils.getCurrentUserName();
+ Map<EntityType, NameIdentifier> sourceContext = new HashMap<>();
+ sourceContext.put(EntityType.METALAKE,
NameIdentifierUtil.ofMetalake(metalakeName));
+ sourceContext.put(EntityType.CATALOG,
NameIdentifierUtil.ofCatalog(metalakeName, catalog));
+ sourceContext.put(
+ EntityType.SCHEMA, NameIdentifierUtil.ofSchema(metalakeName, catalog,
sourceSchema));
+ sourceContext.put(
+ EntityType.TABLE,
+ NameIdentifierUtil.ofTable(metalakeName, catalog, sourceSchema,
sourceTable));
+
+ String sourceExpression =
+ "ANY(OWNER, METALAKE, CATALOG) || "
+ + "SCHEMA_OWNER_WITH_USE_CATALOG || "
+ + "ANY_USE_CATALOG && ANY_USE_SCHEMA && TABLE::OWNER";
+
+ AuthorizationExpressionEvaluator sourceEvaluator =
+ new AuthorizationExpressionEvaluator(sourceExpression);
+
+ boolean sourceAuthorized =
+ sourceEvaluator.evaluate(
+ sourceContext, new HashMap<>(), new AuthorizationRequestContext(),
Optional.empty());
+
+ if (!sourceAuthorized) {
+ String notAuthzMessage =
+ String.format(
+ "User '%s' is not authorized to drop/move table '%s' from schema
'%s'. "
+ + "Only the table owner can move a table to a different
schema.",
+ currentUser, sourceTable, sourceSchema);
+ throw new ForbiddenException(notAuthzMessage);
+ }
+
+ // Check CREATE_TABLE privilege on destination schema
+ // Note: The destination schema is NOT in the nameIdentifierMap, so the
standard expression
+ // never checked it. We need to perform a full authorization check here.
+ Map<EntityType, NameIdentifier> destContext = new HashMap<>();
+ destContext.put(EntityType.METALAKE,
NameIdentifierUtil.ofMetalake(metalakeName));
+ destContext.put(EntityType.CATALOG,
NameIdentifierUtil.ofCatalog(metalakeName, catalog));
+ destContext.put(
+ EntityType.SCHEMA, NameIdentifierUtil.ofSchema(metalakeName, catalog,
destSchema));
+
+ String destExpression =
+ "ANY(OWNER, METALAKE, CATALOG) || "
+ + "SCHEMA_OWNER_WITH_USE_CATALOG || "
+ + "ANY_USE_CATALOG && ANY_USE_SCHEMA && ANY_CREATE_TABLE";
+
+ AuthorizationExpressionEvaluator destEvaluator =
+ new AuthorizationExpressionEvaluator(destExpression);
+
+ boolean destAuthorized =
+ destEvaluator.evaluate(
+ destContext, new HashMap<>(), new AuthorizationRequestContext(),
Optional.empty());
+
+ if (!destAuthorized) {
+ String notAuthzMessage =
+ String.format(
+ "User '%s' is not authorized to create table in destination
schema '%s'",
+ currentUser, destSchema);
+ throw new ForbiddenException(notAuthzMessage);
+ }
+ }
+}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergTableAuthorizationIT.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergTableAuthorizationIT.java
index d5a760f8bb..f0850db376 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergTableAuthorizationIT.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergTableAuthorizationIT.java
@@ -271,24 +271,221 @@ public class IcebergTableAuthorizationIT extends
IcebergAuthorizationIT {
}
@Test
- public void testRenameTable() {
- String tableName = "test_rename";
+ public void testRenameTableSameNamespace() {
+ String tableName = "test_rename_same_ns";
createTable(SCHEMA_NAME, tableName);
+
+ // No privileges - should fail
Assertions.assertThrowsExactly(
ForbiddenException.class,
() -> sql("ALTER TABLE %s RENAME TO %s", tableName, tableName +
"_renamed"));
- setTableOwner(tableName);
+ // Test with MODIFY_TABLE privilege
+ String modifyTableRole = grantModifyTableRole(tableName);
Assertions.assertDoesNotThrow(
() -> sql("ALTER TABLE %s RENAME TO %s", tableName, tableName +
"_renamed"));
+ // Verify ownership remains with original owner (SUPER_USER created the
table)
Optional<Owner> owner =
metalakeClientWithAllPrivilege.getOwner(
MetadataObjects.of(
Arrays.asList(GRAVITINO_CATALOG_NAME, SCHEMA_NAME, tableName +
"_renamed"),
MetadataObject.Type.TABLE));
Assertions.assertTrue(owner.isPresent());
+ Assertions.assertEquals(SUPER_USER, owner.get().name());
+
+ // Clean up for next test
+ revokeRole(modifyTableRole);
+ catalogClientWithAllPrivilege
+ .asTableCatalog()
+ .dropTable(NameIdentifier.of(SCHEMA_NAME, tableName + "_renamed"));
+
+ // Test with table ownership (without MODIFY_TABLE)
+ createTable(SCHEMA_NAME, tableName);
+ setTableOwner(tableName);
+ Assertions.assertDoesNotThrow(
+ () -> sql("ALTER TABLE %s RENAME TO %s", tableName, tableName +
"_renamed2"));
+
+ // Verify ownership is retained
+ owner =
+ metalakeClientWithAllPrivilege.getOwner(
+ MetadataObjects.of(
+ Arrays.asList(GRAVITINO_CATALOG_NAME, SCHEMA_NAME, tableName +
"_renamed2"),
+ MetadataObject.Type.TABLE));
+ Assertions.assertTrue(owner.isPresent());
+ Assertions.assertEquals(NORMAL_USER, owner.get().name());
+ }
+
+ @Test
+ public void testRenameTableToDifferentNamespace() {
+ String sourceSchema = SCHEMA_NAME;
+ String destSchema = SCHEMA_NAME + "_dest";
+ String tableName = "test_cross_ns_rename";
+
+ // Create destination schema
+ catalogClientWithAllPrivilege
+ .asSchemas()
+ .createSchema(destSchema, "dest schema", new HashMap<>());
+ grantUseSchemaRole(destSchema);
+
+ // Create table in source schema
+ createTable(sourceSchema, tableName);
+
+ // Test 1: No privileges - should fail
+ Assertions.assertThrowsExactly(
+ ForbiddenException.class,
+ () ->
+ sql(
+ "ALTER TABLE %s.%s RENAME TO %s.%s",
+ sourceSchema, tableName, destSchema, tableName + "_renamed1"));
+
+ // Test 2: Only MODIFY_TABLE on source (no CREATE_TABLE on dest) - should
fail
+ String modifyTableRole = grantModifyTableRole(tableName);
+ Assertions.assertThrowsExactly(
+ ForbiddenException.class,
+ () ->
+ sql(
+ "ALTER TABLE %s.%s RENAME TO %s.%s",
+ sourceSchema, tableName, destSchema, tableName + "_renamed2"));
+ revokeRole(modifyTableRole);
+
+ // Test 3: Only CREATE_TABLE on dest (no ownership on source) - should fail
+ String createTableRole = grantCreateTableRole(destSchema);
+ Assertions.assertThrowsExactly(
+ ForbiddenException.class,
+ () ->
+ sql(
+ "ALTER TABLE %s.%s RENAME TO %s.%s",
+ sourceSchema, tableName, destSchema, tableName + "_renamed3"));
+ revokeRole(createTableRole);
+
+ // Test 4: Table ownership + CREATE_TABLE on dest - should succeed
+ setTableOwner(tableName);
+ createTableRole = grantCreateTableRole(destSchema);
+ Assertions.assertDoesNotThrow(
+ () ->
+ sql(
+ "ALTER TABLE %s.%s RENAME TO %s.%s",
+ sourceSchema, tableName, destSchema, tableName + "_renamed4"));
+
+ // Verify table exists in destination schema
+ boolean existsInDest =
+ catalogClientWithAllPrivilege
+ .asTableCatalog()
+ .tableExists(NameIdentifier.of(destSchema, tableName +
"_renamed4"));
+ Assertions.assertTrue(existsInDest);
+
+ // Verify table no longer exists in source schema
+ boolean existsInSource =
+ catalogClientWithAllPrivilege
+ .asTableCatalog()
+ .tableExists(NameIdentifier.of(sourceSchema, tableName));
+ Assertions.assertFalse(existsInSource);
+
+ // Verify ownership is retained (NORMAL_USER was already the owner before
rename)
+ Optional<Owner> owner =
+ metalakeClientWithAllPrivilege.getOwner(
+ MetadataObjects.of(
+ Arrays.asList(GRAVITINO_CATALOG_NAME, destSchema, tableName +
"_renamed4"),
+ MetadataObject.Type.TABLE));
+ Assertions.assertTrue(owner.isPresent());
Assertions.assertEquals(NORMAL_USER, owner.get().name());
+
+ // Clean up destination schema
+ revokeRole(createTableRole);
+ clearTable(destSchema);
+ catalogClientWithAllPrivilege.asSchemas().dropSchema(destSchema, false);
+ }
+
+ @Test
+ public void testRenameTableToDifferentNamespaceWithOwnership() {
+ String sourceSchema = SCHEMA_NAME;
+ String destSchema = SCHEMA_NAME + "_dest2";
+ String tableName = "test_owner_rename";
+
+ // Create destination schema
+ catalogClientWithAllPrivilege
+ .asSchemas()
+ .createSchema(destSchema, "dest schema for ownership test", new
HashMap<>());
+ grantUseSchemaRole(destSchema);
+
+ // Test 1: Source schema owner + CREATE_TABLE on dest - should succeed
+ createTable(sourceSchema, tableName + "_1");
+ setSchemaOwner(NORMAL_USER); // NORMAL_USER owns source schema
+ String createTableRole = grantCreateTableRole(destSchema);
+
+ Assertions.assertDoesNotThrow(
+ () ->
+ sql(
+ "ALTER TABLE %s.%s RENAME TO %s.%s",
+ sourceSchema, tableName + "_1", destSchema, tableName +
"_1_renamed"));
+
+ revokeRole(createTableRole);
+ setSchemaOwner(SUPER_USER); // Reset source schema owner
+
+ // Test 2: Table owner + destination schema owner - should succeed
+ createTable(sourceSchema, tableName + "_2");
+ setTableOwner(tableName + "_2");
+ setSchemaOwner(destSchema, NORMAL_USER); // NORMAL_USER owns dest schema
+
+ Assertions.assertDoesNotThrow(
+ () ->
+ sql(
+ "ALTER TABLE %s.%s RENAME TO %s.%s",
+ sourceSchema, tableName + "_2", destSchema, tableName +
"_2_renamed"));
+
+ setSchemaOwner(destSchema, SUPER_USER); // Reset dest schema owner
+
+ // Test 3: Both source and dest schema owner - should succeed
+ createTable(sourceSchema, tableName + "_3");
+ setSchemaOwner(NORMAL_USER); // Source schema owner
+ setSchemaOwner(destSchema, NORMAL_USER); // Dest schema owner
+
+ Assertions.assertDoesNotThrow(
+ () ->
+ sql(
+ "ALTER TABLE %s.%s RENAME TO %s.%s",
+ sourceSchema, tableName + "_3", destSchema, tableName +
"_3_renamed"));
+
+ // Test 4: Only source schema owner (no dest privileges) - should fail
+ setSchemaOwner(destSchema, SUPER_USER); // Reset dest schema owner
+ createTable(sourceSchema, tableName + "_4");
+
+ Assertions.assertThrowsExactly(
+ ForbiddenException.class,
+ () ->
+ sql(
+ "ALTER TABLE %s.%s RENAME TO %s.%s",
+ sourceSchema, tableName + "_4", destSchema, tableName +
"_4_renamed"));
+
+ // Test 5: Only dest schema owner (no source table ownership) - should fail
+ setSchemaOwner(SUPER_USER); // Reset source schema owner
+ setSchemaOwner(destSchema, NORMAL_USER);
+
+ Assertions.assertThrowsExactly(
+ ForbiddenException.class,
+ () ->
+ sql(
+ "ALTER TABLE %s.%s RENAME TO %s.%s",
+ sourceSchema, tableName + "_4", destSchema, tableName +
"_4_renamed2"));
+
+ // Test 6: Catalog owner - should succeed
+ setSchemaOwner(destSchema, SUPER_USER); // Reset dest schema owner
+ createTable(sourceSchema, tableName + "_5");
+ setCatalogOwner(NORMAL_USER);
+
+ Assertions.assertDoesNotThrow(
+ () ->
+ sql(
+ "ALTER TABLE %s.%s RENAME TO %s.%s",
+ sourceSchema, tableName + "_5", destSchema, tableName +
"_5_renamed"));
+
+ setCatalogOwner(SUPER_USER); // Reset catalog owner
+
+ // Clean up
+ setSchemaOwner(SUPER_USER);
+ clearTable(destSchema);
+ catalogClientWithAllPrivilege.asSchemas().dropSchema(destSchema, false);
}
@Test
@@ -444,23 +641,43 @@ public class IcebergTableAuthorizationIT extends
IcebergAuthorizationIT {
}
private void setSchemaOwner(String userName) {
+ setSchemaOwner(SCHEMA_NAME, userName);
+ }
+
+ private void setSchemaOwner(String schemaName, String userName) {
MetadataObject schemaMetadataObject =
MetadataObjects.of(
- Arrays.asList(GRAVITINO_CATALOG_NAME, SCHEMA_NAME),
MetadataObject.Type.SCHEMA);
+ Arrays.asList(GRAVITINO_CATALOG_NAME, schemaName),
MetadataObject.Type.SCHEMA);
metalakeClientWithAllPrivilege.setOwner(schemaMetadataObject, userName,
Owner.Type.USER);
}
+ private void setCatalogOwner(String userName) {
+ MetadataObject catalogMetadataObject =
+ MetadataObjects.of(Arrays.asList(GRAVITINO_CATALOG_NAME),
MetadataObject.Type.CATALOG);
+ metalakeClientWithAllPrivilege.setOwner(catalogMetadataObject, userName,
Owner.Type.USER);
+ }
+
+ private void setMetalakeOwner(String userName) {
+ MetadataObject metalakeMetadataObject =
+ MetadataObjects.of(Arrays.asList(METALAKE_NAME),
MetadataObject.Type.METALAKE);
+ metalakeClientWithAllPrivilege.setOwner(metalakeMetadataObject, userName,
Owner.Type.USER);
+ }
+
private void clearTable() {
+ clearTable(SCHEMA_NAME);
+ }
+
+ private void clearTable(String schemaName) {
Arrays.stream(
-
catalogClientWithAllPrivilege.asTableCatalog().listTables(Namespace.of(SCHEMA_NAME)))
+
catalogClientWithAllPrivilege.asTableCatalog().listTables(Namespace.of(schemaName)))
.forEach(
table -> {
catalogClientWithAllPrivilege
.asTableCatalog()
- .dropTable(NameIdentifier.of(SCHEMA_NAME, table.name()));
+ .dropTable(NameIdentifier.of(schemaName, table.name()));
});
NameIdentifier[] nameIdentifiers =
-
catalogClientWithAllPrivilege.asTableCatalog().listTables(Namespace.of(SCHEMA_NAME));
+
catalogClientWithAllPrivilege.asTableCatalog().listTables(Namespace.of(schemaName));
Assertions.assertEquals(0, nameIdentifiers.length);
}
}