This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new d6721459d [#4195] improvement(core): Decouple `OperationDispatcher` 
from `NormalizeDispatcher` (#4196)
d6721459d is described below

commit d6721459d499d1237cfcb5b0527ed5252e2462ce
Author: mchades <[email protected]>
AuthorDate: Fri Jul 19 18:06:44 2024 +0800

    [#4195] improvement(core): Decouple `OperationDispatcher` from 
`NormalizeDispatcher` (#4196)
    
    ### What changes were proposed in this pull request?
    
     - Decouple `OperationDispatcher` from `NormalizeDispatcher`
    - move `getCatalogCapability` method from `OperationDispatcher` to
    `CapabilityHelpers`
    
    ### Why are the changes needed?
    
    Fix: #4195
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests
---
 .../java/org/apache/gravitino/GravitinoEnv.java    | 11 ++---
 .../gravitino/catalog/CapabilityHelpers.java       | 23 ++++++----
 .../catalog/FilesetNormalizeDispatcher.java        | 48 ++++++++++++++-------
 .../catalog/FilesetOperationDispatcher.java        |  1 +
 .../gravitino/catalog/OperationDispatcher.java     | 40 +-----------------
 .../catalog/PartitionNormalizeDispatcher.java      | 47 ++++++++++-----------
 .../catalog/SchemaNormalizeDispatcher.java         | 38 ++++++++++++-----
 .../catalog/SchemaOperationDispatcher.java         |  1 +
 .../catalog/TableNormalizeDispatcher.java          | 49 +++++++++++++++-------
 .../catalog/TableOperationDispatcher.java          |  1 +
 .../catalog/TopicNormalizeDispatcher.java          | 47 +++++++++++++++------
 .../catalog/TopicOperationDispatcher.java          |  1 +
 .../apache/gravitino/utils/NameIdentifierUtil.java | 32 ++++++++++++++
 .../catalog/TestFilesetNormalizeDispatcher.java    |  6 ++-
 .../gravitino/catalog/TestOperationDispatcher.java | 13 +++---
 .../catalog/TestPartitionNormalizeDispatcher.java  |  2 +-
 .../catalog/TestSchemaNormalizeDispatcher.java     |  2 +-
 .../catalog/TestTableNormalizeDispatcher.java      |  6 ++-
 .../catalog/TestTopicNormalizeDispatcher.java      |  6 ++-
 19 files changed, 232 insertions(+), 142 deletions(-)

diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java 
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index b307cddbd..cf95dd7e7 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -154,30 +154,31 @@ public class GravitinoEnv {
     SchemaOperationDispatcher schemaOperationDispatcher =
         new SchemaOperationDispatcher(catalogManager, entityStore, 
idGenerator);
     SchemaNormalizeDispatcher schemaNormalizeDispatcher =
-        new SchemaNormalizeDispatcher(schemaOperationDispatcher);
+        new SchemaNormalizeDispatcher(schemaOperationDispatcher, 
catalogManager);
     this.schemaDispatcher = new SchemaEventDispatcher(eventBus, 
schemaNormalizeDispatcher);
 
     TableOperationDispatcher tableOperationDispatcher =
         new TableOperationDispatcher(catalogManager, entityStore, idGenerator);
     TableNormalizeDispatcher tableNormalizeDispatcher =
-        new TableNormalizeDispatcher(tableOperationDispatcher);
+        new TableNormalizeDispatcher(tableOperationDispatcher, catalogManager);
     this.tableDispatcher = new TableEventDispatcher(eventBus, 
tableNormalizeDispatcher);
 
     PartitionOperationDispatcher partitionOperationDispatcher =
         new PartitionOperationDispatcher(catalogManager, entityStore, 
idGenerator);
     // todo: support PartitionEventDispatcher
-    this.partitionDispatcher = new 
PartitionNormalizeDispatcher(partitionOperationDispatcher);
+    this.partitionDispatcher =
+        new PartitionNormalizeDispatcher(partitionOperationDispatcher, 
catalogManager);
 
     FilesetOperationDispatcher filesetOperationDispatcher =
         new FilesetOperationDispatcher(catalogManager, entityStore, 
idGenerator);
     FilesetNormalizeDispatcher filesetNormalizeDispatcher =
-        new FilesetNormalizeDispatcher(filesetOperationDispatcher);
+        new FilesetNormalizeDispatcher(filesetOperationDispatcher, 
catalogManager);
     this.filesetDispatcher = new FilesetEventDispatcher(eventBus, 
filesetNormalizeDispatcher);
 
     TopicOperationDispatcher topicOperationDispatcher =
         new TopicOperationDispatcher(catalogManager, entityStore, idGenerator);
     TopicNormalizeDispatcher topicNormalizeDispatcher =
-        new TopicNormalizeDispatcher(topicOperationDispatcher);
+        new TopicNormalizeDispatcher(topicOperationDispatcher, catalogManager);
     this.topicDispatcher = new TopicEventDispatcher(eventBus, 
topicNormalizeDispatcher);
 
     // Create and initialize access control related modules
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/CapabilityHelpers.java 
b/core/src/main/java/org/apache/gravitino/catalog/CapabilityHelpers.java
index 373aef049..266b4a401 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/CapabilityHelpers.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/CapabilityHelpers.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.catalog;
 
 import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
 import static 
org.apache.gravitino.rel.expressions.transforms.Transforms.NAME_OF_IDENTITY;
+import static 
org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier;
 
 import com.google.common.base.Preconditions;
 import java.util.Arrays;
@@ -48,6 +49,16 @@ import org.apache.gravitino.rel.partitions.RangePartition;
 
 public class CapabilityHelpers {
 
+  public static Capability getCapability(NameIdentifier ident, CatalogManager 
catalogManager) {
+    NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+    CatalogManager.CatalogWrapper c = 
catalogManager.loadCatalogAndWrap(catalogIdent);
+    try {
+      return c.capabilities();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to get capabilities for catalog: " + 
catalogIdent, e);
+    }
+  }
+
   public static Column[] applyCapabilities(Column[] columns, Capability 
capabilities) {
     return Arrays.stream(columns)
         .map(c -> applyCapabilities(c, capabilities))
@@ -99,30 +110,28 @@ public class CapabilityHelpers {
   }
 
   public static NameIdentifier[] applyCaseSensitive(
-      NameIdentifier[] idents, Capability.Scope scope, OperationDispatcher 
operationDispatcher) {
+      NameIdentifier[] idents, Capability.Scope scope, Capability 
capabilities) {
     return Arrays.stream(idents)
-        .map(ident -> applyCaseSensitive(ident, scope, operationDispatcher))
+        .map(ident -> applyCaseSensitive(ident, scope, capabilities))
         .toArray(NameIdentifier[]::new);
   }
 
   public static NameIdentifier applyCaseSensitive(
-      NameIdentifier ident, Capability.Scope scope, OperationDispatcher 
operationDispatcher) {
-    Capability capabilities = operationDispatcher.getCatalogCapability(ident);
-    Namespace namespace = applyCaseSensitive(ident.namespace(), scope, 
operationDispatcher);
+      NameIdentifier ident, Capability.Scope scope, Capability capabilities) {
+    Namespace namespace = applyCaseSensitive(ident.namespace(), scope, 
capabilities);
 
     String name = applyCaseSensitiveOnName(scope, ident.name(), capabilities);
     return NameIdentifier.of(namespace, name);
   }
 
   public static Namespace applyCaseSensitive(
-      Namespace namespace, Capability.Scope identScope, OperationDispatcher 
operationDispatcher) {
+      Namespace namespace, Capability.Scope identScope, Capability 
capabilities) {
     String metalake = namespace.level(0);
     String catalog = namespace.level(1);
     if (identScope == Capability.Scope.TABLE
         || identScope == Capability.Scope.FILESET
         || identScope == Capability.Scope.TOPIC) {
       String schema = namespace.level(namespace.length() - 1);
-      Capability capabilities = 
operationDispatcher.getCatalogCapability(namespace);
       schema = applyCaseSensitiveOnName(Capability.Scope.SCHEMA, schema, 
capabilities);
       return Namespace.of(metalake, catalog, schema);
     }
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
 
b/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
index 6698aeccf..d79630392 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
@@ -20,8 +20,10 @@ package org.apache.gravitino.catalog;
 
 import static org.apache.gravitino.catalog.CapabilityHelpers.applyCapabilities;
 import static 
org.apache.gravitino.catalog.CapabilityHelpers.applyCaseSensitive;
+import static org.apache.gravitino.catalog.CapabilityHelpers.getCapability;
 
 import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.connector.capability.Capability;
@@ -32,35 +34,35 @@ import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.file.FilesetChange;
 
 public class FilesetNormalizeDispatcher implements FilesetDispatcher {
+  private final CatalogManager catalogManager;
+  private final FilesetDispatcher dispatcher;
 
-  private final FilesetOperationDispatcher dispatcher;
-
-  public FilesetNormalizeDispatcher(FilesetOperationDispatcher dispatcher) {
+  public FilesetNormalizeDispatcher(FilesetDispatcher dispatcher, 
CatalogManager catalogManager) {
     this.dispatcher = dispatcher;
+    this.catalogManager = catalogManager;
   }
 
   @Override
   public NameIdentifier[] listFilesets(Namespace namespace) throws 
NoSuchSchemaException {
     // The constraints of the name spec may be more strict than underlying 
catalog,
     // and for compatibility reasons, we only apply case-sensitive 
capabilities here.
-    Namespace caseSensitiveNs = applyCaseSensitive(namespace, 
Capability.Scope.FILESET, dispatcher);
+    Namespace caseSensitiveNs = normalizeCaseSensitive(namespace);
     NameIdentifier[] identifiers = dispatcher.listFilesets(caseSensitiveNs);
-    return applyCaseSensitive(identifiers, Capability.Scope.FILESET, 
dispatcher);
+    return normalizeCaseSensitive(identifiers);
   }
 
   @Override
   public Fileset loadFileset(NameIdentifier ident) throws 
NoSuchFilesetException {
     // The constraints of the name spec may be more strict than underlying 
catalog,
     // and for compatibility reasons, we only apply case-sensitive 
capabilities here.
-    return dispatcher.loadFileset(applyCaseSensitive(ident, 
Capability.Scope.FILESET, dispatcher));
+    return dispatcher.loadFileset(normalizeCaseSensitive(ident));
   }
 
   @Override
   public boolean filesetExists(NameIdentifier ident) {
     // The constraints of the name spec may be more strict than underlying 
catalog,
     // and for compatibility reasons, we only apply case-sensitive 
capabilities here.
-    return dispatcher.filesetExists(
-        applyCaseSensitive(ident, Capability.Scope.FILESET, dispatcher));
+    return dispatcher.filesetExists(normalizeCaseSensitive(ident));
   }
 
   @Override
@@ -78,23 +80,41 @@ public class FilesetNormalizeDispatcher implements 
FilesetDispatcher {
   @Override
   public Fileset alterFileset(NameIdentifier ident, FilesetChange... changes)
       throws NoSuchFilesetException, IllegalArgumentException {
-    Capability capability = dispatcher.getCatalogCapability(ident);
+    Capability capability = getCapability(ident, catalogManager);
     return dispatcher.alterFileset(
         // The constraints of the name spec may be more strict than underlying 
catalog,
         // and for compatibility reasons, we only apply case-sensitive 
capabilities here.
-        applyCaseSensitive(ident, Capability.Scope.FILESET, dispatcher),
-        applyCapabilities(capability, changes));
+        normalizeCaseSensitive(ident), applyCapabilities(capability, changes));
   }
 
   @Override
   public boolean dropFileset(NameIdentifier ident) {
     // The constraints of the name spec may be more strict than underlying 
catalog,
     // and for compatibility reasons, we only apply case-sensitive 
capabilities here.
-    return dispatcher.dropFileset(applyCaseSensitive(ident, 
Capability.Scope.FILESET, dispatcher));
+    return dispatcher.dropFileset(normalizeCaseSensitive(ident));
   }
 
   private NameIdentifier normalizeNameIdentifier(NameIdentifier ident) {
-    Capability capability = dispatcher.getCatalogCapability(ident);
-    return applyCapabilities(ident, Capability.Scope.FILESET, capability);
+    Capability capabilities = getCapability(ident, catalogManager);
+    return applyCapabilities(ident, Capability.Scope.FILESET, capabilities);
+  }
+
+  private Namespace normalizeCaseSensitive(Namespace namespace) {
+    Capability capabilities = 
getCapability(NameIdentifier.of(namespace.levels()), catalogManager);
+    return applyCaseSensitive(namespace, Capability.Scope.FILESET, 
capabilities);
+  }
+
+  private NameIdentifier normalizeCaseSensitive(NameIdentifier filesetIdent) {
+    Capability capabilities = getCapability(filesetIdent, catalogManager);
+    return applyCaseSensitive(filesetIdent, Capability.Scope.FILESET, 
capabilities);
+  }
+
+  private NameIdentifier[] normalizeCaseSensitive(NameIdentifier[] 
filesetIdents) {
+    if (ArrayUtils.isEmpty(filesetIdents)) {
+      return filesetIdents;
+    }
+
+    Capability capabilities = getCapability(filesetIdents[0], catalogManager);
+    return applyCaseSensitive(filesetIdents, Capability.Scope.FILESET, 
capabilities);
   }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
 
b/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
index 1dbe3fcad..e28369f96 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
@@ -19,6 +19,7 @@
 package org.apache.gravitino.catalog;
 
 import static 
org.apache.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForCreate;
+import static 
org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier;
 
 import java.util.Map;
 import org.apache.gravitino.EntityStore;
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/OperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/OperationDispatcher.java
index bf8b0746b..a9f875df6 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/OperationDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/OperationDispatcher.java
@@ -19,25 +19,21 @@
 package org.apache.gravitino.catalog;
 
 import static 
org.apache.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForAlter;
+import static 
org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier;
 
 import com.google.common.collect.Maps;
-import java.util.Arrays;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.Namespace;
 import org.apache.gravitino.SchemaChange;
 import org.apache.gravitino.StringIdentifier;
 import org.apache.gravitino.connector.HasPropertyMetadata;
 import org.apache.gravitino.connector.PropertiesMetadata;
 import org.apache.gravitino.connector.capability.Capability;
-import org.apache.gravitino.exceptions.IllegalNameIdentifierException;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.file.FilesetChange;
 import org.apache.gravitino.messaging.TopicChange;
@@ -75,20 +71,6 @@ public abstract class OperationDispatcher {
     this.idGenerator = idGenerator;
   }
 
-  protected Capability getCatalogCapability(NameIdentifier ident) {
-    return doWithCatalog(
-        getCatalogIdentifier(ident),
-        CatalogManager.CatalogWrapper::capabilities,
-        IllegalArgumentException.class);
-  }
-
-  protected Capability getCatalogCapability(Namespace namespace) {
-    return doWithCatalog(
-        getCatalogIdentifier(NameIdentifier.of(namespace.levels())),
-        CatalogManager.CatalogWrapper::capabilities,
-        IllegalArgumentException.class);
-  }
-
   protected <R, E extends Throwable> R doWithTable(
       NameIdentifier tableIdent, ThrowableFunction<SupportsPartitions, R> fn, 
Class<E> ex)
       throws E {
@@ -239,26 +221,6 @@ public abstract class OperationDispatcher {
     return ret;
   }
 
-  // TODO(xun): Remove this method when we implement a better way to get the 
catalog identifier
-  //  [#257] Add an explicit get catalog functions in NameIdentifier
-  protected NameIdentifier getCatalogIdentifier(NameIdentifier ident) {
-    NameIdentifier.check(
-        ident.name() != null, "The name variable in the NameIdentifier must 
have value.");
-    Namespace.check(
-        ident.namespace() != null && ident.namespace().length() > 0,
-        "Catalog namespace must be non-null and have 1 level, the input 
namespace is %s",
-        ident.namespace());
-
-    List<String> allElems =
-        Stream.concat(Arrays.stream(ident.namespace().levels()), 
Stream.of(ident.name()))
-            .collect(Collectors.toList());
-    if (allElems.size() < 2) {
-      throw new IllegalNameIdentifierException(
-          "Cannot create a catalog NameIdentifier less than two elements.");
-    }
-    return NameIdentifier.of(allElems.get(0), allElems.get(1));
-  }
-
   boolean isManagedEntity(NameIdentifier catalogIdent, Capability.Scope scope) 
{
     return doWithCatalog(
         catalogIdent,
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/PartitionNormalizeDispatcher.java
 
b/core/src/main/java/org/apache/gravitino/catalog/PartitionNormalizeDispatcher.java
index 8d14e4210..899846fdb 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/PartitionNormalizeDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/PartitionNormalizeDispatcher.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.catalog;
 
 import static 
org.apache.gravitino.catalog.CapabilityHelpers.applyCaseSensitive;
 import static 
org.apache.gravitino.catalog.CapabilityHelpers.applyCaseSensitiveOnName;
+import static org.apache.gravitino.catalog.CapabilityHelpers.getCapability;
 
 import java.util.Arrays;
 import org.apache.gravitino.NameIdentifier;
@@ -29,19 +30,21 @@ import 
org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
 import org.apache.gravitino.rel.partitions.Partition;
 
 public class PartitionNormalizeDispatcher implements PartitionDispatcher {
+  private final CatalogManager catalogManager;
+  private final PartitionDispatcher dispatcher;
 
-  private final PartitionOperationDispatcher dispatcher;
-
-  public PartitionNormalizeDispatcher(PartitionOperationDispatcher dispatcher) 
{
+  public PartitionNormalizeDispatcher(
+      PartitionDispatcher dispatcher, CatalogManager catalogManager) {
     this.dispatcher = dispatcher;
+    this.catalogManager = catalogManager;
   }
 
   @Override
   public String[] listPartitionNames(NameIdentifier tableIdent) {
+    Capability capabilities = getCapability(tableIdent, catalogManager);
     String[] partitionNames =
         dispatcher.listPartitionNames(
-            applyCaseSensitive(tableIdent, Capability.Scope.TABLE, 
dispatcher));
-    Capability capabilities = dispatcher.getCatalogCapability(tableIdent);
+            applyCaseSensitive(tableIdent, Capability.Scope.TABLE, 
capabilities));
     return Arrays.stream(partitionNames)
         .map(
             partitionName ->
@@ -51,49 +54,45 @@ public class PartitionNormalizeDispatcher implements 
PartitionDispatcher {
 
   @Override
   public Partition[] listPartitions(NameIdentifier tableIdent) {
+    Capability capabilities = getCapability(tableIdent, catalogManager);
     Partition[] partitions =
         dispatcher.listPartitions(
-            CapabilityHelpers.applyCaseSensitive(tableIdent, 
Capability.Scope.TABLE, dispatcher));
-    return applyCaseSensitive(partitions, 
dispatcher.getCatalogCapability(tableIdent));
+            CapabilityHelpers.applyCaseSensitive(tableIdent, 
Capability.Scope.TABLE, capabilities));
+    return applyCaseSensitive(partitions, capabilities);
   }
 
   @Override
   public Partition getPartition(NameIdentifier tableIdent, String 
partitionName)
       throws NoSuchPartitionException {
+    Capability capabilities = getCapability(tableIdent, catalogManager);
     return dispatcher.getPartition(
-        CapabilityHelpers.applyCaseSensitive(tableIdent, 
Capability.Scope.TABLE, dispatcher),
-        applyCaseSensitiveOnName(
-            Capability.Scope.PARTITION,
-            partitionName,
-            dispatcher.getCatalogCapability(tableIdent)));
+        CapabilityHelpers.applyCaseSensitive(tableIdent, 
Capability.Scope.TABLE, capabilities),
+        applyCaseSensitiveOnName(Capability.Scope.PARTITION, partitionName, 
capabilities));
   }
 
   @Override
   public Partition addPartition(NameIdentifier tableIdent, Partition partition)
       throws PartitionAlreadyExistsException {
+    Capability capabilities = getCapability(tableIdent, catalogManager);
     return dispatcher.addPartition(
-        CapabilityHelpers.applyCaseSensitive(tableIdent, 
Capability.Scope.TABLE, dispatcher),
-        applyCaseSensitive(partition, 
dispatcher.getCatalogCapability(tableIdent)));
+        CapabilityHelpers.applyCaseSensitive(tableIdent, 
Capability.Scope.TABLE, capabilities),
+        applyCaseSensitive(partition, capabilities));
   }
 
   @Override
   public boolean dropPartition(NameIdentifier tableIdent, String 
partitionName) {
+    Capability capabilities = getCapability(tableIdent, catalogManager);
     return dispatcher.dropPartition(
-        CapabilityHelpers.applyCaseSensitive(tableIdent, 
Capability.Scope.TABLE, dispatcher),
-        applyCaseSensitiveOnName(
-            Capability.Scope.PARTITION,
-            partitionName,
-            dispatcher.getCatalogCapability(tableIdent)));
+        CapabilityHelpers.applyCaseSensitive(tableIdent, 
Capability.Scope.TABLE, capabilities),
+        applyCaseSensitiveOnName(Capability.Scope.PARTITION, partitionName, 
capabilities));
   }
 
   @Override
   public boolean purgePartition(NameIdentifier tableIdent, String 
partitionName)
       throws UnsupportedOperationException {
+    Capability capabilities = getCapability(tableIdent, catalogManager);
     return dispatcher.purgePartition(
-        CapabilityHelpers.applyCaseSensitive(tableIdent, 
Capability.Scope.TABLE, dispatcher),
-        applyCaseSensitiveOnName(
-            Capability.Scope.PARTITION,
-            partitionName,
-            dispatcher.getCatalogCapability(tableIdent)));
+        CapabilityHelpers.applyCaseSensitive(tableIdent, 
Capability.Scope.TABLE, capabilities),
+        applyCaseSensitiveOnName(Capability.Scope.PARTITION, partitionName, 
capabilities));
   }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/SchemaNormalizeDispatcher.java
 
b/core/src/main/java/org/apache/gravitino/catalog/SchemaNormalizeDispatcher.java
index 04e695fe6..25460bb5d 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/SchemaNormalizeDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/SchemaNormalizeDispatcher.java
@@ -20,8 +20,10 @@ package org.apache.gravitino.catalog;
 
 import static org.apache.gravitino.catalog.CapabilityHelpers.applyCapabilities;
 import static 
org.apache.gravitino.catalog.CapabilityHelpers.applyCaseSensitive;
+import static org.apache.gravitino.catalog.CapabilityHelpers.getCapability;
 
 import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.Schema;
@@ -33,11 +35,12 @@ import 
org.apache.gravitino.exceptions.NonEmptySchemaException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
 
 public class SchemaNormalizeDispatcher implements SchemaDispatcher {
+  private final CatalogManager catalogManager;
+  private final SchemaDispatcher dispatcher;
 
-  private final SchemaOperationDispatcher dispatcher;
-
-  public SchemaNormalizeDispatcher(SchemaOperationDispatcher dispatcher) {
+  public SchemaNormalizeDispatcher(SchemaDispatcher dispatcher, CatalogManager 
catalogManager) {
     this.dispatcher = dispatcher;
+    this.catalogManager = catalogManager;
   }
 
   @Override
@@ -45,14 +48,14 @@ public class SchemaNormalizeDispatcher implements 
SchemaDispatcher {
     NameIdentifier[] identifiers = dispatcher.listSchemas(namespace);
     // The constraints of the name spec may be more strict than underlying 
catalog,
     // and for compatibility reasons, we only apply case-sensitive 
capabilities here.
-    return applyCaseSensitive(identifiers, Capability.Scope.SCHEMA, 
dispatcher);
+    return normalizeCaseSensitive(identifiers);
   }
 
   @Override
   public boolean schemaExists(NameIdentifier ident) {
     // The constraints of the name spec may be more strict than underlying 
catalog,
     // and for compatibility reasons, we only apply case-sensitive 
capabilities here.
-    return dispatcher.schemaExists(applyCaseSensitive(ident, 
Capability.Scope.SCHEMA, dispatcher));
+    return dispatcher.schemaExists(normalizeCaseSensitive(ident));
   }
 
   @Override
@@ -65,7 +68,7 @@ public class SchemaNormalizeDispatcher implements 
SchemaDispatcher {
   public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException {
     // The constraints of the name spec may be more strict than underlying 
catalog,
     // and for compatibility reasons, we only apply case-sensitive 
capabilities here.
-    return dispatcher.loadSchema(applyCaseSensitive(ident, 
Capability.Scope.SCHEMA, dispatcher));
+    return dispatcher.loadSchema(normalizeCaseSensitive(ident));
   }
 
   @Override
@@ -73,8 +76,7 @@ public class SchemaNormalizeDispatcher implements 
SchemaDispatcher {
       throws NoSuchSchemaException {
     // The constraints of the name spec may be more strict than underlying 
catalog,
     // and for compatibility reasons, we only apply case-sensitive 
capabilities here.
-    return dispatcher.alterSchema(
-        applyCaseSensitive(ident, Capability.Scope.SCHEMA, dispatcher), 
changes);
+    return dispatcher.alterSchema(normalizeCaseSensitive(ident), changes);
   }
 
   @Override
@@ -82,8 +84,22 @@ public class SchemaNormalizeDispatcher implements 
SchemaDispatcher {
     return dispatcher.dropSchema(normalizeNameIdentifier(ident), cascade);
   }
 
-  private NameIdentifier normalizeNameIdentifier(NameIdentifier ident) {
-    Capability capability = dispatcher.getCatalogCapability(ident);
-    return applyCapabilities(ident, Capability.Scope.SCHEMA, capability);
+  private NameIdentifier normalizeNameIdentifier(NameIdentifier schemaIdent) {
+    Capability capabilities = getCapability(schemaIdent, catalogManager);
+    return applyCapabilities(schemaIdent, Capability.Scope.SCHEMA, 
capabilities);
+  }
+
+  private NameIdentifier normalizeCaseSensitive(NameIdentifier schemaIdent) {
+    Capability capabilities = getCapability(schemaIdent, catalogManager);
+    return applyCaseSensitive(schemaIdent, Capability.Scope.SCHEMA, 
capabilities);
+  }
+
+  private NameIdentifier[] normalizeCaseSensitive(NameIdentifier[] 
schemaIdents) {
+    if (ArrayUtils.isEmpty(schemaIdents)) {
+      return schemaIdents;
+    }
+
+    Capability capabilities = getCapability(schemaIdents[0], catalogManager);
+    return applyCaseSensitive(schemaIdents, Capability.Scope.SCHEMA, 
capabilities);
   }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
 
b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
index 267a68049..758d19737 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.catalog;
 
 import static org.apache.gravitino.Entity.EntityType.SCHEMA;
 import static 
org.apache.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForCreate;
+import static 
org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier;
 
 import java.time.Instant;
 import java.util.Map;
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/TableNormalizeDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/TableNormalizeDispatcher.java
index f2b680e13..12fd1cb62 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/TableNormalizeDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/TableNormalizeDispatcher.java
@@ -20,8 +20,10 @@ package org.apache.gravitino.catalog;
 
 import static org.apache.gravitino.catalog.CapabilityHelpers.applyCapabilities;
 import static 
org.apache.gravitino.catalog.CapabilityHelpers.applyCaseSensitive;
+import static org.apache.gravitino.catalog.CapabilityHelpers.getCapability;
 
 import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.connector.capability.Capability;
@@ -37,10 +39,11 @@ import 
org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.indexes.Index;
 
 public class TableNormalizeDispatcher implements TableDispatcher {
+  private final CatalogManager catalogManager;
+  private final TableDispatcher dispatcher;
 
-  private final TableOperationDispatcher dispatcher;
-
-  public TableNormalizeDispatcher(TableOperationDispatcher dispatcher) {
+  public TableNormalizeDispatcher(TableDispatcher dispatcher, CatalogManager 
catalogManager) {
+    this.catalogManager = catalogManager;
     this.dispatcher = dispatcher;
   }
 
@@ -48,16 +51,16 @@ public class TableNormalizeDispatcher implements 
TableDispatcher {
   public NameIdentifier[] listTables(Namespace namespace) throws 
NoSuchSchemaException {
     // The constraints of the name spec may be more strict than underlying 
catalog,
     // and for compatibility reasons, we only apply case-sensitive 
capabilities here.
-    Namespace caseSensitiveNs = applyCaseSensitive(namespace, 
Capability.Scope.TABLE, dispatcher);
+    Namespace caseSensitiveNs = normalizeCaseSensitive(namespace);
     NameIdentifier[] identifiers = dispatcher.listTables(caseSensitiveNs);
-    return applyCaseSensitive(identifiers, Capability.Scope.TABLE, dispatcher);
+    return normalizeCaseSensitive(identifiers);
   }
 
   @Override
   public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
     // The constraints of the name spec may be more strict than underlying 
catalog,
     // and for compatibility reasons, we only apply case-sensitive 
capabilities here.
-    return dispatcher.loadTable(applyCaseSensitive(ident, 
Capability.Scope.TABLE, dispatcher));
+    return dispatcher.loadTable(normalizeCaseSensitive(ident));
   }
 
   @Override
@@ -71,7 +74,7 @@ public class TableNormalizeDispatcher implements 
TableDispatcher {
       SortOrder[] sortOrders,
       Index[] indexes)
       throws NoSuchSchemaException, TableAlreadyExistsException {
-    Capability capability = dispatcher.getCatalogCapability(ident);
+    Capability capability = getCapability(ident, catalogManager);
     return dispatcher.createTable(
         applyCapabilities(ident, Capability.Scope.TABLE, capability),
         applyCapabilities(columns, capability),
@@ -86,12 +89,11 @@ public class TableNormalizeDispatcher implements 
TableDispatcher {
   @Override
   public Table alterTable(NameIdentifier ident, TableChange... changes)
       throws NoSuchTableException, IllegalArgumentException {
-    Capability capability = dispatcher.getCatalogCapability(ident);
+    Capability capability = getCapability(ident, catalogManager);
     return dispatcher.alterTable(
         // The constraints of the name spec may be more strict than underlying 
catalog,
         // and for compatibility reasons, we only apply case-sensitive 
capabilities here.
-        applyCaseSensitive(ident, Capability.Scope.TABLE, dispatcher),
-        applyCapabilities(capability, changes));
+        normalizeCaseSensitive(ident), applyCapabilities(capability, changes));
   }
 
   @Override
@@ -108,11 +110,30 @@ public class TableNormalizeDispatcher implements 
TableDispatcher {
   public boolean tableExists(NameIdentifier ident) {
     // The constraints of the name spec may be more strict than underlying 
catalog,
     // and for compatibility reasons, we only apply case-sensitive 
capabilities here.
-    return dispatcher.tableExists(applyCaseSensitive(ident, 
Capability.Scope.TABLE, dispatcher));
+    return dispatcher.tableExists(normalizeCaseSensitive(ident));
+  }
+
+  private Namespace normalizeCaseSensitive(Namespace namespace) {
+    Capability capabilities = 
getCapability(NameIdentifier.of(namespace.levels()), catalogManager);
+    return applyCaseSensitive(namespace, Capability.Scope.TABLE, capabilities);
+  }
+
+  private NameIdentifier normalizeCaseSensitive(NameIdentifier tableIdent) {
+    Capability capability = getCapability(tableIdent, catalogManager);
+    return applyCaseSensitive(tableIdent, Capability.Scope.TABLE, capability);
+  }
+
+  private NameIdentifier[] normalizeCaseSensitive(NameIdentifier[] 
tableIdents) {
+    if (ArrayUtils.isEmpty(tableIdents)) {
+      return tableIdents;
+    }
+
+    Capability capabilities = getCapability(tableIdents[0], catalogManager);
+    return applyCaseSensitive(tableIdents, Capability.Scope.TABLE, 
capabilities);
   }
 
-  private NameIdentifier normalizeNameIdentifier(NameIdentifier ident) {
-    Capability capability = dispatcher.getCatalogCapability(ident);
-    return applyCapabilities(ident, Capability.Scope.TABLE, capability);
+  private NameIdentifier normalizeNameIdentifier(NameIdentifier tableIdent) {
+    Capability capability = getCapability(tableIdent, catalogManager);
+    return applyCapabilities(tableIdent, Capability.Scope.TABLE, capability);
   }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
index 8a0697a37..404af695a 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
@@ -22,6 +22,7 @@ import static org.apache.gravitino.Entity.EntityType.TABLE;
 import static org.apache.gravitino.catalog.CapabilityHelpers.applyCapabilities;
 import static 
org.apache.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForCreate;
 import static 
org.apache.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM;
+import static 
org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier;
 
 import java.time.Instant;
 import java.util.Arrays;
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/TopicNormalizeDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/TopicNormalizeDispatcher.java
index ebcaee981..a104b4f67 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/TopicNormalizeDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/TopicNormalizeDispatcher.java
@@ -20,8 +20,10 @@ package org.apache.gravitino.catalog;
 
 import static org.apache.gravitino.catalog.CapabilityHelpers.applyCapabilities;
 import static 
org.apache.gravitino.catalog.CapabilityHelpers.applyCaseSensitive;
+import static org.apache.gravitino.catalog.CapabilityHelpers.getCapability;
 
 import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.connector.capability.Capability;
@@ -33,34 +35,35 @@ import org.apache.gravitino.messaging.Topic;
 import org.apache.gravitino.messaging.TopicChange;
 
 public class TopicNormalizeDispatcher implements TopicDispatcher {
+  private final CatalogManager catalogManager;
+  private final TopicDispatcher dispatcher;
 
-  private final TopicOperationDispatcher dispatcher;
-
-  public TopicNormalizeDispatcher(TopicOperationDispatcher dispatcher) {
+  public TopicNormalizeDispatcher(TopicDispatcher dispatcher, CatalogManager 
catalogManager) {
     this.dispatcher = dispatcher;
+    this.catalogManager = catalogManager;
   }
 
   @Override
   public NameIdentifier[] listTopics(Namespace namespace) throws 
NoSuchSchemaException {
     // The constraints of the name spec may be more strict than underlying 
catalog,
     // and for compatibility reasons, we only apply case-sensitive 
capabilities here.
-    Namespace caseSensitiveNs = applyCaseSensitive(namespace, 
Capability.Scope.TOPIC, dispatcher);
+    Namespace caseSensitiveNs = normalizeCaseSensitive(namespace);
     NameIdentifier[] identifiers = dispatcher.listTopics(caseSensitiveNs);
-    return applyCaseSensitive(identifiers, Capability.Scope.TOPIC, dispatcher);
+    return normalizeCaseSensitive(identifiers);
   }
 
   @Override
   public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException {
     // The constraints of the name spec may be more strict than underlying 
catalog,
     // and for compatibility reasons, we only apply case-sensitive 
capabilities here.
-    return dispatcher.loadTopic(applyCaseSensitive(ident, 
Capability.Scope.TOPIC, dispatcher));
+    return dispatcher.loadTopic(normalizeCaseSensitive(ident));
   }
 
   @Override
   public boolean topicExists(NameIdentifier ident) {
     // The constraints of the name spec may be more strict than underlying 
catalog,
     // and for compatibility reasons, we only apply case-sensitive 
capabilities here.
-    return dispatcher.topicExists(applyCaseSensitive(ident, 
Capability.Scope.TOPIC, dispatcher));
+    return dispatcher.topicExists(normalizeCaseSensitive(ident));
   }
 
   @Override
@@ -75,19 +78,37 @@ public class TopicNormalizeDispatcher implements 
TopicDispatcher {
       throws NoSuchTopicException, IllegalArgumentException {
     // The constraints of the name spec may be more strict than underlying 
catalog,
     // and for compatibility reasons, we only apply case-sensitive 
capabilities here.
-    return dispatcher.alterTopic(
-        applyCaseSensitive(ident, Capability.Scope.TOPIC, dispatcher), 
changes);
+    return dispatcher.alterTopic(normalizeCaseSensitive(ident), changes);
   }
 
   @Override
   public boolean dropTopic(NameIdentifier ident) {
     // The constraints of the name spec may be more strict than underlying 
catalog,
     // and for compatibility reasons, we only apply case-sensitive 
capabilities here.
-    return dispatcher.dropTopic(applyCaseSensitive(ident, 
Capability.Scope.TOPIC, dispatcher));
+    return dispatcher.dropTopic(normalizeCaseSensitive(ident));
+  }
+
+  private Namespace normalizeCaseSensitive(Namespace namespace) {
+    Capability capabilities = 
getCapability(NameIdentifier.of(namespace.levels()), catalogManager);
+    return applyCaseSensitive(namespace, Capability.Scope.TOPIC, capabilities);
+  }
+
+  private NameIdentifier normalizeCaseSensitive(NameIdentifier topicIdent) {
+    Capability capabilities = getCapability(topicIdent, catalogManager);
+    return applyCaseSensitive(topicIdent, Capability.Scope.TOPIC, 
capabilities);
+  }
+
+  private NameIdentifier[] normalizeCaseSensitive(NameIdentifier[] 
topicIdents) {
+    if (ArrayUtils.isEmpty(topicIdents)) {
+      return topicIdents;
+    }
+
+    Capability capabilities = getCapability(topicIdents[0], catalogManager);
+    return applyCaseSensitive(topicIdents, Capability.Scope.TOPIC, 
capabilities);
   }
 
-  private NameIdentifier normalizeNameIdentifier(NameIdentifier ident) {
-    Capability capability = dispatcher.getCatalogCapability(ident);
-    return applyCapabilities(ident, Capability.Scope.TOPIC, capability);
+  private NameIdentifier normalizeNameIdentifier(NameIdentifier topicIdent) {
+    Capability capability = getCapability(topicIdent, catalogManager);
+    return applyCapabilities(topicIdent, Capability.Scope.TOPIC, capability);
   }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
index e2ac7a4f6..981d999f9 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.catalog;
 import static org.apache.gravitino.Entity.EntityType.TOPIC;
 import static org.apache.gravitino.StringIdentifier.fromProperties;
 import static 
org.apache.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForCreate;
+import static 
org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier;
 
 import java.time.Instant;
 import java.util.Map;
diff --git 
a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java 
b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
index 616deb235..6afed3952 100644
--- a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
@@ -22,10 +22,15 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.errorprone.annotations.FormatMethod;
 import com.google.errorprone.annotations.FormatString;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.IllegalNameIdentifierException;
 import org.apache.gravitino.exceptions.IllegalNamespaceException;
 
@@ -117,6 +122,33 @@ public class NameIdentifierUtil {
     return NameIdentifier.of(metalake, catalog, schema, topic);
   }
 
+  /**
+   * Try to get the catalog {@link NameIdentifier} from the given {@link 
NameIdentifier}.
+   *
+   * @param ident The {@link NameIdentifier} to check.
+   * @return The catalog {@link NameIdentifier}
+   * @throws IllegalNameIdentifierException If the given {@link 
NameIdentifier} does not include
+   *     catalog name
+   */
+  public static NameIdentifier getCatalogIdentifier(NameIdentifier ident)
+      throws IllegalNameIdentifierException {
+    NameIdentifier.check(
+        ident.name() != null, "The name variable in the NameIdentifier must 
have value.");
+    Namespace.check(
+        ident.namespace() != null && !ident.namespace().isEmpty(),
+        "Catalog namespace must be non-null and have 1 level, the input 
namespace is %s",
+        ident.namespace());
+
+    List<String> allElems =
+        Stream.concat(Arrays.stream(ident.namespace().levels()), 
Stream.of(ident.name()))
+            .collect(Collectors.toList());
+    if (allElems.size() < 2) {
+      throw new IllegalNameIdentifierException(
+          "Cannot create a catalog NameIdentifier less than two elements.");
+    }
+    return NameIdentifier.of(allElems.get(0), allElems.get(1));
+  }
+
   /**
    * Check the given {@link NameIdentifier} is a metalake identifier. Throw an 
{@link
    * IllegalNameIdentifierException} if it's not.
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestFilesetNormalizeDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestFilesetNormalizeDispatcher.java
index 7976dbe9b..8781a0299 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestFilesetNormalizeDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestFilesetNormalizeDispatcher.java
@@ -40,9 +40,11 @@ public class TestFilesetNormalizeDispatcher extends 
TestOperationDispatcher {
   public static void initialize() throws IOException {
     TestFilesetOperationDispatcher.initialize();
     filesetNormalizeDispatcher =
-        new 
FilesetNormalizeDispatcher(TestFilesetOperationDispatcher.filesetOperationDispatcher);
+        new FilesetNormalizeDispatcher(
+            TestFilesetOperationDispatcher.filesetOperationDispatcher, 
catalogManager);
     schemaNormalizeDispatcher =
-        new 
SchemaNormalizeDispatcher(TestFilesetOperationDispatcher.schemaOperationDispatcher);
+        new SchemaNormalizeDispatcher(
+            TestFilesetOperationDispatcher.schemaOperationDispatcher, 
catalogManager);
   }
 
   @Test
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestOperationDispatcher.java 
b/core/src/test/java/org/apache/gravitino/catalog/TestOperationDispatcher.java
index eae426c8c..58cd8ce5f 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestOperationDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestOperationDispatcher.java
@@ -19,6 +19,7 @@
 package org.apache.gravitino.catalog;
 
 import static 
org.apache.gravitino.TestFilesetPropertiesMetadata.TEST_FILESET_HIDDEN_KEY;
+import static 
org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.reset;
@@ -108,22 +109,20 @@ public abstract class TestOperationDispatcher {
 
   @Test
   public void testGetCatalogIdentifier() {
-    OperationDispatcher dispatcher = new OperationDispatcher(null, null, null) 
{};
-
     NameIdentifier id1 = NameIdentifier.of("a");
-    assertThrows(IllegalNamespaceException.class, () -> 
dispatcher.getCatalogIdentifier(id1));
+    assertThrows(IllegalNamespaceException.class, () -> 
getCatalogIdentifier(id1));
 
     NameIdentifier id2 = NameIdentifier.of("a", "b");
-    assertEquals(dispatcher.getCatalogIdentifier(id2), NameIdentifier.of("a", 
"b"));
+    assertEquals(getCatalogIdentifier(id2), NameIdentifier.of("a", "b"));
 
     NameIdentifier id3 = NameIdentifier.of("a", "b", "c");
-    assertEquals(dispatcher.getCatalogIdentifier(id3), NameIdentifier.of("a", 
"b"));
+    assertEquals(getCatalogIdentifier(id3), NameIdentifier.of("a", "b"));
 
     NameIdentifier id4 = NameIdentifier.of("a", "b", "c", "d");
-    assertEquals(dispatcher.getCatalogIdentifier(id4), NameIdentifier.of("a", 
"b"));
+    assertEquals(getCatalogIdentifier(id4), NameIdentifier.of("a", "b"));
 
     NameIdentifier id5 = NameIdentifier.of("a", "b", "c", "d", "e");
-    assertEquals(dispatcher.getCatalogIdentifier(id5), NameIdentifier.of("a", 
"b"));
+    assertEquals(getCatalogIdentifier(id5), NameIdentifier.of("a", "b"));
   }
 
   void testProperties(Map<String, String> expectedProps, Map<String, String> 
testProps) {
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
index fcd209b6c..2c47d69bc 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
@@ -43,7 +43,7 @@ public class TestPartitionNormalizeDispatcher extends 
TestOperationDispatcher {
     TestPartitionOperationDispatcher.prepareTable();
     partitionNormalizeDispatcher =
         new PartitionNormalizeDispatcher(
-            TestPartitionOperationDispatcher.partitionOperationDispatcher);
+            TestPartitionOperationDispatcher.partitionOperationDispatcher, 
catalogManager);
     NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema(metalake, 
catalog, SCHEMA);
     TestPartitionOperationDispatcher.schemaOperationDispatcher.createSchema(
         schemaIdent, "comment", null);
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestSchemaNormalizeDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestSchemaNormalizeDispatcher.java
index 539560e8e..e4fef5487 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestSchemaNormalizeDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestSchemaNormalizeDispatcher.java
@@ -37,7 +37,7 @@ public class TestSchemaNormalizeDispatcher extends 
TestOperationDispatcher {
   public static void initialize() throws IOException, IllegalAccessException {
     TestSchemaOperationDispatcher.initialize();
     schemaNormalizeDispatcher =
-        new 
SchemaNormalizeDispatcher(TestSchemaOperationDispatcher.dispatcher);
+        new 
SchemaNormalizeDispatcher(TestSchemaOperationDispatcher.dispatcher, 
catalogManager);
   }
 
   @Test
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestTableNormalizeDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestTableNormalizeDispatcher.java
index 25c9203a5..2c8938edc 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestTableNormalizeDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestTableNormalizeDispatcher.java
@@ -56,9 +56,11 @@ public class TestTableNormalizeDispatcher extends 
TestOperationDispatcher {
   public static void initialize() throws IOException, IllegalAccessException {
     TestTableOperationDispatcher.initialize();
     tableNormalizeDispatcher =
-        new 
TableNormalizeDispatcher(TestTableOperationDispatcher.tableOperationDispatcher);
+        new TableNormalizeDispatcher(
+            TestTableOperationDispatcher.tableOperationDispatcher, 
catalogManager);
     schemaNormalizeDispatcher =
-        new 
SchemaNormalizeDispatcher(TestTableOperationDispatcher.schemaOperationDispatcher);
+        new SchemaNormalizeDispatcher(
+            TestTableOperationDispatcher.schemaOperationDispatcher, 
catalogManager);
   }
 
   @Test
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestTopicNormalizeDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestTopicNormalizeDispatcher.java
index 6d493b91c..ef2589241 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestTopicNormalizeDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestTopicNormalizeDispatcher.java
@@ -38,9 +38,11 @@ public class TestTopicNormalizeDispatcher extends 
TestOperationDispatcher {
   public static void initialize() throws IOException, IllegalAccessException {
     TestTopicOperationDispatcher.initialize();
     schemaNormalizeDispatcher =
-        new 
SchemaNormalizeDispatcher(TestTopicOperationDispatcher.schemaOperationDispatcher);
+        new SchemaNormalizeDispatcher(
+            TestTopicOperationDispatcher.schemaOperationDispatcher, 
catalogManager);
     topicNormalizeDispatcher =
-        new 
TopicNormalizeDispatcher(TestTopicOperationDispatcher.topicOperationDispatcher);
+        new TopicNormalizeDispatcher(
+            TestTopicOperationDispatcher.topicOperationDispatcher, 
catalogManager);
   }
 
   @Test

Reply via email to