This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new ed093460cb [#7724] feat(authz): Support Metalake Authorization (#7726)
ed093460cb is described below

commit ed093460cb10ae3cf58dc55b7ab7b885941c6867
Author: yangyang zhong <[email protected]>
AuthorDate: Wed Jul 16 17:44:36 2025 +0800

    [#7724] feat(authz): Support Metalake Authorization (#7726)
    
    ### What changes were proposed in this pull request?
    
    Support metalake authorization
    
    ### Why are the changes needed?
    
    close #7724
    
    ### Does this PR introduce _any_ user-facing change?
    
    None
    
    ### How was this patch tested?
    
    
    
org.apache.gravitino.client.integration.test.authorization.MetalakeAuthorizationIT
---
 .../authorization/BaseRestApiAuthorizationIT.java  |  10 +-
 .../test/authorization/CatalogAuthorizationIT.java |  16 ++
 .../authorization/MetalakeAuthorizationIT.java     | 172 +++++++++++++++++++++
 .../authorization/GravitinoAuthorizer.java         |  19 +++
 .../authorization/PassThroughAuthorizer.java       |   5 +
 .../annotations/AuthorizationExpression.java       |   9 +-
 .../AuthorizationExpressionConverter.java          |   2 +
 .../AuthorizationExpressionEvaluator.java          |   4 +-
 .../authorization/jcasbin/JcasbinAuthorizer.java   |  59 ++++---
 .../authorization/MockGravitinoAuthorizer.java     |   5 +
 .../jcasbin/TestJcasbinAuthorizer.java             |  24 +--
 .../web/filter/GravitinoInterceptionService.java   |  20 ++-
 .../server/web/rest/CatalogOperations.java         |  35 +++--
 .../server/web/rest/MetalakeOperations.java        |  47 +++++-
 .../filter/TestGravitinoInterceptionService.java   |   5 +
 .../server/web/rest/TestMetalakeOperations.java    |   3 +-
 16 files changed, 367 insertions(+), 68 deletions(-)

diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/BaseRestApiAuthorizationIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/BaseRestApiAuthorizationIT.java
index 0bc18b4eab..776bb23c5e 100644
--- 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/BaseRestApiAuthorizationIT.java
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/BaseRestApiAuthorizationIT.java
@@ -56,15 +56,21 @@ public class BaseRestApiAuthorizationIT extends BaseIT {
             Configs.AUTHORIZATION_IMPL.getKey(),
             JcasbinAuthorizer.class.getCanonicalName(),
             Configs.CACHE_ENABLED.getKey(),
-            "false"));
+            "false",
+            Configs.AUTHENTICATORS.getKey(),
+            "simple"));
+    putServiceAdmin();
     super.startIntegrationTest();
     client.createMetalake(METALAKE, "", new HashMap<>());
     GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
-    gravitinoMetalake.addUser(USER);
     gravitinoMetalake.addUser(NORMAL_USER);
     normalUserClient = 
GravitinoAdminClient.builder(serverUri).withSimpleAuth(NORMAL_USER).build();
   }
 
