This is an automated email from the ASF dual-hosted git repository.

mchades pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d12f2dc03 [#10713] improvement(core,iceberg): Fix event ordering by 
standardizing dispatcher wiring (#10834)
0d12f2dc03 is described below

commit 0d12f2dc0338924cac20a172d1e5e6be09ebeb48
Author: Mehakmeet Singh <[email protected]>
AuthorDate: Thu May 7 11:53:31 2026 +0530

    [#10713] improvement(core,iceberg): Fix event ordering by standardizing 
dispatcher wiring (#10834)
    
    ### **What changes were proposed in this pull request?**
    Standardized dispatcher wiring so that all object types follow the
    Hook(Event(Normalize(Manager))) pattern, matching the existing
    Policy/AccessControl wiring. This ensures CREATE events are published
    before SET_OWNER events for all object types.
    
    GravitinoEnv: Flipped wiring for Metalake, Catalog, Schema, Table,
    Fileset, Topic, Model, Tag, Job dispatchers.
    RESTService: Flipped wiring for Iceberg Table, View, Namespace
    dispatchers.
    HookDispatchers: Ensured setOwner failures are gracefully handled via
    try-catch-log so that ownership failure does not cause creation failure.
    
    ### **Why are the changes needed?**
    Currently, most object types wrap HookDispatcher inside EventDispatcher
    (Event(Hook(Manager))). Since HookDispatcher auto-sets the creator as
    owner after creation, the SET_OWNER event is published before the CREATE
    event. This results in non-causal event ordering — external consumers
    see SET_OWNER for an object that hasn't been "created" yet from the
    event stream's perspective.
    
    Policy already uses the correct pattern where HookDispatcher wraps
    EventDispatcher. This PR standardizes all object types to match.
    
    **Fix**: #10713
    
    ### **Does this PR introduce any user-facing change?**
    No. This is an internal event ordering fix. The API behavior is
    unchanged. Events are now published in causal order (CREATE before
    SET_OWNER).
    
    ### **How was this patch tested?**
    Added setOwner-failure unit tests for all affected HookDispatchers to
    verify that creation succeeds even when setOwner throws:
    
    Core (10 tests): TestMetalakeHookDispatcher, TestSchemaHookDispatcher,
    TestTableHookDispatcher, TestFilesetHookDispatcher,
    TestTopicHookDispatcher, TestModelHookDispatcher, TestTagHookDispatcher,
    TestPolicyHookDispatcher, TestAccessControlHookDispatcher,
    TestJobHookDispatcher
    
    Iceberg (3 tests): TestIcebergTableHookDispatcher,
    TestIcebergViewHookDispatcher, TestIcebergNamespaceHookDispatcher
    
    Tested in Gravitino dev environment. All core and iceberg tests pass:
    
    
    `./gradlew :core:test :iceberg:iceberg-rest-server:test -PskipITs`
---
 .../java/org/apache/gravitino/GravitinoEnv.java    |  81 ++++-----
 .../gravitino/catalog/CapabilityHelpers.java       |  11 ++
 .../hook/AccessControlHookDispatcher.java          |   8 +-
 .../gravitino/hook/FilesetHookDispatcher.java      |  13 +-
 .../gravitino/hook/FunctionHookDispatcher.java     |  16 +-
 .../gravitino/hook/MetalakeHookDispatcher.java     |   2 +-
 .../apache/gravitino/hook/ModelHookDispatcher.java |  22 ++-
 .../gravitino/hook/SchemaHookDispatcher.java       |  12 +-
 .../apache/gravitino/hook/TableHookDispatcher.java |  13 +-
 .../apache/gravitino/hook/TagHookDispatcher.java   |   2 +-
 .../apache/gravitino/hook/TopicHookDispatcher.java |  13 +-
 .../hook/TestAccessControlHookDispatcher.java      |  80 +++++++++
 .../gravitino/hook/TestFilesetHookDispatcher.java  | 111 ++++++++++++-
 .../gravitino/hook/TestFunctionHookDispatcher.java | 117 +++++++++++++
 .../gravitino/hook/TestJobHookDispatcher.java      | 106 ++++++++++++
 .../gravitino/hook/TestMetalakeHookDispatcher.java | 133 +++++++++++++++
 .../gravitino/hook/TestModelHookDispatcher.java    | 175 ++++++++++++++++++++
 .../gravitino/hook/TestPolicyHookDispatcher.java   |  81 +++++++++
 .../gravitino/hook/TestSchemaHookDispatcher.java   | 139 ++++++++++++++++
 .../gravitino/hook/TestTableHookDispatcher.java    | 123 +++++++++++++-
 .../gravitino/hook/TestTagHookDispatcher.java      |  80 +++++++++
 .../gravitino/hook/TestTopicHookDispatcher.java    | 103 +++++++++++-
 .../org/apache/gravitino/iceberg/RESTService.java  |  37 +++--
 .../dispatcher/IcebergNamespaceHookDispatcher.java |  10 +-
 .../dispatcher/IcebergTableHookDispatcher.java     |   4 +
 .../dispatcher/IcebergViewHookDispatcher.java      |   2 +-
 .../TestIcebergNamespaceHookDispatcher.java        | 183 +++++++++++++++++++++
 .../dispatcher/TestIcebergTableHookDispatcher.java |  45 +++++
 .../dispatcher/TestIcebergViewHookDispatcher.java  |  37 +++++
 29 files changed, 1676 insertions(+), 83 deletions(-)

diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java 
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index f3b2e40ec7..f26b630765 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -550,39 +550,43 @@ public class GravitinoEnv {
     this.lockManager = new LockManager(config);
 
     // Create and initialize metalake related modules, the operation chain is:
-    // MetalakeEventDispatcher -> MetalakeNormalizeDispatcher -> 
MetalakeHookDispatcher ->
+    // MetalakeHookDispatcher -> MetalakeEventDispatcher -> 
MetalakeNormalizeDispatcher ->
     // MetalakeManager
     this.metalakeManager = new MetalakeManager(entityStore, idGenerator);
-    MetalakeHookDispatcher metalakeHookDispatcher = new 
MetalakeHookDispatcher(metalakeManager);
     MetalakeNormalizeDispatcher metalakeNormalizeDispatcher =
-        new MetalakeNormalizeDispatcher(metalakeHookDispatcher);
-    this.metalakeDispatcher = new MetalakeEventDispatcher(eventBus, 
metalakeNormalizeDispatcher);
+        new MetalakeNormalizeDispatcher(metalakeManager);
+    MetalakeEventDispatcher metalakeEventDispatcher =
+        new MetalakeEventDispatcher(eventBus, metalakeNormalizeDispatcher);
+    this.metalakeDispatcher = new 
MetalakeHookDispatcher(metalakeEventDispatcher);
 
     // Create and initialize Catalog related modules, the operation chain is:
-    // CatalogEventDispatcher -> CatalogNormalizeDispatcher -> 
CatalogHookDispatcher ->
+    // CatalogHookDispatcher -> CatalogEventDispatcher -> 
CatalogNormalizeDispatcher ->
     // CatalogManager
     this.catalogManager = new CatalogManager(config, entityStore, idGenerator);
-    CatalogHookDispatcher catalogHookDispatcher = new 
CatalogHookDispatcher(catalogManager);
     CatalogNormalizeDispatcher catalogNormalizeDispatcher =
-        new CatalogNormalizeDispatcher(catalogHookDispatcher);
-    this.catalogDispatcher = new CatalogEventDispatcher(eventBus, 
catalogNormalizeDispatcher);
+        new CatalogNormalizeDispatcher(catalogManager);
+    CatalogEventDispatcher catalogEventDispatcher =
+        new CatalogEventDispatcher(eventBus, catalogNormalizeDispatcher);
+    this.catalogDispatcher = new CatalogHookDispatcher(catalogEventDispatcher);
 
     this.credentialOperationDispatcher =
         new CredentialOperationDispatcher(catalogManager, entityStore, 
idGenerator);
 
     SchemaOperationDispatcher schemaOperationDispatcher =
         new SchemaOperationDispatcher(catalogManager, entityStore, 
idGenerator);
-    SchemaHookDispatcher schemaHookDispatcher = new 
SchemaHookDispatcher(schemaOperationDispatcher);
     SchemaNormalizeDispatcher schemaNormalizeDispatcher =
-        new SchemaNormalizeDispatcher(schemaHookDispatcher, catalogManager);
-    this.schemaDispatcher = new SchemaEventDispatcher(eventBus, 
schemaNormalizeDispatcher);
+        new SchemaNormalizeDispatcher(schemaOperationDispatcher, 
catalogManager);
+    SchemaEventDispatcher schemaEventDispatcher =
+        new SchemaEventDispatcher(eventBus, schemaNormalizeDispatcher);
+    this.schemaDispatcher = new SchemaHookDispatcher(schemaEventDispatcher);
 
     TableOperationDispatcher tableOperationDispatcher =
         new TableOperationDispatcher(catalogManager, entityStore, idGenerator);
-    TableHookDispatcher tableHookDispatcher = new 
TableHookDispatcher(tableOperationDispatcher);
     TableNormalizeDispatcher tableNormalizeDispatcher =
-        new TableNormalizeDispatcher(tableHookDispatcher, catalogManager);
-    this.tableDispatcher = new TableEventDispatcher(eventBus, 
tableNormalizeDispatcher);
+        new TableNormalizeDispatcher(tableOperationDispatcher, catalogManager);
+    TableEventDispatcher tableEventDispatcher =
+        new TableEventDispatcher(eventBus, tableNormalizeDispatcher);
+    this.tableDispatcher = new TableHookDispatcher(tableEventDispatcher);
 
     // TODO: We can install hooks when we need, we only supports ownership 
post hook,
     //  partition doesn't have ownership, so we don't need it now.
@@ -594,37 +598,39 @@ public class GravitinoEnv {
 
     FilesetOperationDispatcher filesetOperationDispatcher =
         new FilesetOperationDispatcher(catalogManager, entityStore, 
idGenerator);
-    FilesetHookDispatcher filesetHookDispatcher =
-        new FilesetHookDispatcher(filesetOperationDispatcher);
     FilesetNormalizeDispatcher filesetNormalizeDispatcher =
-        new FilesetNormalizeDispatcher(filesetHookDispatcher, catalogManager);
-    this.filesetDispatcher = new FilesetEventDispatcher(eventBus, 
filesetNormalizeDispatcher);
+        new FilesetNormalizeDispatcher(filesetOperationDispatcher, 
catalogManager);
+    FilesetEventDispatcher filesetEventDispatcher =
+        new FilesetEventDispatcher(eventBus, filesetNormalizeDispatcher);
+    this.filesetDispatcher = new FilesetHookDispatcher(filesetEventDispatcher);
 
     TopicOperationDispatcher topicOperationDispatcher =
         new TopicOperationDispatcher(catalogManager, entityStore, idGenerator);
-    TopicHookDispatcher topicHookDispatcher = new 
TopicHookDispatcher(topicOperationDispatcher);
     TopicNormalizeDispatcher topicNormalizeDispatcher =
-        new TopicNormalizeDispatcher(topicHookDispatcher, catalogManager);
-    this.topicDispatcher = new TopicEventDispatcher(eventBus, 
topicNormalizeDispatcher);
+        new TopicNormalizeDispatcher(topicOperationDispatcher, catalogManager);
+    TopicEventDispatcher topicEventDispatcher =
+        new TopicEventDispatcher(eventBus, topicNormalizeDispatcher);
+    this.topicDispatcher = new TopicHookDispatcher(topicEventDispatcher);
 
     ModelOperationDispatcher modelOperationDispatcher =
         new ModelOperationDispatcher(catalogManager, entityStore, idGenerator);
-    ModelHookDispatcher modelHookDispatcher = new 
ModelHookDispatcher(modelOperationDispatcher);
     ModelNormalizeDispatcher modelNormalizeDispatcher =
-        new ModelNormalizeDispatcher(modelHookDispatcher, catalogManager);
-    this.modelDispatcher = new ModelEventDispatcher(eventBus, 
modelNormalizeDispatcher);
+        new ModelNormalizeDispatcher(modelOperationDispatcher, catalogManager);
+    ModelEventDispatcher modelEventDispatcher =
+        new ModelEventDispatcher(eventBus, modelNormalizeDispatcher);
+    this.modelDispatcher = new ModelHookDispatcher(modelEventDispatcher);
 
-    // The operation chain is:
-    // FunctionEventDispatcher -> FunctionNormalizeDispatcher -> 
FunctionHookDispatcher ->
+    // Create and initialize Function related modules, the operation chain is:
+    // FunctionHookDispatcher -> FunctionEventDispatcher -> 
FunctionNormalizeDispatcher ->
     // FunctionOperationDispatcher
     FunctionOperationDispatcher functionOperationDispatcher =
         new FunctionOperationDispatcher(
             catalogManager, schemaOperationDispatcher, entityStore, 
idGenerator);
-    FunctionHookDispatcher functionHookDispatcher =
-        new FunctionHookDispatcher(functionOperationDispatcher);
     FunctionNormalizeDispatcher functionNormalizeDispatcher =
-        new FunctionNormalizeDispatcher(functionHookDispatcher, 
catalogManager);
-    this.functionDispatcher = new FunctionEventDispatcher(eventBus, 
functionNormalizeDispatcher);
+        new FunctionNormalizeDispatcher(functionOperationDispatcher, 
catalogManager);
+    FunctionEventDispatcher functionEventDispatcher =
+        new FunctionEventDispatcher(eventBus, functionNormalizeDispatcher);
+    this.functionDispatcher = new 
FunctionHookDispatcher(functionEventDispatcher);
 
     // TODO: Add ViewHookDispatcher and ViewEventDispatcher when needed for 
view-specific hooks
     //  and event handling.
@@ -641,10 +647,9 @@ public class GravitinoEnv {
     if (enableAuthorization) {
       AccessControlManager accessControlManager =
           new AccessControlManager(entityStore, idGenerator, config);
-      AccessControlHookDispatcher accessControlHookDispatcher =
-          new AccessControlHookDispatcher(accessControlManager);
-      this.accessControlDispatcher =
-          new AccessControlEventDispatcher(eventBus, 
accessControlHookDispatcher);
+      AccessControlEventDispatcher accessControlEventDispatcher =
+          new AccessControlEventDispatcher(eventBus, accessControlManager);
+      this.accessControlDispatcher = new 
AccessControlHookDispatcher(accessControlEventDispatcher);
       OwnerDispatcher ownerManager = new OwnerManager(entityStore);
       this.ownerDispatcher = new OwnerEventManager(eventBus, ownerManager);
       this.futureGrantManager = new FutureGrantManager(entityStore, 
ownerManager);
@@ -659,8 +664,8 @@ public class GravitinoEnv {
 
     // Create and initialize Tag related modules
     TagManager tagManager = new TagManager(idGenerator, entityStore);
-    TagHookDispatcher tagHookDispatcher = new TagHookDispatcher(tagManager);
-    this.tagDispatcher = new TagEventDispatcher(eventBus, tagHookDispatcher);
+    TagEventDispatcher tagEventDispatcher = new TagEventDispatcher(eventBus, 
tagManager);
+    this.tagDispatcher = new TagHookDispatcher(tagEventDispatcher);
 
     PolicyEventDispatcher policyEventDispatcher =
         new PolicyEventDispatcher(eventBus, new PolicyManager(idGenerator, 
entityStore));
@@ -669,8 +674,8 @@ public class GravitinoEnv {
     JobManager jobManager = new JobManager(config, entityStore, idGenerator);
     JobTemplateValidationDispatcher validationDispatcher =
         new JobTemplateValidationDispatcher(jobManager);
-    this.jobOperationDispatcher =
-        new JobEventDispatcher(eventBus, new 
JobHookDispatcher(validationDispatcher));
+    JobEventDispatcher jobEventDispatcher = new JobEventDispatcher(eventBus, 
validationDispatcher);
+    this.jobOperationDispatcher = new JobHookDispatcher(jobEventDispatcher);
 
     // Register built-in job template event listener to automatically register 
templates
     // when metalakes are created
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/CapabilityHelpers.java 
b/core/src/main/java/org/apache/gravitino/catalog/CapabilityHelpers.java
index a3c7d53157..d619eff2ed 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/CapabilityHelpers.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/CapabilityHelpers.java
@@ -109,6 +109,17 @@ public class CapabilityHelpers {
     return NameIdentifier.of(namespace, name);
   }
 
+  /**
+   * Convenience overload that loads the catalog capability for {@code ident} 
and applies it to the
+   * identifier. Use this from call sites (e.g. HookDispatchers) that need a 
normalized identifier
+   * but do not already hold a {@link Capability} instance.
+   */
+  public static NameIdentifier applyCapabilities(
+      NameIdentifier ident, Capability.Scope scope, CatalogManager 
catalogManager) {
+    Capability capability = getCapability(ident, catalogManager);
+    return applyCapabilities(ident, scope, capability);
+  }
+
   public static NameIdentifier[] applyCaseSensitive(
       NameIdentifier[] idents, Capability.Scope scope, Capability 
capabilities) {
     return Arrays.stream(idents)
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/AccessControlHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/AccessControlHookDispatcher.java
index 5cce42c4d0..d90601ad8e 100644
--- 
a/core/src/main/java/org/apache/gravitino/hook/AccessControlHookDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/hook/AccessControlHookDispatcher.java
@@ -21,7 +21,6 @@ package org.apache.gravitino.hook;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.MetadataObject;
@@ -47,14 +46,17 @@ import 
org.apache.gravitino.exceptions.UserAlreadyExistsException;
 import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.PrincipalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * {@code AccessControlHookDispatcher} is a decorator for {@link 
AccessControlDispatcher} that not
  * only delegates access control operations to the underlying access control 
dispatcher but also
  * executes some hook operations before or after the underlying operations.
  */
-@Slf4j
 public class AccessControlHookDispatcher implements AccessControlDispatcher {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AccessControlHookDispatcher.class);
   private final AccessControlDispatcher dispatcher;
 
   public AccessControlHookDispatcher(AccessControlDispatcher dispatcher) {
@@ -186,7 +188,7 @@ public class AccessControlHookDispatcher implements 
AccessControlDispatcher {
     try {
       oldRole = getRole(metalake, role);
     } catch (NoSuchRoleException e) {
-      log.debug(e.getMessage());
+      LOG.debug(e.getMessage());
     }
     boolean resultOfDeleteRole = dispatcher.deleteRole(metalake, role);
     if (resultOfDeleteRole && oldRole != null) {
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
index 800c6a66a2..22103b6253 100644
--- a/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
@@ -28,7 +28,9 @@ import org.apache.gravitino.Namespace;
 import org.apache.gravitino.authorization.AuthorizationUtils;
 import org.apache.gravitino.authorization.Owner;
 import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.catalog.CapabilityHelpers;
 import org.apache.gravitino.catalog.FilesetDispatcher;
+import org.apache.gravitino.connector.capability.Capability;
 import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
 import org.apache.gravitino.exceptions.NoSuchLocationNameException;
@@ -82,9 +84,16 @@ public class FilesetHookDispatcher implements 
FilesetDispatcher {
     // Set the creator as the owner of the fileset.
     OwnerDispatcher ownerManager = 
GravitinoEnv.getInstance().ownerDispatcher();
     if (ownerManager != null) {
+      // The inner NormalizeDispatcher case-folds the fileset name (and its 
schema namespace)
+      // based on catalog capabilities, so the entity is stored under the 
normalized identifier.
+      // Apply the same normalization here so the owner is attached to the 
same identifier the
+      // manager sees.
+      NameIdentifier normalizedIdent =
+          CapabilityHelpers.applyCapabilities(
+              ident, Capability.Scope.FILESET, 
GravitinoEnv.getInstance().catalogManager());
       ownerManager.setOwner(
-          ident.namespace().level(0),
-          NameIdentifierUtil.toMetadataObject(ident, 
Entity.EntityType.FILESET),
+          normalizedIdent.namespace().level(0),
+          NameIdentifierUtil.toMetadataObject(normalizedIdent, 
Entity.EntityType.FILESET),
           PrincipalUtils.getCurrentUserName(),
           Owner.Type.USER);
     }
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/FunctionHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/FunctionHookDispatcher.java
index 6898c45198..dc1758102b 100644
--- a/core/src/main/java/org/apache/gravitino/hook/FunctionHookDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/hook/FunctionHookDispatcher.java
@@ -25,7 +25,9 @@ import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.authorization.Owner;
 import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.catalog.CapabilityHelpers;
 import org.apache.gravitino.catalog.FunctionDispatcher;
+import org.apache.gravitino.connector.capability.Capability;
 import org.apache.gravitino.exceptions.FunctionAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchFunctionException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
@@ -42,7 +44,6 @@ import org.apache.gravitino.utils.PrincipalUtils;
  * operations before or after the underlying operations.
  */
 public class FunctionHookDispatcher implements FunctionDispatcher {
-
   private final FunctionDispatcher dispatcher;
 
   public FunctionHookDispatcher(FunctionDispatcher dispatcher) {
@@ -80,12 +81,19 @@ public class FunctionHookDispatcher implements 
FunctionDispatcher {
     Function function =
         dispatcher.registerFunction(ident, comment, functionType, 
deterministic, definitions);
 
-    // Set the creator as owner of the function.
+    // Set the creator as the owner of the function.
     OwnerDispatcher ownerManager = 
GravitinoEnv.getInstance().ownerDispatcher();
     if (ownerManager != null) {
+      // The inner NormalizeDispatcher case-folds the function name (and its 
schema namespace)
+      // based on catalog capabilities, so the entity is stored under the 
normalized identifier.
+      // Apply the same normalization here so the owner is attached to the 
same identifier the
+      // manager sees.
+      NameIdentifier normalizedIdent =
+          CapabilityHelpers.applyCapabilities(
+              ident, Capability.Scope.FUNCTION, 
GravitinoEnv.getInstance().catalogManager());
       ownerManager.setOwner(
-          ident.namespace().level(0),
-          NameIdentifierUtil.toMetadataObject(ident, 
Entity.EntityType.FUNCTION),
+          normalizedIdent.namespace().level(0),
+          NameIdentifierUtil.toMetadataObject(normalizedIdent, 
Entity.EntityType.FUNCTION),
           PrincipalUtils.getCurrentUserName(),
           Owner.Type.USER);
     }
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/MetalakeHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/MetalakeHookDispatcher.java
index 407c8fe27b..310368d67f 100644
--- a/core/src/main/java/org/apache/gravitino/hook/MetalakeHookDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/hook/MetalakeHookDispatcher.java
@@ -64,7 +64,7 @@ public class MetalakeHookDispatcher implements 
MetalakeDispatcher {
       throws MetalakeAlreadyExistsException {
     Metalake metalake = dispatcher.createMetalake(ident, comment, properties);
 
-    // Add the creator to the metalake
+    // Add the creator to the metalake.
     AccessControlDispatcher accessControlDispatcher =
         GravitinoEnv.getInstance().accessControlDispatcher();
     if (accessControlDispatcher != null) {
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/ModelHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/ModelHookDispatcher.java
index 97837f8103..85c8af2ce3 100644
--- a/core/src/main/java/org/apache/gravitino/hook/ModelHookDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/hook/ModelHookDispatcher.java
@@ -26,7 +26,9 @@ import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.authorization.Owner;
 import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.catalog.CapabilityHelpers;
 import org.apache.gravitino.catalog.ModelDispatcher;
+import org.apache.gravitino.connector.capability.Capability;
 import org.apache.gravitino.exceptions.ModelAlreadyExistsException;
 import 
org.apache.gravitino.exceptions.ModelVersionAliasesAlreadyExistException;
 import org.apache.gravitino.exceptions.NoSuchModelException;
@@ -71,9 +73,15 @@ public class ModelHookDispatcher implements ModelDispatcher {
     // Set the creator as owner of the model.
     OwnerDispatcher ownerManager = 
GravitinoEnv.getInstance().ownerDispatcher();
     if (ownerManager != null) {
+      // The inner NormalizeDispatcher case-folds the model name based on 
catalog capabilities,
+      // so the entity is stored under the normalized identifier. Apply the 
same normalization
+      // here so the owner is attached to the same identifier the manager sees.
+      NameIdentifier normalizedIdent =
+          CapabilityHelpers.applyCapabilities(
+              ident, Capability.Scope.MODEL, 
GravitinoEnv.getInstance().catalogManager());
       ownerManager.setOwner(
-          ident.namespace().level(0),
-          NameIdentifierUtil.toMetadataObject(ident, Entity.EntityType.MODEL),
+          normalizedIdent.namespace().level(0),
+          NameIdentifierUtil.toMetadataObject(normalizedIdent, 
Entity.EntityType.MODEL),
           PrincipalUtils.getCurrentUserName(),
           Owner.Type.USER);
     }
@@ -159,9 +167,15 @@ public class ModelHookDispatcher implements 
ModelDispatcher {
     // Set the creator as owner of the model.
     OwnerDispatcher ownerManager = 
GravitinoEnv.getInstance().ownerDispatcher();
     if (ownerManager != null) {
+      // The inner NormalizeDispatcher case-folds the model name based on 
catalog capabilities,
+      // so the entity is stored under the normalized identifier. Apply the 
same normalization
+      // here so the owner is attached to the same identifier the manager sees.
+      NameIdentifier normalizedIdent =
+          CapabilityHelpers.applyCapabilities(
+              ident, Capability.Scope.MODEL, 
GravitinoEnv.getInstance().catalogManager());
       ownerManager.setOwner(
-          ident.name(),
-          NameIdentifierUtil.toMetadataObject(ident, Entity.EntityType.MODEL),
+          normalizedIdent.namespace().level(0),
+          NameIdentifierUtil.toMetadataObject(normalizedIdent, 
Entity.EntityType.MODEL),
           PrincipalUtils.getCurrentUserName(),
           Owner.Type.USER);
     }
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/SchemaHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/SchemaHookDispatcher.java
index a90be09111..6114f7d2fb 100644
--- a/core/src/main/java/org/apache/gravitino/hook/SchemaHookDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/hook/SchemaHookDispatcher.java
@@ -29,7 +29,9 @@ import org.apache.gravitino.SchemaChange;
 import org.apache.gravitino.authorization.AuthorizationUtils;
 import org.apache.gravitino.authorization.Owner;
 import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.catalog.CapabilityHelpers;
 import org.apache.gravitino.catalog.SchemaDispatcher;
+import org.apache.gravitino.connector.capability.Capability;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NonEmptySchemaException;
@@ -62,9 +64,15 @@ public class SchemaHookDispatcher implements 
SchemaDispatcher {
     // Set the creator as the owner of the schema.
     OwnerDispatcher ownerManager = 
GravitinoEnv.getInstance().ownerDispatcher();
     if (ownerManager != null) {
+      // The inner NormalizeDispatcher case-folds the schema name based on 
catalog capabilities,
+      // so the entity is stored under the normalized identifier. Apply the 
same normalization
+      // here so the owner is attached to the same identifier the manager sees.
+      NameIdentifier normalizedIdent =
+          CapabilityHelpers.applyCapabilities(
+              ident, Capability.Scope.SCHEMA, 
GravitinoEnv.getInstance().catalogManager());
       ownerManager.setOwner(
-          ident.namespace().level(0),
-          NameIdentifierUtil.toMetadataObject(ident, Entity.EntityType.SCHEMA),
+          normalizedIdent.namespace().level(0),
+          NameIdentifierUtil.toMetadataObject(normalizedIdent, 
Entity.EntityType.SCHEMA),
           PrincipalUtils.getCurrentUserName(),
           Owner.Type.USER);
     }
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java
index dc2bd0fb64..e898a549cb 100644
--- a/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java
@@ -27,7 +27,9 @@ import org.apache.gravitino.Namespace;
 import org.apache.gravitino.authorization.AuthorizationUtils;
 import org.apache.gravitino.authorization.Owner;
 import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.catalog.CapabilityHelpers;
 import org.apache.gravitino.catalog.TableDispatcher;
+import org.apache.gravitino.connector.capability.Capability;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
 import org.apache.gravitino.exceptions.TableAlreadyExistsException;
@@ -81,9 +83,16 @@ public class TableHookDispatcher implements TableDispatcher {
     // Set the creator as the owner of the table.
     OwnerDispatcher ownerManager = 
GravitinoEnv.getInstance().ownerDispatcher();
     if (ownerManager != null) {
+      // The inner NormalizeDispatcher case-folds the table name (and its 
schema namespace)
+      // based on catalog capabilities, so the entity is stored under the 
normalized identifier.
+      // Apply the same normalization here so the owner is attached to the 
same identifier the
+      // manager sees.
+      NameIdentifier normalizedIdent =
+          CapabilityHelpers.applyCapabilities(
+              ident, Capability.Scope.TABLE, 
GravitinoEnv.getInstance().catalogManager());
       ownerManager.setOwner(
-          ident.namespace().level(0),
-          NameIdentifierUtil.toMetadataObject(ident, Entity.EntityType.TABLE),
+          normalizedIdent.namespace().level(0),
+          NameIdentifierUtil.toMetadataObject(normalizedIdent, 
Entity.EntityType.TABLE),
           PrincipalUtils.getCurrentUserName(),
           Owner.Type.USER);
     }
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/TagHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/TagHookDispatcher.java
index 444f597fe4..75bfbd1fb3 100644
--- a/core/src/main/java/org/apache/gravitino/hook/TagHookDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/hook/TagHookDispatcher.java
@@ -59,7 +59,7 @@ public class TagHookDispatcher implements TagDispatcher {
       String metalake, String name, String comment, Map<String, String> 
properties) {
     Tag tag = dispatcher.createTag(metalake, name, comment, properties);
 
-    // Set the creator as the owner of the catalog.
+    // Set the creator as the owner of the tag.
     OwnerDispatcher ownerDispatcher = 
GravitinoEnv.getInstance().ownerDispatcher();
     if (ownerDispatcher != null) {
       ownerDispatcher.setOwner(
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/TopicHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/TopicHookDispatcher.java
index 8926e01829..a9ac3b2dc7 100644
--- a/core/src/main/java/org/apache/gravitino/hook/TopicHookDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/hook/TopicHookDispatcher.java
@@ -27,7 +27,9 @@ import org.apache.gravitino.Namespace;
 import org.apache.gravitino.authorization.AuthorizationUtils;
 import org.apache.gravitino.authorization.Owner;
 import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.catalog.CapabilityHelpers;
 import org.apache.gravitino.catalog.TopicDispatcher;
+import org.apache.gravitino.connector.capability.Capability;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTopicException;
 import org.apache.gravitino.exceptions.TopicAlreadyExistsException;
@@ -68,9 +70,16 @@ public class TopicHookDispatcher implements TopicDispatcher {
     // Set the creator as the owner of the topic.
     OwnerDispatcher ownerManager = 
GravitinoEnv.getInstance().ownerDispatcher();
     if (ownerManager != null) {
+      // The inner NormalizeDispatcher case-folds the topic name (and its 
schema namespace)
+      // based on catalog capabilities, so the entity is stored under the 
normalized identifier.
+      // Apply the same normalization here so the owner is attached to the 
same identifier the
+      // manager sees.
+      NameIdentifier normalizedIdent =
+          CapabilityHelpers.applyCapabilities(
+              ident, Capability.Scope.TOPIC, 
GravitinoEnv.getInstance().catalogManager());
       ownerManager.setOwner(
-          ident.namespace().level(0),
-          NameIdentifierUtil.toMetadataObject(ident, Entity.EntityType.TOPIC),
+          normalizedIdent.namespace().level(0),
+          NameIdentifierUtil.toMetadataObject(normalizedIdent, 
Entity.EntityType.TOPIC),
           PrincipalUtils.getCurrentUserName(),
           Owner.Type.USER);
     }
diff --git 
a/core/src/test/java/org/apache/gravitino/hook/TestAccessControlHookDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/hook/TestAccessControlHookDispatcher.java
new file mode 100644
index 0000000000..d0d1f1519e
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/hook/TestAccessControlHookDispatcher.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.authorization.AccessControlDispatcher;
+import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.authorization.Role;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestAccessControlHookDispatcher {
+
+  private AccessControlHookDispatcher hookDispatcher;
+  private AccessControlDispatcher mockDispatcher;
+  private OwnerDispatcher mockOwnerDispatcher;
+  // Save the original ownerDispatcher before each test and restore it in 
tearDown so we do not
+  // leak null state into the GravitinoEnv singleton across tests.
+  private OwnerDispatcher savedOwnerDispatcher;
+
+  @BeforeEach
+  public void setUp() throws IllegalAccessException {
+    mockDispatcher = mock(AccessControlDispatcher.class);
+    mockOwnerDispatcher = mock(OwnerDispatcher.class);
+    savedOwnerDispatcher = GravitinoEnv.getInstance().ownerDispatcher();
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "ownerDispatcher", 
mockOwnerDispatcher, true);
+    hookDispatcher = new AccessControlHookDispatcher(mockDispatcher);
+  }
+
+  @AfterEach
+  public void tearDown() throws IllegalAccessException {
+    FieldUtils.writeField(
+        GravitinoEnv.getInstance(), "ownerDispatcher", savedOwnerDispatcher, 
true);
+  }
+
+  @Test
+  public void testCreateRoleThrowsWhenSetOwnerFails() {
+    Role mockRole = mock(Role.class);
+    when(mockDispatcher.createRole(any(), any(), any(), 
any())).thenReturn(mockRole);
+
+    doThrow(new RuntimeException("Set owner failed"))
+        .when(mockOwnerDispatcher)
+        .setOwner(any(), any(), any(), any());
+
+    RuntimeException thrown =
+        Assertions.assertThrows(
+            RuntimeException.class,
+            () ->
+                hookDispatcher.createRole(
+                    "test_metalake", "test_role", Collections.emptyMap(), 
Collections.emptyList()));
+    Assertions.assertEquals("Set owner failed", thrown.getMessage());
+    verify(mockDispatcher).createRole(any(), any(), any(), any());
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/hook/TestFilesetHookDispatcher.java 
b/core/src/test/java/org/apache/gravitino/hook/TestFilesetHookDispatcher.java
index 390f0f05dc..9ea21d23e7 100644
--- 
a/core/src/test/java/org/apache/gravitino/hook/TestFilesetHookDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/hook/TestFilesetHookDispatcher.java
@@ -35,27 +35,35 @@ import static 
org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
 import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
 import static org.apache.gravitino.Configs.VERSION_RETENTION_COUNT;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import java.io.IOException;
 import java.util.Map;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.authorization.AccessControlManager;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerDispatcher;
 import org.apache.gravitino.catalog.CatalogManager;
+import org.apache.gravitino.catalog.FilesetDispatcher;
 import org.apache.gravitino.catalog.TestFilesetOperationDispatcher;
 import org.apache.gravitino.catalog.TestOperationDispatcher;
 import org.apache.gravitino.connector.BaseCatalog;
 import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
+import org.apache.gravitino.connector.capability.Capability;
+import org.apache.gravitino.connector.capability.CapabilityResult;
 import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.file.FilesetChange;
 import org.apache.gravitino.lock.LockManager;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 public class TestFilesetHookDispatcher extends TestOperationDispatcher {
@@ -67,7 +75,7 @@ public class TestFilesetHookDispatcher extends 
TestOperationDispatcher {
   private static AuthorizationPlugin authorizationPlugin;
 
   @BeforeAll
-  public static void initialize() throws IOException, IllegalAccessException {
+  public static void initialize() throws Exception {
     TestFilesetOperationDispatcher.initialize();
 
     filesetHookDispatcher =
@@ -80,11 +88,103 @@ public class TestFilesetHookDispatcher extends 
TestOperationDispatcher {
     catalogManager = Mockito.mock(CatalogManager.class);
     FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager", 
catalogManager, true);
     BaseCatalog catalog = Mockito.mock(BaseCatalog.class);
+    Mockito.when(catalog.capability()).thenReturn(Capability.DEFAULT);
+    CatalogManager.CatalogWrapper catalogWrapper =
+        Mockito.mock(CatalogManager.CatalogWrapper.class);
+    Mockito.when(catalogWrapper.catalog()).thenReturn(catalog);
+    Mockito.when(catalogWrapper.capabilities()).thenReturn(Capability.DEFAULT);
     Mockito.when(catalogManager.loadCatalog(any())).thenReturn(catalog);
+    
Mockito.when(catalogManager.loadCatalogAndWrap(any())).thenReturn(catalogWrapper);
     authorizationPlugin = Mockito.mock(AuthorizationPlugin.class);
     
Mockito.when(catalog.getAuthorizationPlugin()).thenReturn(authorizationPlugin);
   }
 
+  @Test
+  public void testCreateFilesetSetsOwnerWithNormalizedIdentifier() throws 
Exception {
+    // Self-contained: use a fresh hook with a directly-mocked 
FilesetDispatcher and a case-
+    // insensitive catalog so we can verify the helper passes a normalized 
ident to setOwner.
+    CatalogManager savedCatalogManager = 
GravitinoEnv.getInstance().catalogManager();
+    OwnerDispatcher savedOwnerDispatcher = 
GravitinoEnv.getInstance().ownerDispatcher();
+
+    CatalogManager mockCatalogManager = Mockito.mock(CatalogManager.class);
+    CatalogManager.CatalogWrapper mockWrapper = 
Mockito.mock(CatalogManager.CatalogWrapper.class);
+    Mockito.when(mockWrapper.capabilities()).thenReturn(new 
CaseInsensitiveCapability());
+    
Mockito.when(mockCatalogManager.loadCatalogAndWrap(any())).thenReturn(mockWrapper);
+
+    OwnerDispatcher mockOwnerDispatcher = Mockito.mock(OwnerDispatcher.class);
+    FilesetDispatcher mockFilesetDispatcher = 
Mockito.mock(FilesetDispatcher.class);
+    Mockito.when(
+            mockFilesetDispatcher.createMultipleLocationFileset(any(), any(), 
any(), any(), any()))
+        .thenReturn(Mockito.mock(Fileset.class));
+
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager", 
mockCatalogManager, true);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "ownerDispatcher", 
mockOwnerDispatcher, true);
+
+    try {
+      FilesetHookDispatcher localHook = new 
FilesetHookDispatcher(mockFilesetDispatcher);
+      NameIdentifier ident = NameIdentifier.of(metalake, catalog, 
"SCHEMA_NORM", "MY_FILESET");
+      localHook.createMultipleLocationFileset(
+          ident,
+          "comment",
+          Fileset.Type.MANAGED,
+          ImmutableMap.of("default", "/tmp/loc"),
+          ImmutableMap.of());
+
+      ArgumentCaptor<MetadataObject> captor = 
ArgumentCaptor.forClass(MetadataObject.class);
+      Mockito.verify(mockOwnerDispatcher)
+          .setOwner(eq(metalake), captor.capture(), any(), 
eq(Owner.Type.USER));
+      Assertions.assertEquals(
+          "my_fileset",
+          captor.getValue().name(),
+          "Fileset name passed to setOwner must be lowercased by 
Capability.Scope.FILESET"
+              + " normalization");
+      Assertions.assertEquals(
+          catalog + ".schema_norm",
+          captor.getValue().parent(),
+          "Fileset parent (catalog.schema) must have its schema component 
lowercased by"
+              + " Capability.Scope.FILESET namespace normalization");
+    } finally {
+      FieldUtils.writeField(
+          GravitinoEnv.getInstance(), "catalogManager", savedCatalogManager, 
true);
+      FieldUtils.writeField(
+          GravitinoEnv.getInstance(), "ownerDispatcher", savedOwnerDispatcher, 
true);
+    }
+  }
+
+  @Test
+  public void testCreateFilesetThrowsWhenSetOwnerFails() throws 
IllegalAccessException {
+    // Save the original ownerDispatcher so we can restore it in the finally 
block instead of
+    // wiping it to null and leaking that into other tests in the suite.
+    OwnerDispatcher savedOwnerDispatcher = 
GravitinoEnv.getInstance().ownerDispatcher();
+
+    // Create the schema first with the existing (non-throwing) 
ownerDispatcher, then swap to the
+    // throwing mock only for the fileset create we actually want to exercise. 
Otherwise the
+    // throwing mock would fire during schema creation and we would never 
reach the fileset call.
+    Namespace filesetNs = Namespace.of(metalake, catalog, "schema_owner_fail");
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    schemaHookDispatcher.createSchema(NameIdentifier.of(filesetNs.levels()), 
"comment", props);
+
+    OwnerDispatcher mockOwnerDispatcher = Mockito.mock(OwnerDispatcher.class);
+    Mockito.doThrow(new RuntimeException("Set owner failed"))
+        .when(mockOwnerDispatcher)
+        .setOwner(any(), any(), any(), any());
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "ownerDispatcher", 
mockOwnerDispatcher, true);
+
+    try {
+      NameIdentifier filesetIdent = NameIdentifier.of(filesetNs, 
"fileset_owner_fail");
+      RuntimeException thrown =
+          Assertions.assertThrows(
+              RuntimeException.class,
+              () ->
+                  filesetHookDispatcher.createFileset(
+                      filesetIdent, "comment", Fileset.Type.MANAGED, 
"fileset_owner", props));
+      Assertions.assertEquals("Set owner failed", thrown.getMessage());
+    } finally {
+      FieldUtils.writeField(
+          GravitinoEnv.getInstance(), "ownerDispatcher", savedOwnerDispatcher, 
true);
+    }
+  }
+
   @Test
   public void testDropAuthorizationPrivilege() {
     Namespace filesetNs = Namespace.of(metalake, catalog, "schema11212");
@@ -149,4 +249,11 @@ public class TestFilesetHookDispatcher extends 
TestOperationDispatcher {
     filesetHookDispatcher.alterFileset(filesetIdent, renameChange);
     Mockito.verify(authorizationPlugin).onMetadataUpdated(any());
   }
+
+  private static class CaseInsensitiveCapability implements Capability {
+    @Override
+    public CapabilityResult caseSensitiveOnName(Scope scope) {
+      return CapabilityResult.unsupported("case-insensitive");
+    }
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/hook/TestFunctionHookDispatcher.java 
b/core/src/test/java/org/apache/gravitino/hook/TestFunctionHookDispatcher.java
index f567d3764f..faba932cd6 100644
--- 
a/core/src/test/java/org/apache/gravitino/hook/TestFunctionHookDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/hook/TestFunctionHookDispatcher.java
@@ -20,6 +20,9 @@ package org.apache.gravitino.hook;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.GravitinoEnv;
@@ -28,7 +31,10 @@ import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.auth.AuthConstants;
 import org.apache.gravitino.authorization.Owner;
 import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.catalog.CatalogManager;
 import org.apache.gravitino.catalog.FunctionDispatcher;
+import org.apache.gravitino.connector.capability.Capability;
+import org.apache.gravitino.connector.capability.CapabilityResult;
 import org.apache.gravitino.function.Function;
 import org.apache.gravitino.function.FunctionDefinition;
 import org.apache.gravitino.function.FunctionType;
@@ -42,6 +48,7 @@ public class TestFunctionHookDispatcher {
   public void testRegisterFunctionSetOwnerAfterRegister() throws Exception {
     GravitinoEnv gravitinoEnv = GravitinoEnv.getInstance();
     Object originalOwnerDispatcher = FieldUtils.readField(gravitinoEnv, 
"ownerDispatcher", true);
+    Object originalCatalogManager = FieldUtils.readField(gravitinoEnv, 
"catalogManager", true);
 
     NameIdentifier functionIdentifier =
         NameIdentifier.of("metalake1", "catalog1", "schema1", "func1");
@@ -50,6 +57,14 @@ public class TestFunctionHookDispatcher {
     Function registeredFunction = Mockito.mock(Function.class);
     OwnerDispatcher ownerDispatcher = Mockito.mock(OwnerDispatcher.class);
 
+    // Wire a case-sensitive capability so the un-normalized identifier 
reaches setOwner unchanged,
+    // while still exercising the normalization codepath in the hook.
+    CatalogManager catalogManager = Mockito.mock(CatalogManager.class);
+    CatalogManager.CatalogWrapper catalogWrapper =
+        Mockito.mock(CatalogManager.CatalogWrapper.class);
+    Mockito.when(catalogWrapper.capabilities()).thenReturn(Capability.DEFAULT);
+    
Mockito.when(catalogManager.loadCatalogAndWrap(any())).thenReturn(catalogWrapper);
+
     Mockito.when(
             dispatcher.registerFunction(
                 Mockito.eq(functionIdentifier),
@@ -60,6 +75,7 @@ public class TestFunctionHookDispatcher {
         .thenReturn(registeredFunction);
 
     FieldUtils.writeField(gravitinoEnv, "ownerDispatcher", ownerDispatcher, 
true);
+    FieldUtils.writeField(gravitinoEnv, "catalogManager", catalogManager, 
true);
     try {
       FunctionHookDispatcher hookDispatcher = new 
FunctionHookDispatcher(dispatcher);
       Function result =
@@ -80,6 +96,7 @@ public class TestFunctionHookDispatcher {
       assertEquals("catalog1.schema1.func1", 
metadataObjectCaptor.getValue().fullName());
     } finally {
       FieldUtils.writeField(gravitinoEnv, "ownerDispatcher", 
originalOwnerDispatcher, true);
+      FieldUtils.writeField(gravitinoEnv, "catalogManager", 
originalCatalogManager, true);
     }
   }
 
@@ -117,4 +134,104 @@ public class TestFunctionHookDispatcher {
       FieldUtils.writeField(gravitinoEnv, "ownerDispatcher", 
originalOwnerDispatcher, true);
     }
   }
+
+  @Test
+  public void testRegisterFunctionSetsOwnerWithNormalizedIdentifier() throws 
Exception {
+    // Verifies the hook applies Capability.Scope.FUNCTION normalization 
before setOwner, so the
+    // owner relation references the same identifier that NormalizeDispatcher 
persists under.
+    GravitinoEnv gravitinoEnv = GravitinoEnv.getInstance();
+    Object originalOwnerDispatcher = FieldUtils.readField(gravitinoEnv, 
"ownerDispatcher", true);
+    Object originalCatalogManager = FieldUtils.readField(gravitinoEnv, 
"catalogManager", true);
+
+    CatalogManager mockCatalogManager = Mockito.mock(CatalogManager.class);
+    CatalogManager.CatalogWrapper mockWrapper = 
Mockito.mock(CatalogManager.CatalogWrapper.class);
+    Mockito.when(mockWrapper.capabilities()).thenReturn(new 
CaseInsensitiveCapability());
+    
Mockito.when(mockCatalogManager.loadCatalogAndWrap(any())).thenReturn(mockWrapper);
+
+    OwnerDispatcher mockOwnerDispatcher = Mockito.mock(OwnerDispatcher.class);
+    FunctionDispatcher mockFunctionDispatcher = 
Mockito.mock(FunctionDispatcher.class);
+    Function mockFunction = Mockito.mock(Function.class);
+    FunctionDefinition[] definitions = new FunctionDefinition[] {};
+    Mockito.when(
+            mockFunctionDispatcher.registerFunction(
+                any(), any(), any(), Mockito.anyBoolean(), any()))
+        .thenReturn(mockFunction);
+
+    FieldUtils.writeField(gravitinoEnv, "catalogManager", mockCatalogManager, 
true);
+    FieldUtils.writeField(gravitinoEnv, "ownerDispatcher", 
mockOwnerDispatcher, true);
+
+    try {
+      FunctionHookDispatcher hook = new 
FunctionHookDispatcher(mockFunctionDispatcher);
+      NameIdentifier ident = NameIdentifier.of("metalake1", "catalog1", 
"SCHEMA_NORM", "MY_FUNC");
+      hook.registerFunction(ident, "comment", FunctionType.SCALAR, true, 
definitions);
+
+      ArgumentCaptor<MetadataObject> captor = 
ArgumentCaptor.forClass(MetadataObject.class);
+      Mockito.verify(mockOwnerDispatcher)
+          .setOwner(eq("metalake1"), captor.capture(), any(), 
eq(Owner.Type.USER));
+      assertEquals(
+          "my_func",
+          captor.getValue().name(),
+          "Function name passed to setOwner must be lowercased by 
Capability.Scope.FUNCTION"
+              + " normalization");
+      assertEquals(
+          "catalog1.schema_norm",
+          captor.getValue().parent(),
+          "Function parent (catalog.schema) must have its schema component 
lowercased by"
+              + " Capability.Scope.FUNCTION namespace normalization");
+    } finally {
+      FieldUtils.writeField(gravitinoEnv, "ownerDispatcher", 
originalOwnerDispatcher, true);
+      FieldUtils.writeField(gravitinoEnv, "catalogManager", 
originalCatalogManager, true);
+    }
+  }
+
+  @Test
+  public void testRegisterFunctionThrowsWhenSetOwnerFails() throws Exception {
+    GravitinoEnv gravitinoEnv = GravitinoEnv.getInstance();
+    Object originalOwnerDispatcher = FieldUtils.readField(gravitinoEnv, 
"ownerDispatcher", true);
+    Object originalCatalogManager = FieldUtils.readField(gravitinoEnv, 
"catalogManager", true);
+
+    OwnerDispatcher mockOwnerDispatcher = Mockito.mock(OwnerDispatcher.class);
+    Mockito.doThrow(new RuntimeException("Set owner failed"))
+        .when(mockOwnerDispatcher)
+        .setOwner(any(), any(), any(), any());
+
+    CatalogManager catalogManager = Mockito.mock(CatalogManager.class);
+    CatalogManager.CatalogWrapper catalogWrapper =
+        Mockito.mock(CatalogManager.CatalogWrapper.class);
+    Mockito.when(catalogWrapper.capabilities()).thenReturn(Capability.DEFAULT);
+    
Mockito.when(catalogManager.loadCatalogAndWrap(any())).thenReturn(catalogWrapper);
+
+    FunctionDispatcher mockFunctionDispatcher = 
Mockito.mock(FunctionDispatcher.class);
+    Function mockFunction = Mockito.mock(Function.class);
+    FunctionDefinition[] definitions = new FunctionDefinition[] {};
+    Mockito.when(
+            mockFunctionDispatcher.registerFunction(
+                any(), any(), any(), Mockito.anyBoolean(), any()))
+        .thenReturn(mockFunction);
+
+    FieldUtils.writeField(gravitinoEnv, "ownerDispatcher", 
mockOwnerDispatcher, true);
+    FieldUtils.writeField(gravitinoEnv, "catalogManager", catalogManager, 
true);
+
+    try {
+      FunctionHookDispatcher hook = new 
FunctionHookDispatcher(mockFunctionDispatcher);
+      NameIdentifier ident =
+          NameIdentifier.of("metalake1", "catalog1", "schema_owner_fail", 
"func_owner_fail");
+      RuntimeException thrown =
+          assertThrows(
+              RuntimeException.class,
+              () ->
+                  hook.registerFunction(ident, "comment", FunctionType.SCALAR, 
true, definitions));
+      assertEquals("Set owner failed", thrown.getMessage());
+    } finally {
+      FieldUtils.writeField(gravitinoEnv, "ownerDispatcher", 
originalOwnerDispatcher, true);
+      FieldUtils.writeField(gravitinoEnv, "catalogManager", 
originalCatalogManager, true);
+    }
+  }
+
+  private static class CaseInsensitiveCapability implements Capability {
+    @Override
+    public CapabilityResult caseSensitiveOnName(Scope scope) {
+      return CapabilityResult.unsupported("case-insensitive");
+    }
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/hook/TestJobHookDispatcher.java 
b/core/src/test/java/org/apache/gravitino/hook/TestJobHookDispatcher.java
new file mode 100644
index 0000000000..404c79456c
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/hook/TestJobHookDispatcher.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.job.JobOperationDispatcher;
+import org.apache.gravitino.meta.JobEntity;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestJobHookDispatcher {
+
+  private JobHookDispatcher hookDispatcher;
+  private JobOperationDispatcher mockDispatcher;
+  private OwnerDispatcher mockOwnerDispatcher;
+  // Save the original ownerDispatcher before each test and restore it in 
tearDown so we do not
+  // leak null state into the GravitinoEnv singleton across tests.
+  private OwnerDispatcher savedOwnerDispatcher;
+
+  @BeforeEach
+  public void setUp() throws IllegalAccessException {
+    mockDispatcher = mock(JobOperationDispatcher.class);
+    mockOwnerDispatcher = mock(OwnerDispatcher.class);
+    savedOwnerDispatcher = GravitinoEnv.getInstance().ownerDispatcher();
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "ownerDispatcher", 
mockOwnerDispatcher, true);
+    hookDispatcher = new JobHookDispatcher(mockDispatcher);
+  }
+
+  @AfterEach
+  public void tearDown() throws IllegalAccessException {
+    FieldUtils.writeField(
+        GravitinoEnv.getInstance(), "ownerDispatcher", savedOwnerDispatcher, 
true);
+  }
+
+  @Test
+  public void testRegisterJobTemplateThrowsWhenSetOwnerFails() {
+    JobTemplateEntity mockTemplate = mock(JobTemplateEntity.class);
+    // Job templates live under the reserved job-template virtual namespace, 
which is required
+    // to have 3 levels (metalake, system-catalog, job-template-schema).
+    when(mockTemplate.nameIdentifier())
+        .thenReturn(
+            NameIdentifier.of(NamespaceUtil.ofJobTemplate("test_metalake"), 
"test_template"));
+    when(mockTemplate.name()).thenReturn("test_template");
+
+    doThrow(new RuntimeException("Set owner failed"))
+        .when(mockOwnerDispatcher)
+        .setOwner(any(), any(), any(), any());
+
+    RuntimeException thrown =
+        Assertions.assertThrows(
+            RuntimeException.class,
+            () -> hookDispatcher.registerJobTemplate("test_metalake", 
mockTemplate));
+    Assertions.assertEquals("Set owner failed", thrown.getMessage());
+    verify(mockDispatcher).registerJobTemplate(any(), any());
+  }
+
+  @Test
+  public void testRunJobThrowsWhenSetOwnerFails() {
+    JobEntity mockJob = mock(JobEntity.class);
+    when(mockJob.nameIdentifier())
+        .thenReturn(NameIdentifier.of(NamespaceUtil.ofJob("test_metalake"), 
"test_job"));
+    when(mockJob.name()).thenReturn("test_job");
+    when(mockDispatcher.runJob(any(), any(), any())).thenReturn(mockJob);
+
+    doThrow(new RuntimeException("Set owner failed"))
+        .when(mockOwnerDispatcher)
+        .setOwner(any(), any(), any(), any());
+
+    RuntimeException thrown =
+        Assertions.assertThrows(
+            RuntimeException.class,
+            () -> hookDispatcher.runJob("test_metalake", "test_template", 
Collections.emptyMap()));
+    Assertions.assertEquals("Set owner failed", thrown.getMessage());
+    verify(mockDispatcher).runJob(any(), any(), any());
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/hook/TestMetalakeHookDispatcher.java 
b/core/src/test/java/org/apache/gravitino/hook/TestMetalakeHookDispatcher.java
new file mode 100644
index 0000000000..144a3b89df
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/hook/TestMetalakeHookDispatcher.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.Metalake;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.authorization.AccessControlDispatcher;
+import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.exceptions.UserAlreadyExistsException;
+import org.apache.gravitino.metalake.MetalakeDispatcher;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestMetalakeHookDispatcher {
+
+  private MetalakeHookDispatcher hookDispatcher;
+  private MetalakeDispatcher mockDispatcher;
+  private OwnerDispatcher mockOwnerDispatcher;
+  private AccessControlDispatcher mockAccessControlDispatcher;
+  // Save the originals before each test and restore them in tearDown so we do 
not leak null
+  // state into the GravitinoEnv singleton across tests.
+  private OwnerDispatcher savedOwnerDispatcher;
+  private AccessControlDispatcher savedAccessControlDispatcher;
+
+  @BeforeEach
+  public void setUp() throws IllegalAccessException {
+    mockDispatcher = mock(MetalakeDispatcher.class);
+    mockOwnerDispatcher = mock(OwnerDispatcher.class);
+    mockAccessControlDispatcher = mock(AccessControlDispatcher.class);
+    savedOwnerDispatcher = GravitinoEnv.getInstance().ownerDispatcher();
+    savedAccessControlDispatcher = 
GravitinoEnv.getInstance().accessControlDispatcher();
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "ownerDispatcher", 
mockOwnerDispatcher, true);
+    FieldUtils.writeField(
+        GravitinoEnv.getInstance(), "accessControlDispatcher", 
mockAccessControlDispatcher, true);
+    hookDispatcher = new MetalakeHookDispatcher(mockDispatcher);
+  }
+
+  @AfterEach
+  public void tearDown() throws IllegalAccessException {
+    FieldUtils.writeField(
+        GravitinoEnv.getInstance(), "ownerDispatcher", savedOwnerDispatcher, 
true);
+    FieldUtils.writeField(
+        GravitinoEnv.getInstance(), "accessControlDispatcher", 
savedAccessControlDispatcher, true);
+  }
+
+  @Test
+  public void testCreateMetalakeThrowsWhenSetOwnerFails() {
+    NameIdentifier ident = NameIdentifier.of("test_metalake");
+    Metalake mockMetalake = mock(Metalake.class);
+    when(mockDispatcher.createMetalake(any(), any(), 
any())).thenReturn(mockMetalake);
+
+    doThrow(new RuntimeException("Set owner failed"))
+        .when(mockOwnerDispatcher)
+        .setOwner(any(), any(), any(), any());
+
+    RuntimeException thrown =
+        Assertions.assertThrows(
+            RuntimeException.class,
+            () -> hookDispatcher.createMetalake(ident, "comment", 
Collections.emptyMap()));
+    Assertions.assertEquals("Set owner failed", thrown.getMessage());
+    verify(mockDispatcher).createMetalake(any(), any(), any());
+  }
+
+  @Test
+  public void testCreateMetalakeThrowsWhenUserAlreadyExists() {
+    NameIdentifier ident = NameIdentifier.of("test_metalake");
+    Metalake mockMetalake = mock(Metalake.class);
+    when(mockDispatcher.createMetalake(any(), any(), 
any())).thenReturn(mockMetalake);
+
+    // With the addUser try-catch removed, UserAlreadyExistsException now 
propagates to the caller.
+    // The caller can treat "already exists" as idempotent if they want; the 
server no longer
+    // silently swallows it.
+    doThrow(new UserAlreadyExistsException("User already exists"))
+        .when(mockAccessControlDispatcher)
+        .addUser(any(), any());
+
+    UserAlreadyExistsException thrown =
+        Assertions.assertThrows(
+            UserAlreadyExistsException.class,
+            () -> hookDispatcher.createMetalake(ident, "comment", 
Collections.emptyMap()));
+    Assertions.assertEquals("User already exists", thrown.getMessage());
+    verify(mockAccessControlDispatcher).addUser(any(), any());
+    verify(mockOwnerDispatcher, never()).setOwner(any(), any(), any(), any());
+  }
+
+  @Test
+  public void testCreateMetalakeThrowsWhenAddUserFails() {
+    NameIdentifier ident = NameIdentifier.of("test_metalake");
+    Metalake mockMetalake = mock(Metalake.class);
+    when(mockDispatcher.createMetalake(any(), any(), 
any())).thenReturn(mockMetalake);
+
+    // addUser failure (e.g. storage I/O) now propagates to the caller; 
setOwner is unreachable.
+    doThrow(new RuntimeException("Add user failed"))
+        .when(mockAccessControlDispatcher)
+        .addUser(any(), any());
+
+    RuntimeException thrown =
+        Assertions.assertThrows(
+            RuntimeException.class,
+            () -> hookDispatcher.createMetalake(ident, "comment", 
Collections.emptyMap()));
+    Assertions.assertEquals("Add user failed", thrown.getMessage());
+    verify(mockAccessControlDispatcher).addUser(any(), any());
+    verify(mockOwnerDispatcher, never()).setOwner(any(), any(), any(), any());
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/hook/TestModelHookDispatcher.java 
b/core/src/test/java/org/apache/gravitino/hook/TestModelHookDispatcher.java
new file mode 100644
index 0000000000..c6777b5891
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/hook/TestModelHookDispatcher.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.catalog.CatalogManager;
+import org.apache.gravitino.catalog.ModelDispatcher;
+import org.apache.gravitino.connector.capability.Capability;
+import org.apache.gravitino.connector.capability.CapabilityResult;
+import org.apache.gravitino.model.Model;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TestModelHookDispatcher {
+
+  private ModelHookDispatcher hookDispatcher;
+  private ModelDispatcher mockDispatcher;
+  private OwnerDispatcher mockOwnerDispatcher;
+  private CatalogManager mockCatalogManager;
+  private CatalogManager.CatalogWrapper mockCatalogWrapper;
+  // Save the originals before each test and restore them in tearDown so we do 
not leak null
+  // state into the GravitinoEnv singleton across tests.
+  private OwnerDispatcher savedOwnerDispatcher;
+  private CatalogManager savedCatalogManager;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    mockDispatcher = mock(ModelDispatcher.class);
+    mockOwnerDispatcher = mock(OwnerDispatcher.class);
+    mockCatalogManager = mock(CatalogManager.class);
+    mockCatalogWrapper = mock(CatalogManager.CatalogWrapper.class);
+    
when(mockCatalogManager.loadCatalogAndWrap(any())).thenReturn(mockCatalogWrapper);
+    when(mockCatalogWrapper.capabilities()).thenReturn(Capability.DEFAULT);
+    savedOwnerDispatcher = GravitinoEnv.getInstance().ownerDispatcher();
+    // Read the catalogManager field directly via reflection because the 
public accessor
+    // Preconditions-checks for non-null, which would fail when GravitinoEnv 
has not been
+    // initialized for this test class.
+    savedCatalogManager =
+        (CatalogManager) FieldUtils.readField(GravitinoEnv.getInstance(), 
"catalogManager", true);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "ownerDispatcher", 
mockOwnerDispatcher, true);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager", 
mockCatalogManager, true);
+    hookDispatcher = new ModelHookDispatcher(mockDispatcher);
+  }
+
+  @AfterEach
+  public void tearDown() throws IllegalAccessException {
+    FieldUtils.writeField(
+        GravitinoEnv.getInstance(), "ownerDispatcher", savedOwnerDispatcher, 
true);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager", 
savedCatalogManager, true);
+  }
+
+  @Test
+  public void testRegisterModelThrowsWhenSetOwnerFails() {
+    NameIdentifier ident =
+        NameIdentifier.of("test_metalake", "test_catalog", "test_schema", 
"test_model");
+    Model mockModel = mock(Model.class);
+    when(mockDispatcher.registerModel(any(NameIdentifier.class), 
any(String.class), any()))
+        .thenReturn(mockModel);
+
+    doThrow(new RuntimeException("Set owner failed"))
+        .when(mockOwnerDispatcher)
+        .setOwner(any(), any(), any(), any());
+
+    RuntimeException thrown =
+        Assertions.assertThrows(
+            RuntimeException.class,
+            () -> hookDispatcher.registerModel(ident, "comment", 
Collections.emptyMap()));
+    Assertions.assertEquals("Set owner failed", thrown.getMessage());
+    verify(mockDispatcher).registerModel(any(NameIdentifier.class), 
any(String.class), any());
+  }
+
+  @Test
+  public void testRegisterModelSetsOwnerWithNormalizedIdentifier() throws 
Exception {
+    when(mockCatalogWrapper.capabilities()).thenReturn(new 
CaseInsensitiveCapability());
+
+    NameIdentifier ident =
+        NameIdentifier.of("test_metalake", "test_catalog", "TEST_SCHEMA", 
"MY_MODEL");
+    Model mockModel = mock(Model.class);
+    when(mockDispatcher.registerModel(any(NameIdentifier.class), 
any(String.class), any()))
+        .thenReturn(mockModel);
+
+    hookDispatcher.registerModel(ident, "comment", Collections.emptyMap());
+
+    ArgumentCaptor<MetadataObject> captor = 
ArgumentCaptor.forClass(MetadataObject.class);
+    verify(mockOwnerDispatcher)
+        .setOwner(eq("test_metalake"), captor.capture(), any(), 
eq(Owner.Type.USER));
+    Assertions.assertEquals(
+        "my_model",
+        captor.getValue().name(),
+        "Model name passed to setOwner must be lowercased by 
Capability.Scope.MODEL normalization");
+    // MODEL scope is intentionally excluded from 
CapabilityHelpers.applyCapabilities(Namespace,
+    // Scope, Capability), so the schema component in the namespace is NOT 
lowercased -- the
+    // captured parent reflects exactly what ModelNormalizeDispatcher would 
also pass to the
+    // manager. This assertion locks that behavior in.
+    Assertions.assertEquals(
+        "test_catalog.TEST_SCHEMA",
+        captor.getValue().parent(),
+        "Model parent must keep its schema component as-is: 
Capability.Scope.MODEL is excluded"
+            + " from namespace normalization in CapabilityHelpers; if this 
changes, ownership"
+            + " attachment will diverge from what ModelNormalizeDispatcher 
passes to the manager");
+  }
+
+  @Test
+  public void testRegisterModelWithVersionPassesMetalakeAsFirstSetOwnerArg() {
+    NameIdentifier ident =
+        NameIdentifier.of("test_metalake", "test_catalog", "test_schema", 
"test_model");
+    Model mockModel = mock(Model.class);
+    when(mockDispatcher.registerModel(
+            any(NameIdentifier.class), any(java.util.Map.class), 
any(String[].class), any(), any()))
+        .thenReturn(mockModel);
+
+    hookDispatcher.registerModel(
+        ident,
+        Collections.singletonMap("location", "s3://bucket/model"),
+        new String[0],
+        "comment",
+        Collections.emptyMap());
+
+    // Verify setOwner is called with the metalake name (level(0) of 
namespace), not the model
+    // name. Previously this method incorrectly passed ident.name() as the 
first argument.
+    ArgumentCaptor<MetadataObject> captor = 
ArgumentCaptor.forClass(MetadataObject.class);
+    verify(mockOwnerDispatcher)
+        .setOwner(eq("test_metalake"), captor.capture(), any(), 
eq(Owner.Type.USER));
+    Assertions.assertEquals(
+        "test_model",
+        captor.getValue().name(),
+        "5-arg registerModel must pass the model name to setOwner unchanged 
when input is already"
+            + " lowercase");
+    Assertions.assertEquals(
+        "test_catalog.test_schema",
+        captor.getValue().parent(),
+        "5-arg registerModel must build parent as <catalog>.<schema> 
(level(1).level(2)), not"
+            + " level(0) or the model name; this regression test guards the 
previous bug where"
+            + " ident.name() was used as the metalake arg");
+  }
+
+  private static class CaseInsensitiveCapability implements Capability {
+    @Override
+    public CapabilityResult caseSensitiveOnName(Scope scope) {
+      return CapabilityResult.unsupported("case-insensitive");
+    }
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/hook/TestPolicyHookDispatcher.java 
b/core/src/test/java/org/apache/gravitino/hook/TestPolicyHookDispatcher.java
new file mode 100644
index 0000000000..23eaa84f52
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/hook/TestPolicyHookDispatcher.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.meta.PolicyEntity;
+import org.apache.gravitino.policy.PolicyDispatcher;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestPolicyHookDispatcher {
+
+  private PolicyHookDispatcher hookDispatcher;
+  private PolicyDispatcher mockDispatcher;
+  private OwnerDispatcher mockOwnerDispatcher;
+  // Save the original ownerDispatcher before each test and restore it in 
tearDown so we do not
+  // leak null state into the GravitinoEnv singleton across tests.
+  private OwnerDispatcher savedOwnerDispatcher;
+
+  @BeforeEach
+  public void setUp() throws IllegalAccessException {
+    mockDispatcher = mock(PolicyDispatcher.class);
+    mockOwnerDispatcher = mock(OwnerDispatcher.class);
+    savedOwnerDispatcher = GravitinoEnv.getInstance().ownerDispatcher();
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "ownerDispatcher", 
mockOwnerDispatcher, true);
+    hookDispatcher = new PolicyHookDispatcher(mockDispatcher);
+  }
+
+  @AfterEach
+  public void tearDown() throws IllegalAccessException {
+    FieldUtils.writeField(
+        GravitinoEnv.getInstance(), "ownerDispatcher", savedOwnerDispatcher, 
true);
+  }
+
+  @Test
+  public void testCreatePolicyThrowsWhenSetOwnerFails() {
+    PolicyEntity mockPolicy = mock(PolicyEntity.class);
+    when(mockDispatcher.createPolicy(any(), any(), any(), any(), anyBoolean(), 
any()))
+        .thenReturn(mockPolicy);
+
+    doThrow(new RuntimeException("Set owner failed"))
+        .when(mockOwnerDispatcher)
+        .setOwner(any(), any(), any(), any());
+
+    RuntimeException thrown =
+        Assertions.assertThrows(
+            RuntimeException.class,
+            () ->
+                hookDispatcher.createPolicy(
+                    "test_metalake", "test_policy", null, "comment", true, 
null));
+    Assertions.assertEquals("Set owner failed", thrown.getMessage());
+    verify(mockDispatcher).createPolicy(any(), any(), any(), any(), 
anyBoolean(), any());
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/hook/TestSchemaHookDispatcher.java 
b/core/src/test/java/org/apache/gravitino/hook/TestSchemaHookDispatcher.java
new file mode 100644
index 0000000000..ce14f9f4d0
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/hook/TestSchemaHookDispatcher.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.catalog.CatalogManager;
+import org.apache.gravitino.catalog.SchemaDispatcher;
+import org.apache.gravitino.connector.capability.Capability;
+import org.apache.gravitino.connector.capability.CapabilityResult;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TestSchemaHookDispatcher {
+
+  private SchemaHookDispatcher hookDispatcher;
+  private SchemaDispatcher mockDispatcher;
+  private OwnerDispatcher mockOwnerDispatcher;
+  private CatalogManager mockCatalogManager;
+  private CatalogManager.CatalogWrapper mockCatalogWrapper;
+  // Save the originals before each test and restore them in tearDown so we do 
not leak null
+  // state into the GravitinoEnv singleton across tests.
+  private OwnerDispatcher savedOwnerDispatcher;
+  private CatalogManager savedCatalogManager;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    mockDispatcher = mock(SchemaDispatcher.class);
+    mockOwnerDispatcher = mock(OwnerDispatcher.class);
+    mockCatalogManager = mock(CatalogManager.class);
+    mockCatalogWrapper = mock(CatalogManager.CatalogWrapper.class);
+    
when(mockCatalogManager.loadCatalogAndWrap(any())).thenReturn(mockCatalogWrapper);
+    when(mockCatalogWrapper.capabilities()).thenReturn(Capability.DEFAULT);
+    savedOwnerDispatcher = GravitinoEnv.getInstance().ownerDispatcher();
+    // Tests in this class that rely on the singleton catalogManager always go 
through
+    // GravitinoEnv.getInstance().catalogManager(), but we cannot call the 
public accessor here
+    // because it Preconditions-checks for non-null and would fail when 
GravitinoEnv has not been
+    // initialized. Read the field directly via reflection to capture the 
current value safely.
+    savedCatalogManager =
+        (CatalogManager) FieldUtils.readField(GravitinoEnv.getInstance(), 
"catalogManager", true);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "ownerDispatcher", 
mockOwnerDispatcher, true);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager", 
mockCatalogManager, true);
+    hookDispatcher = new SchemaHookDispatcher(mockDispatcher);
+  }
+
+  @AfterEach
+  public void tearDown() throws IllegalAccessException {
+    FieldUtils.writeField(
+        GravitinoEnv.getInstance(), "ownerDispatcher", savedOwnerDispatcher, 
true);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager", 
savedCatalogManager, true);
+  }
+
+  @Test
+  public void testCreateSchemaThrowsWhenSetOwnerFails() {
+    NameIdentifier ident = NameIdentifier.of("test_metalake", "test_catalog", 
"test_schema");
+    Schema mockSchema = mock(Schema.class);
+    when(mockDispatcher.createSchema(any(), any(), 
any())).thenReturn(mockSchema);
+
+    doThrow(new RuntimeException("Set owner failed"))
+        .when(mockOwnerDispatcher)
+        .setOwner(any(), any(), any(), any());
+
+    RuntimeException thrown =
+        Assertions.assertThrows(
+            RuntimeException.class,
+            () -> hookDispatcher.createSchema(ident, "comment", 
Collections.emptyMap()));
+    Assertions.assertEquals("Set owner failed", thrown.getMessage());
+    verify(mockDispatcher).createSchema(any(), any(), any());
+  }
+
+  @Test
+  public void testCreateSchemaSetsOwnerWithNormalizedIdentifier() throws 
Exception {
+    // Use a case-insensitive capability so the schema name is normalized to 
lower case before
+    // setOwner is called, mirroring what NormalizeDispatcher would do for the 
manager.
+    when(mockCatalogWrapper.capabilities()).thenReturn(new 
CaseInsensitiveCapability());
+
+    NameIdentifier ident = NameIdentifier.of("test_metalake", "test_catalog", 
"MY_SCHEMA");
+    Schema mockSchema = mock(Schema.class);
+    when(mockDispatcher.createSchema(any(), any(), 
any())).thenReturn(mockSchema);
+
+    hookDispatcher.createSchema(ident, "comment", Collections.emptyMap());
+
+    ArgumentCaptor<MetadataObject> captor = 
ArgumentCaptor.forClass(MetadataObject.class);
+    verify(mockOwnerDispatcher)
+        .setOwner(eq("test_metalake"), captor.capture(), any(), 
eq(Owner.Type.USER));
+    Assertions.assertEquals(
+        "my_schema",
+        captor.getValue().name(),
+        "Schema name passed to setOwner must be lowercased by 
Capability.Scope.SCHEMA"
+            + " normalization");
+    // Schema's namespace is [metalake, catalog]; 
NameIdentifierUtil.toMetadataObject uses
+    // level(1) as parent. Catalog is not subject to per-scope name 
normalization here, so
+    // parent is just the catalog name -- there is no schema component to 
normalize.
+    Assertions.assertEquals(
+        "test_catalog",
+        captor.getValue().parent(),
+        "Schema parent must be the catalog name (level(1) of the namespace); 
SCHEMA's namespace"
+            + " has no schema component to normalize");
+  }
+
+  private static class CaseInsensitiveCapability implements Capability {
+    @Override
+    public CapabilityResult caseSensitiveOnName(Scope scope) {
+      return CapabilityResult.unsupported("case-insensitive");
+    }
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/hook/TestTableHookDispatcher.java 
b/core/src/test/java/org/apache/gravitino/hook/TestTableHookDispatcher.java
index 92c0e16131..fe5d5d05b7 100644
--- a/core/src/test/java/org/apache/gravitino/hook/TestTableHookDispatcher.java
+++ b/core/src/test/java/org/apache/gravitino/hook/TestTableHookDispatcher.java
@@ -35,26 +35,32 @@ import static 
org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
 import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
 import static org.apache.gravitino.Configs.VERSION_RETENTION_COUNT;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import java.io.IOException;
 import java.util.Map;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.TestColumn;
 import org.apache.gravitino.authorization.AccessControlManager;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerDispatcher;
 import org.apache.gravitino.catalog.CatalogManager;
+import org.apache.gravitino.catalog.TableDispatcher;
 import org.apache.gravitino.catalog.TestOperationDispatcher;
 import org.apache.gravitino.catalog.TestTableOperationDispatcher;
 import org.apache.gravitino.connector.BaseCatalog;
 import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
 import org.apache.gravitino.connector.capability.Capability;
+import org.apache.gravitino.connector.capability.CapabilityResult;
 import org.apache.gravitino.lock.LockManager;
 import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.expressions.NamedReference;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
@@ -70,8 +76,10 @@ import org.apache.gravitino.rel.indexes.Indexes;
 import org.apache.gravitino.rel.partitions.Partitions;
 import org.apache.gravitino.rel.partitions.RangePartition;
 import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 public class TestTableHookDispatcher extends TestOperationDispatcher {
@@ -83,7 +91,7 @@ public class TestTableHookDispatcher extends 
TestOperationDispatcher {
   private static AuthorizationPlugin authorizationPlugin;
 
   @BeforeAll
-  public static void initialize() throws IOException, IllegalAccessException {
+  public static void initialize() throws Exception {
     TestTableOperationDispatcher.initialize();
 
     tableHookDispatcher =
@@ -100,6 +108,7 @@ public class TestTableHookDispatcher extends 
TestOperationDispatcher {
     CatalogManager.CatalogWrapper catalogWrapper =
         Mockito.mock(CatalogManager.CatalogWrapper.class);
     Mockito.when(catalogWrapper.catalog()).thenReturn(catalog);
+    Mockito.when(catalogWrapper.capabilities()).thenReturn(Capability.DEFAULT);
 
     Mockito.when(catalogManager.loadCatalog(any())).thenReturn(catalog);
     
Mockito.when(catalogManager.loadCatalogAndWrap(any())).thenReturn(catalogWrapper);
@@ -180,6 +189,109 @@ public class TestTableHookDispatcher extends 
TestOperationDispatcher {
         });
   }
 
+  @Test
+  public void testCreateTableSetsOwnerWithNormalizedIdentifier() throws 
Exception {
+    // Self-contained: use a fresh hook with a directly-mocked TableDispatcher 
and a case-
+    // insensitive catalog so we can verify the helper passes a normalized 
ident to setOwner.
+    CatalogManager savedCatalogManager = 
GravitinoEnv.getInstance().catalogManager();
+    OwnerDispatcher savedOwnerDispatcher = 
GravitinoEnv.getInstance().ownerDispatcher();
+
+    CatalogManager mockCatalogManager = Mockito.mock(CatalogManager.class);
+    CatalogManager.CatalogWrapper mockWrapper = 
Mockito.mock(CatalogManager.CatalogWrapper.class);
+    Mockito.when(mockWrapper.capabilities()).thenReturn(new 
CaseInsensitiveCapability());
+    
Mockito.when(mockCatalogManager.loadCatalogAndWrap(any())).thenReturn(mockWrapper);
+
+    OwnerDispatcher mockOwnerDispatcher = Mockito.mock(OwnerDispatcher.class);
+    TableDispatcher mockTableDispatcher = Mockito.mock(TableDispatcher.class);
+    Mockito.when(
+            mockTableDispatcher.createTable(any(), any(), any(), any(), any(), 
any(), any(), any()))
+        .thenReturn(Mockito.mock(Table.class));
+
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager", 
mockCatalogManager, true);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "ownerDispatcher", 
mockOwnerDispatcher, true);
+
+    try {
+      TableHookDispatcher localHook = new 
TableHookDispatcher(mockTableDispatcher);
+      NameIdentifier ident = NameIdentifier.of(metalake, catalog, 
"SCHEMA_NORM", "MY_TABLE");
+      localHook.createTable(
+          ident,
+          new Column[0],
+          "comment",
+          ImmutableMap.of(),
+          new Transform[0],
+          Distributions.NONE,
+          new SortOrder[0],
+          new Index[0]);
+
+      ArgumentCaptor<MetadataObject> captor = 
ArgumentCaptor.forClass(MetadataObject.class);
+      Mockito.verify(mockOwnerDispatcher)
+          .setOwner(eq(metalake), captor.capture(), any(), 
eq(Owner.Type.USER));
+      Assertions.assertEquals(
+          "my_table",
+          captor.getValue().name(),
+          "Table name passed to setOwner must be lowercased by 
Capability.Scope.TABLE normalization");
+      Assertions.assertEquals(
+          catalog + ".schema_norm",
+          captor.getValue().parent(),
+          "Table parent (catalog.schema) must have its schema component 
lowercased by"
+              + " Capability.Scope.TABLE namespace normalization");
+    } finally {
+      FieldUtils.writeField(
+          GravitinoEnv.getInstance(), "catalogManager", savedCatalogManager, 
true);
+      FieldUtils.writeField(
+          GravitinoEnv.getInstance(), "ownerDispatcher", savedOwnerDispatcher, 
true);
+    }
+  }
+
+  @Test
+  public void testCreateTableThrowsWhenSetOwnerFails() throws 
IllegalAccessException {
+    // Save the original ownerDispatcher so we can restore it in the finally 
block instead of
+    // wiping it to null and leaking that into other tests in the suite.
+    OwnerDispatcher savedOwnerDispatcher = 
GravitinoEnv.getInstance().ownerDispatcher();
+
+    // Create the schema first with the existing (non-throwing) 
ownerDispatcher, then swap to the
+    // throwing mock only for the table create we actually want to exercise. 
Otherwise the throwing
+    // mock would fire during schema creation and we would never reach the 
table call.
+    Namespace tableNs = Namespace.of(metalake, catalog, "schema_owner_fail");
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    schemaHookDispatcher.createSchema(NameIdentifier.of(tableNs.levels()), 
"comment", props);
+
+    OwnerDispatcher mockOwnerDispatcher = Mockito.mock(OwnerDispatcher.class);
+    Mockito.doThrow(new RuntimeException("Set owner failed"))
+        .when(mockOwnerDispatcher)
+        .setOwner(any(), any(), any(), any());
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "ownerDispatcher", 
mockOwnerDispatcher, true);
+
+    try {
+      NameIdentifier tableIdent = NameIdentifier.of(tableNs, 
"table_owner_fail");
+      Column[] columns =
+          new Column[] {
+            TestColumn.builder()
+                .withName("col1")
+                .withPosition(0)
+                .withType(Types.StringType.get())
+                .build()
+          };
+      RuntimeException thrown =
+          Assertions.assertThrows(
+              RuntimeException.class,
+              () ->
+                  tableHookDispatcher.createTable(
+                      tableIdent,
+                      columns,
+                      "comment",
+                      props,
+                      new Transform[0],
+                      Distributions.NONE,
+                      new SortOrder[0],
+                      new Index[0]));
+      Assertions.assertEquals("Set owner failed", thrown.getMessage());
+    } finally {
+      FieldUtils.writeField(
+          GravitinoEnv.getInstance(), "ownerDispatcher", savedOwnerDispatcher, 
true);
+    }
+  }
+
   @Test
   public void testRenameAuthorizationPrivilege() {
     Namespace tableNs = Namespace.of(metalake, catalog, "schema1124");
@@ -229,4 +341,11 @@ public class TestTableHookDispatcher extends 
TestOperationDispatcher {
     tableHookDispatcher.alterTable(tableIdent, renameChange);
     Mockito.verify(authorizationPlugin).onMetadataUpdated(any());
   }
+
+  private static class CaseInsensitiveCapability implements Capability {
+    @Override
+    public CapabilityResult caseSensitiveOnName(Scope scope) {
+      return CapabilityResult.unsupported("case-insensitive");
+    }
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/hook/TestTagHookDispatcher.java 
b/core/src/test/java/org/apache/gravitino/hook/TestTagHookDispatcher.java
new file mode 100644
index 0000000000..2819decf62
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/hook/TestTagHookDispatcher.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.tag.Tag;
+import org.apache.gravitino.tag.TagDispatcher;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestTagHookDispatcher {
+
+  private TagHookDispatcher hookDispatcher;
+  private TagDispatcher mockDispatcher;
+  private OwnerDispatcher mockOwnerDispatcher;
+  // Save the original ownerDispatcher before each test and restore it in 
tearDown so we do not
+  // leak null state into the GravitinoEnv singleton across tests.
+  private OwnerDispatcher savedOwnerDispatcher;
+
+  @BeforeEach
+  public void setUp() throws IllegalAccessException {
+    mockDispatcher = mock(TagDispatcher.class);
+    mockOwnerDispatcher = mock(OwnerDispatcher.class);
+    savedOwnerDispatcher = GravitinoEnv.getInstance().ownerDispatcher();
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "ownerDispatcher", 
mockOwnerDispatcher, true);
+    hookDispatcher = new TagHookDispatcher(mockDispatcher);
+  }
+
+  @AfterEach
+  public void tearDown() throws IllegalAccessException {
+    FieldUtils.writeField(
+        GravitinoEnv.getInstance(), "ownerDispatcher", savedOwnerDispatcher, 
true);
+  }
+
+  @Test
+  public void testCreateTagThrowsWhenSetOwnerFails() {
+    Tag mockTag = mock(Tag.class);
+    when(mockDispatcher.createTag(any(), any(), any(), 
any())).thenReturn(mockTag);
+
+    doThrow(new RuntimeException("Set owner failed"))
+        .when(mockOwnerDispatcher)
+        .setOwner(any(), any(), any(), any());
+
+    RuntimeException thrown =
+        Assertions.assertThrows(
+            RuntimeException.class,
+            () ->
+                hookDispatcher.createTag(
+                    "test_metalake", "test_tag", "comment", 
Collections.emptyMap()));
+    Assertions.assertEquals("Set owner failed", thrown.getMessage());
+    verify(mockDispatcher).createTag(any(), any(), any(), any());
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/hook/TestTopicHookDispatcher.java 
b/core/src/test/java/org/apache/gravitino/hook/TestTopicHookDispatcher.java
index dab37bee05..539af87f5a 100644
--- a/core/src/test/java/org/apache/gravitino/hook/TestTopicHookDispatcher.java
+++ b/core/src/test/java/org/apache/gravitino/hook/TestTopicHookDispatcher.java
@@ -19,22 +19,31 @@
 package org.apache.gravitino.hook;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 
 import com.google.common.collect.ImmutableMap;
-import java.io.IOException;
 import java.util.Map;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.authorization.AccessControlManager;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerDispatcher;
 import org.apache.gravitino.catalog.CatalogManager;
 import org.apache.gravitino.catalog.TestOperationDispatcher;
 import org.apache.gravitino.catalog.TestTopicOperationDispatcher;
+import org.apache.gravitino.catalog.TopicDispatcher;
 import org.apache.gravitino.connector.BaseCatalog;
 import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
+import org.apache.gravitino.connector.capability.Capability;
+import org.apache.gravitino.connector.capability.CapabilityResult;
+import org.apache.gravitino.messaging.Topic;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 public class TestTopicHookDispatcher extends TestOperationDispatcher {
@@ -45,7 +54,7 @@ public class TestTopicHookDispatcher extends 
TestOperationDispatcher {
   private static AuthorizationPlugin authorizationPlugin;
 
   @BeforeAll
-  public static void initialize() throws IOException, IllegalAccessException {
+  public static void initialize() throws Exception {
     TestTopicOperationDispatcher.initialize();
 
     topicHookDispatcher =
@@ -58,11 +67,94 @@ public class TestTopicHookDispatcher extends 
TestOperationDispatcher {
     catalogManager = Mockito.mock(CatalogManager.class);
     FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager", 
catalogManager, true);
     BaseCatalog catalog = Mockito.mock(BaseCatalog.class);
+    Mockito.when(catalog.capability()).thenReturn(Capability.DEFAULT);
+    CatalogManager.CatalogWrapper catalogWrapper =
+        Mockito.mock(CatalogManager.CatalogWrapper.class);
+    Mockito.when(catalogWrapper.catalog()).thenReturn(catalog);
+    Mockito.when(catalogWrapper.capabilities()).thenReturn(Capability.DEFAULT);
     Mockito.when(catalogManager.loadCatalog(any())).thenReturn(catalog);
+    
Mockito.when(catalogManager.loadCatalogAndWrap(any())).thenReturn(catalogWrapper);
     authorizationPlugin = Mockito.mock(AuthorizationPlugin.class);
     
Mockito.when(catalog.getAuthorizationPlugin()).thenReturn(authorizationPlugin);
   }
 
+  @Test
+  public void testCreateTopicSetsOwnerWithNormalizedIdentifier() throws 
Exception {
+    // Self-contained: use a fresh hook with a directly-mocked TopicDispatcher 
and a case-
+    // insensitive catalog so we can verify the helper passes a normalized 
ident to setOwner.
+    CatalogManager savedCatalogManager = 
GravitinoEnv.getInstance().catalogManager();
+    OwnerDispatcher savedOwnerDispatcher = 
GravitinoEnv.getInstance().ownerDispatcher();
+
+    CatalogManager mockCatalogManager = Mockito.mock(CatalogManager.class);
+    CatalogManager.CatalogWrapper mockWrapper = 
Mockito.mock(CatalogManager.CatalogWrapper.class);
+    Mockito.when(mockWrapper.capabilities()).thenReturn(new 
CaseInsensitiveCapability());
+    
Mockito.when(mockCatalogManager.loadCatalogAndWrap(any())).thenReturn(mockWrapper);
+
+    OwnerDispatcher mockOwnerDispatcher = Mockito.mock(OwnerDispatcher.class);
+    TopicDispatcher mockTopicDispatcher = Mockito.mock(TopicDispatcher.class);
+    Mockito.when(mockTopicDispatcher.createTopic(any(), any(), any(), any()))
+        .thenReturn(Mockito.mock(Topic.class));
+
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager", 
mockCatalogManager, true);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "ownerDispatcher", 
mockOwnerDispatcher, true);
+
+    try {
+      TopicHookDispatcher localHook = new 
TopicHookDispatcher(mockTopicDispatcher);
+      NameIdentifier ident = NameIdentifier.of(metalake, catalog, 
"SCHEMA_NORM", "MY_TOPIC");
+      localHook.createTopic(ident, "comment", null, ImmutableMap.of());
+
+      ArgumentCaptor<MetadataObject> captor = 
ArgumentCaptor.forClass(MetadataObject.class);
+      Mockito.verify(mockOwnerDispatcher)
+          .setOwner(eq(metalake), captor.capture(), any(), 
eq(Owner.Type.USER));
+      Assertions.assertEquals(
+          "my_topic",
+          captor.getValue().name(),
+          "Topic name passed to setOwner must be lowercased by 
Capability.Scope.TOPIC normalization");
+      Assertions.assertEquals(
+          catalog + ".schema_norm",
+          captor.getValue().parent(),
+          "Topic parent (catalog.schema) must have its schema component 
lowercased by"
+              + " Capability.Scope.TOPIC namespace normalization");
+    } finally {
+      FieldUtils.writeField(
+          GravitinoEnv.getInstance(), "catalogManager", savedCatalogManager, 
true);
+      FieldUtils.writeField(
+          GravitinoEnv.getInstance(), "ownerDispatcher", savedOwnerDispatcher, 
true);
+    }
+  }
+
+  @Test
+  public void testCreateTopicThrowsWhenSetOwnerFails() throws 
IllegalAccessException {
+    // Save the original ownerDispatcher so we can restore it in the finally 
block instead of
+    // wiping it to null and leaking that into other tests in the suite.
+    OwnerDispatcher savedOwnerDispatcher = 
GravitinoEnv.getInstance().ownerDispatcher();
+
+    // Create the schema first with the existing (non-throwing) 
ownerDispatcher, then swap to the
+    // throwing mock only for the topic create we actually want to exercise. 
Otherwise the throwing
+    // mock would fire during schema creation and we would never reach the 
topic call.
+    Namespace topicNs = Namespace.of(metalake, catalog, "schema_owner_fail");
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    schemaHookDispatcher.createSchema(NameIdentifier.of(topicNs.levels()), 
"comment", props);
+
+    OwnerDispatcher mockOwnerDispatcher = Mockito.mock(OwnerDispatcher.class);
+    Mockito.doThrow(new RuntimeException("Set owner failed"))
+        .when(mockOwnerDispatcher)
+        .setOwner(any(), any(), any(), any());
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "ownerDispatcher", 
mockOwnerDispatcher, true);
+
+    try {
+      NameIdentifier topicIdent = NameIdentifier.of(topicNs, 
"topic_owner_fail");
+      RuntimeException thrown =
+          Assertions.assertThrows(
+              RuntimeException.class,
+              () -> topicHookDispatcher.createTopic(topicIdent, "comment", 
null, props));
+      Assertions.assertEquals("Set owner failed", thrown.getMessage());
+    } finally {
+      FieldUtils.writeField(
+          GravitinoEnv.getInstance(), "ownerDispatcher", savedOwnerDispatcher, 
true);
+    }
+  }
+
   @Test
   public void testDropAuthorizationPrivilege() {
     Namespace topicNs = Namespace.of(metalake, catalog, "schema1123");
@@ -77,4 +169,11 @@ public class TestTopicHookDispatcher extends 
TestOperationDispatcher {
           topicHookDispatcher.dropTopic(topicIdent);
         });
   }
+
+  private static class CaseInsensitiveCapability implements Capability {
+    @Override
+    public CapabilityResult caseSensitiveOnName(Scope scope) {
+      return CapabilityResult.unsupported("case-insensitive");
+    }
+  }
 }
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
index 06655e541c..5be7b4db4b 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
@@ -118,31 +118,38 @@ public class RESTService implements 
GravitinoAuxiliaryService {
             skipAuthorizationForRestBackend,
             icebergCatalogWrapperManager);
     this.icebergMetricsManager = new IcebergMetricsManager(icebergConfig);
+    // Table: HookDispatcher -> EventDispatcher -> OperationExecutor
     IcebergTableOperationDispatcher icebergTableOperationDispatcher =
         new IcebergTableOperationExecutor(icebergCatalogWrapperManager);
+    IcebergTableOperationDispatcher icebergTableEventDispatcher =
+        new IcebergTableEventDispatcher(icebergTableOperationDispatcher, 
eventBus, metalakeName);
     if (authorizationContext.isAuthorizationEnabled()) {
-      icebergTableOperationDispatcher =
-          new IcebergTableHookDispatcher(icebergTableOperationDispatcher);
+      icebergTableEventDispatcher = new 
IcebergTableHookDispatcher(icebergTableEventDispatcher);
     }
-    IcebergTableEventDispatcher icebergTableEventDispatcher =
-        new IcebergTableEventDispatcher(icebergTableOperationDispatcher, 
eventBus, metalakeName);
+    IcebergTableOperationDispatcher icebergTableDispatcher = 
icebergTableEventDispatcher;
+
+    // View: HookDispatcher -> EventDispatcher -> OperationExecutor
     IcebergViewOperationDispatcher icebergViewOperationDispatcher =
         new IcebergViewOperationExecutor(icebergCatalogWrapperManager);
+    IcebergViewOperationDispatcher icebergViewEventDispatcher =
+        new IcebergViewEventDispatcher(icebergViewOperationDispatcher, 
eventBus, metalakeName);
     if (authorizationContext.isAuthorizationEnabled()) {
-      icebergViewOperationDispatcher =
-          new IcebergViewHookDispatcher(icebergViewOperationDispatcher, 
metalakeName);
+      icebergViewEventDispatcher =
+          new IcebergViewHookDispatcher(icebergViewEventDispatcher, 
metalakeName);
     }
-    IcebergViewEventDispatcher icebergViewEventDispatcher =
-        new IcebergViewEventDispatcher(icebergViewOperationDispatcher, 
eventBus, metalakeName);
+    IcebergViewOperationDispatcher icebergViewDispatcher = 
icebergViewEventDispatcher;
 
+    // Namespace: HookDispatcher -> EventDispatcher -> OperationExecutor
     IcebergNamespaceOperationDispatcher namespaceOperationDispatcher =
         new IcebergNamespaceOperationExecutor(icebergCatalogWrapperManager);
+    IcebergNamespaceOperationDispatcher icebergNamespaceEventDispatcher =
+        new IcebergNamespaceEventDispatcher(namespaceOperationDispatcher, 
eventBus, metalakeName);
     if (authorizationContext.isAuthorizationEnabled()) {
-      namespaceOperationDispatcher =
-          new IcebergNamespaceHookDispatcher(namespaceOperationDispatcher);
+      icebergNamespaceEventDispatcher =
+          new IcebergNamespaceHookDispatcher(icebergNamespaceEventDispatcher);
     }
-    IcebergNamespaceEventDispatcher icebergNamespaceEventDispatcher =
-        new IcebergNamespaceEventDispatcher(namespaceOperationDispatcher, 
eventBus, metalakeName);
+    IcebergNamespaceOperationDispatcher icebergNamespaceDispatcher =
+        icebergNamespaceEventDispatcher;
 
     config.register(
         new AbstractBinder() {
@@ -155,9 +162,9 @@ public class RESTService implements 
GravitinoAuxiliaryService {
             }
             
bind(icebergCatalogWrapperManager).to(IcebergCatalogWrapperManager.class).ranked(1);
             
bind(icebergMetricsManager).to(IcebergMetricsManager.class).ranked(1);
-            
bind(icebergTableEventDispatcher).to(IcebergTableOperationDispatcher.class).ranked(1);
-            
bind(icebergViewEventDispatcher).to(IcebergViewOperationDispatcher.class).ranked(1);
-            bind(icebergNamespaceEventDispatcher)
+            
bind(icebergTableDispatcher).to(IcebergTableOperationDispatcher.class).ranked(1);
+            
bind(icebergViewDispatcher).to(IcebergViewOperationDispatcher.class).ranked(1);
+            bind(icebergNamespaceDispatcher)
                 .to(IcebergNamespaceOperationDispatcher.class)
                 .ranked(1);
           }
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergNamespaceHookDispatcher.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergNamespaceHookDispatcher.java
index 87932dea20..59a3c4756b 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergNamespaceHookDispatcher.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergNamespaceHookDispatcher.java
@@ -46,6 +46,7 @@ import 
org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
  * operations.
  */
 public class IcebergNamespaceHookDispatcher implements 
IcebergNamespaceOperationDispatcher {
+
   private final IcebergNamespaceOperationDispatcher dispatcher;
   private final String metalake;
 
@@ -59,7 +60,11 @@ public class IcebergNamespaceHookDispatcher implements 
IcebergNamespaceOperation
       IcebergRequestContext context, CreateNamespaceRequest createRequest) {
     CreateNamespaceResponse response = dispatcher.createNamespace(context, 
createRequest);
 
+    // Import is intentionally NOT wrapped in try-catch: if it fails the 
namespace exists in
+    // Iceberg but not in Gravitino, and silently swallowing that would 
mislead callers into
+    // thinking the entity is registered. Surface the failure so the caller 
can react.
     importSchema(context.catalogName(), createRequest.namespace());
+
     IcebergOwnershipUtils.setSchemaOwner(
         metalake,
         context.catalogName(),
@@ -122,10 +127,11 @@ public class IcebergNamespaceHookDispatcher implements 
IcebergNamespaceOperation
       RegisterTableRequest registerTableRequest) {
     LoadTableResponse response = dispatcher.registerTable(context, namespace, 
registerTableRequest);
 
-    // Import the registered table into Gravitino's catalog so it exists as a 
metadata object
+    // Import is intentionally NOT wrapped in try-catch: if it fails the table 
exists in Iceberg
+    // but not in Gravitino, and silently swallowing that would mislead 
callers into thinking the
+    // entity is registered. Surface the failure so the caller can react.
     importTable(context.catalogName(), namespace, registerTableRequest.name());
 
-    // Set the owner of the registered table to the current user
     IcebergOwnershipUtils.setTableOwner(
         metalake,
         context.catalogName(),
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
index 3a52e2428d..ed5d415627 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
@@ -197,12 +197,16 @@ public class IcebergTableHookDispatcher implements 
IcebergTableOperationDispatch
 
   private void importTableAndSetOwner(
       IcebergRequestContext context, Namespace namespace, String tableName) {
+    // Import is intentionally NOT wrapped in try-catch: if it fails the table 
exists in Iceberg
+    // but not in Gravitino, and silently swallowing that would mislead 
callers into thinking the
+    // entity is registered. Surface the failure so the caller can react.
     TableDispatcher tableDispatcher = 
GravitinoEnv.getInstance().tableDispatcher();
     if (tableDispatcher != null) {
       tableDispatcher.loadTable(
           IcebergIdentifierUtils.toGravitinoTableIdentifier(
               metalake, context.catalogName(), TableIdentifier.of(namespace, 
tableName)));
     }
+
     IcebergOwnershipUtils.setTableOwner(
         metalake,
         context.catalogName(),
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergViewHookDispatcher.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergViewHookDispatcher.java
index edf525dc1f..e737c81a4e 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergViewHookDispatcher.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergViewHookDispatcher.java
@@ -68,7 +68,7 @@ public class IcebergViewHookDispatcher implements 
IcebergViewOperationDispatcher
     // Then import it into Gravitino so Gravitino is aware of the view
     importView(context.catalogName(), namespace, createViewRequest.name());
 
-    // Set ownership for the newly created view
+    // Set ownership for the newly created view.
     IcebergOwnershipUtils.setViewOwner(
         metalake,
         context.catalogName(),
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergNamespaceHookDispatcher.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergNamespaceHookDispatcher.java
new file mode 100644
index 0000000000..792f85fba1
--- /dev/null
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergNamespaceHookDispatcher.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.service.dispatcher;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.catalog.SchemaDispatcher;
+import org.apache.gravitino.catalog.TableDispatcher;
+import 
org.apache.gravitino.iceberg.service.authorization.IcebergRESTServerContext;
+import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
+import org.apache.gravitino.listener.api.event.IcebergRequestContext;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
+import org.apache.iceberg.rest.requests.RegisterTableRequest;
+import org.apache.iceberg.rest.responses.CreateNamespaceResponse;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestIcebergNamespaceHookDispatcher {
+
+  private static final String TEST_METALAKE = "test_metalake";
+  private static final String TEST_CATALOG = "test_catalog";
+  private static final String TEST_USER = "test_user";
+
+  private IcebergNamespaceHookDispatcher hookDispatcher;
+  private IcebergNamespaceOperationDispatcher mockDispatcher;
+  private OwnerDispatcher mockOwnerDispatcher;
+  private IcebergRequestContext mockContext;
+
+  @BeforeEach
+  public void setUp() throws IllegalAccessException {
+    mockDispatcher = mock(IcebergNamespaceOperationDispatcher.class);
+    mockOwnerDispatcher = mock(OwnerDispatcher.class);
+    SchemaDispatcher mockSchemaDispatcher = mock(SchemaDispatcher.class);
+    TableDispatcher mockTableDispatcher = mock(TableDispatcher.class);
+
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "ownerDispatcher", 
mockOwnerDispatcher, true);
+    FieldUtils.writeField(
+        GravitinoEnv.getInstance(), "schemaDispatcher", mockSchemaDispatcher, 
true);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "tableDispatcher", 
mockTableDispatcher, true);
+
+    IcebergConfigProvider mockConfigProvider = 
mock(IcebergConfigProvider.class);
+    when(mockConfigProvider.getMetalakeName()).thenReturn(TEST_METALAKE);
+    when(mockConfigProvider.getDefaultCatalogName()).thenReturn(TEST_CATALOG);
+    IcebergRESTServerContext.create(mockConfigProvider, false, false, true, 
null);
+
+    hookDispatcher = new IcebergNamespaceHookDispatcher(mockDispatcher);
+
+    mockContext = mock(IcebergRequestContext.class);
+    when(mockContext.catalogName()).thenReturn(TEST_CATALOG);
+    when(mockContext.userName()).thenReturn(TEST_USER);
+  }
+
+  @AfterEach
+  public void tearDown() throws IllegalAccessException {
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "ownerDispatcher", null, 
true);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "schemaDispatcher", 
null, true);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "tableDispatcher", null, 
true);
+
+    Class<?> holderClass =
+        Arrays.stream(IcebergRESTServerContext.class.getDeclaredClasses())
+            .filter(c -> c.getSimpleName().equals("InstanceHolder"))
+            .findFirst()
+            .orElseThrow(() -> new RuntimeException("InstanceHolder class not 
found"));
+    FieldUtils.writeStaticField(holderClass, "INSTANCE", null, true);
+  }
+
+  @Test
+  public void testCreateNamespaceThrowsWhenSetOwnerFails() {
+    Namespace namespace = Namespace.of("test_schema");
+    CreateNamespaceRequest mockRequest = mock(CreateNamespaceRequest.class);
+    when(mockRequest.namespace()).thenReturn(namespace);
+
+    CreateNamespaceResponse mockResponse = mock(CreateNamespaceResponse.class);
+    when(mockDispatcher.createNamespace(mockContext, 
mockRequest)).thenReturn(mockResponse);
+
+    doThrow(new RuntimeException("Set owner failed"))
+        .when(mockOwnerDispatcher)
+        .setOwner(any(), any(), any(), any());
+
+    RuntimeException thrown =
+        Assertions.assertThrows(
+            RuntimeException.class, () -> 
hookDispatcher.createNamespace(mockContext, mockRequest));
+    Assertions.assertEquals("Set owner failed", thrown.getMessage());
+    verify(mockDispatcher).createNamespace(mockContext, mockRequest);
+  }
+
+  @Test
+  public void testRegisterTableThrowsWhenSetOwnerFails() {
+    Namespace namespace = Namespace.of("test_schema");
+    RegisterTableRequest mockRequest = mock(RegisterTableRequest.class);
+    when(mockRequest.name()).thenReturn("test_table");
+
+    LoadTableResponse mockResponse = mock(LoadTableResponse.class);
+    when(mockDispatcher.registerTable(mockContext, namespace, mockRequest))
+        .thenReturn(mockResponse);
+
+    doThrow(new RuntimeException("Set owner failed"))
+        .when(mockOwnerDispatcher)
+        .setOwner(any(), any(), any(), any());
+
+    RuntimeException thrown =
+        Assertions.assertThrows(
+            RuntimeException.class,
+            () -> hookDispatcher.registerTable(mockContext, namespace, 
mockRequest));
+    Assertions.assertEquals("Set owner failed", thrown.getMessage());
+    verify(mockDispatcher).registerTable(mockContext, namespace, mockRequest);
+  }
+
+  @Test
+  public void testCreateNamespacePropagatesImportFailure() {
+    Namespace namespace = Namespace.of("test_schema");
+    CreateNamespaceRequest mockRequest = mock(CreateNamespaceRequest.class);
+    when(mockRequest.namespace()).thenReturn(namespace);
+
+    CreateNamespaceResponse mockResponse = mock(CreateNamespaceResponse.class);
+    when(mockDispatcher.createNamespace(mockContext, 
mockRequest)).thenReturn(mockResponse);
+
+    // Schema import (loadSchema) throwing must propagate so the caller learns 
the namespace
+    // exists in Iceberg but is not registered in Gravitino. setOwner is 
therefore unreachable.
+    SchemaDispatcher schemaDispatcher = 
GravitinoEnv.getInstance().schemaDispatcher();
+    doThrow(new RuntimeException("Import 
failed")).when(schemaDispatcher).loadSchema(any());
+
+    RuntimeException thrown =
+        Assertions.assertThrows(
+            RuntimeException.class, () -> 
hookDispatcher.createNamespace(mockContext, mockRequest));
+
+    Assertions.assertEquals("Import failed", thrown.getMessage());
+    verify(mockOwnerDispatcher, never()).setOwner(any(), any(), any(), any());
+  }
+
+  @Test
+  public void testRegisterTablePropagatesImportFailure() {
+    Namespace namespace = Namespace.of("test_schema");
+    RegisterTableRequest mockRequest = mock(RegisterTableRequest.class);
+    when(mockRequest.name()).thenReturn("test_table");
+
+    LoadTableResponse mockResponse = mock(LoadTableResponse.class);
+    when(mockDispatcher.registerTable(mockContext, namespace, mockRequest))
+        .thenReturn(mockResponse);
+
+    // Table import (loadTable) throwing must propagate so the caller learns 
the table exists in
+    // Iceberg but is not registered in Gravitino. setOwner is therefore 
unreachable.
+    TableDispatcher tableDispatcher = 
GravitinoEnv.getInstance().tableDispatcher();
+    doThrow(new RuntimeException("Import 
failed")).when(tableDispatcher).loadTable(any());
+
+    RuntimeException thrown =
+        Assertions.assertThrows(
+            RuntimeException.class,
+            () -> hookDispatcher.registerTable(mockContext, namespace, 
mockRequest));
+
+    Assertions.assertEquals("Import failed", thrown.getMessage());
+    verify(mockOwnerDispatcher, never()).setOwner(any(), any(), any(), any());
+  }
+}
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java
index 837f5e1b8c..cc5f915475 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java
@@ -253,6 +253,27 @@ public class TestIcebergTableHookDispatcher {
     verify(mockDispatcher).renameTable(mockContext, request);
   }
 
+  @Test
+  public void testCreateTableThrowsWhenSetOwnerFails() {
+    Namespace namespace = Namespace.of("test_schema");
+    CreateTableRequest request =
+        
CreateTableRequest.builder().withName("test_table").withSchema(TABLE_SCHEMA).build();
+
+    LoadTableResponse mockResponse = mock(LoadTableResponse.class);
+    when(mockDispatcher.createTable(mockContext, namespace, 
request)).thenReturn(mockResponse);
+
+    doThrow(new RuntimeException("Set owner failed"))
+        .when(mockOwnerDispatcher)
+        .setOwner(any(), any(), any(), any());
+
+    RuntimeException thrown =
+        Assertions.assertThrows(
+            RuntimeException.class,
+            () -> hookDispatcher.createTable(mockContext, namespace, request));
+    Assertions.assertEquals("Set owner failed", thrown.getMessage());
+    verify(mockDispatcher).createTable(mockContext, namespace, request);
+  }
+
   @Test
   public void testCreateTableSkipsImportAndOwnershipForStageCreate() {
     Namespace namespace = Namespace.of("test_schema");
@@ -341,4 +362,28 @@ public class TestIcebergTableHookDispatcher {
     Assertions.assertEquals(mockResponse, result);
     verify(mockDispatcher).loadTable(mockContext, tableId);
   }
+
+  @Test
+  public void testCreateTablePropagatesImportFailure() {
+    Namespace namespace = Namespace.of("test_schema");
+    CreateTableRequest request =
+        
CreateTableRequest.builder().withName("test_table").withSchema(TABLE_SCHEMA).build();
+
+    LoadTableResponse mockResponse = mock(LoadTableResponse.class);
+    when(mockDispatcher.createTable(mockContext, namespace, 
request)).thenReturn(mockResponse);
+
+    // Import failure (the loadTable call) must propagate so the caller learns 
the table exists in
+    // Iceberg but is not registered in Gravitino. setOwner is therefore 
unreachable.
+    doThrow(new RuntimeException("Import 
failed")).when(mockTableDispatcher).loadTable(any());
+
+    RuntimeException thrown =
+        Assertions.assertThrows(
+            RuntimeException.class,
+            () -> hookDispatcher.createTable(mockContext, namespace, request));
+
+    Assertions.assertEquals("Import failed", thrown.getMessage());
+    verify(mockDispatcher).createTable(mockContext, namespace, request);
+    verify(mockTableDispatcher).loadTable(any());
+    verify(mockOwnerDispatcher, never()).setOwner(any(), any(), any(), any());
+  }
 }
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergViewHookDispatcher.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergViewHookDispatcher.java
index cf74f760b3..195f7673e3 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergViewHookDispatcher.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergViewHookDispatcher.java
@@ -187,6 +187,43 @@ public class TestIcebergViewHookDispatcher {
     verify(mockExecutor, times(1)).createView(mockContext, namespace, 
createRequest);
   }
 
+  @Test
+  public void testCreateViewThrowsWhenSetOwnerFails() throws Exception {
+    Namespace namespace = Namespace.of(SCHEMA_NAME);
+    CreateViewRequest createRequest =
+        ImmutableCreateViewRequest.builder()
+            .name(VIEW_NAME)
+            .schema(VIEW_SCHEMA)
+            .viewVersion(
+                ImmutableViewVersion.builder()
+                    .versionId(1)
+                    .timestampMillis(System.currentTimeMillis())
+                    .schemaId(1)
+                    .defaultNamespace(namespace)
+                    .addRepresentations(
+                        ImmutableSQLViewRepresentation.builder()
+                            .sql("SELECT * FROM test")
+                            .dialect("spark")
+                            .build())
+                    .build())
+            .build();
+
+    LoadViewResponse mockResponse = mock(LoadViewResponse.class);
+    when(mockExecutor.createView(mockContext, namespace, 
createRequest)).thenReturn(mockResponse);
+
+    doThrow(new RuntimeException("Set owner failed"))
+        .when(mockOwnerDispatcher)
+        .setOwner(any(), any(), any(), any());
+
+    RuntimeException thrown =
+        assertThrows(
+            RuntimeException.class,
+            () -> hookDispatcher.createView(mockContext, namespace, 
createRequest));
+    assertEquals("Set owner failed", thrown.getMessage());
+    verify(mockExecutor, times(1)).createView(mockContext, namespace, 
createRequest);
+    verify(mockViewDispatcher, times(1)).loadView(any(NameIdentifier.class));
+  }
+
   @Test
   public void testDropViewRemovesFromEntityStore() throws Exception {
     TableIdentifier viewIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), 
VIEW_NAME);

Reply via email to