This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 0804634db [#4236] feat(core): Rework of post hook for dispatcher 
(#4449)
0804634db is described below

commit 0804634db88d895b1b11feff49c98d6a7d2fb711
Author: roryqi <[email protected]>
AuthorDate: Tue Aug 13 09:44:11 2024 +0800

    [#4236] feat(core): Rework of post hook for dispatcher (#4449)
    
    ### What changes were proposed in this pull request?
    
    Rework of post hook for dispatcher .
    
    1. Implement a new class dispatcher instead dynamic proxy.
    2. Optimize the lock logic
    3. Add the securable object metalake back. We just remove the privileges
    of the metalake. We should remove the securable object metalake.
    
    ### Why are the changes needed?
    
    Fix: #4236
    
    ### Does this PR introduce _any_ user-facing change?
    NO.
    
    ### How was this patch tested?
    
    Add new ITs.
---
 .../gravitino/authorization/SecurableObjects.java  |  27 +-
 .../authorization/TestSecurableObjects.java        |  13 +
 .../java/org/apache/gravitino/GravitinoEnv.java    |  62 ++---
 .../authorization/AuthorizationUtils.java          |   7 -
 .../gravitino/authorization/OwnerManager.java      |  63 +++--
 .../hook/AccessControlHookDispatcher.java          | 150 +++++++++++
 .../gravitino/hook/CatalogHookDispatcher.java      | 112 +++++++++
 .../gravitino/hook/DispatcherHookHelper.java       |  35 ---
 .../apache/gravitino/hook/DispatcherHookProxy.java |  44 ----
 .../org/apache/gravitino/hook/DispatcherHooks.java |  52 ----
 .../gravitino/hook/FilesetHookDispatcher.java      |  96 ++++++++
 .../gravitino/hook/MetalakeHookDispatcher.java     |  98 ++++++++
 .../gravitino/hook/SchemaHookDispatcher.java       |  92 +++++++
 .../apache/gravitino/hook/TableHookDispatcher.java | 111 +++++++++
 .../apache/gravitino/hook/TopicHookDispatcher.java |  93 +++++++
 .../apache/gravitino/hook/TestDispatcherHooks.java |  79 ------
 .../test/authorization/OwnerPostHookIT.java        | 274 +++++++++++++++++++++
 .../gravitino/server/web/rest/OwnerOperations.java |  13 +-
 18 files changed, 1124 insertions(+), 297 deletions(-)

diff --git 
a/api/src/main/java/org/apache/gravitino/authorization/SecurableObjects.java 
b/api/src/main/java/org/apache/gravitino/authorization/SecurableObjects.java
index 22e2b258b..c5ab814eb 100644
--- a/api/src/main/java/org/apache/gravitino/authorization/SecurableObjects.java
+++ b/api/src/main/java/org/apache/gravitino/authorization/SecurableObjects.java
@@ -34,7 +34,18 @@ public class SecurableObjects {
   private static final Splitter DOT_SPLITTER = Splitter.on('.');
 
   /**
-   * Create the catalog {@link SecurableObject} with the given catalog name.
+   * Create the metalake {@link SecurableObject} with the given metalake name 
and privileges.
+   *
+   * @param metalake The metalake name
+   * @param privileges The privileges of the metalake
+   * @return The created metalake {@link SecurableObject}
+   */
+  public static SecurableObject ofMetalake(String metalake, List<Privilege> 
privileges) {
+    return of(MetadataObject.Type.METALAKE, Lists.newArrayList(metalake), 
privileges);
+  }
+
+  /**
+   * Create the catalog {@link SecurableObject} with the given catalog name 
and privileges.
    *
    * @param catalog The catalog name
    * @param privileges The privileges of the catalog
@@ -45,8 +56,8 @@ public class SecurableObjects {
   }
 
   /**
-   * Create the schema {@link SecurableObject} with the given securable 
catalog object and schema
-   * name.
+   * Create the schema {@link SecurableObject} with the given securable 
catalog object, schema name
+   * and privileges.
    *
    * @param catalog The catalog securable object.
    * @param schema The schema name
@@ -60,7 +71,8 @@ public class SecurableObjects {
   }
 
   /**
-   * Create the table {@link SecurableObject} with the given securable schema 
object and table name.
+   * Create the table {@link SecurableObject} with the given securable schema 
object, table name and
+   * privileges.
    *
    * @param schema The schema securable object
    * @param table The table name
@@ -75,7 +87,8 @@ public class SecurableObjects {
   }
 
   /**
-   * Create the topic {@link SecurableObject} with the given securable schema 
object and topic name.
+   * Create the topic {@link SecurableObject} with the given securable schema 
object ,topic name and
+   * privileges.
    *
    * @param schema The schema securable object
    * @param topic The topic name
@@ -90,8 +103,8 @@ public class SecurableObjects {
   }
 
   /**
-   * Create the table {@link SecurableObject} with the given securable schema 
object and fileset
-   * name.
+   * Create the table {@link SecurableObject} with the given securable schema 
object, fileset name
+   * and privileges.
    *
    * @param schema The schema securable object
    * @param fileset The fileset name
diff --git 
a/api/src/test/java/org/apache/gravitino/authorization/TestSecurableObjects.java
 
b/api/src/test/java/org/apache/gravitino/authorization/TestSecurableObjects.java
index 256636397..82374f676 100644
--- 
a/api/src/test/java/org/apache/gravitino/authorization/TestSecurableObjects.java
+++ 
b/api/src/test/java/org/apache/gravitino/authorization/TestSecurableObjects.java
@@ -27,6 +27,19 @@ public class TestSecurableObjects {
 
   @Test
   public void testSecurableObjects() {
+
+    SecurableObject metalake =
+        SecurableObjects.ofMetalake(
+            "metalake", Lists.newArrayList(Privileges.CreateCatalog.allow()));
+    Assertions.assertEquals("metalake", metalake.fullName());
+    Assertions.assertEquals(MetadataObject.Type.METALAKE, metalake.type());
+    SecurableObject anotherMetalake =
+        SecurableObjects.of(
+            MetadataObject.Type.METALAKE,
+            Lists.newArrayList("metalake"),
+            Lists.newArrayList(Privileges.CreateCatalog.allow()));
+    Assertions.assertEquals(metalake, anotherMetalake);
+
     SecurableObject catalog =
         SecurableObjects.ofCatalog("catalog", 
Lists.newArrayList(Privileges.UseCatalog.allow()));
     Assertions.assertEquals("catalog", catalog.fullName());
diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java 
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index 8a0f19f99..6e97ed1eb 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -21,7 +21,6 @@ package org.apache.gravitino;
 import com.google.common.base.Preconditions;
 import org.apache.gravitino.authorization.AccessControlDispatcher;
 import org.apache.gravitino.authorization.AccessControlManager;
-import org.apache.gravitino.authorization.AuthorizationUtils;
 import org.apache.gravitino.authorization.OwnerManager;
 import org.apache.gravitino.auxiliary.AuxiliaryServiceManager;
 import org.apache.gravitino.catalog.CatalogDispatcher;
@@ -42,8 +41,13 @@ import org.apache.gravitino.catalog.TableOperationDispatcher;
 import org.apache.gravitino.catalog.TopicDispatcher;
 import org.apache.gravitino.catalog.TopicNormalizeDispatcher;
 import org.apache.gravitino.catalog.TopicOperationDispatcher;
-import org.apache.gravitino.hook.DispatcherHookHelper;
-import org.apache.gravitino.hook.DispatcherHooks;
+import org.apache.gravitino.hook.AccessControlHookDispatcher;
+import org.apache.gravitino.hook.CatalogHookDispatcher;
+import org.apache.gravitino.hook.FilesetHookDispatcher;
+import org.apache.gravitino.hook.MetalakeHookDispatcher;
+import org.apache.gravitino.hook.SchemaHookDispatcher;
+import org.apache.gravitino.hook.TableHookDispatcher;
+import org.apache.gravitino.hook.TopicHookDispatcher;
 import org.apache.gravitino.listener.CatalogEventDispatcher;
 import org.apache.gravitino.listener.EventBus;
 import org.apache.gravitino.listener.EventListenerManager;
@@ -348,28 +352,30 @@ public class GravitinoEnv {
 
     // Create and initialize metalake related modules
     MetalakeDispatcher metalakeManager = new MetalakeManager(entityStore, 
idGenerator);
+    MetalakeHookDispatcher metalakeHookDispatcher = new 
MetalakeHookDispatcher(metalakeManager);
     MetalakeNormalizeDispatcher metalakeNormalizeDispatcher =
-        new 
MetalakeNormalizeDispatcher(installDispatcherHooks(metalakeManager));
+        new MetalakeNormalizeDispatcher(metalakeHookDispatcher);
     this.metalakeDispatcher = new MetalakeEventDispatcher(eventBus, 
metalakeNormalizeDispatcher);
 
     // Create and initialize Catalog related modules
     this.catalogManager = new CatalogManager(config, entityStore, idGenerator);
+    CatalogHookDispatcher catalogHookDispatcher = new 
CatalogHookDispatcher(catalogManager);
     CatalogNormalizeDispatcher catalogNormalizeDispatcher =
-        new 
CatalogNormalizeDispatcher(installDispatcherHooks((CatalogDispatcher) 
catalogManager));
+        new CatalogNormalizeDispatcher(catalogHookDispatcher);
     this.catalogDispatcher = new CatalogEventDispatcher(eventBus, 
catalogNormalizeDispatcher);
 
     SchemaOperationDispatcher schemaOperationDispatcher =
         new SchemaOperationDispatcher(catalogManager, entityStore, 
idGenerator);
+    SchemaHookDispatcher schemaHookDispatcher = new 
SchemaHookDispatcher(schemaOperationDispatcher);
     SchemaNormalizeDispatcher schemaNormalizeDispatcher =
-        new SchemaNormalizeDispatcher(
-            installDispatcherHooks((SchemaDispatcher) 
schemaOperationDispatcher), catalogManager);
+        new SchemaNormalizeDispatcher(schemaHookDispatcher, catalogManager);
     this.schemaDispatcher = new SchemaEventDispatcher(eventBus, 
schemaNormalizeDispatcher);
 
     TableOperationDispatcher tableOperationDispatcher =
         new TableOperationDispatcher(catalogManager, entityStore, idGenerator);
+    TableHookDispatcher tableHookDispatcher = new 
TableHookDispatcher(tableOperationDispatcher);
     TableNormalizeDispatcher tableNormalizeDispatcher =
-        new TableNormalizeDispatcher(
-            installDispatcherHooks((TableDispatcher) 
tableOperationDispatcher), catalogManager);
+        new TableNormalizeDispatcher(tableHookDispatcher, catalogManager);
     this.tableDispatcher = new TableEventDispatcher(eventBus, 
tableNormalizeDispatcher);
 
     // TODO: We can install hooks when we need, we only supports ownership 
post hook,
@@ -382,24 +388,27 @@ public class GravitinoEnv {
 
     FilesetOperationDispatcher filesetOperationDispatcher =
         new FilesetOperationDispatcher(catalogManager, entityStore, 
idGenerator);
+    FilesetHookDispatcher filesetHookDispatcher =
+        new FilesetHookDispatcher(filesetOperationDispatcher);
     FilesetNormalizeDispatcher filesetNormalizeDispatcher =
-        new FilesetNormalizeDispatcher(
-            installDispatcherHooks((FilesetDispatcher) 
filesetOperationDispatcher), catalogManager);
+        new FilesetNormalizeDispatcher(filesetHookDispatcher, catalogManager);
     this.filesetDispatcher = new FilesetEventDispatcher(eventBus, 
filesetNormalizeDispatcher);
 
     TopicOperationDispatcher topicOperationDispatcher =
         new TopicOperationDispatcher(catalogManager, entityStore, idGenerator);
+    TopicHookDispatcher topicHookDispatcher = new 
TopicHookDispatcher(topicOperationDispatcher);
     TopicNormalizeDispatcher topicNormalizeDispatcher =
-        new TopicNormalizeDispatcher(
-            installDispatcherHooks((TopicDispatcher) 
topicOperationDispatcher), catalogManager);
+        new TopicNormalizeDispatcher(topicHookDispatcher, catalogManager);
     this.topicDispatcher = new TopicEventDispatcher(eventBus, 
topicNormalizeDispatcher);
 
     // Create and initialize access control related modules
     boolean enableAuthorization = config.get(Configs.ENABLE_AUTHORIZATION);
     if (enableAuthorization) {
-      this.accessControlDispatcher =
-          installDispatcherHooks(
-              (AccessControlDispatcher) new AccessControlManager(entityStore, 
idGenerator, config));
+      AccessControlHookDispatcher accessControlHookDispatcher =
+          new AccessControlHookDispatcher(
+              new AccessControlManager(entityStore, idGenerator, config));
+
+      this.accessControlDispatcher = accessControlHookDispatcher;
       this.ownerManager = new OwnerManager(entityStore);
     } else {
       this.accessControlDispatcher = null;
@@ -415,25 +424,4 @@ public class GravitinoEnv {
     // Tag manager
     this.tagManager = new TagManager(idGenerator, entityStore);
   }
-
-  // Provides a universal entrance to install dispatcher hooks. This method
-  // focuses the logic of installing hooks.
-  // We should reuse the ability of 
(Metalake|Schema|Table|Fileset|...)NormalizeDispatcher to avoid
-  // solving
-  // normalization names, this is useful for pre-hooks.
-  // so we can't install the hooks for the outside of
-  // (Metalake|Schema|Table|Fileset|...)NormalizeDispatcher.
-  private <T> T installDispatcherHooks(T manager) {
-    boolean enableAuthorization = config.get(Configs.ENABLE_AUTHORIZATION);
-    DispatcherHooks hooks = new DispatcherHooks();
-    if (enableAuthorization) {
-      AuthorizationUtils.prepareAuthorizationHooks(manager, hooks);
-    }
-
-    if (hooks.isEmpty()) {
-      return manager;
-    }
-
-    return DispatcherHookHelper.installHooks(manager, hooks);
-  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
index 875cd6fb6..5e16c5bcb 100644
--- 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
@@ -25,7 +25,6 @@ import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
-import org.apache.gravitino.hook.DispatcherHooks;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -117,10 +116,4 @@ public class AuthorizationUtils {
         "Role namespace must have 3 levels, the input namespace is %s",
         namespace);
   }
-
-  // Install some post hooks used for owner. The owner will have the all 
privileges
-  // of securable objects, users, groups, roles.
-  public static <T> void prepareAuthorizationHooks(T manager, DispatcherHooks 
hooks) {
-    // TODO: Refactor the post hook by adding new dispatcher
-  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java 
b/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java
index f79164285..ec1b26438 100644
--- a/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java
+++ b/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java
@@ -40,7 +40,8 @@ import org.slf4j.LoggerFactory;
 
 /**
  * OwnerManager is used for manage the owner of metadata object. The user and 
group don't have an
- * owner
+ * owner. Because the post hook will call the methods. We shouldn't add the 
lock of the metadata
+ * object. Otherwise, it will cause deadlock.
  */
 public class OwnerManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(OwnerManager.class);
@@ -71,45 +72,37 @@ public class OwnerManager {
       if (ownerType == Owner.Type.USER) {
         NameIdentifier ownerIdent = AuthorizationUtils.ofUser(metalake, 
ownerName);
         TreeLockUtils.doWithTreeLock(
-            objectIdent,
+            ownerIdent,
             LockType.READ,
-            () ->
-                TreeLockUtils.doWithTreeLock(
-                    ownerIdent,
-                    LockType.READ,
-                    () -> {
-                      store
-                          .relationOperations()
-                          .insertRelation(
-                              SupportsRelationOperations.Type.OWNER_REL,
-                              objectIdent,
-                              MetadataObjectUtil.toEntityType(metadataObject),
-                              ownerIdent,
-                              Entity.EntityType.USER,
-                              true);
-                      return null;
-                    }));
+            () -> {
+              store
+                  .relationOperations()
+                  .insertRelation(
+                      SupportsRelationOperations.Type.OWNER_REL,
+                      objectIdent,
+                      MetadataObjectUtil.toEntityType(metadataObject),
+                      ownerIdent,
+                      Entity.EntityType.USER,
+                      true);
+              return null;
+            });
       } else if (ownerType == Owner.Type.GROUP) {
         NameIdentifier ownerIdent = AuthorizationUtils.ofGroup(metalake, 
ownerName);
         TreeLockUtils.doWithTreeLock(
-            objectIdent,
+            ownerIdent,
             LockType.READ,
-            () ->
-                TreeLockUtils.doWithTreeLock(
-                    ownerIdent,
-                    LockType.READ,
-                    () -> {
-                      store
-                          .relationOperations()
-                          .insertRelation(
-                              SupportsRelationOperations.Type.OWNER_REL,
-                              objectIdent,
-                              MetadataObjectUtil.toEntityType(metadataObject),
-                              ownerIdent,
-                              Entity.EntityType.GROUP,
-                              true);
-                      return null;
-                    }));
+            () -> {
+              store
+                  .relationOperations()
+                  .insertRelation(
+                      SupportsRelationOperations.Type.OWNER_REL,
+                      objectIdent,
+                      MetadataObjectUtil.toEntityType(metadataObject),
+                      ownerIdent,
+                      Entity.EntityType.GROUP,
+                      true);
+              return null;
+            });
       }
     } catch (NoSuchEntityException nse) {
       LOG.warn(
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/AccessControlHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/AccessControlHookDispatcher.java
new file mode 100644
index 000000000..44dc491a7
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/hook/AccessControlHookDispatcher.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.authorization.AccessControlDispatcher;
+import org.apache.gravitino.authorization.AuthorizationUtils;
+import org.apache.gravitino.authorization.Group;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerManager;
+import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.User;
+import org.apache.gravitino.exceptions.GroupAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchGroupException;
+import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.exceptions.NoSuchRoleException;
+import org.apache.gravitino.exceptions.NoSuchUserException;
+import org.apache.gravitino.exceptions.RoleAlreadyExistsException;
+import org.apache.gravitino.exceptions.UserAlreadyExistsException;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+
+/**
+ * {@code AccessControlHookDispatcher} is a decorator for {@link 
AccessControlDispatcher} that not
+ * only delegates access control operations to the underlying access control 
dispatcher but also
+ * executes some hook operations before or after the underlying operations.
+ */
+public class AccessControlHookDispatcher implements AccessControlDispatcher {
+  private final AccessControlDispatcher dispatcher;
+
+  public AccessControlHookDispatcher(AccessControlDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public User addUser(String metalake, String user)
+      throws UserAlreadyExistsException, NoSuchMetalakeException {
+    return dispatcher.addUser(metalake, user);
+  }
+
+  @Override
+  public boolean removeUser(String metalake, String user) throws 
NoSuchMetalakeException {
+    return dispatcher.removeUser(metalake, user);
+  }
+
+  @Override
+  public User getUser(String metalake, String user)
+      throws NoSuchUserException, NoSuchMetalakeException {
+    return dispatcher.getUser(metalake, user);
+  }
+
+  @Override
+  public Group addGroup(String metalake, String group)
+      throws GroupAlreadyExistsException, NoSuchMetalakeException {
+    return dispatcher.addGroup(metalake, group);
+  }
+
+  @Override
+  public boolean removeGroup(String metalake, String group) throws 
NoSuchMetalakeException {
+    return dispatcher.removeGroup(metalake, group);
+  }
+
+  @Override
+  public Group getGroup(String metalake, String group)
+      throws NoSuchGroupException, NoSuchMetalakeException {
+    return dispatcher.getGroup(metalake, group);
+  }
+
+  @Override
+  public User grantRolesToUser(String metalake, List<String> roles, String 
user)
+      throws NoSuchUserException, NoSuchRoleException, NoSuchMetalakeException 
{
+    return dispatcher.grantRolesToUser(metalake, roles, user);
+  }
+
+  @Override
+  public Group grantRolesToGroup(String metalake, List<String> roles, String 
group)
+      throws NoSuchGroupException, NoSuchRoleException, 
NoSuchMetalakeException {
+    return dispatcher.grantRolesToGroup(metalake, roles, group);
+  }
+
+  @Override
+  public Group revokeRolesFromGroup(String metalake, List<String> roles, 
String group)
+      throws NoSuchGroupException, NoSuchRoleException, 
NoSuchMetalakeException {
+    return dispatcher.revokeRolesFromGroup(metalake, roles, group);
+  }
+
+  @Override
+  public User revokeRolesFromUser(String metalake, List<String> roles, String 
user)
+      throws NoSuchUserException, NoSuchRoleException, NoSuchMetalakeException 
{
+    return dispatcher.revokeRolesFromUser(metalake, roles, user);
+  }
+
+  @Override
+  public boolean isServiceAdmin(String user) {
+    return dispatcher.isServiceAdmin(user);
+  }
+
+  @Override
+  public Role createRole(
+      String metalake,
+      String role,
+      Map<String, String> properties,
+      List<SecurableObject> securableObjects)
+      throws RoleAlreadyExistsException, NoSuchMetalakeException {
+    Role createdRole = dispatcher.createRole(metalake, role, properties, 
securableObjects);
+
+    // Set the creator as the owner of role.
+    OwnerManager ownerManager = GravitinoEnv.getInstance().ownerManager();
+    if (ownerManager != null) {
+      ownerManager.setOwner(
+          metalake,
+          NameIdentifierUtil.toMetadataObject(
+              AuthorizationUtils.ofRole(metalake, role), 
Entity.EntityType.ROLE),
+          PrincipalUtils.getCurrentUserName(),
+          Owner.Type.USER);
+    }
+    return createdRole;
+  }
+
+  @Override
+  public Role getRole(String metalake, String role)
+      throws NoSuchRoleException, NoSuchMetalakeException {
+    return dispatcher.getRole(metalake, role);
+  }
+
+  @Override
+  public boolean deleteRole(String metalake, String role) throws 
NoSuchMetalakeException {
+    return dispatcher.deleteRole(metalake, role);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/CatalogHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/CatalogHookDispatcher.java
new file mode 100644
index 000000000..4b6067de1
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/hook/CatalogHookDispatcher.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.CatalogChange;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerManager;
+import org.apache.gravitino.catalog.CatalogDispatcher;
+import org.apache.gravitino.exceptions.CatalogAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+
+/**
+ * {@code CatalogHookDispatcher} is a decorator for {@link CatalogDispatcher} 
that not only
+ * delegates catalog operations to the underlying catalog dispatcher but also 
executes some hook
+ * operations before or after the underlying operations.
+ */
+public class CatalogHookDispatcher implements CatalogDispatcher {
+  private final CatalogDispatcher dispatcher;
+
+  public CatalogHookDispatcher(CatalogDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public NameIdentifier[] listCatalogs(Namespace namespace) throws 
NoSuchMetalakeException {
+    return dispatcher.listCatalogs(namespace);
+  }
+
+  @Override
+  public Catalog[] listCatalogsInfo(Namespace namespace) throws 
NoSuchMetalakeException {
+    return dispatcher.listCatalogsInfo(namespace);
+  }
+
+  @Override
+  public Catalog loadCatalog(NameIdentifier ident) throws 
NoSuchCatalogException {
+    return dispatcher.loadCatalog(ident);
+  }
+
+  @Override
+  public Catalog createCatalog(
+      NameIdentifier ident,
+      Catalog.Type type,
+      String provider,
+      String comment,
+      Map<String, String> properties)
+      throws NoSuchMetalakeException, CatalogAlreadyExistsException {
+    Catalog catalog = dispatcher.createCatalog(ident, type, provider, comment, 
properties);
+
+    // Set the creator as the owner of the catalog.
+    OwnerManager ownerManager = GravitinoEnv.getInstance().ownerManager();
+    if (ownerManager != null) {
+      ownerManager.setOwner(
+          ident.namespace().level(0),
+          NameIdentifierUtil.toMetadataObject(ident, 
Entity.EntityType.CATALOG),
+          PrincipalUtils.getCurrentUserName(),
+          Owner.Type.USER);
+    }
+    return catalog;
+  }
+
+  @Override
+  public Catalog alterCatalog(NameIdentifier ident, CatalogChange... changes)
+      throws NoSuchCatalogException, IllegalArgumentException {
+    return dispatcher.alterCatalog(ident, changes);
+  }
+
+  @Override
+  public boolean dropCatalog(NameIdentifier ident) {
+    return dispatcher.dropCatalog(ident);
+  }
+
+  @Override
+  public void testConnection(
+      NameIdentifier ident,
+      Catalog.Type type,
+      String provider,
+      String comment,
+      Map<String, String> properties)
+      throws Exception {
+    dispatcher.testConnection(ident, type, provider, comment, properties);
+  }
+
+  @Override
+  public boolean catalogExists(NameIdentifier ident) {
+    return dispatcher.catalogExists(ident);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/DispatcherHookHelper.java 
b/core/src/main/java/org/apache/gravitino/hook/DispatcherHookHelper.java
deleted file mode 100644
index c08d85d0b..000000000
--- a/core/src/main/java/org/apache/gravitino/hook/DispatcherHookHelper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.hook;
-
-import java.lang.reflect.Proxy;
-
-/** The class is a helper class of dispatcher hooks */
-public class DispatcherHookHelper {
-
-  private DispatcherHookHelper() {}
-
-  public static <T> T installHooks(T dispatcher, DispatcherHooks hooks) {
-    return (T)
-        Proxy.newProxyInstance(
-            dispatcher.getClass().getClassLoader(),
-            dispatcher.getClass().getInterfaces(),
-            new DispatcherHookProxy<T>(dispatcher, hooks));
-  }
-}
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/DispatcherHookProxy.java 
b/core/src/main/java/org/apache/gravitino/hook/DispatcherHookProxy.java
deleted file mode 100644
index 5fa36dab8..000000000
--- a/core/src/main/java/org/apache/gravitino/hook/DispatcherHookProxy.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.hook;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.util.List;
-import java.util.function.BiConsumer;
-
-class DispatcherHookProxy<T> implements InvocationHandler {
-  private final DispatcherHooks hooks;
-  private final T dispatcher;
-
-  DispatcherHookProxy(T dispatcher, DispatcherHooks hooks) {
-    this.hooks = hooks;
-    this.dispatcher = dispatcher;
-  }
-
-  @Override
-  public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
-    Object result = method.invoke(dispatcher, args);
-    List<BiConsumer> postHooks = hooks.getPostHooks(method.getName());
-    for (BiConsumer hook : postHooks) {
-      hook.accept(args, result);
-    }
-    return result;
-  }
-}
diff --git a/core/src/main/java/org/apache/gravitino/hook/DispatcherHooks.java 
b/core/src/main/java/org/apache/gravitino/hook/DispatcherHooks.java
deleted file mode 100644
index 205f27f26..000000000
--- a/core/src/main/java/org/apache/gravitino/hook/DispatcherHooks.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.hook;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.function.BiConsumer;
-
-/**
- * DispatcherHooks provide the ability to execute specific hook actions before 
or after calling
- * specific methods. Now we only support the post hook.
- */
-public class DispatcherHooks {
-
-  private final Map<String, List<BiConsumer>> postHookMap = Maps.newHashMap();
-
-  public void addPostHook(String method, BiConsumer hook) {
-    List<BiConsumer> postHooks = postHookMap.computeIfAbsent(method, key -> 
Lists.newArrayList());
-    postHooks.add(hook);
-  }
-
-  public boolean isEmpty() {
-    return postHookMap.isEmpty();
-  }
-
-  List<BiConsumer> getPostHooks(String method) {
-    List<BiConsumer> postHooks = postHookMap.get(method);
-    if (postHooks == null) {
-      return Collections.emptyList();
-    }
-    return postHooks;
-  }
-}
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
new file mode 100644
index 000000000..6e5a82eb2
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import java.util.Map;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerManager;
+import org.apache.gravitino.catalog.FilesetDispatcher;
+import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.file.Fileset;
+import org.apache.gravitino.file.FilesetChange;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+
+/**
+ * {@code FilesetHookDispatcher} is a decorator for {@link FilesetDispatcher} 
that not only
+ * delegates fileset operations to the underlying fileset dispatcher but also 
executes some hook
+ * operations before or after the underlying operations.
+ */
+public class FilesetHookDispatcher implements FilesetDispatcher {
+  private final FilesetDispatcher dispatcher;
+
+  public FilesetHookDispatcher(FilesetDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public NameIdentifier[] listFilesets(Namespace namespace) throws 
NoSuchSchemaException {
+    return dispatcher.listFilesets(namespace);
+  }
+
+  @Override
+  public Fileset loadFileset(NameIdentifier ident) throws 
NoSuchFilesetException {
+    return dispatcher.loadFileset(ident);
+  }
+
+  @Override
+  public Fileset createFileset(
+      NameIdentifier ident,
+      String comment,
+      Fileset.Type type,
+      String storageLocation,
+      Map<String, String> properties)
+      throws NoSuchSchemaException, FilesetAlreadyExistsException {
+    Fileset fileset = dispatcher.createFileset(ident, comment, type, 
storageLocation, properties);
+
+    // Set the creator as the owner of the fileset.
+    OwnerManager ownerManager = GravitinoEnv.getInstance().ownerManager();
+    if (ownerManager != null) {
+      ownerManager.setOwner(
+          ident.namespace().level(0),
+          NameIdentifierUtil.toMetadataObject(ident, 
Entity.EntityType.FILESET),
+          PrincipalUtils.getCurrentUserName(),
+          Owner.Type.USER);
+    }
+    return fileset;
+  }
+
+  @Override
+  public Fileset alterFileset(NameIdentifier ident, FilesetChange... changes)
+      throws NoSuchFilesetException, IllegalArgumentException {
+    return dispatcher.alterFileset(ident, changes);
+  }
+
+  @Override
+  public boolean dropFileset(NameIdentifier ident) {
+    return dispatcher.dropFileset(ident);
+  }
+
+  @Override
+  public boolean filesetExists(NameIdentifier ident) {
+    return dispatcher.filesetExists(ident);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/MetalakeHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/MetalakeHookDispatcher.java
new file mode 100644
index 000000000..3c242bd56
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/hook/MetalakeHookDispatcher.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import java.util.Map;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.Metalake;
+import org.apache.gravitino.MetalakeChange;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.authorization.AccessControlDispatcher;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerManager;
+import org.apache.gravitino.exceptions.MetalakeAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.metalake.MetalakeDispatcher;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+
+/**
+ * {@code MetalakeHookDispatcher} is a decorator for {@link 
MetalakeDispatcher} that not only
+ * delegates metalake operations to the underlying metalake dispatcher but 
also executes some hook
+ * operations before or after the underlying operations.
+ */
+public class MetalakeHookDispatcher implements MetalakeDispatcher {
+  private final MetalakeDispatcher dispatcher;
+
+  public MetalakeHookDispatcher(MetalakeDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public Metalake[] listMetalakes() {
+    return dispatcher.listMetalakes();
+  }
+
+  @Override
+  public Metalake loadMetalake(NameIdentifier ident) throws 
NoSuchMetalakeException {
+    return dispatcher.loadMetalake(ident);
+  }
+
+  @Override
+  public Metalake createMetalake(
+      NameIdentifier ident, String comment, Map<String, String> properties)
+      throws MetalakeAlreadyExistsException {
+    Metalake metalake = dispatcher.createMetalake(ident, comment, properties);
+
+    // Add the creator to the metalake
+    AccessControlDispatcher accessControlDispatcher =
+        GravitinoEnv.getInstance().accessControlDispatcher();
+    if (accessControlDispatcher != null) {
+      accessControlDispatcher.addUser(ident.name(), 
PrincipalUtils.getCurrentUserName());
+    }
+
+    // Set the creator as owner of the metalake.
+    OwnerManager ownerManager = GravitinoEnv.getInstance().ownerManager();
+    if (ownerManager != null) {
+      ownerManager.setOwner(
+          ident.name(),
+          NameIdentifierUtil.toMetadataObject(ident, 
Entity.EntityType.METALAKE),
+          PrincipalUtils.getCurrentUserName(),
+          Owner.Type.USER);
+    }
+    return metalake;
+  }
+
+  @Override
+  public Metalake alterMetalake(NameIdentifier ident, MetalakeChange... 
changes)
+      throws NoSuchMetalakeException, IllegalArgumentException {
+    return dispatcher.alterMetalake(ident, changes);
+  }
+
+  @Override
+  public boolean dropMetalake(NameIdentifier ident) {
+    return dispatcher.dropMetalake(ident);
+  }
+
+  @Override
+  public boolean metalakeExists(NameIdentifier ident) {
+    return dispatcher.metalakeExists(ident);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/SchemaHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/SchemaHookDispatcher.java
new file mode 100644
index 000000000..d9bcf0417
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/hook/SchemaHookDispatcher.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import java.util.Map;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.SchemaChange;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerManager;
+import org.apache.gravitino.catalog.SchemaDispatcher;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NonEmptySchemaException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+
+/**
+ * {@code SchemaHookDispatcher} is a decorator for {@link SchemaDispatcher} 
that not only delegates
+ * schema operations to the underlying schema dispatcher but also executes 
some hook operations
+ * before or after the underlying operations.
+ */
+public class SchemaHookDispatcher implements SchemaDispatcher {
+  private final SchemaDispatcher dispatcher;
+
+  public SchemaHookDispatcher(SchemaDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public NameIdentifier[] listSchemas(Namespace namespace) throws 
NoSuchCatalogException {
+    return dispatcher.listSchemas(namespace);
+  }
+
+  @Override
+  public Schema createSchema(NameIdentifier ident, String comment, Map<String, 
String> properties)
+      throws NoSuchCatalogException, SchemaAlreadyExistsException {
+    Schema schema = dispatcher.createSchema(ident, comment, properties);
+
+    // Set the creator as the owner of the schema.
+    OwnerManager ownerManager = GravitinoEnv.getInstance().ownerManager();
+    if (ownerManager != null) {
+      ownerManager.setOwner(
+          ident.namespace().level(0),
+          NameIdentifierUtil.toMetadataObject(ident, Entity.EntityType.SCHEMA),
+          PrincipalUtils.getCurrentUserName(),
+          Owner.Type.USER);
+    }
+    return schema;
+  }
+
+  @Override
+  public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException {
+    return dispatcher.loadSchema(ident);
+  }
+
+  @Override
+  public Schema alterSchema(NameIdentifier ident, SchemaChange... changes)
+      throws NoSuchSchemaException {
+    return dispatcher.alterSchema(ident, changes);
+  }
+
+  @Override
+  public boolean dropSchema(NameIdentifier ident, boolean cascade) throws 
NonEmptySchemaException {
+    return dispatcher.dropSchema(ident, cascade);
+  }
+
+  @Override
+  public boolean schemaExists(NameIdentifier ident) {
+    return dispatcher.schemaExists(ident);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java
new file mode 100644
index 000000000..3a39f0a9d
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import java.util.Map;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerManager;
+import org.apache.gravitino.catalog.TableDispatcher;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+
+/**
+ * {@code TableHookDispatcher} is a decorator for {@link TableDispatcher} that 
not only delegates
+ * table operations to the underlying table dispatcher but also executes some 
hook operations before
+ * or after the underlying operations.
+ */
+public class TableHookDispatcher implements TableDispatcher {
+  private final TableDispatcher dispatcher;
+
+  public TableHookDispatcher(TableDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public NameIdentifier[] listTables(Namespace namespace) throws 
NoSuchSchemaException {
+    return dispatcher.listTables(namespace);
+  }
+
+  @Override
+  public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
+    return dispatcher.loadTable(ident);
+  }
+
+  @Override
+  public Table createTable(
+      NameIdentifier ident,
+      Column[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitions,
+      Distribution distribution,
+      SortOrder[] sortOrders,
+      Index[] indexes)
+      throws NoSuchSchemaException, TableAlreadyExistsException {
+    Table table =
+        dispatcher.createTable(
+            ident, columns, comment, properties, partitions, distribution, 
sortOrders, indexes);
+
+    // Set the creator as the owner of the table.
+    OwnerManager ownerManager = GravitinoEnv.getInstance().ownerManager();
+    if (ownerManager != null) {
+      ownerManager.setOwner(
+          ident.namespace().level(0),
+          NameIdentifierUtil.toMetadataObject(ident, Entity.EntityType.TABLE),
+          PrincipalUtils.getCurrentUserName(),
+          Owner.Type.USER);
+    }
+    return table;
+  }
+
+  @Override
+  public Table alterTable(NameIdentifier ident, TableChange... changes)
+      throws NoSuchTableException, IllegalArgumentException {
+    return dispatcher.alterTable(ident, changes);
+  }
+
+  @Override
+  public boolean dropTable(NameIdentifier ident) {
+    return dispatcher.dropTable(ident);
+  }
+
+  @Override
+  public boolean purgeTable(NameIdentifier ident) throws 
UnsupportedOperationException {
+    return dispatcher.purgeTable(ident);
+  }
+
+  @Override
+  public boolean tableExists(NameIdentifier ident) {
+    return dispatcher.tableExists(ident);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/TopicHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/TopicHookDispatcher.java
new file mode 100644
index 000000000..c36e58e6f
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/hook/TopicHookDispatcher.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import java.util.Map;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerManager;
+import org.apache.gravitino.catalog.TopicDispatcher;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTopicException;
+import org.apache.gravitino.exceptions.TopicAlreadyExistsException;
+import org.apache.gravitino.messaging.DataLayout;
+import org.apache.gravitino.messaging.Topic;
+import org.apache.gravitino.messaging.TopicChange;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+
+/**
+ * {@code TopicHookDispatcher} is a decorator for {@link TopicDispatcher} that 
not only delegates
+ * topic operations to the underlying topic dispatcher but also executes some 
hook operations before
+ * or after the underlying operations.
+ */
+public class TopicHookDispatcher implements TopicDispatcher {
+  private final TopicDispatcher dispatcher;
+
+  public TopicHookDispatcher(TopicDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public NameIdentifier[] listTopics(Namespace namespace) throws 
NoSuchSchemaException {
+    return dispatcher.listTopics(namespace);
+  }
+
+  @Override
+  public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException {
+    return dispatcher.loadTopic(ident);
+  }
+
+  @Override
+  public Topic createTopic(
+      NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, 
String> properties)
+      throws NoSuchSchemaException, TopicAlreadyExistsException {
+    Topic topic = dispatcher.createTopic(ident, comment, dataLayout, 
properties);
+
+    // Set the creator as the owner of the topic.
+    OwnerManager ownerManager = GravitinoEnv.getInstance().ownerManager();
+    if (ownerManager != null) {
+      ownerManager.setOwner(
+          ident.namespace().level(0),
+          NameIdentifierUtil.toMetadataObject(ident, Entity.EntityType.TOPIC),
+          PrincipalUtils.getCurrentUserName(),
+          Owner.Type.USER);
+    }
+    return topic;
+  }
+
+  @Override
+  public Topic alterTopic(NameIdentifier ident, TopicChange... changes)
+      throws NoSuchTopicException, IllegalArgumentException {
+    return dispatcher.alterTopic(ident, changes);
+  }
+
+  @Override
+  public boolean dropTopic(NameIdentifier ident) {
+    return dispatcher.dropTopic(ident);
+  }
+
+  @Override
+  public boolean topicExists(NameIdentifier ident) {
+    return dispatcher.topicExists(ident);
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/hook/TestDispatcherHooks.java 
b/core/src/test/java/org/apache/gravitino/hook/TestDispatcherHooks.java
deleted file mode 100644
index 010382035..000000000
--- a/core/src/test/java/org/apache/gravitino/hook/TestDispatcherHooks.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.hook;
-
-import static org.apache.gravitino.Configs.SERVICE_ADMINS;
-
-import com.google.common.collect.Lists;
-import java.util.Collections;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.gravitino.Config;
-import org.apache.gravitino.EntityStore;
-import org.apache.gravitino.GravitinoEnv;
-import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.authorization.AccessControlDispatcher;
-import org.apache.gravitino.authorization.AccessControlManager;
-import org.apache.gravitino.metalake.MetalakeDispatcher;
-import org.apache.gravitino.metalake.MetalakeManager;
-import org.apache.gravitino.storage.IdGenerator;
-import org.apache.gravitino.storage.RandomIdGenerator;
-import org.apache.gravitino.storage.memory.TestMemoryEntityStore;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class TestDispatcherHooks {
-
-  @Test
-  public void testLifecycleHooks() throws IllegalAccessException {
-    Config config = new Config(false) {};
-    config.set(SERVICE_ADMINS, Lists.newArrayList("admin1", "admin2"));
-    EntityStore entityStore = new TestMemoryEntityStore.InMemoryEntityStore();
-    entityStore.initialize(config);
-    entityStore.setSerDe(null);
-    IdGenerator idGenerator = new RandomIdGenerator();
-    FieldUtils.writeField(GravitinoEnv.getInstance(), "entityStore", 
entityStore, true);
-
-    DispatcherHooks hooks = new DispatcherHooks();
-    AtomicBoolean result = new AtomicBoolean(true);
-    hooks.addPostHook(
-        "createMetalake",
-        (args, metalake) -> {
-          result.set(false);
-        });
-    MetalakeDispatcher metalakeDispatcher =
-        DispatcherHookHelper.installHooks(new MetalakeManager(entityStore, 
idGenerator), hooks);
-    Assertions.assertTrue(result.get());
-    metalakeDispatcher.createMetalake(NameIdentifier.of("test"), "", 
Collections.emptyMap());
-    Assertions.assertFalse(result.get());
-
-    hooks.addPostHook(
-        "addUser",
-        (args, user) -> {
-          result.set(false);
-        });
-    AccessControlDispatcher accessControlManager =
-        DispatcherHookHelper.installHooks(
-            new AccessControlManager(entityStore, idGenerator, config), hooks);
-    result.set(true);
-    Assertions.assertTrue(result.get());
-    accessControlManager.addUser("test", "test");
-    Assertions.assertFalse(result.get());
-  }
-}
diff --git 
a/integration-test/src/test/java/org/apache/gravitino/integration/test/authorization/OwnerPostHookIT.java
 
b/integration-test/src/test/java/org/apache/gravitino/integration/test/authorization/OwnerPostHookIT.java
new file mode 100644
index 000000000..b7c6c1788
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/gravitino/integration/test/authorization/OwnerPostHookIT.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.integration.test.authorization;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.file.Fileset;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.container.HiveContainer;
+import org.apache.gravitino.integration.test.container.KafkaContainer;
+import org.apache.gravitino.integration.test.util.AbstractIT;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.utils.RandomNameUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Tag("gravitino-docker-test")
+public class OwnerPostHookIT extends AbstractIT {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(OwnerPostHookIT.class);
+
+  private static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
+  private static String hmsUri;
+  private static String kafkaBootstrapServers;
+
+  @BeforeAll
+  public static void startIntegrationTest() throws Exception {
+    Map<String, String> configs = Maps.newHashMap();
+    configs.put(Configs.ENABLE_AUTHORIZATION.getKey(), String.valueOf(true));
+    configs.put(Configs.SERVICE_ADMINS.getKey(), AuthConstants.ANONYMOUS_USER);
+    registerCustomConfigs(configs);
+    AbstractIT.startIntegrationTest();
+
+    containerSuite.startHiveContainer();
+    hmsUri =
+        String.format(
+            "thrift://%s:%d",
+            containerSuite.getHiveContainer().getContainerIpAddress(),
+            HiveContainer.HIVE_METASTORE_PORT);
+
+    containerSuite.startKafkaContainer();
+    kafkaBootstrapServers =
+        String.format(
+            "%s:%d",
+            containerSuite.getKafkaContainer().getContainerIpAddress(),
+            KafkaContainer.DEFAULT_BROKER_PORT);
+  }
+
+  @AfterAll
+  public static void tearDown() {
+    if (client != null) {
+      client.close();
+      client = null;
+    }
+
+    try {
+      closer.close();
+    } catch (Exception e) {
+      LOG.error("Exception in closing CloseableGroup", e);
+    }
+  }
+
+  @Test
+  public void testCreateFileset() {
+    String metalakeNameA = RandomNameUtils.genRandomName("metalakeA");
+    client.createMetalake(metalakeNameA, "metalake A comment", 
Collections.emptyMap());
+    GravitinoMetalake metalake = client.loadMetalake(metalakeNameA);
+    String catalogNameA = RandomNameUtils.genRandomName("catalogA");
+    Catalog catalog =
+        metalake.createCatalog(
+            catalogNameA, Catalog.Type.FILESET, "hadoop", "comment", 
Collections.emptyMap());
+    NameIdentifier fileIdent = NameIdentifier.of("schema_owner", 
"fileset_owner");
+    catalog.asSchemas().createSchema("schema_owner", "comment", 
Collections.emptyMap());
+    catalog
+        .asFilesetCatalog()
+        .createFileset(fileIdent, "comment", Fileset.Type.EXTERNAL, "tmp", 
Collections.emptyMap());
+
+    MetadataObject metalakeObject =
+        MetadataObjects.of(null, metalakeNameA, MetadataObject.Type.METALAKE);
+    Owner owner = metalake.getOwner(metalakeObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    MetadataObject catalogObject =
+        MetadataObjects.of(Lists.newArrayList(catalogNameA), 
MetadataObject.Type.CATALOG);
+    owner = metalake.getOwner(catalogObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    MetadataObject schemaObject =
+        MetadataObjects.of(
+            Lists.newArrayList(catalogNameA, "schema_owner"), 
MetadataObject.Type.SCHEMA);
+    owner = metalake.getOwner(schemaObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    MetadataObject filesetObject =
+        MetadataObjects.of(
+            Lists.newArrayList(catalogNameA, "schema_owner", "fileset_owner"),
+            MetadataObject.Type.FILESET);
+    owner = metalake.getOwner(filesetObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    // Clean up
+    catalog.asFilesetCatalog().dropFileset(fileIdent);
+    catalog.asSchemas().dropSchema("schema_owner", true);
+    metalake.dropCatalog(catalogNameA);
+    client.dropMetalake(metalakeNameA);
+  }
+
+  @Test
+  public void testCreateTopic() {
+    String metalakeNameB = RandomNameUtils.genRandomName("metalakeB");
+    GravitinoMetalake metalake =
+        client.createMetalake(metalakeNameB, "metalake B comment", 
Collections.emptyMap());
+    String catalogNameB = RandomNameUtils.genRandomName("catalogB");
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("bootstrap.servers", kafkaBootstrapServers);
+    Catalog catalogB =
+        metalake.createCatalog(
+            catalogNameB, Catalog.Type.MESSAGING, "kafka", "comment", 
properties);
+    NameIdentifier topicIdent = NameIdentifier.of("default", "topic_owner");
+    catalogB.asTopicCatalog().createTopic(topicIdent, "comment", null, 
Collections.emptyMap());
+
+    MetadataObject metalakeObject =
+        MetadataObjects.of(null, metalakeNameB, MetadataObject.Type.METALAKE);
+    Owner owner = metalake.getOwner(metalakeObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    MetadataObject catalogObject =
+        MetadataObjects.of(Lists.newArrayList(catalogNameB), 
MetadataObject.Type.CATALOG);
+    owner = metalake.getOwner(catalogObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    MetadataObject schemaObject =
+        MetadataObjects.of(Lists.newArrayList(catalogNameB, "default"), 
MetadataObject.Type.SCHEMA);
+    Assertions.assertFalse(metalake.getOwner(schemaObject).isPresent());
+
+    MetadataObject topicObject =
+        MetadataObjects.of(
+            Lists.newArrayList(catalogNameB, "default", "topic_owner"), 
MetadataObject.Type.TOPIC);
+    owner = metalake.getOwner(topicObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    // Clean up
+    catalogB.asTopicCatalog().dropTopic(topicIdent);
+    metalake.dropCatalog(catalogNameB);
+    client.dropMetalake(metalakeNameB);
+  }
+
+  @Test
+  public void testCreateRole() {
+    String metalakeNameC = RandomNameUtils.genRandomName("metalakeC");
+    GravitinoMetalake metalake =
+        client.createMetalake(metalakeNameC, "metalake C comment", 
Collections.emptyMap());
+    SecurableObject metalakeSecObject =
+        SecurableObjects.ofMetalake(
+            metalakeNameC, 
Lists.newArrayList(Privileges.CreateCatalog.allow()));
+    metalake.createRole(
+        "role_owner", Collections.emptyMap(), 
Lists.newArrayList(metalakeSecObject));
+
+    MetadataObject metalakeObject =
+        MetadataObjects.of(null, metalakeNameC, MetadataObject.Type.METALAKE);
+    Owner owner = metalake.getOwner(metalakeObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    MetadataObject roleObject = MetadataObjects.of(null, "role_owner", 
MetadataObject.Type.ROLE);
+    owner = metalake.getOwner(roleObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    // Clean up
+    metalake.deleteRole("role_owner");
+    client.dropMetalake(metalakeNameC);
+  }
+
+  @Test
+  public void testCreateTable() {
+    String metalakeNameD = RandomNameUtils.genRandomName("metalakeD");
+    GravitinoMetalake metalake =
+        client.createMetalake(metalakeNameD, "metalake D comment", 
Collections.emptyMap());
+    String catalogNameD = RandomNameUtils.genRandomName("catalogD");
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("metastore.uris", hmsUri);
+    Catalog catalog =
+        metalake.createCatalog(
+            catalogNameD, Catalog.Type.RELATIONAL, "hive", "catalog comment", 
properties);
+
+    NameIdentifier tableIdent = NameIdentifier.of("schema_owner", 
"table_owner");
+    catalog.asSchemas().createSchema("schema_owner", "comment", 
Collections.emptyMap());
+    catalog
+        .asTableCatalog()
+        .createTable(
+            tableIdent,
+            new Column[] {
+              Column.of("col1", Types.IntegerType.get()), Column.of("col2", 
Types.StringType.get())
+            },
+            "comment",
+            Collections.emptyMap());
+
+    MetadataObject metalakeObject =
+        MetadataObjects.of(null, metalakeNameD, MetadataObject.Type.METALAKE);
+    Owner owner = metalake.getOwner(metalakeObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    MetadataObject catalogObject =
+        MetadataObjects.of(Lists.newArrayList(catalogNameD), 
MetadataObject.Type.CATALOG);
+    owner = metalake.getOwner(catalogObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    MetadataObject schemaObject =
+        MetadataObjects.of(
+            Lists.newArrayList(catalogNameD, "schema_owner"), 
MetadataObject.Type.SCHEMA);
+    owner = metalake.getOwner(schemaObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    MetadataObject tableObject =
+        MetadataObjects.of(
+            Lists.newArrayList(catalogNameD, "schema_owner", "table_owner"),
+            MetadataObject.Type.TABLE);
+    owner = metalake.getOwner(tableObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    // Clean up
+    catalog.asTableCatalog().dropTable(tableIdent);
+    catalog.asSchemas().dropSchema("schema_owner", true);
+    metalake.dropCatalog(catalogNameD);
+    client.dropMetalake(metalakeNameD);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/OwnerOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/OwnerOperations.java
index 517b08cd7..fb5e69198 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/OwnerOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/OwnerOperations.java
@@ -33,15 +33,19 @@ import javax.ws.rs.core.Response;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.authorization.Owner;
 import org.apache.gravitino.authorization.OwnerManager;
 import org.apache.gravitino.dto.requests.OwnerSetRequest;
 import org.apache.gravitino.dto.responses.OwnerResponse;
 import org.apache.gravitino.dto.responses.SetResponse;
 import org.apache.gravitino.dto.util.DTOConverters;
+import org.apache.gravitino.lock.LockType;
+import org.apache.gravitino.lock.TreeLockUtils;
 import org.apache.gravitino.metrics.MetricNames;
 import org.apache.gravitino.server.authorization.NameBindings;
 import org.apache.gravitino.server.web.Utils;
+import org.apache.gravitino.utils.MetadataObjectUtil;
 
 @NameBindings.AccessControlInterfaces
 @Path("/metalakes/{metalake}/owners")
@@ -105,7 +109,14 @@ public class OwnerOperations {
       return Utils.doAs(
           httpRequest,
           () -> {
-            ownerManager.setOwner(metalake, object, request.getName(), 
request.getType());
+            NameIdentifier objectIdent = 
MetadataObjectUtil.toEntityIdent(metalake, object);
+            TreeLockUtils.doWithTreeLock(
+                objectIdent,
+                LockType.READ,
+                () -> {
+                  ownerManager.setOwner(metalake, object, request.getName(), 
request.getType());
+                  return null;
+                });
             return Utils.ok(new SetResponse(true));
           });
     } catch (Exception e) {

Reply via email to