This is an automated email from the ASF dual-hosted git repository. jshao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit fc8e2eb5eb8988acdab3bb6191c0a9314d2de34e Author: Kyle Lin <[email protected]> AuthorDate: Mon Jul 14 08:16:56 2025 +0800 [#7570] feat(authz): Support topic authorization (#7580) ### What changes were proposed in this pull request? Support topic authorization. ### Why are the changes needed? Fixes #7570 ### Does this PR introduce any user-facing change? No. ### How was this patch tested? `org.apache.gravitino.client.integration.test.authorization.TopicAuthorizationIT` --------- Co-authored-by: yangyang zhong <[email protected]> Co-authored-by: [email protected] <[email protected]> --- .../test/authorization/TopicAuthorizationIT.java | 240 +++++++++++++++++++++ .../server/authorization/MetadataFilterHelper.java | 21 ++ .../AuthorizationExpressionConverter.java | 15 +- .../authorization/TestMetadataFilterHelper.java | 25 +++ .../web/filter/GravitinoInterceptionService.java | 4 +- .../gravitino/server/web/rest/TopicOperations.java | 76 +++++-- .../server/web/rest/TestTopicOperations.java | 3 +- .../TestTopicAuthorizationExpression.java | 186 ++++++++++++++++ 8 files changed, 548 insertions(+), 22 deletions(-) diff --git a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/TopicAuthorizationIT.java b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/TopicAuthorizationIT.java new file mode 100644 index 0000000000..5d28f8a8bc --- /dev/null +++ b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/TopicAuthorizationIT.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.client.integration.test.authorization; + +import static org.junit.Assert.assertThrows; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.MetadataObject; +import org.apache.gravitino.MetadataObjects; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.authorization.Owner; +import org.apache.gravitino.authorization.Privileges; +import org.apache.gravitino.authorization.SecurableObject; +import org.apache.gravitino.authorization.SecurableObjects; +import org.apache.gravitino.client.GravitinoMetalake; +import org.apache.gravitino.integration.test.container.ContainerSuite; +import org.apache.gravitino.integration.test.container.KafkaContainer; +import org.apache.gravitino.messaging.Topic; +import org.apache.gravitino.messaging.TopicCatalog; +import org.apache.gravitino.messaging.TopicChange; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +@Tag("gravitino-docker-test") +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class TopicAuthorizationIT extends BaseRestApiAuthorizationIT { + + private static final String CATALOG = "catalog"; + private static final String SCHEMA = "default"; + private static final ContainerSuite containerSuite = ContainerSuite.getInstance(); + private static String kafkaBootstrapServers; + private static String role = "role"; + + @BeforeAll + public void startIntegrationTest() throws Exception { + containerSuite.startKafkaContainer(); + super.startIntegrationTest(); + kafkaBootstrapServers = + String.format( + "%s:%d", + containerSuite.getKafkaContainer().getContainerIpAddress(), + KafkaContainer.DEFAULT_BROKER_PORT); + Map<String, String> properties = Maps.newHashMap(); + properties.put("bootstrap.servers", kafkaBootstrapServers); + client + .loadMetalake(METALAKE) + .createCatalog(CATALOG, Catalog.Type.MESSAGING, "kafka", "comment", properties); + // try to load the schema as normal user, expect failure + assertThrows( + "Can not access metadata {" + CATALOG + "." + SCHEMA + "}.", + RuntimeException.class, + () -> { + normalUserClient + .loadMetalake(METALAKE) + .loadCatalog(CATALOG) + .asSchemas() + .loadSchema(SCHEMA); + }); + // grant tester privilege + List<SecurableObject> securableObjects = new ArrayList<>(); + GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE); + SecurableObject catalogObject = + SecurableObjects.ofCatalog(CATALOG, ImmutableList.of(Privileges.UseCatalog.allow())); + securableObjects.add(catalogObject); + gravitinoMetalake.createRole(role, new HashMap<>(), securableObjects); + gravitinoMetalake.grantRolesToUser(ImmutableList.of(role), NORMAL_USER); + // normal user can load the catalog but not the schema + Catalog catalogLoadByNormalUser = normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG); + assertEquals(CATALOG, catalogLoadByNormalUser.name()); + assertThrows( + "Can not access metadata {" + CATALOG + "." + SCHEMA + "}.", + RuntimeException.class, + () -> { + catalogLoadByNormalUser.asSchemas().loadSchema(SCHEMA); + }); + } + + @Test + @Order(1) + public void testCreateTopic() { + // owner can create topic + TopicCatalog topicCatalog = client.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog(); + topicCatalog.createTopic(NameIdentifier.of(SCHEMA, "topic1"), "test", null, new HashMap<>()); + // normal user cannot create topic + TopicCatalog topicCatalogNormalUser = + normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog(); + assertThrows( + "Can not access metadata {" + CATALOG + "." + SCHEMA + "}.", + RuntimeException.class, + () -> { + topicCatalogNormalUser.createTopic( + NameIdentifier.of(SCHEMA, "topic2"), "test2", null, new HashMap<>()); + }); + // grant privileges + GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE); + gravitinoMetalake.grantPrivilegesToRole( + role, + MetadataObjects.of(CATALOG, SCHEMA, MetadataObject.Type.SCHEMA), + ImmutableList.of(Privileges.UseSchema.allow(), Privileges.CreateTopic.allow())); + // normal user can now create topic + topicCatalogNormalUser.createTopic( + NameIdentifier.of(SCHEMA, "topic2"), "test2", null, new HashMap<>()); + topicCatalogNormalUser.createTopic( + NameIdentifier.of(SCHEMA, "topic3"), "test3", null, new HashMap<>()); + } + + @Test + @Order(2) + public void testListTopic() { + TopicCatalog topicCatalog = client.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog(); + NameIdentifier[] topicsList = topicCatalog.listTopics(Namespace.of(SCHEMA)); + assertArrayEquals( + new NameIdentifier[] { + NameIdentifier.of(SCHEMA, "topic1"), + NameIdentifier.of(SCHEMA, "topic2"), + NameIdentifier.of(SCHEMA, "topic3") + }, + topicsList); + // normal user can only see topics they have privilege for + TopicCatalog topicCatalogNormalUser = + normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog(); + NameIdentifier[] topicsListNormalUser = topicCatalogNormalUser.listTopics(Namespace.of(SCHEMA)); + assertArrayEquals( + new NameIdentifier[] { + NameIdentifier.of(SCHEMA, "topic2"), NameIdentifier.of(SCHEMA, "topic3") + }, + topicsListNormalUser); + } + + @Test + @Order(3) + public void testLoadTopic() { + TopicCatalog topicCatalogNormalUser = + normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog(); + // normal user can load topic2 and topic3, but not topic1 + assertThrows( + String.format("Can not access metadata {%s.%s.%s}.", CATALOG, SCHEMA, "topic1"), + RuntimeException.class, + () -> { + topicCatalogNormalUser.loadTopic(NameIdentifier.of(SCHEMA, "topic1")); + }); + Topic topic2 = topicCatalogNormalUser.loadTopic(NameIdentifier.of(SCHEMA, "topic2")); + assertEquals("topic2", topic2.name()); + Topic topic3 = topicCatalogNormalUser.loadTopic(NameIdentifier.of(SCHEMA, "topic3")); + assertEquals("topic3", topic3.name()); + + // grant normal user privilege to use topic1 + GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE); + gravitinoMetalake.grantPrivilegesToRole( + role, + MetadataObjects.of(ImmutableList.of(CATALOG, SCHEMA, "topic1"), MetadataObject.Type.TOPIC), + ImmutableList.of(Privileges.ConsumeTopic.allow())); + topicCatalogNormalUser.loadTopic(NameIdentifier.of(SCHEMA, "topic1")); + } + + @Test + @Order(4) + public void testAlterTopic() { + GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE); + TopicCatalog topicCatalogNormalUser = + normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog(); + + // normal user cannot alter topic1 (no privilege) + assertThrows( + String.format("Can not access metadata {%s.%s.%s}.", CATALOG, SCHEMA, "topic1"), + RuntimeException.class, + () -> { + topicCatalogNormalUser.alterTopic( + NameIdentifier.of(SCHEMA, "topic1"), TopicChange.updateComment("new comment")); + }); + // grant normal user owner privilege on topic1 + gravitinoMetalake.setOwner( + MetadataObjects.of(ImmutableList.of(CATALOG, SCHEMA, "topic1"), MetadataObject.Type.TOPIC), + NORMAL_USER, + Owner.Type.USER); + topicCatalogNormalUser.alterTopic( + NameIdentifier.of(SCHEMA, "topic1"), TopicChange.updateComment("new comment")); + } + + @Test + @Order(5) + public void testDropTopic() { + GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE); + TopicCatalog topicCatalogNormalUser = + normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog(); + // reset owner + gravitinoMetalake.setOwner( + MetadataObjects.of(ImmutableList.of(CATALOG, SCHEMA, "topic1"), MetadataObject.Type.TOPIC), + USER, + Owner.Type.USER); + // normal user cannot drop topic1 + assertThrows( + String.format("Can not access metadata {%s.%s.%s}.", CATALOG, SCHEMA, "topic1"), + RuntimeException.class, + () -> { + topicCatalogNormalUser.dropTopic(NameIdentifier.of(SCHEMA, "topic1")); + }); + // normal user can drop topic2 and topic3 (they created them) + topicCatalogNormalUser.dropTopic(NameIdentifier.of(SCHEMA, "topic2")); + topicCatalogNormalUser.dropTopic(NameIdentifier.of(SCHEMA, "topic3")); + + // owner can drop topic1 + TopicCatalog topicCatalog = client.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog(); + topicCatalog.dropTopic(NameIdentifier.of(SCHEMA, "topic1")); + // check topics are dropped + NameIdentifier[] topicsList = topicCatalog.listTopics(Namespace.of(SCHEMA)); + assertArrayEquals(new NameIdentifier[] {}, topicsList); + NameIdentifier[] topicsListNormalUser = topicCatalogNormalUser.listTopics(Namespace.of(SCHEMA)); + assertArrayEquals(new NameIdentifier[] {}, topicsListNormalUser); + } +} diff --git a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java index 13fed7c5a6..2655b98a05 100644 --- a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java +++ b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java @@ -21,7 +21,10 @@ import java.security.Principal; import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import org.apache.gravitino.Config; +import org.apache.gravitino.Configs; import org.apache.gravitino.Entity; +import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.authorization.GravitinoAuthorizer; import org.apache.gravitino.authorization.Privilege; @@ -51,6 +54,9 @@ public class MetadataFilterHelper { Entity.EntityType entityType, String privilege, NameIdentifier[] metadataList) { + if (!enableAuthorization()) { + return metadataList; + } GravitinoAuthorizer gravitinoAuthorizer = GravitinoAuthorizerProvider.getInstance().getGravitinoAuthorizer(); Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal(); @@ -81,6 +87,9 @@ public class MetadataFilterHelper { NameIdentifier[] nameIdentifiers) { AuthorizationExpressionEvaluator authorizationExpressionEvaluator = new AuthorizationExpressionEvaluator(expression); + if (!enableAuthorization()) { + return nameIdentifiers; + } return Arrays.stream(nameIdentifiers) .filter( metaDataName -> { @@ -128,9 +137,21 @@ public class MetadataFilterHelper { nameIdentifierMap.put( Entity.EntityType.CATALOG, NameIdentifierUtil.getCatalogIdentifier(nameIdentifier)); break; + case TOPIC: + nameIdentifierMap.put(Entity.EntityType.TOPIC, nameIdentifier); + nameIdentifierMap.put( + Entity.EntityType.SCHEMA, NameIdentifierUtil.getSchemaIdentifier(nameIdentifier)); + nameIdentifierMap.put( + Entity.EntityType.CATALOG, NameIdentifierUtil.getCatalogIdentifier(nameIdentifier)); + break; default: break; } return nameIdentifierMap; } + + private static boolean enableAuthorization() { + Config config = GravitinoEnv.getInstance().config(); + return config != null && config.get(Configs.ENABLE_AUTHORIZATION); + } } diff --git a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java index f21ef43824..0ec8ef18a1 100644 --- a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java +++ b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java @@ -54,12 +54,12 @@ public class AuthorizationExpressionConverter { * @return an OGNL expression used to call GravitinoAuthorizer */ public static String convertToOgnlExpression(String authorizationExpression) { - authorizationExpression = replaceAnyPrivilege(authorizationExpression); - authorizationExpression = replaceAnyExpressions(authorizationExpression); return EXPRESSION_CACHE.computeIfAbsent( authorizationExpression, (expression) -> { - Matcher matcher = PATTERN.matcher(expression); + String replacedExpression = replaceAnyPrivilege(authorizationExpression); + replacedExpression = replaceAnyExpressions(replacedExpression); + Matcher matcher = PATTERN.matcher(replacedExpression); StringBuffer result = new StringBuffer(); while (matcher.find()) { @@ -154,6 +154,15 @@ public class AuthorizationExpressionConverter { "(ANY(CREATE_MODEL_VERSION, METALAKE, CATALOG, SCHEMA, MODEL))"); expression = expression.replaceAll("ANY_CREATE_MODEL", "(ANY(CREATE_MODEL, METALAKE, CATALOG, SCHEMA))"); + expression = + expression.replaceAll( + "ANY_CREATE_TOPIC", "(ANY(CREATE_TOPIC, METALAKE, CATALOG, SCHEMA, TOPIC))"); + expression = + expression.replaceAll( + "ANY_PRODUCE_TOPIC", "(ANY(PRODUCE_TOPIC, METALAKE, CATALOG, SCHEMA, TOPIC))"); + expression = + expression.replaceAll( + "ANY_CONSUME_TOPIC", "(ANY(CONSUME_TOPIC, METALAKE, CATALOG, SCHEMA, TOPIC))"); return expression; } } diff --git a/server-common/src/test/java/org/apache/gravitino/server/authorization/TestMetadataFilterHelper.java b/server-common/src/test/java/org/apache/gravitino/server/authorization/TestMetadataFilterHelper.java index 6bb38f2de7..2714845b3d 100644 --- a/server-common/src/test/java/org/apache/gravitino/server/authorization/TestMetadataFilterHelper.java +++ b/server-common/src/test/java/org/apache/gravitino/server/authorization/TestMetadataFilterHelper.java @@ -17,23 +17,48 @@ package org.apache.gravitino.server.authorization; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.when; +import org.apache.gravitino.Config; +import org.apache.gravitino.Configs; import org.apache.gravitino.Entity; +import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.UserPrincipal; import org.apache.gravitino.authorization.Privilege; import org.apache.gravitino.utils.NameIdentifierUtil; import org.apache.gravitino.utils.PrincipalUtils; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.mockito.MockedStatic; /** Test of {@link MetadataFilterHelper} */ public class TestMetadataFilterHelper { + private static MockedStatic<GravitinoEnv> mockedStaticGravitinoEnv; + + @BeforeAll + public static void setup() { + mockedStaticGravitinoEnv = mockStatic(GravitinoEnv.class); + GravitinoEnv gravitinoEnv = mock(GravitinoEnv.class); + mockedStaticGravitinoEnv.when(GravitinoEnv::getInstance).thenReturn(gravitinoEnv); + Config configMock = mock(Config.class); + when(gravitinoEnv.config()).thenReturn(configMock); + when(configMock.get(eq(Configs.ENABLE_AUTHORIZATION))).thenReturn(true); + } + + @AfterAll + public static void stop() { + if (mockedStaticGravitinoEnv != null) { + mockedStaticGravitinoEnv.close(); + } + } + @Test public void testFilter() { try (MockedStatic<PrincipalUtils> principalUtilsMocked = mockStatic(PrincipalUtils.class); diff --git a/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java b/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java index 9f799d2776..d39faf0ad8 100644 --- a/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java +++ b/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java @@ -43,6 +43,7 @@ import org.apache.gravitino.server.web.rest.CatalogOperations; import org.apache.gravitino.server.web.rest.ModelOperations; import org.apache.gravitino.server.web.rest.SchemaOperations; import org.apache.gravitino.server.web.rest.TableOperations; +import org.apache.gravitino.server.web.rest.TopicOperations; import org.apache.gravitino.utils.NameIdentifierUtil; import org.glassfish.hk2.api.Descriptor; import org.glassfish.hk2.api.Filter; @@ -62,7 +63,8 @@ public class GravitinoInterceptionService implements InterceptionService { CatalogOperations.class.getName(), SchemaOperations.class.getName(), TableOperations.class.getName(), - ModelOperations.class.getName())); + ModelOperations.class.getName(), + TopicOperations.class.getName())); } @Override diff --git a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java index 471a194a4f..c3dd0e4c62 100644 --- a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java +++ b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java @@ -31,6 +31,8 @@ import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.core.Context; import javax.ws.rs.core.Response; +import org.apache.gravitino.Entity; +import org.apache.gravitino.MetadataObject; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; import org.apache.gravitino.catalog.TopicDispatcher; @@ -44,6 +46,9 @@ import org.apache.gravitino.dto.util.DTOConverters; import org.apache.gravitino.messaging.Topic; import org.apache.gravitino.messaging.TopicChange; import org.apache.gravitino.metrics.MetricNames; +import org.apache.gravitino.server.authorization.MetadataFilterHelper; +import org.apache.gravitino.server.authorization.annotations.AuthorizationExpression; +import org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata; import org.apache.gravitino.server.web.Utils; import org.apache.gravitino.utils.NameIdentifierUtil; import org.apache.gravitino.utils.NamespaceUtil; @@ -54,6 +59,11 @@ import org.slf4j.LoggerFactory; public class TopicOperations { private static final Logger LOG = LoggerFactory.getLogger(TopicOperations.class); + private static final String loadTopicsAuthorizationExpression = + "ANY(OWNER, METALAKE, CATALOG) || " + + "SCHEMA_OWNER_WITH_USE_CATALOG || " + + "ANY_USE_CATALOG && ANY_USE_SCHEMA && (TOPIC::OWNER || ANY_CONSUME_TOPIC || ANY_PRODUCE_TOPIC)"; + private final TopicDispatcher dispatcher; @Context private HttpServletRequest httpRequest; @@ -66,6 +76,7 @@ public class TopicOperations { @GET @Produces("application/vnd.gravitino.v1+json") @Timed(name = "list-topic." + MetricNames.HTTP_PROCESS_DURATION, absolute = true) + @ResponseMetered(name = "list-topic", absolute = true) public Response listTopics( @PathParam("metalake") String metalake, @PathParam("catalog") String catalog, @@ -78,13 +89,17 @@ public class TopicOperations { LOG.info("Listing topics under schema: {}.{}.{}", metalake, catalog, schema); Namespace topicNS = NamespaceUtil.ofTopic(metalake, catalog, schema); NameIdentifier[] topics = dispatcher.listTopics(topicNS); + topics = topics == null ? new NameIdentifier[0] : topics; + topics = + MetadataFilterHelper.filterByExpression( + metalake, loadTopicsAuthorizationExpression, Entity.EntityType.TOPIC, topics); Response response = Utils.ok(new EntityListResponse(topics)); LOG.info( "List {} topics under schema: {}.{}.{}", topics.length, metalake, catalog, schema); return response; }); } catch (Exception e) { - return ExceptionHandlers.handleFilesetException(OperationType.LIST, "", schema, e); + return ExceptionHandlers.handleTopicException(OperationType.LIST, "", schema, e); } } @@ -92,10 +107,18 @@ public class TopicOperations { @Produces("application/vnd.gravitino.v1+json") @Timed(name = "create-topic." + MetricNames.HTTP_PROCESS_DURATION, absolute = true) @ResponseMetered(name = "create-topic", absolute = true) + @AuthorizationExpression( + expression = + "ANY(OWNER,METALAKE,CATALOG) || " + + "SCHEMA_OWNER_WITH_USE_CATALOG || " + + "ANY_USE_CATALOG && ANY_USE_SCHEMA && ANY_CREATE_TOPIC", + accessMetadataType = MetadataObject.Type.SCHEMA) public Response createTopic( - @PathParam("metalake") String metalake, - @PathParam("catalog") String catalog, - @PathParam("schema") String schema, + @PathParam("metalake") @AuthorizationMetadata(type = MetadataObject.Type.METALAKE) + String metalake, + @PathParam("catalog") @AuthorizationMetadata(type = MetadataObject.Type.CATALOG) + String catalog, + @PathParam("schema") @AuthorizationMetadata(type = MetadataObject.Type.SCHEMA) String schema, TopicCreateRequest request) { LOG.info("Received create topic request: {}.{}.{}", metalake, catalog, schema); try { @@ -133,11 +156,16 @@ public class TopicOperations { @Produces("application/vnd.gravitino.v1+json") @Timed(name = "load-topic." + MetricNames.HTTP_PROCESS_DURATION, absolute = true) @ResponseMetered(name = "load-topic", absolute = true) + @AuthorizationExpression( + expression = loadTopicsAuthorizationExpression, + accessMetadataType = MetadataObject.Type.TOPIC) public Response loadTopic( - @PathParam("metalake") String metalake, - @PathParam("catalog") String catalog, - @PathParam("schema") String schema, - @PathParam("topic") String topic) { + @PathParam("metalake") @AuthorizationMetadata(type = MetadataObject.Type.METALAKE) + String metalake, + @PathParam("catalog") @AuthorizationMetadata(type = MetadataObject.Type.CATALOG) + String catalog, + @PathParam("schema") @AuthorizationMetadata(type = MetadataObject.Type.SCHEMA) String schema, + @PathParam("topic") @AuthorizationMetadata(type = MetadataObject.Type.TOPIC) String topic) { LOG.info( "Received load topic request for topic: {}.{}.{}.{}", metalake, catalog, schema, topic); try { @@ -161,11 +189,19 @@ public class TopicOperations { @Produces("application/vnd.gravitino.v1+json") @Timed(name = "alter-topic." + MetricNames.HTTP_PROCESS_DURATION, absolute = true) @ResponseMetered(name = "alter-topic", absolute = true) + @AuthorizationExpression( + expression = + "ANY(OWNER,METALAKE,CATALOG) || " + + "SCHEMA_OWNER_WITH_USE_CATALOG || " + + "ANY_USE_CATALOG && ANY_USE_SCHEMA && (TOPIC::OWNER || ANY_PRODUCE_TOPIC)", + accessMetadataType = MetadataObject.Type.TOPIC) public Response alterTopic( - @PathParam("metalake") String metalake, - @PathParam("catalog") String catalog, - @PathParam("schema") String schema, - @PathParam("topic") String topic, + @PathParam("metalake") @AuthorizationMetadata(type = MetadataObject.Type.METALAKE) + String metalake, + @PathParam("catalog") @AuthorizationMetadata(type = MetadataObject.Type.CATALOG) + String catalog, + @PathParam("schema") @AuthorizationMetadata(type = MetadataObject.Type.SCHEMA) String schema, + @PathParam("topic") @AuthorizationMetadata(type = MetadataObject.Type.TOPIC) String topic, TopicUpdatesRequest request) { LOG.info("Received alter topic request: {}.{}.{}.{}", metalake, catalog, schema, topic); try { @@ -195,11 +231,19 @@ public class TopicOperations { @Produces("application/vnd.gravitino.v1+json") @Timed(name = "drop-topic." + MetricNames.HTTP_PROCESS_DURATION, absolute = true) @ResponseMetered(name = "drop-topic", absolute = true) + @AuthorizationExpression( + expression = + "ANY(OWNER,METALAKE,CATALOG) || " + + "SCHEMA_OWNER_WITH_USE_CATALOG || " + + "ANY_USE_CATALOG && ANY_USE_SCHEMA && TOPIC::OWNER", + accessMetadataType = MetadataObject.Type.TOPIC) public Response dropTopic( - @PathParam("metalake") String metalake, - @PathParam("catalog") String catalog, - @PathParam("schema") String schema, - @PathParam("topic") String topic) { + @PathParam("metalake") @AuthorizationMetadata(type = MetadataObject.Type.METALAKE) + String metalake, + @PathParam("catalog") @AuthorizationMetadata(type = MetadataObject.Type.CATALOG) + String catalog, + @PathParam("schema") @AuthorizationMetadata(type = MetadataObject.Type.SCHEMA) String schema, + @PathParam("topic") @AuthorizationMetadata(type = MetadataObject.Type.TOPIC) String topic) { LOG.info("Received drop topic request: {}.{}.{}.{}", metalake, catalog, schema, topic); try { return Utils.doAs( diff --git a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTopicOperations.java b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTopicOperations.java index d5be6662f3..b838d62744 100644 --- a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTopicOperations.java +++ b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTopicOperations.java @@ -60,14 +60,13 @@ import org.apache.gravitino.messaging.TopicChange; import org.apache.gravitino.rest.RESTUtils; import org.glassfish.jersey.internal.inject.AbstractBinder; import org.glassfish.jersey.server.ResourceConfig; -import org.glassfish.jersey.test.JerseyTest; import org.glassfish.jersey.test.TestProperties; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.mockito.Mockito; -public class TestTopicOperations extends JerseyTest { +public class TestTopicOperations extends BaseOperationsTest { private static class MockServletRequestFactory extends ServletRequestFactoryBase { @Override diff --git a/server/src/test/java/org/apache/gravitino/server/web/rest/authorization/TestTopicAuthorizationExpression.java b/server/src/test/java/org/apache/gravitino/server/web/rest/authorization/TestTopicAuthorizationExpression.java new file mode 100644 index 0000000000..8e4b88aeb6 --- /dev/null +++ b/server/src/test/java/org/apache/gravitino/server/web/rest/authorization/TestTopicAuthorizationExpression.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.server.web.rest.authorization; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.google.common.collect.ImmutableSet; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import ognl.OgnlException; +import org.apache.gravitino.dto.requests.TopicCreateRequest; +import org.apache.gravitino.dto.requests.TopicUpdatesRequest; +import org.apache.gravitino.server.authorization.annotations.AuthorizationExpression; +import org.apache.gravitino.server.web.rest.TopicOperations; +import org.junit.jupiter.api.Test; + +public class TestTopicAuthorizationExpression { + + @Test + public void testCreateTopic() throws NoSuchMethodException, OgnlException { + Method method = + TopicOperations.class.getMethod( + "createTopic", String.class, String.class, String.class, TopicCreateRequest.class); + AuthorizationExpression authorizationExpressionAnnotation = + method.getAnnotation(AuthorizationExpression.class); + String expression = authorizationExpressionAnnotation.expression(); + MockAuthorizationExpressionEvaluator mockEvaluator = + new MockAuthorizationExpressionEvaluator(expression); + assertFalse(mockEvaluator.getResult(ImmutableSet.of())); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG"))); + assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER"))); + assertTrue(mockEvaluator.getResult(ImmutableSet.of("CATALOG::OWNER"))); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER"))); + assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER", "CATALOG::USE_CATALOG"))); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_METALAKE"))); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::CREATE_CATALOG"))); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TOPIC"))); + assertFalse( + mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TOPIC", "CATALOG::USE_CATALOG"))); + assertTrue( + mockEvaluator.getResult( + ImmutableSet.of("SCHEMA::CREATE_TOPIC", "SCHEMA::USE_SCHEMA", "CATALOG::USE_CATALOG"))); + } + + @Test + public void testLoadTopics() throws OgnlException, NoSuchFieldException, IllegalAccessException { + Field loadTopicsAuthorizationExpressionField = + TopicOperations.class.getDeclaredField("loadTopicsAuthorizationExpression"); + loadTopicsAuthorizationExpressionField.setAccessible(true); + String loadTopicsAuthorizationExpression = + (String) loadTopicsAuthorizationExpressionField.get(null); + MockAuthorizationExpressionEvaluator mockEvaluator = + new MockAuthorizationExpressionEvaluator(loadTopicsAuthorizationExpression); + assertFalse(mockEvaluator.getResult(ImmutableSet.of())); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG"))); + assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER"))); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_METALAKE"))); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::CREATE_CATALOG"))); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TOPIC"))); + assertFalse( + mockEvaluator.getResult( + ImmutableSet.of("SCHEMA::CREATE_TOPIC", "CATALOG::CREATE_CATALOG"))); + + assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::PRODUCE_TOPIC"))); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::PRODUCE_TOPIC"))); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::PRODUCE_TOPIC"))); + assertFalse( + mockEvaluator.getResult(ImmutableSet.of("SCHEMA::PRODUCE_TOPIC", "CATALOG::USE_CATALOG"))); + assertTrue( + mockEvaluator.getResult( + ImmutableSet.of( + "SCHEMA::PRODUCE_TOPIC", "SCHEMA::USE_SCHEMA", "CATALOG::USE_CATALOG"))); + + assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::CONSUME_TOPIC"))); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::CONSUME_TOPIC"))); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CONSUME_TOPIC"))); + assertFalse( + mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CONSUME_TOPIC", "CATALOG::USE_CATALOG"))); + assertTrue( + mockEvaluator.getResult( + ImmutableSet.of( + "SCHEMA::CONSUME_TOPIC", "SCHEMA::USE_SCHEMA", "CATALOG::USE_CATALOG"))); + + assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER"))); + assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER", "CATALOG::USE_CATALOG"))); + assertTrue( + mockEvaluator.getResult( + ImmutableSet.of("SCHEMA::OWNER", "SCHEMA::USE_SCHEMA", "CATALOG::USE_CATALOG"))); + } + + @Test + public void testAlterFileset() throws NoSuchMethodException, OgnlException { + Method method = + TopicOperations.class.getMethod( + "alterTopic", + String.class, + String.class, + String.class, + String.class, + TopicUpdatesRequest.class); + AuthorizationExpression authorizationExpressionAnnotation = + method.getAnnotation(AuthorizationExpression.class); + String expression = authorizationExpressionAnnotation.expression(); + MockAuthorizationExpressionEvaluator mockEvaluator = + new MockAuthorizationExpressionEvaluator(expression); + assertFalse(mockEvaluator.getResult(ImmutableSet.of())); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG"))); + assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER"))); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_METALAKE"))); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::CREATE_CATALOG"))); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TOPIC"))); + assertFalse( + mockEvaluator.getResult( + ImmutableSet.of("SCHEMA::CREATE_TOPIC", "CATALOG::CREATE_CATALOG"))); + + assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::PRODUCE_TOPIC"))); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::PRODUCE_TOPIC"))); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::PRODUCE_TOPIC"))); + assertFalse( + mockEvaluator.getResult(ImmutableSet.of("SCHEMA::PRODUCE_TOPIC", "CATALOG::USE_CATALOG"))); + assertTrue( + mockEvaluator.getResult( + ImmutableSet.of( + "SCHEMA::PRODUCE_TOPIC", "SCHEMA::USE_SCHEMA", "CATALOG::USE_CATALOG"))); + + assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER"))); + assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER", "CATALOG::USE_CATALOG"))); + assertTrue( + mockEvaluator.getResult( + ImmutableSet.of("SCHEMA::OWNER", "SCHEMA::USE_SCHEMA", "CATALOG::USE_CATALOG"))); + } + + @Test + public void testDropFileset() throws NoSuchMethodException, OgnlException { + Method method = + TopicOperations.class.getMethod( + "dropTopic", String.class, String.class, String.class, String.class); + AuthorizationExpression authorizationExpressionAnnotation = + method.getAnnotation(AuthorizationExpression.class); + String expression = authorizationExpressionAnnotation.expression(); + MockAuthorizationExpressionEvaluator mockEvaluator = + new MockAuthorizationExpressionEvaluator(expression); + assertFalse(mockEvaluator.getResult(ImmutableSet.of())); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG"))); + assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER"))); + + assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_METALAKE"))); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::CREATE_CATALOG"))); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TOPIC"))); + assertFalse( + mockEvaluator.getResult( + ImmutableSet.of("SCHEMA::CREATE_TOPIC", "CATALOG::CREATE_CATALOG"))); + + assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::PRODUCE_TOPIC"))); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::PRODUCE_TOPIC"))); + assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::PRODUCE_TOPIC"))); + assertFalse( + mockEvaluator.getResult(ImmutableSet.of("SCHEMA::PRODUCE_TOPIC", "CATALOG::USE_CATALOG"))); + assertFalse( + mockEvaluator.getResult( + ImmutableSet.of( + "SCHEMA::PRODUCE_TOPIC", "SCHEMA::USE_SCHEMA", "CATALOG::USE_CATALOG"))); + + assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER"))); + assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER", "CATALOG::USE_CATALOG"))); + assertTrue( + mockEvaluator.getResult( + ImmutableSet.of("SCHEMA::OWNER", "SCHEMA::USE_SCHEMA", "CATALOG::USE_CATALOG"))); + } +}
