This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new cfbecfdf8 Add hook dispatcher
cfbecfdf8 is described below

commit cfbecfdf8fa94c5bc4ce4b728cae32ffe7ee3a92
Author: Rory <[email protected]>
AuthorDate: Wed Aug 7 11:44:15 2024 +0800

    Add hook dispatcher
---
 .../gravitino/authorization/SecurableObjects.java  |  27 +-
 .../authorization/TestSecurableObjects.java        |  13 +
 .../java/org/apache/gravitino/GravitinoEnv.java    |  62 ++---
 .../authorization/AuthorizationUtils.java          |   7 -
 .../gravitino/authorization/OwnerManager.java      |  63 +++--
 .../hook/AccessControlHookDispatcher.java          | 150 +++++++++++
 .../gravitino/hook/CatalogHookDispatcher.java      | 112 +++++++++
 .../gravitino/hook/DispatcherHookHelper.java       |  35 ---
 .../apache/gravitino/hook/DispatcherHookProxy.java |  44 ----
 .../org/apache/gravitino/hook/DispatcherHooks.java |  52 ----
 .../gravitino/hook/FilesetHookDispatcher.java      |  96 ++++++++
 .../gravitino/hook/MetalakeHookDispatcher.java     |  98 ++++++++
 .../gravitino/hook/SchemaHookDispatcher.java       |  92 +++++++
 .../apache/gravitino/hook/TableHookDispatcher.java | 111 +++++++++
 .../apache/gravitino/hook/TopicHookDispatcher.java |  93 +++++++
 .../apache/gravitino/hook/TestDispatcherHooks.java |  79 ------
 .../test/authorization/OwnerPostHookIT.java        | 274 +++++++++++++++++++++
 .../gravitino/server/web/rest/OwnerOperations.java |  13 +-
 18 files changed, 1124 insertions(+), 297 deletions(-)