+  protected void putServiceAdmin() {
+    customConfigs.put(Configs.SERVICE_ADMINS.getKey(), USER);
+  }
+
   @AfterAll
   @Override
   public void stopIntegrationTest() throws IOException, InterruptedException {
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/CatalogAuthorizationIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/CatalogAuthorizationIT.java
index 0fcf7569b4..8da9efa068 100644
--- 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/CatalogAuthorizationIT.java
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/CatalogAuthorizationIT.java
@@ -95,6 +95,22 @@ public class CatalogAuthorizationIT extends 
BaseRestApiAuthorizationIT {
     assertArrayEquals(new String[] {catalog1, catalog2}, catalogs);
   }
 
+  @Test
+  @Order(2)
+  public void testListCatalogInfo() {
+    Catalog[] catalogs = 
normalUserClient.loadMetalake(METALAKE).listCatalogsInfo();
+    assertCatalogEquals(new String[] {}, catalogs);
+    catalogs = client.loadMetalake(METALAKE).listCatalogsInfo();
+    assertCatalogEquals(new String[] {catalog1, catalog2}, catalogs);
+  }
+
+  private void assertCatalogEquals(String[] expectCatalogs, Catalog[] 
actualCatalogs) {
+    assertEquals(expectCatalogs.length, actualCatalogs.length);
+    for (int i = 0; i < expectCatalogs.length; i++) {
+      assertEquals(expectCatalogs[i], actualCatalogs[i].name());
+    }
+  }
+
   @Test
   @Order(3)
   public void testDeleteCatalog() {
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/MetalakeAuthorizationIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/MetalakeAuthorizationIT.java
new file mode 100644
index 0000000000..cf994b3491
--- /dev/null
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/MetalakeAuthorizationIT.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.client.integration.test.authorization;
+
+import static org.junit.Assert.assertThrows;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.MetalakeChange;
+import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.dto.MetalakeDTO;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+@Tag("gravitino-docker-test")
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class MetalakeAuthorizationIT extends BaseRestApiAuthorizationIT {
+
+  private static final String SERVICE_ADMIN = "serviceAdmin";
+
+  private static final String SERVICE_ADMIN_BUT_NOT_OWNER = "serviceAdmin2";
+
+  private GravitinoAdminClient serviceAdminClient;
+
+  private GravitinoAdminClient serviceAdminButNotOwnerClient;
+
+  @BeforeAll
+  @Override
+  public void startIntegrationTest() throws Exception {
+    super.startIntegrationTest();
+    client.loadMetalake(METALAKE).addUser(SERVICE_ADMIN);
+    client.loadMetalake(METALAKE).addUser(SERVICE_ADMIN_BUT_NOT_OWNER);
+    serviceAdminClient =
+        
GravitinoAdminClient.builder(serverUri).withSimpleAuth(SERVICE_ADMIN).build();
+    serviceAdminButNotOwnerClient =
+        
GravitinoAdminClient.builder(serverUri).withSimpleAuth(SERVICE_ADMIN_BUT_NOT_OWNER).build();
+  }
+
+  @Override
+  protected void putServiceAdmin() {
+    customConfigs.put(Configs.SERVICE_ADMINS.getKey(), String.join(",", USER, 
SERVICE_ADMIN));
+  }
+
+  @Test
+  @Order(1)
+  public void testCreateMetalake() {
+    assertThrows(
+        "Only service admins can create metalakes, current user can't create 
the metalake,  you should configure it in the server configuration first",
+        RuntimeException.class,
+        () -> {
+          normalUserClient.createMetalake("testMetalake2", "", new 
HashMap<>());
+        });
+    serviceAdminClient.createMetalake("testMetalake2", "", new HashMap<>());
+    serviceAdminClient.createMetalake("testMetalake3", "", new HashMap<>());
+  }
+
+  @Test
+  @Order(2)
+  public void testListMetalake() {
+    assertMetalakeEquals(
+        new String[] {METALAKE, "testMetalake2", "testMetalake3"},
+        serviceAdminClient.listMetalakes());
+    assertMetalakeEquals(new String[] {METALAKE}, 
normalUserClient.listMetalakes());
+    assertMetalakeEquals(new String[] {METALAKE}, 
serviceAdminButNotOwnerClient.listMetalakes());
+    
serviceAdminClient.loadMetalake("testMetalake2").addUser(SERVICE_ADMIN_BUT_NOT_OWNER);
+    assertMetalakeEquals(
+        new String[] {METALAKE, "testMetalake2"}, 
serviceAdminButNotOwnerClient.listMetalakes());
+    GravitinoAdminClient tempClient =
+        
GravitinoAdminClient.builder(serverUri).withSimpleAuth("tempUse").build();
+    assertMetalakeEquals(new String[] {}, tempClient.listMetalakes());
+  }
+
+  @Test
+  @Order(3)
+  public void testLoadMetalake() {
+    serviceAdminClient.loadMetalake(METALAKE);
+    serviceAdminClient.loadMetalake("testMetalake2");
+    serviceAdminClient.loadMetalake("testMetalake3");
+    normalUserClient.loadMetalake(METALAKE);
+    assertThrows(
+        "Current user access metadata {testMetalake2}",
+        RuntimeException.class,
+        () -> {
+          normalUserClient.loadMetalake("testMetalake2");
+        });
+    assertThrows(
+        "Current user access metadata {testMetalake3}",
+        RuntimeException.class,
+        () -> {
+          normalUserClient.loadMetalake("testMetalake3");
+        });
+    serviceAdminButNotOwnerClient.loadMetalake(METALAKE);
+    serviceAdminButNotOwnerClient.loadMetalake("testMetalake2");
+    assertThrows(
+        "Current user access metadata {testMetalake3}",
+        RuntimeException.class,
+        () -> {
+          serviceAdminButNotOwnerClient.loadMetalake("testMetalake3");
+        });
+  }
+
+  @Test
+  @Order(4)
+  public void testSetMetalake() {
+    assertThrows(
+        "Current user access metadata {testMetalake2}",
+        RuntimeException.class,
+        () -> {
+          serviceAdminButNotOwnerClient.alterMetalake(
+              "testMetalake2", MetalakeChange.setProperty("key1", "value1"));
+        });
+    assertThrows(
+        "Current user access metadata {testMetalake2}",
+        RuntimeException.class,
+        () -> {
+          normalUserClient.alterMetalake(
+              "testMetalake2", MetalakeChange.setProperty("key1", "value1"));
+        });
+    serviceAdminClient.alterMetalake("testMetalake2", 
MetalakeChange.setProperty("key1", "value1"));
+  }
+
+  @Test
+  @Order(5)
+  public void testDropMetalake() {
+    assertThrows(
+        "Current user access metadata {testMetalake2}",
+        RuntimeException.class,
+        () -> {
+          serviceAdminButNotOwnerClient.dropMetalake("testMetalake3", true);
+        });
+    assertThrows(
+        "Current user access metadata {testMetalake2}",
+        RuntimeException.class,
+        () -> {
+          normalUserClient.dropMetalake("testMetalake3", true);
+        });
+    serviceAdminClient.dropMetalake("testMetalake3", true);
+  }
+
+  private void assertMetalakeEquals(
+      String[] expectedMetalakes, GravitinoMetalake[] actualMetalakes) {
+    Assertions.assertEquals(expectedMetalakes.length, actualMetalakes.length);
+    Arrays.sort(expectedMetalakes);
+    Arrays.sort(actualMetalakes, Comparator.comparing(MetalakeDTO::name));
+    for (int i = 0; i < expectedMetalakes.length; i++) {
+      Assertions.assertEquals(expectedMetalakes[i], actualMetalakes[i].name());
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
 
b/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
index 37370e4c6b..7a8a11f339 100644
--- 
a/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
@@ -60,6 +60,25 @@ public interface GravitinoAuthorizer extends Closeable {
    */
   boolean isOwner(Principal principal, String metalake, MetadataObject 
metadataObject);
 
+  /**
+   * Determine whether the user is the service admin.
+   *
+   * @return authorization result
+   */
+  default boolean isServiceAdmin() {
+    return true;
+  }
+
+  /**
+   * Determine whether the user is the metalake user
+   *
+   * @param metalake metalake
+   * @return authorization result
+   */
+  default boolean isMetalakeUser(String metalake) {
+    return true;
+  };
+
   /**
    * When the permissions of a role change, it is necessary to notify the 
GravitinoAuthorizer in
    * order to clear the cache.
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/PassThroughAuthorizer.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/PassThroughAuthorizer.java
index f785168a35..d98b04825d 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/PassThroughAuthorizer.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/PassThroughAuthorizer.java
@@ -48,6 +48,11 @@ public class PassThroughAuthorizer implements 
GravitinoAuthorizer {
     return true;
   }
 
+  @Override
+  public boolean isServiceAdmin() {
+    return true;
+  }
+
   @Override
   public void handleRolePrivilegeChange(Long roleId) {}
 
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/annotations/AuthorizationExpression.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/annotations/AuthorizationExpression.java
index 58be4fb206..3e22593313 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/annotations/AuthorizationExpression.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/annotations/AuthorizationExpression.java
@@ -43,5 +43,12 @@ public @interface AuthorizationExpression {
    *
    * @return accessMetadataType
    */
-  MetadataObject.Type accessMetadataType();
+  MetadataObject.Type accessMetadataType() default 
MetadataObject.Type.METALAKE;
+
+  /**
+   * Error message when authorization failed.
+   *
+   * @return error message
+   */
+  String errorMessage() default "";
 }
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
index 694dafbf6d..71054c6a34 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
@@ -127,6 +127,8 @@ public class AuthorizationExpressionConverter {
    * @return authorization expression
    */
   public static String replaceAnyPrivilege(String expression) {
+    expression = expression.replaceAll("SERVICE_ADMIN", 
"authorizer.isServiceAdmin()");
+    expression = expression.replaceAll("METALAKE_USER", 
"authorizer.isMetalakeUser(METALAKE_NAME)");
     expression = expression.replaceAll("ANY_USE_CATALOG", "(ANY(USE_CATALOG, 
METALAKE, CATALOG))");
     expression =
         expression.replaceAll("ANY_USE_SCHEMA", "(ANY(USE_SCHEMA, METALAKE, 
CATALOG, SCHEMA))");
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionEvaluator.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionEvaluator.java
index f9bb05c0b6..cca38ccd0e 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionEvaluator.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionEvaluator.java
@@ -19,6 +19,7 @@ package org.apache.gravitino.server.authorization.expression;
 
 import java.security.Principal;
 import java.util.Map;
+import java.util.Optional;
 import ognl.Ognl;
 import ognl.OgnlContext;
 import ognl.OgnlException;
@@ -67,7 +68,8 @@ public class AuthorizationExpressionEvaluator {
           ognlContext.put(metadataType.name(), metadataObject);
         });
     NameIdentifier nameIdentifier = 
metadataNames.get(Entity.EntityType.METALAKE);
-    ognlContext.put("METALAKE_NAME", nameIdentifier.name());
+    ognlContext.put(
+        "METALAKE_NAME", 
Optional.ofNullable(nameIdentifier).map(NameIdentifier::name).orElse(""));
     try {
       Object value = Ognl.getValue(ognlAuthorizationExpression, ognlContext);
       return (boolean) value;
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
index 0666593635..aea414ebaf 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
@@ -27,6 +27,9 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
@@ -37,13 +40,12 @@ import org.apache.gravitino.auth.AuthConstants;
 import org.apache.gravitino.authorization.GravitinoAuthorizer;
 import org.apache.gravitino.authorization.Privilege;
 import org.apache.gravitino.authorization.SecurableObject;
-import org.apache.gravitino.meta.BaseMetalake;
 import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.meta.UserEntity;
 import org.apache.gravitino.server.authorization.MetadataIdConverter;
-import org.apache.gravitino.storage.relational.service.UserMetaService;
 import org.apache.gravitino.utils.MetadataObjectUtil;
 import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
 import org.casbin.jcasbin.main.Enforcer;
 import org.casbin.jcasbin.model.Model;
 import org.slf4j.Logger;
@@ -57,6 +59,8 @@ public class JcasbinAuthorizer implements GravitinoAuthorizer 
{
   /** Jcasbin enforcer is used for metadata authorization. */
   private Enforcer enforcer;
 
+  private final Set<String> serviceAdmins = ConcurrentHashMap.newKeySet();
+
   /**
    * loadedRoles is used to cache roles that have loaded permissions. When the 
permissions of a role
    * are updated, they should be removed from it.
@@ -72,6 +76,10 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
       Model model = new Model();
       model.loadModelFromText(modelData);
       enforcer = new Enforcer(model, new GravitinoAdapter());
+      Config config = GravitinoEnv.getInstance().config();
+      if (config != null) {
+        serviceAdmins.addAll(config.get(Configs.SERVICE_ADMINS));
+      }
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -91,6 +99,28 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
     return authorizeInternal(principal, metalake, metadataObject, 
AuthConstants.OWNER);
   }
 
+  @Override
+  public boolean isServiceAdmin() {
+    return GravitinoEnv.getInstance()
+        .accessControlDispatcher()
+        .isServiceAdmin(PrincipalUtils.getCurrentUserName());
+  }
+
+  @Override
+  public boolean isMetalakeUser(String metalake) {
+    String currentUserName = PrincipalUtils.getCurrentUserName();
+    if (StringUtils.isBlank(currentUserName)) {
+      return false;
+    }
+    try {
+      return 
GravitinoEnv.getInstance().accessControlDispatcher().getUser(metalake, 
currentUserName)
+          != null;
+    } catch (Exception e) {
+      LOG.warn("Can not get user {} in metalake {}", currentUserName, 
metalake, e);
+      return false;
+    }
+  }
+
   @Override
   public void handleRolePrivilegeChange(Long roleId) {
     loadedRoles.remove(roleId);
@@ -121,21 +151,6 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
     return loadPrivilegeAndAuthorize(username, metalake, metadataObject, 
privilege);
   }
 
-  private static Long getMetalakeId(String metalake) {
-    try {
-      EntityStore entityStore = GravitinoEnv.getInstance().entityStore();
-      BaseMetalake metalakeEntity =
-          entityStore.get(
-              NameIdentifierUtil.ofMetalake(metalake),
-              Entity.EntityType.METALAKE,
-              BaseMetalake.class);
-      Long metalakeId = metalakeEntity.id();
-      return metalakeId;
-    } catch (Exception e) {
-      throw new RuntimeException("Can not get metalake id by entity store", e);
-    }
-  }
-
   private boolean authorizeByJcasbin(
       Long userId, MetadataObject metadataObject, Long metadataId, String 
privilege) {
     return enforcer.enforce(
@@ -147,10 +162,16 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
 
   private boolean loadPrivilegeAndAuthorize(
       String username, String metalake, MetadataObject metadataObject, String 
privilege) {
-    Long metalakeId = getMetalakeId(metalake);
-    Long userId = 
UserMetaService.getInstance().getUserIdByMetalakeIdAndName(metalakeId, 
username);
     Long metadataId;
+    Long userId;
     try {
+      EntityStore entityStore = GravitinoEnv.getInstance().entityStore();
+      UserEntity userEntity =
+          entityStore.get(
+              NameIdentifierUtil.ofUser(metalake, username),
+              Entity.EntityType.USER,
+              UserEntity.class);
+      userId = userEntity.id();
       metadataId = MetadataIdConverter.getID(metadataObject, metalake);
     } catch (Exception e) {
       LOG.debug("Can not get entity id", e);
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/MockGravitinoAuthorizer.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/MockGravitinoAuthorizer.java
index 4360b02f70..8bde1bb209 100644
--- 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/MockGravitinoAuthorizer.java
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/MockGravitinoAuthorizer.java
@@ -64,6 +64,11 @@ public class MockGravitinoAuthorizer implements 
GravitinoAuthorizer {
         && Objects.equals("metalakeWithOwner", metadataObject.name());
   }
 
+  @Override
+  public boolean isServiceAdmin() {
+    return false;
+  }
+
   @Override
   public void handleRolePrivilegeChange(Long roleId) {}
 
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
index 4c6ab83845..b1fc7bdb98 100644
--- 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
@@ -52,8 +52,6 @@ import org.apache.gravitino.meta.SchemaVersion;
 import org.apache.gravitino.meta.UserEntity;
 import org.apache.gravitino.server.authorization.MetadataIdConverter;
 import org.apache.gravitino.storage.relational.po.SecurableObjectPO;
-import org.apache.gravitino.storage.relational.service.MetalakeMetaService;
-import org.apache.gravitino.storage.relational.service.UserMetaService;
 import org.apache.gravitino.storage.relational.utils.POConverters;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.NamespaceUtil;
@@ -80,8 +78,6 @@ public class TestJcasbinAuthorizer {
 
   private static final String METALAKE = "testMetalake";
 
-  private static UserMetaService mockUserMetaService = 
mock(UserMetaService.class);
-
   private static EntityStore entityStore = mock(EntityStore.class);
 
   private static GravitinoEnv gravitinoEnv = mock(GravitinoEnv.class);
@@ -91,12 +87,8 @@ public class TestJcasbinAuthorizer {
 
   private static MockedStatic<PrincipalUtils> principalUtilsMockedStatic;
 
-  private static MockedStatic<UserMetaService> userMetaServiceMockedStatic;
-
   private static MockedStatic<GravitinoEnv> gravitinoEnvMockedStatic;
 
-  private static MockedStatic<MetalakeMetaService> 
metalakeMetaServiceMockedStatic;
-
   private static MockedStatic<MetadataIdConverter> 
metadataIdConverterMockedStatic;
 
   private static JcasbinAuthorizer jcasbinAuthorizer;
@@ -107,15 +99,10 @@ public class TestJcasbinAuthorizer {
   public static void setup() throws IOException {
     jcasbinAuthorizer = new JcasbinAuthorizer();
     jcasbinAuthorizer.initialize();
-    when(mockUserMetaService.getUserIdByMetalakeIdAndName(USER_METALAKE_ID, 
USERNAME))
-        .thenReturn(USER_ID);
     principalUtilsMockedStatic = mockStatic(PrincipalUtils.class);
-    userMetaServiceMockedStatic = mockStatic(UserMetaService.class);
-    metalakeMetaServiceMockedStatic = mockStatic(MetalakeMetaService.class);
     metadataIdConverterMockedStatic = mockStatic(MetadataIdConverter.class);
     gravitinoEnvMockedStatic = mockStatic(GravitinoEnv.class);
     
gravitinoEnvMockedStatic.when(GravitinoEnv::getInstance).thenReturn(gravitinoEnv);
-    
userMetaServiceMockedStatic.when(UserMetaService::getInstance).thenReturn(mockUserMetaService);
     principalUtilsMockedStatic
         .when(PrincipalUtils::getCurrentPrincipal)
         .thenReturn(new UserPrincipal(USERNAME));
@@ -124,6 +111,11 @@ public class TestJcasbinAuthorizer {
         .thenReturn(CATALOG_ID);
     when(gravitinoEnv.entityStore()).thenReturn(entityStore);
     
when(entityStore.relationOperations()).thenReturn(supportsRelationOperations);
+    when(entityStore.get(
+            eq(NameIdentifierUtil.ofUser(METALAKE, USERNAME)),
+            eq(Entity.EntityType.USER),
+            eq(UserEntity.class)))
+        .thenReturn(getUserEntity());
     BaseMetalake baseMetalake =
         BaseMetalake.builder()
             .withId(USER_METALAKE_ID)
@@ -143,12 +135,6 @@ public class TestJcasbinAuthorizer {
     if (principalUtilsMockedStatic != null) {
       principalUtilsMockedStatic.close();
     }
-    if (userMetaServiceMockedStatic != null) {
-      userMetaServiceMockedStatic.close();
-    }
-    if (metalakeMetaServiceMockedStatic != null) {
-      metalakeMetaServiceMockedStatic.close();
-    }
     if (metadataIdConverterMockedStatic != null) {
       metadataIdConverterMockedStatic.close();
     }
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
index 7d26113aaf..c185500119 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
@@ -28,9 +28,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import javax.ws.rs.core.Response;
 import org.aopalliance.intercept.ConstructorInterceptor;
 import org.aopalliance.intercept.MethodInterceptor;
 import org.aopalliance.intercept.MethodInvocation;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
@@ -41,6 +43,7 @@ import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpress
 import org.apache.gravitino.server.web.Utils;
 import org.apache.gravitino.server.web.rest.CatalogOperations;
 import org.apache.gravitino.server.web.rest.FilesetOperations;
+import org.apache.gravitino.server.web.rest.MetalakeOperations;
 import org.apache.gravitino.server.web.rest.ModelOperations;
 import org.apache.gravitino.server.web.rest.SchemaOperations;
 import org.apache.gravitino.server.web.rest.TableOperations;
@@ -61,6 +64,7 @@ public class GravitinoInterceptionService implements 
InterceptionService {
   public Filter getDescriptorFilter() {
     return new ClassListFilter(
         ImmutableSet.of(
+            MetalakeOperations.class.getName(),
             CatalogOperations.class.getName(),
             SchemaOperations.class.getName(),
             TableOperations.class.getName(),
@@ -112,9 +116,8 @@ public class GravitinoInterceptionService implements 
InterceptionService {
             MetadataObject.Type type = 
expressionAnnotation.accessMetadataType();
             NameIdentifier accessMetadataName =
                 metadataContext.get(Entity.EntityType.valueOf(type.name()));
-            return Utils.forbidden(
-                String.format("Can not access metadata {%s}.", 
accessMetadataName.name()),
-                new ForbiddenException("Can not access metadata {%s}.", 
accessMetadataName));
+            String errorMessage = expressionAnnotation.errorMessage();
+            return buildNoAuthResponse(errorMessage, accessMetadataName);
           }
         }
         return methodInvocation.proceed();
@@ -123,6 +126,17 @@ public class GravitinoInterceptionService implements 
InterceptionService {
       }
     }
 
+    private Response buildNoAuthResponse(String errorMessage, NameIdentifier 
accessMetadataName) {
+      if (StringUtils.isNotBlank(errorMessage)) {
+        return Utils.forbidden(
+            errorMessage,
+            new ForbiddenException("Can not access metadata, cause by: %s", 
errorMessage));
+      }
+      return Utils.forbidden(
+          String.format("Can not access metadata {%s}.", 
accessMetadataName.name()),
+          new ForbiddenException("Can not access metadata {%s}.", 
accessMetadataName));
+    }
+
     private Map<Entity.EntityType, NameIdentifier> 
extractNameIdentifierFromParameters(
         Parameter[] parameters, Object[] args) {
       Map<Entity.EntityType, String> metadatas = new HashMap<>();
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/CatalogOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/CatalogOperations.java
index 0b8e3900c5..c2a36cf0ca 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/CatalogOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/CatalogOperations.java
@@ -103,23 +103,24 @@ public class CatalogOperations {
             // Lock the root and the metalake with WRITE lock to ensure the 
consistency of the list.
             if (verbose) {
               Catalog[] catalogs = 
catalogDispatcher.listCatalogsInfo(catalogNS);
-              Arrays.stream(catalogs)
-                  .filter(
-                      catalog -> {
-                        NameIdentifier[] nameIdentifiers =
-                            new NameIdentifier[] {
-                              NameIdentifierUtil.ofCatalog(metalake, 
catalog.name())
-                            };
-                        return MetadataFilterHelper.filterByExpression(
-                                    metalake,
-                                    loadCatalogAuthorizationExpression,
-                                    Entity.EntityType.CATALOG,
-                                    nameIdentifiers)
-                                .length
-                            > 0;
-                      })
-                  .collect(Collectors.toList())
-                  .toArray(new Catalog[0]);
+              catalogs =
+                  Arrays.stream(catalogs)
+                      .filter(
+                          catalog -> {
+                            NameIdentifier[] nameIdentifiers =
+                                new NameIdentifier[] {
+                                  NameIdentifierUtil.ofCatalog(metalake, 
catalog.name())
+                                };
+                            return MetadataFilterHelper.filterByExpression(
+                                        metalake,
+                                        loadCatalogAuthorizationExpression,
+                                        Entity.EntityType.CATALOG,
+                                        nameIdentifiers)
+                                    .length
+                                > 0;
+                          })
+                      .collect(Collectors.toList())
+                      .toArray(new Catalog[0]);
               Response response = Utils.ok(new 
CatalogListResponse(DTOConverters.toDTOs(catalogs)));
               LOG.info("List {} catalogs info under metalake: {}", 
catalogs.length, metalake);
               return response;
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/MetalakeOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/MetalakeOperations.java
index 827c7a0b9d..e4f2568d10 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/MetalakeOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/MetalakeOperations.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.server.web.rest;
 import com.codahale.metrics.annotation.ResponseMetered;
 import com.codahale.metrics.annotation.Timed;
 import java.util.Arrays;
+import java.util.stream.Collectors;
 import javax.inject.Inject;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
@@ -37,6 +38,8 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.Metalake;
 import org.apache.gravitino.MetalakeChange;
 import org.apache.gravitino.NameIdentifier;
@@ -53,6 +56,9 @@ import org.apache.gravitino.dto.responses.MetalakeResponse;
 import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.metalake.MetalakeDispatcher;
 import org.apache.gravitino.metrics.MetricNames;
+import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
 import org.apache.gravitino.server.web.Utils;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.slf4j.Logger;
@@ -85,6 +91,22 @@ public class MetalakeOperations {
           httpRequest,
           () -> {
             Metalake[] metalakes = metalakeDispatcher.listMetalakes();
+            metalakes =
+                Arrays.stream(metalakes)
+                    .filter(
+                        metalake -> {
+                          NameIdentifier[] nameIdentifiers =
+                              new NameIdentifier[] 
{NameIdentifierUtil.ofMetalake(metalake.name())};
+                          return MetadataFilterHelper.filterByExpression(
+                                      metalake.name(),
+                                      "METALAKE_USER",
+                                      Entity.EntityType.METALAKE,
+                                      nameIdentifiers)
+                                  .length
+                              > 0;
+                        })
+                    .collect(Collectors.toList())
+                    .toArray(new Metalake[0]);
             MetalakeDTO[] metalakeDTOs =
                 
Arrays.stream(metalakes).map(DTOConverters::toDTO).toArray(MetalakeDTO[]::new);
             Response response = Utils.ok(new 
MetalakeListResponse(metalakeDTOs));
@@ -102,6 +124,11 @@ public class MetalakeOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "create-metalake." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "create-metalake", absolute = true)
+  @AuthorizationExpression(
+      expression = "SERVICE_ADMIN",
+      errorMessage =
+          "Only service admins can create metalakes, current user can't create 
the metalake,"
+              + "  you should configure it in the server configuration first")
   public Response createMetalake(MetalakeCreateRequest request) {
     LOG.info("Received create metalake request for {}", request.getName());
     try {
@@ -128,7 +155,10 @@ public class MetalakeOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "load-metalake." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
   @ResponseMetered(name = "load-metalake", absolute = true)
-  public Response loadMetalake(@PathParam("name") String metalakeName) {
+  @AuthorizationExpression(expression = "METALAKE_USER")
+  public Response loadMetalake(
+      @PathParam("name") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
+          String metalakeName) {
     LOG.info("Received load metalake request for metalake: {}", metalakeName);
     try {
       return Utils.doAs(
@@ -151,7 +181,11 @@ public class MetalakeOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "set-metalake." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
   @ResponseMetered(name = "set-metalake", absolute = true)
-  public Response setMetalake(@PathParam("name") String metalakeName, 
MetalakeSetRequest request) {
+  @AuthorizationExpression(expression = "METALAKE::OWNER")
+  public Response setMetalake(
+      @PathParam("name") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
+          String metalakeName,
+      MetalakeSetRequest request) {
     LOG.info("Received set request for metalake: {}", metalakeName);
     try {
       return Utils.doAs(
@@ -182,8 +216,11 @@ public class MetalakeOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "alter-metalake." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "alter-metalake", absolute = true)
+  @AuthorizationExpression(expression = "METALAKE::OWNER")
   public Response alterMetalake(
-      @PathParam("name") String metalakeName, MetalakeUpdatesRequest 
updatesRequest) {
+      @PathParam("name") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
+          String metalakeName,
+      MetalakeUpdatesRequest updatesRequest) {
     LOG.info("Received alter metalake request for metalake: {}", metalakeName);
     try {
       return Utils.doAs(
@@ -212,8 +249,10 @@ public class MetalakeOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "drop-metalake." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
   @ResponseMetered(name = "drop-metalake", absolute = true)
+  @AuthorizationExpression(expression = "METALAKE::OWNER")
   public Response dropMetalake(
-      @PathParam("name") String metalakeName,
+      @PathParam("name") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
+          String metalakeName,
       @DefaultValue("false") @QueryParam("force") boolean force) {
     LOG.info("Received drop metalake request for metalake: {}", metalakeName);
     try {
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/filter/TestGravitinoInterceptionService.java
 
b/server/src/test/java/org/apache/gravitino/server/web/filter/TestGravitinoInterceptionService.java
index 7e803cfa26..73f7c69559 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/filter/TestGravitinoInterceptionService.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/filter/TestGravitinoInterceptionService.java
@@ -113,6 +113,11 @@ public class TestGravitinoInterceptionService {
       return false;
     }
 
+    @Override
+    public boolean isServiceAdmin() {
+      return false;
+    }
+
     @Override
     public void handleRolePrivilegeChange(Long roleId) {}
 
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestMetalakeOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestMetalakeOperations.java
index 9b265e016d..a3b59dca42 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestMetalakeOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestMetalakeOperations.java
@@ -69,14 +69,13 @@ import 
org.apache.gravitino.server.web.mapper.JsonParseExceptionMapper;
 import org.apache.gravitino.server.web.mapper.JsonProcessingExceptionMapper;
 import org.glassfish.hk2.utilities.binding.AbstractBinder;
 import org.glassfish.jersey.server.ResourceConfig;
-import org.glassfish.jersey.test.JerseyTest;
 import org.glassfish.jersey.test.TestProperties;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
-public class TestMetalakeOperations extends JerseyTest {
+public class TestMetalakeOperations extends BaseOperationsTest {
 
   private static class MockServletRequestFactory extends 
ServletRequestFactoryBase {
     @Override


Reply via email to