This is an automated email from the ASF dual-hosted git repository.
liuxun pushed a commit to branch branch-metadata-authz
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-metadata-authz by this
push:
new 4ae2440237 [#7570] feat(authz): Support topic authorization (#7580)
4ae2440237 is described below
commit 4ae2440237fb11f7c803f574058a3e05490a761e
Author: Kyle Lin <[email protected]>
AuthorDate: Mon Jul 14 08:16:56 2025 +0800
[#7570] feat(authz): Support topic authorization (#7580)
### What changes were proposed in this pull request?
Support topic authorization.
### Why are the changes needed?
Fixes #7570
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
`org.apache.gravitino.client.integration.test.authorization.TopicAuthorizationIT`
---------
Co-authored-by: yangyang zhong <[email protected]>
Co-authored-by: [email protected] <[email protected]>
---
.../test/authorization/TopicAuthorizationIT.java | 240 +++++++++++++++++++++
.../server/authorization/MetadataFilterHelper.java | 21 ++
.../AuthorizationExpressionConverter.java | 15 +-
.../authorization/TestMetadataFilterHelper.java | 25 +++
.../web/filter/GravitinoInterceptionService.java | 4 +-
.../gravitino/server/web/rest/TopicOperations.java | 76 +++++--
.../server/web/rest/TestTopicOperations.java | 3 +-
.../TestTopicAuthorizationExpression.java | 186 ++++++++++++++++
8 files changed, 548 insertions(+), 22 deletions(-)
diff --git
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/TopicAuthorizationIT.java
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/TopicAuthorizationIT.java
new file mode 100644
index 0000000000..5d28f8a8bc
--- /dev/null
+++
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/TopicAuthorizationIT.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.client.integration.test.authorization;
+
+import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.container.KafkaContainer;
+import org.apache.gravitino.messaging.Topic;
+import org.apache.gravitino.messaging.TopicCatalog;
+import org.apache.gravitino.messaging.TopicChange;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+@Tag("gravitino-docker-test")
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class TopicAuthorizationIT extends BaseRestApiAuthorizationIT {
+
+ private static final String CATALOG = "catalog";
+ private static final String SCHEMA = "default";
+ private static final ContainerSuite containerSuite =
ContainerSuite.getInstance();
+ private static String kafkaBootstrapServers;
+ private static String role = "role";
+
+ @BeforeAll
+ public void startIntegrationTest() throws Exception {
+ containerSuite.startKafkaContainer();
+ super.startIntegrationTest();
+ kafkaBootstrapServers =
+ String.format(
+ "%s:%d",
+ containerSuite.getKafkaContainer().getContainerIpAddress(),
+ KafkaContainer.DEFAULT_BROKER_PORT);
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("bootstrap.servers", kafkaBootstrapServers);
+ client
+ .loadMetalake(METALAKE)
+ .createCatalog(CATALOG, Catalog.Type.MESSAGING, "kafka", "comment",
properties);
+ // try to load the schema as normal user, expect failure
+ assertThrows(
+ "Can not access metadata {" + CATALOG + "." + SCHEMA + "}.",
+ RuntimeException.class,
+ () -> {
+ normalUserClient
+ .loadMetalake(METALAKE)
+ .loadCatalog(CATALOG)
+ .asSchemas()
+ .loadSchema(SCHEMA);
+ });
+ // grant tester privilege
+ List<SecurableObject> securableObjects = new ArrayList<>();
+ GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+ SecurableObject catalogObject =
+ SecurableObjects.ofCatalog(CATALOG,
ImmutableList.of(Privileges.UseCatalog.allow()));
+ securableObjects.add(catalogObject);
+ gravitinoMetalake.createRole(role, new HashMap<>(), securableObjects);
+ gravitinoMetalake.grantRolesToUser(ImmutableList.of(role), NORMAL_USER);
+ // normal user can load the catalog but not the schema
+ Catalog catalogLoadByNormalUser =
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG);
+ assertEquals(CATALOG, catalogLoadByNormalUser.name());
+ assertThrows(
+ "Can not access metadata {" + CATALOG + "." + SCHEMA + "}.",
+ RuntimeException.class,
+ () -> {
+ catalogLoadByNormalUser.asSchemas().loadSchema(SCHEMA);
+ });
+ }
+
+ @Test
+ @Order(1)
+ public void testCreateTopic() {
+ // owner can create topic
+ TopicCatalog topicCatalog =
client.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog();
+ topicCatalog.createTopic(NameIdentifier.of(SCHEMA, "topic1"), "test",
null, new HashMap<>());
+ // normal user cannot create topic
+ TopicCatalog topicCatalogNormalUser =
+
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog();
+ assertThrows(
+ "Can not access metadata {" + CATALOG + "." + SCHEMA + "}.",
+ RuntimeException.class,
+ () -> {
+ topicCatalogNormalUser.createTopic(
+ NameIdentifier.of(SCHEMA, "topic2"), "test2", null, new
HashMap<>());
+ });
+ // grant privileges
+ GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+ gravitinoMetalake.grantPrivilegesToRole(
+ role,
+ MetadataObjects.of(CATALOG, SCHEMA, MetadataObject.Type.SCHEMA),
+ ImmutableList.of(Privileges.UseSchema.allow(),
Privileges.CreateTopic.allow()));
+ // normal user can now create topic
+ topicCatalogNormalUser.createTopic(
+ NameIdentifier.of(SCHEMA, "topic2"), "test2", null, new HashMap<>());
+ topicCatalogNormalUser.createTopic(
+ NameIdentifier.of(SCHEMA, "topic3"), "test3", null, new HashMap<>());
+ }
+
+ @Test
+ @Order(2)
+ public void testListTopic() {
+ TopicCatalog topicCatalog =
client.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog();
+ NameIdentifier[] topicsList =
topicCatalog.listTopics(Namespace.of(SCHEMA));
+ assertArrayEquals(
+ new NameIdentifier[] {
+ NameIdentifier.of(SCHEMA, "topic1"),
+ NameIdentifier.of(SCHEMA, "topic2"),
+ NameIdentifier.of(SCHEMA, "topic3")
+ },
+ topicsList);
+ // normal user can only see topics they have privilege for
+ TopicCatalog topicCatalogNormalUser =
+
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog();
+ NameIdentifier[] topicsListNormalUser =
topicCatalogNormalUser.listTopics(Namespace.of(SCHEMA));
+ assertArrayEquals(
+ new NameIdentifier[] {
+ NameIdentifier.of(SCHEMA, "topic2"), NameIdentifier.of(SCHEMA,
"topic3")
+ },
+ topicsListNormalUser);
+ }
+
+ @Test
+ @Order(3)
+ public void testLoadTopic() {
+ TopicCatalog topicCatalogNormalUser =
+
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog();
+ // normal user can load topic2 and topic3, but not topic1
+ assertThrows(
+ String.format("Can not access metadata {%s.%s.%s}.", CATALOG, SCHEMA,
"topic1"),
+ RuntimeException.class,
+ () -> {
+ topicCatalogNormalUser.loadTopic(NameIdentifier.of(SCHEMA,
"topic1"));
+ });
+ Topic topic2 = topicCatalogNormalUser.loadTopic(NameIdentifier.of(SCHEMA,
"topic2"));
+ assertEquals("topic2", topic2.name());
+ Topic topic3 = topicCatalogNormalUser.loadTopic(NameIdentifier.of(SCHEMA,
"topic3"));
+ assertEquals("topic3", topic3.name());
+
+ // grant normal user privilege to use topic1
+ GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+ gravitinoMetalake.grantPrivilegesToRole(
+ role,
+ MetadataObjects.of(ImmutableList.of(CATALOG, SCHEMA, "topic1"),
MetadataObject.Type.TOPIC),
+ ImmutableList.of(Privileges.ConsumeTopic.allow()));
+ topicCatalogNormalUser.loadTopic(NameIdentifier.of(SCHEMA, "topic1"));
+ }
+
+ @Test
+ @Order(4)
+ public void testAlterTopic() {
+ GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+ TopicCatalog topicCatalogNormalUser =
+
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog();
+
+ // normal user cannot alter topic1 (no privilege)
+ assertThrows(
+ String.format("Can not access metadata {%s.%s.%s}.", CATALOG, SCHEMA,
"topic1"),
+ RuntimeException.class,
+ () -> {
+ topicCatalogNormalUser.alterTopic(
+ NameIdentifier.of(SCHEMA, "topic1"),
TopicChange.updateComment("new comment"));
+ });
+ // grant normal user owner privilege on topic1
+ gravitinoMetalake.setOwner(
+ MetadataObjects.of(ImmutableList.of(CATALOG, SCHEMA, "topic1"),
MetadataObject.Type.TOPIC),
+ NORMAL_USER,
+ Owner.Type.USER);
+ topicCatalogNormalUser.alterTopic(
+ NameIdentifier.of(SCHEMA, "topic1"), TopicChange.updateComment("new
comment"));
+ }
+
+ @Test
+ @Order(5)
+ public void testDropTopic() {
+ GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+ TopicCatalog topicCatalogNormalUser =
+
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog();
+ // reset owner
+ gravitinoMetalake.setOwner(
+ MetadataObjects.of(ImmutableList.of(CATALOG, SCHEMA, "topic1"),
MetadataObject.Type.TOPIC),
+ USER,
+ Owner.Type.USER);
+ // normal user cannot drop topic1
+ assertThrows(
+ String.format("Can not access metadata {%s.%s.%s}.", CATALOG, SCHEMA,
"topic1"),
+ RuntimeException.class,
+ () -> {
+ topicCatalogNormalUser.dropTopic(NameIdentifier.of(SCHEMA,
"topic1"));
+ });
+ // normal user can drop topic2 and topic3 (they created them)
+ topicCatalogNormalUser.dropTopic(NameIdentifier.of(SCHEMA, "topic2"));
+ topicCatalogNormalUser.dropTopic(NameIdentifier.of(SCHEMA, "topic3"));
+
+ // owner can drop topic1
+ TopicCatalog topicCatalog =
client.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog();
+ topicCatalog.dropTopic(NameIdentifier.of(SCHEMA, "topic1"));
+ // check topics are dropped
+ NameIdentifier[] topicsList =
topicCatalog.listTopics(Namespace.of(SCHEMA));
+ assertArrayEquals(new NameIdentifier[] {}, topicsList);
+ NameIdentifier[] topicsListNormalUser =
topicCatalogNormalUser.listTopics(Namespace.of(SCHEMA));
+ assertArrayEquals(new NameIdentifier[] {}, topicsListNormalUser);
+ }
+}
diff --git
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java
index 13fed7c5a6..2655b98a05 100644
---
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java
+++
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java
@@ -21,7 +21,10 @@ import java.security.Principal;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.authorization.GravitinoAuthorizer;
import org.apache.gravitino.authorization.Privilege;
@@ -51,6 +54,9 @@ public class MetadataFilterHelper {
Entity.EntityType entityType,
String privilege,
NameIdentifier[] metadataList) {
+ if (!enableAuthorization()) {
+ return metadataList;
+ }
GravitinoAuthorizer gravitinoAuthorizer =
GravitinoAuthorizerProvider.getInstance().getGravitinoAuthorizer();
Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
@@ -81,6 +87,9 @@ public class MetadataFilterHelper {
NameIdentifier[] nameIdentifiers) {
AuthorizationExpressionEvaluator authorizationExpressionEvaluator =
new AuthorizationExpressionEvaluator(expression);
+ if (!enableAuthorization()) {
+ return nameIdentifiers;
+ }
return Arrays.stream(nameIdentifiers)
.filter(
metaDataName -> {
@@ -128,9 +137,21 @@ public class MetadataFilterHelper {
nameIdentifierMap.put(
Entity.EntityType.CATALOG,
NameIdentifierUtil.getCatalogIdentifier(nameIdentifier));
break;
+ case TOPIC:
+ nameIdentifierMap.put(Entity.EntityType.TOPIC, nameIdentifier);
+ nameIdentifierMap.put(
+ Entity.EntityType.SCHEMA,
NameIdentifierUtil.getSchemaIdentifier(nameIdentifier));
+ nameIdentifierMap.put(
+ Entity.EntityType.CATALOG,
NameIdentifierUtil.getCatalogIdentifier(nameIdentifier));
+ break;
default:
break;
}
return nameIdentifierMap;
}
+
+ private static boolean enableAuthorization() {
+ Config config = GravitinoEnv.getInstance().config();
+ return config != null && config.get(Configs.ENABLE_AUTHORIZATION);
+ }
}
diff --git
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
index f21ef43824..0ec8ef18a1 100644
---
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
+++
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
@@ -54,12 +54,12 @@ public class AuthorizationExpressionConverter {
* @return an OGNL expression used to call GravitinoAuthorizer
*/
public static String convertToOgnlExpression(String authorizationExpression)
{
- authorizationExpression = replaceAnyPrivilege(authorizationExpression);
- authorizationExpression = replaceAnyExpressions(authorizationExpression);
return EXPRESSION_CACHE.computeIfAbsent(
authorizationExpression,
(expression) -> {
- Matcher matcher = PATTERN.matcher(expression);
+ String replacedExpression =
replaceAnyPrivilege(authorizationExpression);
+ replacedExpression = replaceAnyExpressions(replacedExpression);
+ Matcher matcher = PATTERN.matcher(replacedExpression);
StringBuffer result = new StringBuffer();
while (matcher.find()) {
@@ -154,6 +154,15 @@ public class AuthorizationExpressionConverter {
"(ANY(CREATE_MODEL_VERSION, METALAKE, CATALOG, SCHEMA, MODEL))");
expression =
expression.replaceAll("ANY_CREATE_MODEL", "(ANY(CREATE_MODEL,
METALAKE, CATALOG, SCHEMA))");
+ expression =
+ expression.replaceAll(
+ "ANY_CREATE_TOPIC", "(ANY(CREATE_TOPIC, METALAKE, CATALOG, SCHEMA,
TOPIC))");
+ expression =
+ expression.replaceAll(
+ "ANY_PRODUCE_TOPIC", "(ANY(PRODUCE_TOPIC, METALAKE, CATALOG,
SCHEMA, TOPIC))");
+ expression =
+ expression.replaceAll(
+ "ANY_CONSUME_TOPIC", "(ANY(CONSUME_TOPIC, METALAKE, CATALOG,
SCHEMA, TOPIC))");
return expression;
}
}
diff --git
a/server-common/src/test/java/org/apache/gravitino/server/authorization/TestMetadataFilterHelper.java
b/server-common/src/test/java/org/apache/gravitino/server/authorization/TestMetadataFilterHelper.java
index 6bb38f2de7..2714845b3d 100644
---
a/server-common/src/test/java/org/apache/gravitino/server/authorization/TestMetadataFilterHelper.java
+++
b/server-common/src/test/java/org/apache/gravitino/server/authorization/TestMetadataFilterHelper.java
@@ -17,23 +17,48 @@
package org.apache.gravitino.server.authorization;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.UserPrincipal;
import org.apache.gravitino.authorization.Privilege;
import org.apache.gravitino.utils.NameIdentifierUtil;
import org.apache.gravitino.utils.PrincipalUtils;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
/** Test of {@link MetadataFilterHelper} */
public class TestMetadataFilterHelper {
+ private static MockedStatic<GravitinoEnv> mockedStaticGravitinoEnv;
+
+ @BeforeAll
+ public static void setup() {
+ mockedStaticGravitinoEnv = mockStatic(GravitinoEnv.class);
+ GravitinoEnv gravitinoEnv = mock(GravitinoEnv.class);
+
mockedStaticGravitinoEnv.when(GravitinoEnv::getInstance).thenReturn(gravitinoEnv);
+ Config configMock = mock(Config.class);
+ when(gravitinoEnv.config()).thenReturn(configMock);
+ when(configMock.get(eq(Configs.ENABLE_AUTHORIZATION))).thenReturn(true);
+ }
+
+ @AfterAll
+ public static void stop() {
+ if (mockedStaticGravitinoEnv != null) {
+ mockedStaticGravitinoEnv.close();
+ }
+ }
+
@Test
public void testFilter() {
try (MockedStatic<PrincipalUtils> principalUtilsMocked =
mockStatic(PrincipalUtils.class);
diff --git
a/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
b/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
index 9f799d2776..d39faf0ad8 100644
---
a/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
+++
b/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
@@ -43,6 +43,7 @@ import org.apache.gravitino.server.web.rest.CatalogOperations;
import org.apache.gravitino.server.web.rest.ModelOperations;
import org.apache.gravitino.server.web.rest.SchemaOperations;
import org.apache.gravitino.server.web.rest.TableOperations;
+import org.apache.gravitino.server.web.rest.TopicOperations;
import org.apache.gravitino.utils.NameIdentifierUtil;
import org.glassfish.hk2.api.Descriptor;
import org.glassfish.hk2.api.Filter;
@@ -62,7 +63,8 @@ public class GravitinoInterceptionService implements
InterceptionService {
CatalogOperations.class.getName(),
SchemaOperations.class.getName(),
TableOperations.class.getName(),
- ModelOperations.class.getName()));
+ ModelOperations.class.getName(),
+ TopicOperations.class.getName()));
}
@Override
diff --git
a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
index 471a194a4f..c3dd0e4c62 100644
---
a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
+++
b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
@@ -31,6 +31,8 @@ import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.catalog.TopicDispatcher;
@@ -44,6 +46,9 @@ import org.apache.gravitino.dto.util.DTOConverters;
import org.apache.gravitino.messaging.Topic;
import org.apache.gravitino.messaging.TopicChange;
import org.apache.gravitino.metrics.MetricNames;
+import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
+import
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
import org.apache.gravitino.server.web.Utils;
import org.apache.gravitino.utils.NameIdentifierUtil;
import org.apache.gravitino.utils.NamespaceUtil;
@@ -54,6 +59,11 @@ import org.slf4j.LoggerFactory;
public class TopicOperations {
private static final Logger LOG =
LoggerFactory.getLogger(TopicOperations.class);
+ private static final String loadTopicsAuthorizationExpression =
+ "ANY(OWNER, METALAKE, CATALOG) || "
+ + "SCHEMA_OWNER_WITH_USE_CATALOG || "
+ + "ANY_USE_CATALOG && ANY_USE_SCHEMA && (TOPIC::OWNER ||
ANY_CONSUME_TOPIC || ANY_PRODUCE_TOPIC)";
+
private final TopicDispatcher dispatcher;
@Context private HttpServletRequest httpRequest;
@@ -66,6 +76,7 @@ public class TopicOperations {
@GET
@Produces("application/vnd.gravitino.v1+json")
@Timed(name = "list-topic." + MetricNames.HTTP_PROCESS_DURATION, absolute =
true)
+ @ResponseMetered(name = "list-topic", absolute = true)
public Response listTopics(
@PathParam("metalake") String metalake,
@PathParam("catalog") String catalog,
@@ -78,13 +89,17 @@ public class TopicOperations {
LOG.info("Listing topics under schema: {}.{}.{}", metalake,
catalog, schema);
Namespace topicNS = NamespaceUtil.ofTopic(metalake, catalog,
schema);
NameIdentifier[] topics = dispatcher.listTopics(topicNS);
+ topics = topics == null ? new NameIdentifier[0] : topics;
+ topics =
+ MetadataFilterHelper.filterByExpression(
+ metalake, loadTopicsAuthorizationExpression,
Entity.EntityType.TOPIC, topics);
Response response = Utils.ok(new EntityListResponse(topics));
LOG.info(
"List {} topics under schema: {}.{}.{}", topics.length,
metalake, catalog, schema);
return response;
});
} catch (Exception e) {
- return ExceptionHandlers.handleFilesetException(OperationType.LIST, "",
schema, e);
+ return ExceptionHandlers.handleTopicException(OperationType.LIST, "",
schema, e);
}
}
@@ -92,10 +107,18 @@ public class TopicOperations {
@Produces("application/vnd.gravitino.v1+json")
@Timed(name = "create-topic." + MetricNames.HTTP_PROCESS_DURATION, absolute
= true)
@ResponseMetered(name = "create-topic", absolute = true)
+ @AuthorizationExpression(
+ expression =
+ "ANY(OWNER,METALAKE,CATALOG) || "
+ + "SCHEMA_OWNER_WITH_USE_CATALOG || "
+ + "ANY_USE_CATALOG && ANY_USE_SCHEMA && ANY_CREATE_TOPIC",
+ accessMetadataType = MetadataObject.Type.SCHEMA)
public Response createTopic(
- @PathParam("metalake") String metalake,
- @PathParam("catalog") String catalog,
- @PathParam("schema") String schema,
+ @PathParam("metalake") @AuthorizationMetadata(type =
MetadataObject.Type.METALAKE)
+ String metalake,
+ @PathParam("catalog") @AuthorizationMetadata(type =
MetadataObject.Type.CATALOG)
+ String catalog,
+ @PathParam("schema") @AuthorizationMetadata(type =
MetadataObject.Type.SCHEMA) String schema,
TopicCreateRequest request) {
LOG.info("Received create topic request: {}.{}.{}", metalake, catalog,
schema);
try {
@@ -133,11 +156,16 @@ public class TopicOperations {
@Produces("application/vnd.gravitino.v1+json")
@Timed(name = "load-topic." + MetricNames.HTTP_PROCESS_DURATION, absolute =
true)
@ResponseMetered(name = "load-topic", absolute = true)
+ @AuthorizationExpression(
+ expression = loadTopicsAuthorizationExpression,
+ accessMetadataType = MetadataObject.Type.TOPIC)
public Response loadTopic(
- @PathParam("metalake") String metalake,
- @PathParam("catalog") String catalog,
- @PathParam("schema") String schema,
- @PathParam("topic") String topic) {
+ @PathParam("metalake") @AuthorizationMetadata(type =
MetadataObject.Type.METALAKE)
+ String metalake,
+ @PathParam("catalog") @AuthorizationMetadata(type =
MetadataObject.Type.CATALOG)
+ String catalog,
+ @PathParam("schema") @AuthorizationMetadata(type =
MetadataObject.Type.SCHEMA) String schema,
+ @PathParam("topic") @AuthorizationMetadata(type =
MetadataObject.Type.TOPIC) String topic) {
LOG.info(
"Received load topic request for topic: {}.{}.{}.{}", metalake,
catalog, schema, topic);
try {
@@ -161,11 +189,19 @@ public class TopicOperations {
@Produces("application/vnd.gravitino.v1+json")
@Timed(name = "alter-topic." + MetricNames.HTTP_PROCESS_DURATION, absolute =
true)
@ResponseMetered(name = "alter-topic", absolute = true)
+ @AuthorizationExpression(
+ expression =
+ "ANY(OWNER,METALAKE,CATALOG) || "
+ + "SCHEMA_OWNER_WITH_USE_CATALOG || "
+ + "ANY_USE_CATALOG && ANY_USE_SCHEMA && (TOPIC::OWNER ||
ANY_PRODUCE_TOPIC)",
+ accessMetadataType = MetadataObject.Type.TOPIC)
public Response alterTopic(
- @PathParam("metalake") String metalake,
- @PathParam("catalog") String catalog,
- @PathParam("schema") String schema,
- @PathParam("topic") String topic,
+ @PathParam("metalake") @AuthorizationMetadata(type =
MetadataObject.Type.METALAKE)
+ String metalake,
+ @PathParam("catalog") @AuthorizationMetadata(type =
MetadataObject.Type.CATALOG)
+ String catalog,
+ @PathParam("schema") @AuthorizationMetadata(type =
MetadataObject.Type.SCHEMA) String schema,
+ @PathParam("topic") @AuthorizationMetadata(type =
MetadataObject.Type.TOPIC) String topic,
TopicUpdatesRequest request) {
LOG.info("Received alter topic request: {}.{}.{}.{}", metalake, catalog,
schema, topic);
try {
@@ -195,11 +231,19 @@ public class TopicOperations {
@Produces("application/vnd.gravitino.v1+json")
@Timed(name = "drop-topic." + MetricNames.HTTP_PROCESS_DURATION, absolute =
true)
@ResponseMetered(name = "drop-topic", absolute = true)
+ @AuthorizationExpression(
+ expression =
+ "ANY(OWNER,METALAKE,CATALOG) || "
+ + "SCHEMA_OWNER_WITH_USE_CATALOG || "
+ + "ANY_USE_CATALOG && ANY_USE_SCHEMA && TOPIC::OWNER",
+ accessMetadataType = MetadataObject.Type.TOPIC)
public Response dropTopic(
- @PathParam("metalake") String metalake,
- @PathParam("catalog") String catalog,
- @PathParam("schema") String schema,
- @PathParam("topic") String topic) {
+ @PathParam("metalake") @AuthorizationMetadata(type =
MetadataObject.Type.METALAKE)
+ String metalake,
+ @PathParam("catalog") @AuthorizationMetadata(type =
MetadataObject.Type.CATALOG)
+ String catalog,
+ @PathParam("schema") @AuthorizationMetadata(type =
MetadataObject.Type.SCHEMA) String schema,
+ @PathParam("topic") @AuthorizationMetadata(type =
MetadataObject.Type.TOPIC) String topic) {
LOG.info("Received drop topic request: {}.{}.{}.{}", metalake, catalog,
schema, topic);
try {
return Utils.doAs(
diff --git
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTopicOperations.java
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTopicOperations.java
index d5be6662f3..b838d62744 100644
---
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTopicOperations.java
+++
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTopicOperations.java
@@ -60,14 +60,13 @@ import org.apache.gravitino.messaging.TopicChange;
import org.apache.gravitino.rest.RESTUtils;
import org.glassfish.jersey.internal.inject.AbstractBinder;
import org.glassfish.jersey.server.ResourceConfig;
-import org.glassfish.jersey.test.JerseyTest;
import org.glassfish.jersey.test.TestProperties;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
-public class TestTopicOperations extends JerseyTest {
+public class TestTopicOperations extends BaseOperationsTest {
private static class MockServletRequestFactory extends
ServletRequestFactoryBase {
@Override
diff --git
a/server/src/test/java/org/apache/gravitino/server/web/rest/authorization/TestTopicAuthorizationExpression.java
b/server/src/test/java/org/apache/gravitino/server/web/rest/authorization/TestTopicAuthorizationExpression.java
new file mode 100644
index 0000000000..8e4b88aeb6
--- /dev/null
+++
b/server/src/test/java/org/apache/gravitino/server/web/rest/authorization/TestTopicAuthorizationExpression.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.server.web.rest.authorization;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.google.common.collect.ImmutableSet;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import ognl.OgnlException;
+import org.apache.gravitino.dto.requests.TopicCreateRequest;
+import org.apache.gravitino.dto.requests.TopicUpdatesRequest;
+import
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
+import org.apache.gravitino.server.web.rest.TopicOperations;
+import org.junit.jupiter.api.Test;
+
+public class TestTopicAuthorizationExpression {
+
+ @Test
+ public void testCreateTopic() throws NoSuchMethodException, OgnlException {
+ Method method =
+ TopicOperations.class.getMethod(
+ "createTopic", String.class, String.class, String.class,
TopicCreateRequest.class);
+ AuthorizationExpression authorizationExpressionAnnotation =
+ method.getAnnotation(AuthorizationExpression.class);
+ String expression = authorizationExpressionAnnotation.expression();
+ MockAuthorizationExpressionEvaluator mockEvaluator =
+ new MockAuthorizationExpressionEvaluator(expression);
+ assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG")));
+ assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+ assertTrue(mockEvaluator.getResult(ImmutableSet.of("CATALOG::OWNER")));
+ assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER")));
+ assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER",
"CATALOG::USE_CATALOG")));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_METALAKE")));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::CREATE_CATALOG")));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TOPIC")));
+ assertFalse(
+ mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TOPIC",
"CATALOG::USE_CATALOG")));
+ assertTrue(
+ mockEvaluator.getResult(
+ ImmutableSet.of("SCHEMA::CREATE_TOPIC", "SCHEMA::USE_SCHEMA",
"CATALOG::USE_CATALOG")));
+ }
+
+ @Test
+ public void testLoadTopics() throws OgnlException, NoSuchFieldException,
IllegalAccessException {
+ Field loadTopicsAuthorizationExpressionField =
+
TopicOperations.class.getDeclaredField("loadTopicsAuthorizationExpression");
+ loadTopicsAuthorizationExpressionField.setAccessible(true);
+ String loadTopicsAuthorizationExpression =
+ (String) loadTopicsAuthorizationExpressionField.get(null);
+ MockAuthorizationExpressionEvaluator mockEvaluator =
+ new
MockAuthorizationExpressionEvaluator(loadTopicsAuthorizationExpression);
+ assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG")));
+ assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_METALAKE")));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::CREATE_CATALOG")));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TOPIC")));
+ assertFalse(
+ mockEvaluator.getResult(
+ ImmutableSet.of("SCHEMA::CREATE_TOPIC",
"CATALOG::CREATE_CATALOG")));
+
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::PRODUCE_TOPIC")));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::PRODUCE_TOPIC")));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::PRODUCE_TOPIC")));
+ assertFalse(
+ mockEvaluator.getResult(ImmutableSet.of("SCHEMA::PRODUCE_TOPIC",
"CATALOG::USE_CATALOG")));
+ assertTrue(
+ mockEvaluator.getResult(
+ ImmutableSet.of(
+ "SCHEMA::PRODUCE_TOPIC", "SCHEMA::USE_SCHEMA",
"CATALOG::USE_CATALOG")));
+
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::CONSUME_TOPIC")));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::CONSUME_TOPIC")));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CONSUME_TOPIC")));
+ assertFalse(
+ mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CONSUME_TOPIC",
"CATALOG::USE_CATALOG")));
+ assertTrue(
+ mockEvaluator.getResult(
+ ImmutableSet.of(
+ "SCHEMA::CONSUME_TOPIC", "SCHEMA::USE_SCHEMA",
"CATALOG::USE_CATALOG")));
+
+ assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER")));
+ assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER",
"CATALOG::USE_CATALOG")));
+ assertTrue(
+ mockEvaluator.getResult(
+ ImmutableSet.of("SCHEMA::OWNER", "SCHEMA::USE_SCHEMA",
"CATALOG::USE_CATALOG")));
+ }
+
+ @Test
+ public void testAlterFileset() throws NoSuchMethodException, OgnlException {
+ Method method =
+ TopicOperations.class.getMethod(
+ "alterTopic",
+ String.class,
+ String.class,
+ String.class,
+ String.class,
+ TopicUpdatesRequest.class);
+ AuthorizationExpression authorizationExpressionAnnotation =
+ method.getAnnotation(AuthorizationExpression.class);
+ String expression = authorizationExpressionAnnotation.expression();
+ MockAuthorizationExpressionEvaluator mockEvaluator =
+ new MockAuthorizationExpressionEvaluator(expression);
+ assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG")));
+ assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_METALAKE")));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::CREATE_CATALOG")));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TOPIC")));
+ assertFalse(
+ mockEvaluator.getResult(
+ ImmutableSet.of("SCHEMA::CREATE_TOPIC",
"CATALOG::CREATE_CATALOG")));
+
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::PRODUCE_TOPIC")));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::PRODUCE_TOPIC")));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::PRODUCE_TOPIC")));
+ assertFalse(
+ mockEvaluator.getResult(ImmutableSet.of("SCHEMA::PRODUCE_TOPIC",
"CATALOG::USE_CATALOG")));
+ assertTrue(
+ mockEvaluator.getResult(
+ ImmutableSet.of(
+ "SCHEMA::PRODUCE_TOPIC", "SCHEMA::USE_SCHEMA",
"CATALOG::USE_CATALOG")));
+
+ assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER")));
+ assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER",
"CATALOG::USE_CATALOG")));
+ assertTrue(
+ mockEvaluator.getResult(
+ ImmutableSet.of("SCHEMA::OWNER", "SCHEMA::USE_SCHEMA",
"CATALOG::USE_CATALOG")));
+ }
+
+ @Test
+ public void testDropFileset() throws NoSuchMethodException, OgnlException {
+ Method method =
+ TopicOperations.class.getMethod(
+ "dropTopic", String.class, String.class, String.class,
String.class);
+ AuthorizationExpression authorizationExpressionAnnotation =
+ method.getAnnotation(AuthorizationExpression.class);
+ String expression = authorizationExpressionAnnotation.expression();
+ MockAuthorizationExpressionEvaluator mockEvaluator =
+ new MockAuthorizationExpressionEvaluator(expression);
+ assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG")));
+ assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_METALAKE")));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::CREATE_CATALOG")));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TOPIC")));
+ assertFalse(
+ mockEvaluator.getResult(
+ ImmutableSet.of("SCHEMA::CREATE_TOPIC",
"CATALOG::CREATE_CATALOG")));
+
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::PRODUCE_TOPIC")));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::PRODUCE_TOPIC")));
+
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::PRODUCE_TOPIC")));
+ assertFalse(
+ mockEvaluator.getResult(ImmutableSet.of("SCHEMA::PRODUCE_TOPIC",
"CATALOG::USE_CATALOG")));
+ assertFalse(
+ mockEvaluator.getResult(
+ ImmutableSet.of(
+ "SCHEMA::PRODUCE_TOPIC", "SCHEMA::USE_SCHEMA",
"CATALOG::USE_CATALOG")));
+
+ assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER")));
+ assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER",
"CATALOG::USE_CATALOG")));
+ assertTrue(
+ mockEvaluator.getResult(
+ ImmutableSet.of("SCHEMA::OWNER", "SCHEMA::USE_SCHEMA",
"CATALOG::USE_CATALOG")));
+ }
+}