diff --git 
a/api/src/main/java/org/apache/gravitino/authorization/SecurableObjects.java 
b/api/src/main/java/org/apache/gravitino/authorization/SecurableObjects.java
index 22e2b258b..c5ab814eb 100644
--- a/api/src/main/java/org/apache/gravitino/authorization/SecurableObjects.java
+++ b/api/src/main/java/org/apache/gravitino/authorization/SecurableObjects.java
@@ -34,7 +34,18 @@ public class SecurableObjects {
   private static final Splitter DOT_SPLITTER = Splitter.on('.');
 
   /**
-   * Create the catalog {@link SecurableObject} with the given catalog name.
+   * Create the metalake {@link SecurableObject} with the given metalake name 
and privileges.
+   *
+   * @param metalake The metalake name
+   * @param privileges The privileges of the metalake
+   * @return The created metalake {@link SecurableObject}
+   */
+  public static SecurableObject ofMetalake(String metalake, List<Privilege> 
privileges) {
+    return of(MetadataObject.Type.METALAKE, Lists.newArrayList(metalake), 
privileges);
+  }
+
+  /**
+   * Create the catalog {@link SecurableObject} with the given catalog name 
and privileges.
    *
    * @param catalog The catalog name
    * @param privileges The privileges of the catalog
@@ -45,8 +56,8 @@ public class SecurableObjects {
   }
 
   /**
-   * Create the schema {@link SecurableObject} with the given securable 
catalog object and schema
-   * name.
+   * Create the schema {@link SecurableObject} with the given securable 
catalog object, schema name
+   * and privileges.
    *
    * @param catalog The catalog securable object.
    * @param schema The schema name
@@ -60,7 +71,8 @@ public class SecurableObjects {
   }
 
   /**
-   * Create the table {@link SecurableObject} with the given securable schema 
object and table name.
+   * Create the table {@link SecurableObject} with the given securable schema 
object, table name and
+   * privileges.
    *
    * @param schema The schema securable object
    * @param table The table name
@@ -75,7 +87,8 @@ public class SecurableObjects {
   }
 
   /**
-   * Create the topic {@link SecurableObject} with the given securable schema 
object and topic name.
+   * Create the topic {@link SecurableObject} with the given securable schema 
object ,topic name and
+   * privileges.
    *
    * @param schema The schema securable object
    * @param topic The topic name
@@ -90,8 +103,8 @@ public class SecurableObjects {
   }
 
   /**
-   * Create the table {@link SecurableObject} with the given securable schema 
object and fileset
-   * name.
+   * Create the table {@link SecurableObject} with the given securable schema 
object, fileset name
+   * and privileges.
    *
    * @param schema The schema securable object
    * @param fileset The fileset name
diff --git 
a/api/src/test/java/org/apache/gravitino/authorization/TestSecurableObjects.java
 
b/api/src/test/java/org/apache/gravitino/authorization/TestSecurableObjects.java
index 256636397..82374f676 100644
--- 
a/api/src/test/java/org/apache/gravitino/authorization/TestSecurableObjects.java
+++ 
b/api/src/test/java/org/apache/gravitino/authorization/TestSecurableObjects.java
@@ -27,6 +27,19 @@ public class TestSecurableObjects {
 
   @Test
   public void testSecurableObjects() {
+
+    SecurableObject metalake =
+        SecurableObjects.ofMetalake(
+            "metalake", Lists.newArrayList(Privileges.CreateCatalog.allow()));
+    Assertions.assertEquals("metalake", metalake.fullName());
+    Assertions.assertEquals(MetadataObject.Type.METALAKE, metalake.type());
+    SecurableObject anotherMetalake =
+        SecurableObjects.of(
+            MetadataObject.Type.METALAKE,
+            Lists.newArrayList("metalake"),
+            Lists.newArrayList(Privileges.CreateCatalog.allow()));
+    Assertions.assertEquals(metalake, anotherMetalake);
+
     SecurableObject catalog =
         SecurableObjects.ofCatalog("catalog", 
Lists.newArrayList(Privileges.UseCatalog.allow()));
     Assertions.assertEquals("catalog", catalog.fullName());
diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java 
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index 8a0f19f99..6e97ed1eb 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -21,7 +21,6 @@ package org.apache.gravitino;
 import com.google.common.base.Preconditions;
 import org.apache.gravitino.authorization.AccessControlDispatcher;
 import org.apache.gravitino.authorization.AccessControlManager;
-import org.apache.gravitino.authorization.AuthorizationUtils;
 import org.apache.gravitino.authorization.OwnerManager;
 import org.apache.gravitino.auxiliary.AuxiliaryServiceManager;
 import org.apache.gravitino.catalog.CatalogDispatcher;
@@ -42,8 +41,13 @@ import org.apache.gravitino.catalog.TableOperationDispatcher;
 import org.apache.gravitino.catalog.TopicDispatcher;
 import org.apache.gravitino.catalog.TopicNormalizeDispatcher;
 import org.apache.gravitino.catalog.TopicOperationDispatcher;
-import org.apache.gravitino.hook.DispatcherHookHelper;
-import org.apache.gravitino.hook.DispatcherHooks;
+import org.apache.gravitino.hook.AccessControlHookDispatcher;
+import org.apache.gravitino.hook.CatalogHookDispatcher;
+import org.apache.gravitino.hook.FilesetHookDispatcher;
+import org.apache.gravitino.hook.MetalakeHookDispatcher;
+import org.apache.gravitino.hook.SchemaHookDispatcher;
+import org.apache.gravitino.hook.TableHookDispatcher;
+import org.apache.gravitino.hook.TopicHookDispatcher;
 import org.apache.gravitino.listener.CatalogEventDispatcher;
 import org.apache.gravitino.listener.EventBus;
 import org.apache.gravitino.listener.EventListenerManager;
@@ -348,28 +352,30 @@ public class GravitinoEnv {
 
     // Create and initialize metalake related modules
     MetalakeDispatcher metalakeManager = new MetalakeManager(entityStore, 
idGenerator);
+    MetalakeHookDispatcher metalakeHookDispatcher = new 
MetalakeHookDispatcher(metalakeManager);
     MetalakeNormalizeDispatcher metalakeNormalizeDispatcher =
-        new 
MetalakeNormalizeDispatcher(installDispatcherHooks(metalakeManager));
+        new MetalakeNormalizeDispatcher(metalakeHookDispatcher);
     this.metalakeDispatcher = new MetalakeEventDispatcher(eventBus, 
metalakeNormalizeDispatcher);
 
     // Create and initialize Catalog related modules
     this.catalogManager = new CatalogManager(config, entityStore, idGenerator);
+    CatalogHookDispatcher catalogHookDispatcher = new 
CatalogHookDispatcher(catalogManager);
     CatalogNormalizeDispatcher catalogNormalizeDispatcher =
-        new 
CatalogNormalizeDispatcher(installDispatcherHooks((CatalogDispatcher) 
catalogManager));
+        new CatalogNormalizeDispatcher(catalogHookDispatcher);
     this.catalogDispatcher = new CatalogEventDispatcher(eventBus, 
catalogNormalizeDispatcher);
 
     SchemaOperationDispatcher schemaOperationDispatcher =
         new SchemaOperationDispatcher(catalogManager, entityStore, 
idGenerator);
+    SchemaHookDispatcher schemaHookDispatcher = new 
SchemaHookDispatcher(schemaOperationDispatcher);
     SchemaNormalizeDispatcher schemaNormalizeDispatcher =
-        new SchemaNormalizeDispatcher(
-            installDispatcherHooks((SchemaDispatcher) 
schemaOperationDispatcher), catalogManager);
+        new SchemaNormalizeDispatcher(schemaHookDispatcher, catalogManager);
     this.schemaDispatcher = new SchemaEventDispatcher(eventBus, 
schemaNormalizeDispatcher);
 
     TableOperationDispatcher tableOperationDispatcher =
         new TableOperationDispatcher(catalogManager, entityStore, idGenerator);
+    TableHookDispatcher tableHookDispatcher = new 
TableHookDispatcher(tableOperationDispatcher);
     TableNormalizeDispatcher tableNormalizeDispatcher =
-        new TableNormalizeDispatcher(
-            installDispatcherHooks((TableDispatcher) 
tableOperationDispatcher), catalogManager);
+        new TableNormalizeDispatcher(tableHookDispatcher, catalogManager);
     this.tableDispatcher = new TableEventDispatcher(eventBus, 
tableNormalizeDispatcher);
 
     // TODO: We can install hooks when we need, we only supports ownership 
post hook,
@@ -382,24 +388,27 @@ public class GravitinoEnv {
 
     FilesetOperationDispatcher filesetOperationDispatcher =
         new FilesetOperationDispatcher(catalogManager, entityStore, 
idGenerator);
+    FilesetHookDispatcher filesetHookDispatcher =
+        new FilesetHookDispatcher(filesetOperationDispatcher);
     FilesetNormalizeDispatcher filesetNormalizeDispatcher =
-        new FilesetNormalizeDispatcher(
-            installDispatcherHooks((FilesetDispatcher) 
filesetOperationDispatcher), catalogManager);
+        new FilesetNormalizeDispatcher(filesetHookDispatcher, catalogManager);
     this.filesetDispatcher = new FilesetEventDispatcher(eventBus, 
filesetNormalizeDispatcher);
 
     TopicOperationDispatcher topicOperationDispatcher =
         new TopicOperationDispatcher(catalogManager, entityStore, idGenerator);
+    TopicHookDispatcher topicHookDispatcher = new 
TopicHookDispatcher(topicOperationDispatcher);
     TopicNormalizeDispatcher topicNormalizeDispatcher =
-        new TopicNormalizeDispatcher(
-            installDispatcherHooks((TopicDispatcher) 
topicOperationDispatcher), catalogManager);
+        new TopicNormalizeDispatcher(topicHookDispatcher, catalogManager);
     this.topicDispatcher = new TopicEventDispatcher(eventBus, 
topicNormalizeDispatcher);
 
     // Create and initialize access control related modules
     boolean enableAuthorization = config.get(Configs.ENABLE_AUTHORIZATION);
     if (enableAuthorization) {
-      this.accessControlDispatcher =
-          installDispatcherHooks(
-              (AccessControlDispatcher) new AccessControlManager(entityStore, 
idGenerator, config));
+      AccessControlHookDispatcher accessControlHookDispatcher =
+          new AccessControlHookDispatcher(
+              new AccessControlManager(entityStore, idGenerator, config));
+
+      this.accessControlDispatcher = accessControlHookDispatcher;
       this.ownerManager = new OwnerManager(entityStore);
     } else {
       this.accessControlDispatcher = null;
@@ -415,25 +424,4 @@ public class GravitinoEnv {
     // Tag manager
     this.tagManager = new TagManager(idGenerator, entityStore);
   }
-
-  // Provides a universal entrance to install dispatcher hooks. This method
-  // focuses the logic of installing hooks.
-  // We should reuse the ability of 
(Metalake|Schema|Table|Fileset|...)NormalizeDispatcher to avoid
-  // solving
-  // normalization names, this is useful for pre-hooks.
-  // so we can't install the hooks for the outside of
-  // (Metalake|Schema|Table|Fileset|...)NormalizeDispatcher.
-  private <T> T installDispatcherHooks(T manager) {
-    boolean enableAuthorization = config.get(Configs.ENABLE_AUTHORIZATION);
-    DispatcherHooks hooks = new DispatcherHooks();
-    if (enableAuthorization) {
-      AuthorizationUtils.prepareAuthorizationHooks(manager, hooks);
-    }
-
-    if (hooks.isEmpty()) {
-      return manager;
-    }
-
-    return DispatcherHookHelper.installHooks(manager, hooks);
-  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
index 875cd6fb6..5e16c5bcb 100644
--- 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
@@ -25,7 +25,6 @@ import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
-import org.apache.gravitino.hook.DispatcherHooks;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -117,10 +116,4 @@ public class AuthorizationUtils {
         "Role namespace must have 3 levels, the input namespace is %s",
         namespace);
   }
-
-  // Install some post hooks used for owner. The owner will have the all 
privileges
-  // of securable objects, users, groups, roles.
-  public static <T> void prepareAuthorizationHooks(T manager, DispatcherHooks 
hooks) {
-    // TODO: Refactor the post hook by adding new dispatcher
-  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java 
b/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java
index f79164285..ec1b26438 100644
--- a/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java
+++ b/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java
@@ -40,7 +40,8 @@ import org.slf4j.LoggerFactory;
 
 /**
  * OwnerManager is used for manage the owner of metadata object. The user and 
group don't have an
- * owner
+ * owner. Because the post hook will call the methods. We shouldn't add the 
lock of the metadata
+ * object. Otherwise, it will cause deadlock.
  */
 public class OwnerManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(OwnerManager.class);
@@ -71,45 +72,37 @@ public class OwnerManager {
       if (ownerType == Owner.Type.USER) {
         NameIdentifier ownerIdent = AuthorizationUtils.ofUser(metalake, 
ownerName);
         TreeLockUtils.doWithTreeLock(
-            objectIdent,
+            ownerIdent,
             LockType.READ,
-            () ->
-                TreeLockUtils.doWithTreeLock(
-                    ownerIdent,
-                    LockType.READ,
-                    () -> {
-                      store
-                          .relationOperations()
-                          .insertRelation(
-                              SupportsRelationOperations.Type.OWNER_REL,
-                              objectIdent,
-                              MetadataObjectUtil.toEntityType(metadataObject),
-                              ownerIdent,
-                              Entity.EntityType.USER,
-                              true);
-                      return null;
-                    }));
+            () -> {
+              store
+                  .relationOperations()
+                  .insertRelation(
+                      SupportsRelationOperations.Type.OWNER_REL,
+                      objectIdent,
+                      MetadataObjectUtil.toEntityType(metadataObject),
+                      ownerIdent,
+                      Entity.EntityType.USER,
+                      true);
+              return null;
+            });
       } else if (ownerType == Owner.Type.GROUP) {
         NameIdentifier ownerIdent = AuthorizationUtils.ofGroup(metalake, 
ownerName);
         TreeLockUtils.doWithTreeLock(
-            objectIdent,
+            ownerIdent,
             LockType.READ,
-            () ->
-                TreeLockUtils.doWithTreeLock(
-                    ownerIdent,
-                    LockType.READ,
-                    () -> {
-                      store
-                          .relationOperations()
-                          .insertRelation(
-                              SupportsRelationOperations.Type.OWNER_REL,
-                              objectIdent,
-                              MetadataObjectUtil.toEntityType(metadataObject),
-                              ownerIdent,
-                              Entity.EntityType.GROUP,
-                              true);
-                      return null;
-                    }));
+            () -> {
+              store
+                  .relationOperations()
+                  .insertRelation(
+                      SupportsRelationOperations.Type.OWNER_REL,
+                      objectIdent,
+                      MetadataObjectUtil.toEntityType(metadataObject),
+                      ownerIdent,
+                      Entity.EntityType.GROUP,
+                      true);
+              return null;
+            });
       }
     } catch (NoSuchEntityException nse) {
       LOG.warn(
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/AccessControlHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/AccessControlHookDispatcher.java
new file mode 100644
index 000000000..44dc491a7
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/hook/AccessControlHookDispatcher.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.authorization.AccessControlDispatcher;
+import org.apache.gravitino.authorization.AuthorizationUtils;
+import org.apache.gravitino.authorization.Group;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerManager;
+import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.User;
+import org.apache.gravitino.exceptions.GroupAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchGroupException;
+import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.exceptions.NoSuchRoleException;
+import org.apache.gravitino.exceptions.NoSuchUserException;
+import org.apache.gravitino.exceptions.RoleAlreadyExistsException;
+import org.apache.gravitino.exceptions.UserAlreadyExistsException;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+
+/**
+ * {@code AccessControlHookDispatcher} is a decorator for {@link 
AccessControlDispatcher} that not
+ * only delegates access control operations to the underlying access control 
dispatcher but also
+ * executes some hook operations before or after the underlying operations.
+ */
+public class AccessControlHookDispatcher implements AccessControlDispatcher {
+  private final AccessControlDispatcher dispatcher;
+
+  public AccessControlHookDispatcher(AccessControlDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public User addUser(String metalake, String user)
+      throws UserAlreadyExistsException, NoSuchMetalakeException {
+    return dispatcher.addUser(metalake, user);
+  }
+
+  @Override
+  public boolean removeUser(String metalake, String user) throws 
NoSuchMetalakeException {
+    return dispatcher.removeUser(metalake, user);
+  }
+
+  @Override
+  public User getUser(String metalake, String user)
+      throws NoSuchUserException, NoSuchMetalakeException {
+    return dispatcher.getUser(metalake, user);
+  }
+
+  @Override
+  public Group addGroup(String metalake, String group)
+      throws GroupAlreadyExistsException, NoSuchMetalakeException {
+    return dispatcher.addGroup(metalake, group);
+  }
+
+  @Override
+  public boolean removeGroup(String metalake, String group) throws 
NoSuchMetalakeException {
+    return dispatcher.removeGroup(metalake, group);
+  }
+
+  @Override
+  public Group getGroup(String metalake, String group)
+      throws NoSuchGroupException, NoSuchMetalakeException {
+    return dispatcher.getGroup(metalake, group);
+  }
+
+  @Override
+  public User grantRolesToUser(String metalake, List<String> roles, String 
user)
+      throws NoSuchUserException, NoSuchRoleException, NoSuchMetalakeException 
{
+    return dispatcher.grantRolesToUser(metalake, roles, user);
+  }
+
+  @Override
+  public Group grantRolesToGroup(String metalake, List<String> roles, String 
group)
+      throws NoSuchGroupException, NoSuchRoleException, 
NoSuchMetalakeException {
+    return dispatcher.grantRolesToGroup(metalake, roles, group);
+  }
+
+  @Override
+  public Group revokeRolesFromGroup(String metalake, List<String> roles, 
String group)
+      throws NoSuchGroupException, NoSuchRoleException, 
NoSuchMetalakeException {
+    return dispatcher.revokeRolesFromGroup(metalake, roles, group);
+  }
+
+  @Override
+  public User revokeRolesFromUser(String metalake, List<String> roles, String 
user)
+      throws NoSuchUserException, NoSuchRoleException, NoSuchMetalakeException 
{
+    return dispatcher.revokeRolesFromUser(metalake, roles, user);
+  }
+
+  @Override
+  public boolean isServiceAdmin(String user) {
+    return dispatcher.isServiceAdmin(user);
+  }
+
+  @Override
+  public Role createRole(
+      String metalake,
+      String role,
+      Map<String, String> properties,
+      List<SecurableObject> securableObjects)
+      throws RoleAlreadyExistsException, NoSuchMetalakeException {
+    Role createdRole = dispatcher.createRole(metalake, role, properties, 
securableObjects);
+
+    // Set the creator as the owner of role.
+    OwnerManager ownerManager = GravitinoEnv.getInstance().ownerManager();
+    if (ownerManager != null) {
+      ownerManager.setOwner(
+          metalake,
+          NameIdentifierUtil.toMetadataObject(
+              AuthorizationUtils.ofRole(metalake, role), 
Entity.EntityType.ROLE),
+          PrincipalUtils.getCurrentUserName(),
+          Owner.Type.USER);
+    }
+    return createdRole;
+  }
+
+  @Override
+  public Role getRole(String metalake, String role)
+      throws NoSuchRoleException, NoSuchMetalakeException {
+    return dispatcher.getRole(metalake, role);
+  }
+
+  @Override
+  public boolean deleteRole(String metalake, String role) throws 
NoSuchMetalakeException {
+    return dispatcher.deleteRole(metalake, role);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/CatalogHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/CatalogHookDispatcher.java
new file mode 100644
index 000000000..4b6067de1
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/hook/CatalogHookDispatcher.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.CatalogChange;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerManager;
+import org.apache.gravitino.catalog.CatalogDispatcher;
+import org.apache.gravitino.exceptions.CatalogAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+
+/**
+ * {@code CatalogHookDispatcher} is a decorator for {@link CatalogDispatcher} 
that not only
+ * delegates catalog operations to the underlying catalog dispatcher but also 
executes some hook
+ * operations before or after the underlying operations.
+ */
+public class CatalogHookDispatcher implements CatalogDispatcher {
+  private final CatalogDispatcher dispatcher;
+
+  public CatalogHookDispatcher(CatalogDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public NameIdentifier[] listCatalogs(Namespace namespace) throws 
NoSuchMetalakeException {
+    return dispatcher.listCatalogs(namespace);
+  }
+
+  @Override
+  public Catalog[] listCatalogsInfo(Namespace namespace) throws 
NoSuchMetalakeException {
+    return dispatcher.listCatalogsInfo(namespace);
+  }
+
+  @Override
+  public Catalog loadCatalog(NameIdentifier ident) throws 
NoSuchCatalogException {
+    return dispatcher.loadCatalog(ident);
+  }
+
+  @Override
+  public Catalog createCatalog(
+      NameIdentifier ident,
+      Catalog.Type type,
+      String provider,
+      String comment,
+      Map<String, String> properties)
+      throws NoSuchMetalakeException, CatalogAlreadyExistsException {
+    Catalog catalog = dispatcher.createCatalog(ident, type, provider, comment, 
properties);
+
+    // Set the creator as the owner of the catalog.
+    OwnerManager ownerManager = GravitinoEnv.getInstance().ownerManager();
+    if (ownerManager != null) {
+      ownerManager.setOwner(
+          ident.namespace().level(0),
+          NameIdentifierUtil.toMetadataObject(ident, 
Entity.EntityType.CATALOG),
+          PrincipalUtils.getCurrentUserName(),
+          Owner.Type.USER);
+    }
+    return catalog;
+  }
+
+  @Override
+  public Catalog alterCatalog(NameIdentifier ident, CatalogChange... changes)
+      throws NoSuchCatalogException, IllegalArgumentException {
+    return dispatcher.alterCatalog(ident, changes);
+  }
+
+  @Override
+  public boolean dropCatalog(NameIdentifier ident) {
+    return dispatcher.dropCatalog(ident);
+  }
+
+  @Override
+  public void testConnection(
+      NameIdentifier ident,
+      Catalog.Type type,
+      String provider,
+      String comment,
+      Map<String, String> properties)
+      throws Exception {
+    dispatcher.testConnection(ident, type, provider, comment, properties);
+  }
+
+  @Override
+  public boolean catalogExists(NameIdentifier ident) {
+    return dispatcher.catalogExists(ident);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/DispatcherHookHelper.java 
b/core/src/main/java/org/apache/gravitino/hook/DispatcherHookHelper.java
deleted file mode 100644
index c08d85d0b..000000000
--- a/core/src/main/java/org/apache/gravitino/hook/DispatcherHookHelper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.hook;
-
-import java.lang.reflect.Proxy;
-
-/** The class is a helper class of dispatcher hooks */
-public class DispatcherHookHelper {
-
-  private DispatcherHookHelper() {}
-
-  public static <T> T installHooks(T dispatcher, DispatcherHooks hooks) {
-    return (T)
-        Proxy.newProxyInstance(
-            dispatcher.getClass().getClassLoader(),
-            dispatcher.getClass().getInterfaces(),
-            new DispatcherHookProxy<T>(dispatcher, hooks));
-  }
-}
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/DispatcherHookProxy.java 
b/core/src/main/java/org/apache/gravitino/hook/DispatcherHookProxy.java
deleted file mode 100644
index 5fa36dab8..000000000
--- a/core/src/main/java/org/apache/gravitino/hook/DispatcherHookProxy.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.hook;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.util.List;
-import java.util.function.BiConsumer;
-
-class DispatcherHookProxy<T> implements InvocationHandler {
-  private final DispatcherHooks hooks;
-  private final T dispatcher;
-
-  DispatcherHookProxy(T dispatcher, DispatcherHooks hooks) {
-    this.hooks = hooks;
-    this.dispatcher = dispatcher;
-  }
-
-  @Override
-  public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
-    Object result = method.invoke(dispatcher, args);
-    List<BiConsumer> postHooks = hooks.getPostHooks(method.getName());
-    for (BiConsumer hook : postHooks) {
-      hook.accept(args, result);
-    }
-    return result;
-  }
-}
diff --git a/core/src/main/java/org/apache/gravitino/hook/DispatcherHooks.java 
b/core/src/main/java/org/apache/gravitino/hook/DispatcherHooks.java
deleted file mode 100644
index 205f27f26..000000000
--- a/core/src/main/java/org/apache/gravitino/hook/DispatcherHooks.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.hook;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.function.BiConsumer;
-
-/**
- * DispatcherHooks provide the ability to execute specific hook actions before 
or after calling
- * specific methods. Now we only support the post hook.
- */
-public class DispatcherHooks {
-
-  private final Map<String, List<BiConsumer>> postHookMap = Maps.newHashMap();
-
-  public void addPostHook(String method, BiConsumer hook) {
-    List<BiConsumer> postHooks = postHookMap.computeIfAbsent(method, key -> 
Lists.newArrayList());
-    postHooks.add(hook);
-  }
-
-  public boolean isEmpty() {
-    return postHookMap.isEmpty();
-  }
-
-  List<BiConsumer> getPostHooks(String method) {
-    List<BiConsumer> postHooks = postHookMap.get(method);
-    if (postHooks == null) {
-      return Collections.emptyList();
-    }
-    return postHooks;
-  }
-}
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
new file mode 100644
index 000000000..6e5a82eb2
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import java.util.Map;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerManager;
+import org.apache.gravitino.catalog.FilesetDispatcher;
+import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.file.Fileset;
+import org.apache.gravitino.file.FilesetChange;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+
+/**
+ * {@code FilesetHookDispatcher} is a decorator for {@link FilesetDispatcher} 
that not only
+ * delegates fileset operations to the underlying fileset dispatcher but also 
executes some hook
+ * operations before or after the underlying operations.
+ */
+public class FilesetHookDispatcher implements FilesetDispatcher {
+  private final FilesetDispatcher dispatcher;
+
+  public FilesetHookDispatcher(FilesetDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public NameIdentifier[] listFilesets(Namespace namespace) throws 
NoSuchSchemaException {
+    return dispatcher.listFilesets(namespace);
+  }
+
+  @Override
+  public Fileset loadFileset(NameIdentifier ident) throws 
NoSuchFilesetException {
+    return dispatcher.loadFileset(ident);
+  }
+
+  @Override
+  public Fileset createFileset(
+      NameIdentifier ident,
+      String comment,
+      Fileset.Type type,
+      String storageLocation,
+      Map<String, String> properties)
+      throws NoSuchSchemaException, FilesetAlreadyExistsException {
+    Fileset fileset = dispatcher.createFileset(ident, comment, type, 
storageLocation, properties);
+
+    // Set the creator as the owner of the fileset.
+    OwnerManager ownerManager = GravitinoEnv.getInstance().ownerManager();
+    if (ownerManager != null) {
+      ownerManager.setOwner(
+          ident.namespace().level(0),
+          NameIdentifierUtil.toMetadataObject(ident, 
Entity.EntityType.FILESET),
+          PrincipalUtils.getCurrentUserName(),
+          Owner.Type.USER);
+    }
+    return fileset;
+  }
+
+  @Override
+  public Fileset alterFileset(NameIdentifier ident, FilesetChange... changes)
+      throws NoSuchFilesetException, IllegalArgumentException {
+    return dispatcher.alterFileset(ident, changes);
+  }
+
+  @Override
+  public boolean dropFileset(NameIdentifier ident) {
+    return dispatcher.dropFileset(ident);
+  }
+
+  @Override
+  public boolean filesetExists(NameIdentifier ident) {
+    return dispatcher.filesetExists(ident);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/MetalakeHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/MetalakeHookDispatcher.java
new file mode 100644
index 000000000..3c242bd56
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/hook/MetalakeHookDispatcher.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import java.util.Map;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.Metalake;
+import org.apache.gravitino.MetalakeChange;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.authorization.AccessControlDispatcher;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerManager;
+import org.apache.gravitino.exceptions.MetalakeAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.metalake.MetalakeDispatcher;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+
+/**
+ * {@code MetalakeHookDispatcher} is a decorator for {@link 
MetalakeDispatcher} that not only
+ * delegates metalake operations to the underlying metalake dispatcher but 
also executes some hook
+ * operations before or after the underlying operations.
+ */
+public class MetalakeHookDispatcher implements MetalakeDispatcher {
+  private final MetalakeDispatcher dispatcher;
+
+  public MetalakeHookDispatcher(MetalakeDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public Metalake[] listMetalakes() {
+    return dispatcher.listMetalakes();
+  }
+
+  @Override
+  public Metalake loadMetalake(NameIdentifier ident) throws 
NoSuchMetalakeException {
+    return dispatcher.loadMetalake(ident);
+  }
+
+  @Override
+  public Metalake createMetalake(
+      NameIdentifier ident, String comment, Map<String, String> properties)
+      throws MetalakeAlreadyExistsException {
+    Metalake metalake = dispatcher.createMetalake(ident, comment, properties);
+
+    // Add the creator to the metalake
+    AccessControlDispatcher accessControlDispatcher =
+        GravitinoEnv.getInstance().accessControlDispatcher();
+    if (accessControlDispatcher != null) {
+      accessControlDispatcher.addUser(ident.name(), 
PrincipalUtils.getCurrentUserName());
+    }
+
+    // Set the creator as owner of the metalake.
+    OwnerManager ownerManager = GravitinoEnv.getInstance().ownerManager();
+    if (ownerManager != null) {
+      ownerManager.setOwner(
+          ident.name(),
+          NameIdentifierUtil.toMetadataObject(ident, 
Entity.EntityType.METALAKE),
+          PrincipalUtils.getCurrentUserName(),
+          Owner.Type.USER);
+    }
+    return metalake;
+  }
+
+  @Override
+  public Metalake alterMetalake(NameIdentifier ident, MetalakeChange... 
changes)
+      throws NoSuchMetalakeException, IllegalArgumentException {
+    return dispatcher.alterMetalake(ident, changes);
+  }
+
+  @Override
+  public boolean dropMetalake(NameIdentifier ident) {
+    return dispatcher.dropMetalake(ident);
+  }
+
+  @Override
+  public boolean metalakeExists(NameIdentifier ident) {
+    return dispatcher.metalakeExists(ident);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/SchemaHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/SchemaHookDispatcher.java
new file mode 100644
index 000000000..d9bcf0417
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/hook/SchemaHookDispatcher.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import java.util.Map;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.SchemaChange;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerManager;
+import org.apache.gravitino.catalog.SchemaDispatcher;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NonEmptySchemaException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+
+/**
+ * {@code SchemaHookDispatcher} is a decorator for {@link SchemaDispatcher} 
that not only delegates
+ * schema operations to the underlying schema dispatcher but also executes 
some hook operations
+ * before or after the underlying operations.
+ */
+public class SchemaHookDispatcher implements SchemaDispatcher {
+  private final SchemaDispatcher dispatcher;
+
+  public SchemaHookDispatcher(SchemaDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public NameIdentifier[] listSchemas(Namespace namespace) throws 
NoSuchCatalogException {
+    return dispatcher.listSchemas(namespace);
+  }
+
+  @Override
+  public Schema createSchema(NameIdentifier ident, String comment, Map<String, 
String> properties)
+      throws NoSuchCatalogException, SchemaAlreadyExistsException {
+    Schema schema = dispatcher.createSchema(ident, comment, properties);
+
+    // Set the creator as the owner of the schema.
+    OwnerManager ownerManager = GravitinoEnv.getInstance().ownerManager();
+    if (ownerManager != null) {
+      ownerManager.setOwner(
+          ident.namespace().level(0),
+          NameIdentifierUtil.toMetadataObject(ident, Entity.EntityType.SCHEMA),
+          PrincipalUtils.getCurrentUserName(),
+          Owner.Type.USER);
+    }
+    return schema;
+  }
+
+  @Override
+  public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException {
+    return dispatcher.loadSchema(ident);
+  }
+
+  @Override
+  public Schema alterSchema(NameIdentifier ident, SchemaChange... changes)
+      throws NoSuchSchemaException {
+    return dispatcher.alterSchema(ident, changes);
+  }
+
+  @Override
+  public boolean dropSchema(NameIdentifier ident, boolean cascade) throws 
NonEmptySchemaException {
+    return dispatcher.dropSchema(ident, cascade);
+  }
+
+  @Override
+  public boolean schemaExists(NameIdentifier ident) {
+    return dispatcher.schemaExists(ident);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java
new file mode 100644
index 000000000..3a39f0a9d
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import java.util.Map;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerManager;
+import org.apache.gravitino.catalog.TableDispatcher;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+
+/**
+ * {@code TableHookDispatcher} is a decorator for {@link TableDispatcher} that 
not only delegates
+ * table operations to the underlying table dispatcher but also executes some 
hook operations before
+ * or after the underlying operations.
+ */
+public class TableHookDispatcher implements TableDispatcher {
+  private final TableDispatcher dispatcher;
+
+  public TableHookDispatcher(TableDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public NameIdentifier[] listTables(Namespace namespace) throws 
NoSuchSchemaException {
+    return dispatcher.listTables(namespace);
+  }
+
+  @Override
+  public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
+    return dispatcher.loadTable(ident);
+  }
+
+  @Override
+  public Table createTable(
+      NameIdentifier ident,
+      Column[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitions,
+      Distribution distribution,
+      SortOrder[] sortOrders,
+      Index[] indexes)
+      throws NoSuchSchemaException, TableAlreadyExistsException {
+    Table table =
+        dispatcher.createTable(
+            ident, columns, comment, properties, partitions, distribution, 
sortOrders, indexes);
+
+    // Set the creator as the owner of the table.
+    OwnerManager ownerManager = GravitinoEnv.getInstance().ownerManager();
+    if (ownerManager != null) {
+      ownerManager.setOwner(
+          ident.namespace().level(0),
+          NameIdentifierUtil.toMetadataObject(ident, Entity.EntityType.TABLE),
+          PrincipalUtils.getCurrentUserName(),
+          Owner.Type.USER);
+    }
+    return table;
+  }
+
+  @Override
+  public Table alterTable(NameIdentifier ident, TableChange... changes)
+      throws NoSuchTableException, IllegalArgumentException {
+    return dispatcher.alterTable(ident, changes);
+  }
+
+  @Override
+  public boolean dropTable(NameIdentifier ident) {
+    return dispatcher.dropTable(ident);
+  }
+
+  @Override
+  public boolean purgeTable(NameIdentifier ident) throws 
UnsupportedOperationException {
+    return dispatcher.purgeTable(ident);
+  }
+
+  @Override
+  public boolean tableExists(NameIdentifier ident) {
+    return dispatcher.tableExists(ident);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/TopicHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/TopicHookDispatcher.java
new file mode 100644
index 000000000..c36e58e6f
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/hook/TopicHookDispatcher.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import java.util.Map;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerManager;
+import org.apache.gravitino.catalog.TopicDispatcher;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTopicException;
+import org.apache.gravitino.exceptions.TopicAlreadyExistsException;
+import org.apache.gravitino.messaging.DataLayout;
+import org.apache.gravitino.messaging.Topic;
+import org.apache.gravitino.messaging.TopicChange;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+
+/**
+ * {@code TopicHookDispatcher} is a decorator for {@link TopicDispatcher} that 
not only delegates
+ * topic operations to the underlying topic dispatcher but also executes some 
hook operations before
+ * or after the underlying operations.
+ */
+public class TopicHookDispatcher implements TopicDispatcher {
+  private final TopicDispatcher dispatcher;
+
+  public TopicHookDispatcher(TopicDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public NameIdentifier[] listTopics(Namespace namespace) throws 
NoSuchSchemaException {
+    return dispatcher.listTopics(namespace);
+  }
+
+  @Override
+  public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException {
+    return dispatcher.loadTopic(ident);
+  }
+
+  @Override
+  public Topic createTopic(
+      NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, 
String> properties)
+      throws NoSuchSchemaException, TopicAlreadyExistsException {
+    Topic topic = dispatcher.createTopic(ident, comment, dataLayout, 
properties);
+
+    // Set the creator as the owner of the topic.
+    OwnerManager ownerManager = GravitinoEnv.getInstance().ownerManager();
+    if (ownerManager != null) {
+      ownerManager.setOwner(
+          ident.namespace().level(0),
+          NameIdentifierUtil.toMetadataObject(ident, Entity.EntityType.TOPIC),
+          PrincipalUtils.getCurrentUserName(),
+          Owner.Type.USER);
+    }
+    return topic;
+  }
+
+  @Override
+  public Topic alterTopic(NameIdentifier ident, TopicChange... changes)
+      throws NoSuchTopicException, IllegalArgumentException {
+    return dispatcher.alterTopic(ident, changes);
+  }
+
+  @Override
+  public boolean dropTopic(NameIdentifier ident) {
+    return dispatcher.dropTopic(ident);
+  }
+
+  @Override
+  public boolean topicExists(NameIdentifier ident) {
+    return dispatcher.topicExists(ident);
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/hook/TestDispatcherHooks.java 
b/core/src/test/java/org/apache/gravitino/hook/TestDispatcherHooks.java
deleted file mode 100644
index 010382035..000000000
--- a/core/src/test/java/org/apache/gravitino/hook/TestDispatcherHooks.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.hook;
-
-import static org.apache.gravitino.Configs.SERVICE_ADMINS;
-
-import com.google.common.collect.Lists;
-import java.util.Collections;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.gravitino.Config;
-import org.apache.gravitino.EntityStore;
-import org.apache.gravitino.GravitinoEnv;
-import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.authorization.AccessControlDispatcher;
-import org.apache.gravitino.authorization.AccessControlManager;
-import org.apache.gravitino.metalake.MetalakeDispatcher;
-import org.apache.gravitino.metalake.MetalakeManager;
-import org.apache.gravitino.storage.IdGenerator;
-import org.apache.gravitino.storage.RandomIdGenerator;
-import org.apache.gravitino.storage.memory.TestMemoryEntityStore;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class TestDispatcherHooks {
-
-  @Test
-  public void testLifecycleHooks() throws IllegalAccessException {
-    Config config = new Config(false) {};
-    config.set(SERVICE_ADMINS, Lists.newArrayList("admin1", "admin2"));
-    EntityStore entityStore = new TestMemoryEntityStore.InMemoryEntityStore();
-    entityStore.initialize(config);
-    entityStore.setSerDe(null);
-    IdGenerator idGenerator = new RandomIdGenerator();
-    FieldUtils.writeField(GravitinoEnv.getInstance(), "entityStore", 
entityStore, true);
-
-    DispatcherHooks hooks = new DispatcherHooks();
-    AtomicBoolean result = new AtomicBoolean(true);
-    hooks.addPostHook(
-        "createMetalake",
-        (args, metalake) -> {
-          result.set(false);
-        });
-    MetalakeDispatcher metalakeDispatcher =
-        DispatcherHookHelper.installHooks(new MetalakeManager(entityStore, 
idGenerator), hooks);
-    Assertions.assertTrue(result.get());
-    metalakeDispatcher.createMetalake(NameIdentifier.of("test"), "", 
Collections.emptyMap());
-    Assertions.assertFalse(result.get());
-
-    hooks.addPostHook(
-        "addUser",
-        (args, user) -> {
-          result.set(false);
-        });
-    AccessControlDispatcher accessControlManager =
-        DispatcherHookHelper.installHooks(
-            new AccessControlManager(entityStore, idGenerator, config), hooks);
-    result.set(true);
-    Assertions.assertTrue(result.get());
-    accessControlManager.addUser("test", "test");
-    Assertions.assertFalse(result.get());
-  }
-}
diff --git 
a/integration-test/src/test/java/org/apache/gravitino/integration/test/authorization/OwnerPostHookIT.java
 
b/integration-test/src/test/java/org/apache/gravitino/integration/test/authorization/OwnerPostHookIT.java
new file mode 100644
index 000000000..b7c6c1788
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/gravitino/integration/test/authorization/OwnerPostHookIT.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.integration.test.authorization;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.file.Fileset;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.container.HiveContainer;
+import org.apache.gravitino.integration.test.container.KafkaContainer;
+import org.apache.gravitino.integration.test.util.AbstractIT;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.utils.RandomNameUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Tag("gravitino-docker-test")
+public class OwnerPostHookIT extends AbstractIT {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(OwnerPostHookIT.class);
+
+  private static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
+  private static String hmsUri;
+  private static String kafkaBootstrapServers;
+
+  @BeforeAll
+  public static void startIntegrationTest() throws Exception {
+    Map<String, String> configs = Maps.newHashMap();
+    configs.put(Configs.ENABLE_AUTHORIZATION.getKey(), String.valueOf(true));
+    configs.put(Configs.SERVICE_ADMINS.getKey(), AuthConstants.ANONYMOUS_USER);
+    registerCustomConfigs(configs);
+    AbstractIT.startIntegrationTest();
+
+    containerSuite.startHiveContainer();
+    hmsUri =
+        String.format(
+            "thrift://%s:%d",
+            containerSuite.getHiveContainer().getContainerIpAddress(),
+            HiveContainer.HIVE_METASTORE_PORT);
+
+    containerSuite.startKafkaContainer();
+    kafkaBootstrapServers =
+        String.format(
+            "%s:%d",
+            containerSuite.getKafkaContainer().getContainerIpAddress(),
+            KafkaContainer.DEFAULT_BROKER_PORT);
+  }
+
+  @AfterAll
+  public static void tearDown() {
+    if (client != null) {
+      client.close();
+      client = null;
+    }
+
+    try {
+      closer.close();
+    } catch (Exception e) {
+      LOG.error("Exception in closing CloseableGroup", e);
+    }
+  }
+
+  @Test
+  public void testCreateFileset() {
+    String metalakeNameA = RandomNameUtils.genRandomName("metalakeA");
+    client.createMetalake(metalakeNameA, "metalake A comment", 
Collections.emptyMap());
+    GravitinoMetalake metalake = client.loadMetalake(metalakeNameA);
+    String catalogNameA = RandomNameUtils.genRandomName("catalogA");
+    Catalog catalog =
+        metalake.createCatalog(
+            catalogNameA, Catalog.Type.FILESET, "hadoop", "comment", 
Collections.emptyMap());
+    NameIdentifier fileIdent = NameIdentifier.of("schema_owner", 
"fileset_owner");
+    catalog.asSchemas().createSchema("schema_owner", "comment", 
Collections.emptyMap());
+    catalog
+        .asFilesetCatalog()
+        .createFileset(fileIdent, "comment", Fileset.Type.EXTERNAL, "tmp", 
Collections.emptyMap());
+
+    MetadataObject metalakeObject =
+        MetadataObjects.of(null, metalakeNameA, MetadataObject.Type.METALAKE);
+    Owner owner = metalake.getOwner(metalakeObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    MetadataObject catalogObject =
+        MetadataObjects.of(Lists.newArrayList(catalogNameA), 
MetadataObject.Type.CATALOG);
+    owner = metalake.getOwner(catalogObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    MetadataObject schemaObject =
+        MetadataObjects.of(
+            Lists.newArrayList(catalogNameA, "schema_owner"), 
MetadataObject.Type.SCHEMA);
+    owner = metalake.getOwner(schemaObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    MetadataObject filesetObject =
+        MetadataObjects.of(
+            Lists.newArrayList(catalogNameA, "schema_owner", "fileset_owner"),
+            MetadataObject.Type.FILESET);
+    owner = metalake.getOwner(filesetObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    // Clean up
+    catalog.asFilesetCatalog().dropFileset(fileIdent);
+    catalog.asSchemas().dropSchema("schema_owner", true);
+    metalake.dropCatalog(catalogNameA);
+    client.dropMetalake(metalakeNameA);
+  }
+
+  @Test
+  public void testCreateTopic() {
+    String metalakeNameB = RandomNameUtils.genRandomName("metalakeB");
+    GravitinoMetalake metalake =
+        client.createMetalake(metalakeNameB, "metalake B comment", 
Collections.emptyMap());
+    String catalogNameB = RandomNameUtils.genRandomName("catalogB");
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("bootstrap.servers", kafkaBootstrapServers);
+    Catalog catalogB =
+        metalake.createCatalog(
+            catalogNameB, Catalog.Type.MESSAGING, "kafka", "comment", 
properties);
+    NameIdentifier topicIdent = NameIdentifier.of("default", "topic_owner");
+    catalogB.asTopicCatalog().createTopic(topicIdent, "comment", null, 
Collections.emptyMap());
+
+    MetadataObject metalakeObject =
+        MetadataObjects.of(null, metalakeNameB, MetadataObject.Type.METALAKE);
+    Owner owner = metalake.getOwner(metalakeObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    MetadataObject catalogObject =
+        MetadataObjects.of(Lists.newArrayList(catalogNameB), 
MetadataObject.Type.CATALOG);
+    owner = metalake.getOwner(catalogObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    MetadataObject schemaObject =
+        MetadataObjects.of(Lists.newArrayList(catalogNameB, "default"), 
MetadataObject.Type.SCHEMA);
+    Assertions.assertFalse(metalake.getOwner(schemaObject).isPresent());
+
+    MetadataObject topicObject =
+        MetadataObjects.of(
+            Lists.newArrayList(catalogNameB, "default", "topic_owner"), 
MetadataObject.Type.TOPIC);
+    owner = metalake.getOwner(topicObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    // Clean up
+    catalogB.asTopicCatalog().dropTopic(topicIdent);
+    metalake.dropCatalog(catalogNameB);
+    client.dropMetalake(metalakeNameB);
+  }
+
+  @Test
+  public void testCreateRole() {
+    String metalakeNameC = RandomNameUtils.genRandomName("metalakeC");
+    GravitinoMetalake metalake =
+        client.createMetalake(metalakeNameC, "metalake C comment", 
Collections.emptyMap());
+    SecurableObject metalakeSecObject =
+        SecurableObjects.ofMetalake(
+            metalakeNameC, 
Lists.newArrayList(Privileges.CreateCatalog.allow()));
+    metalake.createRole(
+        "role_owner", Collections.emptyMap(), 
Lists.newArrayList(metalakeSecObject));
+
+    MetadataObject metalakeObject =
+        MetadataObjects.of(null, metalakeNameC, MetadataObject.Type.METALAKE);
+    Owner owner = metalake.getOwner(metalakeObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    MetadataObject roleObject = MetadataObjects.of(null, "role_owner", 
MetadataObject.Type.ROLE);
+    owner = metalake.getOwner(roleObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    // Clean up
+    metalake.deleteRole("role_owner");
+    client.dropMetalake(metalakeNameC);
+  }
+
+  @Test
+  public void testCreateTable() {
+    String metalakeNameD = RandomNameUtils.genRandomName("metalakeD");
+    GravitinoMetalake metalake =
+        client.createMetalake(metalakeNameD, "metalake D comment", 
Collections.emptyMap());
+    String catalogNameD = RandomNameUtils.genRandomName("catalogD");
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("metastore.uris", hmsUri);
+    Catalog catalog =
+        metalake.createCatalog(
+            catalogNameD, Catalog.Type.RELATIONAL, "hive", "catalog comment", 
properties);
+
+    NameIdentifier tableIdent = NameIdentifier.of("schema_owner", 
"table_owner");
+    catalog.asSchemas().createSchema("schema_owner", "comment", 
Collections.emptyMap());
+    catalog
+        .asTableCatalog()
+        .createTable(
+            tableIdent,
+            new Column[] {
+              Column.of("col1", Types.IntegerType.get()), Column.of("col2", 
Types.StringType.get())
+            },
+            "comment",
+            Collections.emptyMap());
+
+    MetadataObject metalakeObject =
+        MetadataObjects.of(null, metalakeNameD, MetadataObject.Type.METALAKE);
+    Owner owner = metalake.getOwner(metalakeObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    MetadataObject catalogObject =
+        MetadataObjects.of(Lists.newArrayList(catalogNameD), 
MetadataObject.Type.CATALOG);
+    owner = metalake.getOwner(catalogObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    MetadataObject schemaObject =
+        MetadataObjects.of(
+            Lists.newArrayList(catalogNameD, "schema_owner"), 
MetadataObject.Type.SCHEMA);
+    owner = metalake.getOwner(schemaObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    MetadataObject tableObject =
+        MetadataObjects.of(
+            Lists.newArrayList(catalogNameD, "schema_owner", "table_owner"),
+            MetadataObject.Type.TABLE);
+    owner = metalake.getOwner(tableObject).get();
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, owner.name());
+    Assertions.assertEquals(Owner.Type.USER, owner.type());
+
+    // Clean up
+    catalog.asTableCatalog().dropTable(tableIdent);
+    catalog.asSchemas().dropSchema("schema_owner", true);
+    metalake.dropCatalog(catalogNameD);
+    client.dropMetalake(metalakeNameD);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/OwnerOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/OwnerOperations.java
index 517b08cd7..fb5e69198 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/OwnerOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/OwnerOperations.java
@@ -33,15 +33,19 @@ import javax.ws.rs.core.Response;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.authorization.Owner;
 import org.apache.gravitino.authorization.OwnerManager;
 import org.apache.gravitino.dto.requests.OwnerSetRequest;
 import org.apache.gravitino.dto.responses.OwnerResponse;
 import org.apache.gravitino.dto.responses.SetResponse;
 import org.apache.gravitino.dto.util.DTOConverters;
+import org.apache.gravitino.lock.LockType;
+import org.apache.gravitino.lock.TreeLockUtils;
 import org.apache.gravitino.metrics.MetricNames;
 import org.apache.gravitino.server.authorization.NameBindings;
 import org.apache.gravitino.server.web.Utils;
+import org.apache.gravitino.utils.MetadataObjectUtil;
 
 @NameBindings.AccessControlInterfaces
 @Path("/metalakes/{metalake}/owners")
@@ -105,7 +109,14 @@ public class OwnerOperations {
       return Utils.doAs(
           httpRequest,
           () -> {
-            ownerManager.setOwner(metalake, object, request.getName(), 
request.getType());
+            NameIdentifier objectIdent = 
MetadataObjectUtil.toEntityIdent(metalake, object);
+            TreeLockUtils.doWithTreeLock(
+                objectIdent,
+                LockType.READ,
+                () -> {
+                  ownerManager.setOwner(metalake, object, request.getName(), 
request.getType());
+                  return null;
+                });
             return Utils.ok(new SetResponse(true));
           });
     } catch (Exception e) {

Reply via email to