This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new bdd4b8ac59 [#8546] improvement: Retrieve metadata object id using 
cache (#9091)
bdd4b8ac59 is described below

commit bdd4b8ac59f98e9cfe5efc368f4f96581ab53011
Author: roryqi <[email protected]>
AuthorDate: Tue Nov 25 16:07:15 2025 +0800

    [#8546] improvement: Retrieve metadata object id using cache (#9091)
    
    ### What changes were proposed in this pull request?
    
     Retrieve metadata object id using cache
    
    ### Why are the changes needed?
    
    Fix: #8546
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing test cases.
---
 .../fileset/TestFilesetCatalogOperations.java      |   2 +-
 .../gravitino/cache/CachedEntityIdResolver.java    |  64 +++++
 .../apache/gravitino/meta/EntityIdResolver.java    |  44 ++++
 .../apache/gravitino/meta/NamespacedEntityId.java  |  50 ++++
 .../gravitino/storage/relational/JDBCBackend.java  |   1 -
 .../storage/relational/RelationalEntityStore.java  |  18 +-
 .../RelationalEntityStoreIdResolver.java           | 206 +++++++++++++++
 .../relational/service/CatalogMetaService.java     |  21 +-
 .../relational/service/CommonMetaService.java      | 157 -----------
 .../relational/service/EntityIdService.java        |  50 ++++
 .../relational/service/FilesetMetaService.java     |  29 +-
 .../relational/service/GroupMetaService.java       |  23 +-
 .../storage/relational/service/JobMetaService.java |   3 +-
 .../relational/service/JobTemplateMetaService.java |   4 +-
 .../relational/service/MetadataObjectService.java  |  55 ----
 .../relational/service/ModelMetaService.java       |  23 +-
 .../service/ModelVersionMetaService.java           |   6 +-
 .../relational/service/OwnerMetaService.java       |  27 +-
 .../relational/service/PolicyMetaService.java      |  18 +-
 .../relational/service/RoleMetaService.java        |  36 +--
 .../relational/service/SchemaMetaService.java      | 292 +++++++++++----------
 .../relational/service/StatisticMetaService.java   |  33 +--
 .../relational/service/TableColumnMetaService.java |   2 +-
 .../relational/service/TableMetaService.java       |  59 +++--
 .../storage/relational/service/TagMetaService.java |  15 +-
 .../relational/service/TopicMetaService.java       |  30 ++-
 .../relational/service/UserMetaService.java        |   4 +-
 .../apache/gravitino/utils/NameIdentifierUtil.java |  59 +++++
 .../storage/relational/TestJDBCBackend.java        |   2 +
 .../relational/service/TestModelMetaService.java   |   4 +-
 .../relational/service/TestSecurableObjects.java   |  25 +-
 31 files changed, 813 insertions(+), 549 deletions(-)

diff --git 
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
 
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
index 993f92acf7..a9d26fdf6a 100644
--- 
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
+++ 
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
@@ -2887,7 +2887,7 @@ public class TestFilesetCatalogOperations {
 
       NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1", 
name);
       Map<String, String> schemaProps = Maps.newHashMap();
-      StringIdentifier stringId = 
StringIdentifier.fromId(idGenerator.nextId());
+      StringIdentifier stringId = StringIdentifier.fromId(testId);
       schemaProps = 
Maps.newHashMap(StringIdentifier.newPropertiesWithId(stringId, schemaProps));
 
       if (schemaPath != null) {
diff --git 
a/core/src/main/java/org/apache/gravitino/cache/CachedEntityIdResolver.java 
b/core/src/main/java/org/apache/gravitino/cache/CachedEntityIdResolver.java
new file mode 100644
index 0000000000..d77c685d40
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/cache/CachedEntityIdResolver.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.cache;
+
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.meta.EntityIdResolver;
+import org.apache.gravitino.meta.NamespacedEntityId;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+
+public class CachedEntityIdResolver implements EntityIdResolver {
+
+  private final EntityCache entityCache;
+  private final EntityIdResolver underlyingResolver;
+
+  public CachedEntityIdResolver(EntityCache entityCache, EntityIdResolver 
underlyingResolver) {
+    this.entityCache = entityCache;
+    this.underlyingResolver = underlyingResolver;
+  }
+
+  @Override
+  public NamespacedEntityId getEntityIds(NameIdentifier nameIdentifier, 
Entity.EntityType type) {
+    return entityCache
+        .getIfPresent(nameIdentifier, type)
+        .map(
+            entity -> {
+              if (nameIdentifier.hasNamespace()) {
+                NamespacedEntityId namespaceIds =
+                    getEntityIds(
+                        
NameIdentifierUtil.parentNameIdentifier(nameIdentifier, type),
+                        NameIdentifierUtil.parentEntityType(type));
+                return new NamespacedEntityId(entity.id(), 
namespaceIds.fullIds());
+              } else {
+                return new NamespacedEntityId(entity.id());
+              }
+            })
+        .orElseGet(() -> underlyingResolver.getEntityIds(nameIdentifier, 
type));
+  }
+
+  @Override
+  public Long getEntityId(NameIdentifier nameIdentifier, Entity.EntityType 
type) {
+    return entityCache
+        .getIfPresent(nameIdentifier, type)
+        .map(HasIdentifier::id)
+        .orElseGet(() -> underlyingResolver.getEntityId(nameIdentifier, type));
+  }
+}
diff --git a/core/src/main/java/org/apache/gravitino/meta/EntityIdResolver.java 
b/core/src/main/java/org/apache/gravitino/meta/EntityIdResolver.java
new file mode 100644
index 0000000000..28210266d9
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/meta/EntityIdResolver.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.meta;
+
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+
+/** Interface for resolving entity IDs based on NameIdentifiers and Entity 
types. */
+public interface EntityIdResolver {
+
+  /**
+   * Get EntityIds for the given NameIdentifier and Entity type.
+   *
+   * @param nameIdentifier NameIdentifier of the entity.
+   * @param type Entity type.
+   * @return EntityIds corresponding to the NameIdentifier and Entity type.
+   */
+  NamespacedEntityId getEntityIds(NameIdentifier nameIdentifier, 
Entity.EntityType type);
+
+  /**
+   * Get the entity ID for the given NameIdentifier and Entity type.
+   *
+   * @param nameIdentifier NameIdentifier of the entity.
+   * @param type Entity type.
+   * @return Entity ID corresponding to the NameIdentifier and Entity type.
+   */
+  Long getEntityId(NameIdentifier nameIdentifier, Entity.EntityType type);
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/meta/NamespacedEntityId.java 
b/core/src/main/java/org/apache/gravitino/meta/NamespacedEntityId.java
new file mode 100644
index 0000000000..8f840eae51
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/meta/NamespacedEntityId.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.meta;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.ArrayUtils;
+
+public class NamespacedEntityId {
+
+  private final long entityId;
+  private final long[] namespaceIds;
+
+  public NamespacedEntityId(long entityId, long... namespaceIds) {
+    Preconditions.checkArgument(namespaceIds != null, "namespaceIds cannot be 
null");
+    this.entityId = entityId;
+    this.namespaceIds = namespaceIds;
+  }
+
+  public NamespacedEntityId(long entityId) {
+    this(entityId, new long[0]);
+  }
+
+  public long entityId() {
+    return entityId;
+  }
+
+  public long[] namespaceIds() {
+    return namespaceIds;
+  }
+
+  public long[] fullIds() {
+    return ArrayUtils.add(namespaceIds, entityId);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index e44cb9b2ae..206d9521ff 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -101,7 +101,6 @@ public class JDBCBackend implements RelationalBackend {
   @Override
   public void initialize(Config config) {
     jdbcDatabase = startJDBCDatabaseIfNecessary(config);
-
     SqlSessionFactoryHelper.getInstance().init(config);
     SQLExceptionConverterFactory.initConverter(config);
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
index 57de273755..aea88e9a9e 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
@@ -37,10 +37,12 @@ import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.SupportsRelationOperations;
 import org.apache.gravitino.cache.CacheFactory;
+import org.apache.gravitino.cache.CachedEntityIdResolver;
 import org.apache.gravitino.cache.EntityCache;
 import org.apache.gravitino.cache.EntityCacheRelationKey;
 import org.apache.gravitino.cache.NoOpsCache;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.storage.relational.service.EntityIdService;
 import org.apache.gravitino.utils.Executable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,16 +68,21 @@ public class RelationalEntityStore implements EntityStore, 
SupportsRelationOpera
 
   @Override
   public void initialize(Config config) throws RuntimeException {
+    if (config.get(Configs.CACHE_ENABLED)) {
+      this.cache = CacheFactory.getEntityCache(config);
+      EntityIdService.initialize(
+          new CachedEntityIdResolver(cache, new 
RelationalEntityStoreIdResolver()));
+    } else {
+      this.cache = new NoOpsCache(config);
+      EntityIdService.initialize(new RelationalEntityStoreIdResolver());
+    }
+
     this.backend = createRelationalEntityBackend(config);
     this.garbageCollector = new RelationalGarbageCollector(backend, config);
     this.garbageCollector.start();
-    this.cache =
-        config.get(Configs.CACHE_ENABLED)
-            ? CacheFactory.getEntityCache(config)
-            : new NoOpsCache(config);
   }
 
-  private static RelationalBackend createRelationalEntityBackend(Config 
config) {
+  private RelationalBackend createRelationalEntityBackend(Config config) {
     String backendName = config.get(ENTITY_RELATIONAL_STORE);
     String className =
         RELATIONAL_BACKENDS.getOrDefault(backendName, 
Configs.DEFAULT_ENTITY_RELATIONAL_STORE);
@@ -84,6 +91,7 @@ public class RelationalEntityStore implements EntityStore, 
SupportsRelationOpera
       RelationalBackend relationalBackend =
           (RelationalBackend) 
Class.forName(className).getDeclaredConstructor().newInstance();
       relationalBackend.initialize(config);
+
       return relationalBackend;
     } catch (Exception e) {
       LOGGER.error(
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStoreIdResolver.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStoreIdResolver.java
new file mode 100644
index 0000000000..9f46e47196
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStoreIdResolver.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.meta.EntityIdResolver;
+import org.apache.gravitino.meta.NamespacedEntityId;
+import org.apache.gravitino.storage.relational.helper.CatalogIds;
+import org.apache.gravitino.storage.relational.helper.SchemaIds;
+import org.apache.gravitino.storage.relational.service.CatalogMetaService;
+import org.apache.gravitino.storage.relational.service.FilesetMetaService;
+import org.apache.gravitino.storage.relational.service.GroupMetaService;
+import org.apache.gravitino.storage.relational.service.MetalakeMetaService;
+import org.apache.gravitino.storage.relational.service.ModelMetaService;
+import org.apache.gravitino.storage.relational.service.RoleMetaService;
+import org.apache.gravitino.storage.relational.service.SchemaMetaService;
+import org.apache.gravitino.storage.relational.service.TableColumnMetaService;
+import org.apache.gravitino.storage.relational.service.TableMetaService;
+import org.apache.gravitino.storage.relational.service.TagMetaService;
+import org.apache.gravitino.storage.relational.service.TopicMetaService;
+import org.apache.gravitino.storage.relational.service.UserMetaService;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+
+public class RelationalEntityStoreIdResolver implements EntityIdResolver {
+  private static final Set<Entity.EntityType> 
ENTITY_TYPES_REQUIRING_METALAKE_ID =
+      ImmutableSet.of(
+          Entity.EntityType.METALAKE,
+          Entity.EntityType.ROLE,
+          Entity.EntityType.USER,
+          Entity.EntityType.GROUP,
+          Entity.EntityType.TAG);
+  private static final Set<Entity.EntityType> 
ENTITY_TYPES_REQURING_CATALOG_IDS =
+      ImmutableSet.of(Entity.EntityType.CATALOG);
+  private static final Set<Entity.EntityType> ENTITY_TYPES_REQURING_SCHEMA_IDS 
=
+      ImmutableSet.of(
+          Entity.EntityType.SCHEMA,
+          Entity.EntityType.TABLE,
+          Entity.EntityType.FILESET,
+          Entity.EntityType.TOPIC,
+          Entity.EntityType.MODEL,
+          Entity.EntityType.COLUMN);
+
+  @Override
+  public NamespacedEntityId getEntityIds(NameIdentifier nameIdentifier, 
Entity.EntityType type) {
+    if (ENTITY_TYPES_REQUIRING_METALAKE_ID.contains(type)) {
+      return getEntityIdsRequiringMetalakeId(nameIdentifier, type);
+
+    } else if (ENTITY_TYPES_REQURING_CATALOG_IDS.contains(type)) {
+      CatalogIds catalogIds =
+          CatalogMetaService.getInstance()
+              .getCatalogIdByMetalakeAndCatalogName(
+                  NameIdentifierUtil.getMetalake(nameIdentifier),
+                  
NameIdentifierUtil.getCatalogIdentifier(nameIdentifier).name());
+      return new NamespacedEntityId(catalogIds.getCatalogId(), 
catalogIds.getMetalakeId());
+
+    } else if (ENTITY_TYPES_REQURING_SCHEMA_IDS.contains(type)) {
+      return getEntityIdsRequiringSchemaIds(nameIdentifier, type);
+
+    } else {
+      throw new IllegalArgumentException("Unsupported entity type: " + type);
+    }
+  }
+
+  @Override
+  public Long getEntityId(NameIdentifier nameIdentifier, Entity.EntityType 
type) {
+    if (ENTITY_TYPES_REQUIRING_METALAKE_ID.contains(type)) {
+      return getEntityIdsRequiringMetalakeId(nameIdentifier, type).entityId();
+
+    } else if (ENTITY_TYPES_REQURING_CATALOG_IDS.contains(type)) {
+      return CatalogMetaService.getInstance()
+          .getCatalogIdByMetalakeAndCatalogName(
+              NameIdentifierUtil.getMetalake(nameIdentifier),
+              NameIdentifierUtil.getCatalogIdentifier(nameIdentifier).name())
+          .getCatalogId();
+
+    } else if (ENTITY_TYPES_REQURING_SCHEMA_IDS.contains(type)) {
+      return getEntityIdsRequiringSchemaIds(nameIdentifier, type).entityId();
+
+    } else {
+      throw new IllegalArgumentException("Unsupported entity type: " + type);
+    }
+  }
+
+  private NamespacedEntityId getEntityIdsRequiringMetalakeId(
+      NameIdentifier nameIdentifier, Entity.EntityType type) {
+    long metalakeId =
+        MetalakeMetaService.getInstance()
+            
.getMetalakeIdByName(NameIdentifierUtil.getMetalake(nameIdentifier));
+
+    switch (type) {
+      case METALAKE:
+        return new NamespacedEntityId(metalakeId);
+
+      case ROLE:
+        long roleId =
+            RoleMetaService.getInstance()
+                .getRoleIdByMetalakeIdAndName(metalakeId, 
nameIdentifier.name());
+        return new NamespacedEntityId(roleId, metalakeId);
+
+      case USER:
+        long userId =
+            UserMetaService.getInstance()
+                .getUserIdByMetalakeIdAndName(metalakeId, 
nameIdentifier.name());
+        return new NamespacedEntityId(userId, metalakeId);
+
+      case GROUP:
+        long groupId =
+            GroupMetaService.getInstance()
+                .getGroupIdByMetalakeIdAndName(metalakeId, 
nameIdentifier.name());
+        return new NamespacedEntityId(groupId, metalakeId);
+
+      case TAG:
+        long tagId =
+            TagMetaService.getInstance().getTagIdByTagName(metalakeId, 
nameIdentifier.name());
+        return new NamespacedEntityId(tagId, metalakeId);
+
+      default:
+        throw new IllegalArgumentException("Unsupported entity type: " + type);
+    }
+  }
+
+  private NamespacedEntityId getEntityIdsRequiringSchemaIds(
+      NameIdentifier nameIdentifier, Entity.EntityType type) {
+    SchemaIds schemaIds =
+        SchemaMetaService.getInstance()
+            .getSchemaIdByMetalakeNameAndCatalogNameAndSchemaName(
+                NameIdentifierUtil.getMetalake(nameIdentifier),
+                NameIdentifierUtil.getCatalogIdentifier(nameIdentifier).name(),
+                NameIdentifierUtil.getSchemaIdentifier(nameIdentifier).name());
+
+    switch (type) {
+      case SCHEMA:
+        return new NamespacedEntityId(
+            schemaIds.getSchemaId(), schemaIds.getMetalakeId(), 
schemaIds.getCatalogId());
+
+      case TABLE:
+        long tableId =
+            TableMetaService.getInstance()
+                .getTableIdBySchemaIdAndName(schemaIds.getSchemaId(), 
nameIdentifier.name());
+        return new NamespacedEntityId(
+            tableId, schemaIds.getMetalakeId(), schemaIds.getCatalogId(), 
schemaIds.getSchemaId());
+
+      case COLUMN:
+        long columnTableId =
+            TableMetaService.getInstance()
+                .getTableIdBySchemaIdAndName(
+                    schemaIds.getSchemaId(),
+                    
NameIdentifier.of(nameIdentifier.namespace().levels()).name());
+        long columnId =
+            TableColumnMetaService.getInstance()
+                .getColumnIdByTableIdAndName(columnTableId, 
nameIdentifier.name());
+        return new NamespacedEntityId(
+            columnId,
+            schemaIds.getMetalakeId(),
+            schemaIds.getCatalogId(),
+            schemaIds.getSchemaId(),
+            columnTableId);
+
+      case FILESET:
+        long filesetId =
+            FilesetMetaService.getInstance()
+                .getFilesetIdBySchemaIdAndName(schemaIds.getSchemaId(), 
nameIdentifier.name());
+        return new NamespacedEntityId(
+            filesetId,
+            schemaIds.getMetalakeId(),
+            schemaIds.getCatalogId(),
+            schemaIds.getSchemaId());
+
+      case TOPIC:
+        long topicId =
+            TopicMetaService.getInstance()
+                .getTopicIdBySchemaIdAndName(schemaIds.getSchemaId(), 
nameIdentifier.name());
+        return new NamespacedEntityId(
+            topicId, schemaIds.getMetalakeId(), schemaIds.getCatalogId(), 
schemaIds.getSchemaId());
+
+      case MODEL:
+        long modelId =
+            ModelMetaService.getInstance()
+                .getModelIdBySchemaIdAndModelName(schemaIds.getSchemaId(), 
nameIdentifier.name());
+        return new NamespacedEntityId(
+            modelId, schemaIds.getMetalakeId(), schemaIds.getCatalogId(), 
schemaIds.getSchemaId());
+
+      default:
+        throw new IllegalArgumentException("Unsupported entity type: " + type);
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
index 396ca397fd..7b6edc46a5 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
@@ -92,9 +92,18 @@ public class CatalogMetaService {
       metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
       baseMetricName = "getCatalogIdByMetalakeAndCatalogName")
   public CatalogIds getCatalogIdByMetalakeAndCatalogName(String metalakeName, 
String catalogName) {
-    return SessionUtils.getWithoutCommit(
-        CatalogMetaMapper.class,
-        mapper -> 
mapper.selectCatalogIdByMetalakeNameAndCatalogName(metalakeName, catalogName));
+    CatalogIds catalogIds =
+        SessionUtils.getWithoutCommit(
+            CatalogMetaMapper.class,
+            mapper ->
+                
mapper.selectCatalogIdByMetalakeNameAndCatalogName(metalakeName, catalogName));
+    if (catalogIds == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.CATALOG.name().toLowerCase(),
+          catalogName);
+    }
+    return catalogIds;
   }
 
   @Monitored(
@@ -165,8 +174,9 @@ public class CatalogMetaService {
     try {
       NameIdentifierUtil.checkCatalog(catalogEntity.nameIdentifier());
 
+      String metalake = 
NameIdentifierUtil.getMetalake(catalogEntity.nameIdentifier());
       Long metalakeId =
-          
CommonMetaService.getInstance().getParentEntityIdByNamespace(catalogEntity.namespace());
+          EntityIdService.getEntityId(NameIdentifier.of(metalake), 
Entity.EntityType.METALAKE);
 
       SessionUtils.doWithCommit(
           CatalogMetaMapper.class,
@@ -234,9 +244,8 @@ public class CatalogMetaService {
   public boolean deleteCatalog(NameIdentifier identifier, boolean cascade) {
     NameIdentifierUtil.checkCatalog(identifier);
 
-    String metalakeName = identifier.namespace().level(0);
     String catalogName = identifier.name();
-    long catalogId = getCatalogIdByName(metalakeName, catalogName);
+    long catalogId = EntityIdService.getEntityId(identifier, 
Entity.EntityType.CATALOG);
 
     if (cascade) {
       SessionUtils.doMultipleWithCommit(
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/CommonMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/CommonMetaService.java
deleted file mode 100644
index d070d97163..0000000000
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/CommonMetaService.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.gravitino.storage.relational.service;
-
-import static 
org.apache.gravitino.metrics.source.MetricsSource.GRAVITINO_RELATIONAL_STORE_METRIC_NAME;
-
-import com.google.common.base.Preconditions;
-import org.apache.gravitino.Entity;
-import org.apache.gravitino.Namespace;
-import org.apache.gravitino.exceptions.NoSuchEntityException;
-import org.apache.gravitino.metrics.Monitored;
-import org.apache.gravitino.storage.relational.helper.CatalogIds;
-import org.apache.gravitino.storage.relational.helper.SchemaIds;
-
-/** The service class for common metadata operations. */
-public class CommonMetaService {
-  private static final CommonMetaService INSTANCE = new CommonMetaService();
-
-  public static CommonMetaService getInstance() {
-    return INSTANCE;
-  }
-
-  private CommonMetaService() {}
-
-  @Monitored(
-      metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
-      baseMetricName = "getParentEntityIdByNamespace")
-  public Long getParentEntityIdByNamespace(Namespace namespace) {
-    Preconditions.checkArgument(
-        !namespace.isEmpty() && namespace.levels().length <= 3,
-        "Namespace should not be empty and length should be less than or equal 
to 3.");
-
-    int length = namespace.levels().length;
-    Long parentEntityId;
-    switch (length) {
-      case 1:
-        // Parent is a metalake
-        parentEntityId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(namespace.level(0));
-        if (parentEntityId == null) {
-          throw new NoSuchEntityException(
-              NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-              Entity.EntityType.METALAKE.name().toLowerCase(),
-              namespace);
-        }
-
-        return parentEntityId;
-      case 2:
-        // Parent is a catalog
-        CatalogIds catalogIds =
-            CatalogMetaService.getInstance()
-                .getCatalogIdByMetalakeAndCatalogName(namespace.level(0), 
namespace.level(1));
-        parentEntityId = catalogIds == null ? null : catalogIds.getCatalogId();
-        if (parentEntityId == null) {
-          throw new NoSuchEntityException(
-              NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-              Entity.EntityType.CATALOG.name().toLowerCase(),
-              namespace);
-        }
-
-        return parentEntityId;
-      case 3:
-        // Parent is a schema
-        SchemaIds schemaIds =
-            SchemaMetaService.getInstance()
-                .getSchemaIdByMetalakeNameAndCatalogNameAndSchemaName(
-                    namespace.level(0), namespace.level(1), 
namespace.level(2));
-        parentEntityId = schemaIds == null ? null : schemaIds.getSchemaId();
-        if (parentEntityId == null) {
-          throw new NoSuchEntityException(
-              NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-              Entity.EntityType.SCHEMA.name().toLowerCase(),
-              namespace);
-        }
-        return parentEntityId;
-      default:
-        throw new IllegalArgumentException("Namespace length should be less 
than or equal to 3.");
-    }
-  }
-
-  @Monitored(
-      metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
-      baseMetricName = "getParentEntityIdsByNamespace")
-  public Long[] getParentEntityIdsByNamespace(Namespace namespace) {
-    Preconditions.checkArgument(
-        !namespace.isEmpty() && namespace.levels().length <= 3,
-        "Namespace should not be empty and length should be less than or equal 
to 3.");
-    Long[] parentEntityIds = new Long[namespace.levels().length];
-
-    int length = namespace.levels().length;
-    switch (length) {
-      case 1:
-        // Parent is a metalake
-        parentEntityIds[0] =
-            
MetalakeMetaService.getInstance().getMetalakeIdByName(namespace.level(0));
-        if (parentEntityIds[0] == null) {
-          throw new NoSuchEntityException(
-              NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-              Entity.EntityType.METALAKE.name().toLowerCase(),
-              namespace);
-        }
-
-        return parentEntityIds;
-      case 2:
-        // Parent is a catalog
-        CatalogIds catalogIds =
-            CatalogMetaService.getInstance()
-                .getCatalogIdByMetalakeAndCatalogName(namespace.level(0), 
namespace.level(1));
-        parentEntityIds[0] = catalogIds == null ? null : 
catalogIds.getMetalakeId();
-        parentEntityIds[1] = catalogIds == null ? null : 
catalogIds.getCatalogId();
-
-        if (parentEntityIds[1] == null) {
-          throw new NoSuchEntityException(
-              NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-              Entity.EntityType.CATALOG.name().toLowerCase(),
-              namespace);
-        }
-        return parentEntityIds;
-      case 3:
-        // Parent is a schema
-        SchemaIds schemaIds =
-            SchemaMetaService.getInstance()
-                .getSchemaIdByMetalakeNameAndCatalogNameAndSchemaName(
-                    namespace.level(0), namespace.level(1), 
namespace.level(2));
-        parentEntityIds[0] = schemaIds == null ? null : 
schemaIds.getMetalakeId();
-        parentEntityIds[1] = schemaIds == null ? null : 
schemaIds.getCatalogId();
-        parentEntityIds[2] = schemaIds == null ? null : 
schemaIds.getSchemaId();
-
-        if (parentEntityIds[2] == null) {
-          throw new NoSuchEntityException(
-              NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-              Entity.EntityType.SCHEMA.name().toLowerCase(),
-              namespace);
-        }
-
-        return parentEntityIds;
-      default:
-        throw new IllegalArgumentException("Namespace length should be less 
than or equal to 3.");
-    }
-  }
-}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/EntityIdService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/EntityIdService.java
new file mode 100644
index 0000000000..f95dc4f478
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/EntityIdService.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.storage.relational.service;
+
+import com.google.common.base.Preconditions;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.meta.EntityIdResolver;
+import org.apache.gravitino.meta.NamespacedEntityId;
+
+/**
+ * Service class to resolve entity IDs based on NameIdentifiers and Entity 
types. This class must be
+ * initialized with an EntityIdResolver before use.
+ */
+public class EntityIdService {
+  private static EntityIdResolver entityIdResolver;
+
+  private EntityIdService() {}
+
+  public static void initialize(EntityIdResolver idResolver) {
+    entityIdResolver = idResolver;
+  }
+
+  public static long getEntityId(NameIdentifier identifier, Entity.EntityType 
type) {
+    Preconditions.checkArgument(entityIdResolver != null, "EntityIdService is 
not initialized");
+    return entityIdResolver.getEntityId(identifier, type);
+  }
+
+  public static NamespacedEntityId getEntityIds(NameIdentifier identifier, 
Entity.EntityType type) {
+    Preconditions.checkArgument(entityIdResolver != null, "EntityIdService is 
not initialized");
+    return entityIdResolver.getEntityIds(identifier, type);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
index 15fa3f6b14..6b49bcc447 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
@@ -32,6 +32,7 @@ import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.NamespacedEntityId;
 import org.apache.gravitino.metrics.Monitored;
 import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper;
@@ -110,7 +111,8 @@ public class FilesetMetaService {
     String filesetName = identifier.name();
 
     Long schemaId =
-        
CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace());
+        EntityIdService.getEntityId(
+            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.SCHEMA);
 
     FilesetPO filesetPO = getFilesetPOBySchemaIdAndName(schemaId, filesetName);
 
@@ -123,7 +125,9 @@ public class FilesetMetaService {
   public List<FilesetEntity> listFilesetsByNamespace(Namespace namespace) {
     NamespaceUtil.checkFileset(namespace);
 
-    Long schemaId = 
CommonMetaService.getInstance().getParentEntityIdByNamespace(namespace);
+    Long schemaId =
+        EntityIdService.getEntityId(
+            NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
 
     List<FilesetPO> filesetPOs =
         SessionUtils.getWithoutCommit(
@@ -183,7 +187,8 @@ public class FilesetMetaService {
     String filesetName = identifier.name();
 
     Long schemaId =
-        
CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace());
+        EntityIdService.getEntityId(
+            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.SCHEMA);
 
     FilesetPO oldFilesetPO = getFilesetPOBySchemaIdAndName(schemaId, 
filesetName);
     FilesetEntity oldFilesetEntity =
@@ -244,12 +249,7 @@ public class FilesetMetaService {
   public boolean deleteFileset(NameIdentifier identifier) {
     NameIdentifierUtil.checkFileset(identifier);
 
-    String filesetName = identifier.name();
-
-    Long schemaId =
-        
CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace());
-
-    Long filesetId = getFilesetIdBySchemaIdAndName(schemaId, filesetName);
+    Long filesetId = EntityIdService.getEntityId(identifier, 
Entity.EntityType.FILESET);
 
     // We should delete meta and version info
     SessionUtils.doMultipleWithCommit(
@@ -349,10 +349,11 @@ public class FilesetMetaService {
 
   private void fillFilesetPOBuilderParentEntityId(FilesetPO.Builder builder, 
Namespace namespace) {
     NamespaceUtil.checkFileset(namespace);
-    Long[] parentEntityIds =
-        
CommonMetaService.getInstance().getParentEntityIdsByNamespace(namespace);
-    builder.withMetalakeId(parentEntityIds[0]);
-    builder.withCatalogId(parentEntityIds[1]);
-    builder.withSchemaId(parentEntityIds[2]);
+    NamespacedEntityId namespacedEntityId =
+        EntityIdService.getEntityIds(
+            NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
+    builder.withMetalakeId(namespacedEntityId.namespaceIds()[0]);
+    builder.withCatalogId(namespacedEntityId.namespaceIds()[1]);
+    builder.withSchemaId(namespacedEntityId.entityId());
   }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/GroupMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/GroupMetaService.java
index a62052c27a..4b42d74a64 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/GroupMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/GroupMetaService.java
@@ -50,6 +50,7 @@ import org.apache.gravitino.storage.relational.po.RolePO;
 import org.apache.gravitino.storage.relational.utils.ExceptionUtils;
 import org.apache.gravitino.storage.relational.utils.POConverters;
 import org.apache.gravitino.storage.relational.utils.SessionUtils;
+import org.apache.gravitino.utils.NameIdentifierUtil;
 
 /** The service class for group metadata. It provides the basic database 
operations for group. */
 public class GroupMetaService {
@@ -100,8 +101,8 @@ public class GroupMetaService {
   public GroupEntity getGroupByIdentifier(NameIdentifier identifier) {
     AuthorizationUtils.checkGroup(identifier);
 
-    Long metalakeId =
-        
MetalakeMetaService.getInstance().getMetalakeIdByName(identifier.namespace().level(0));
+    NameIdentifier metalakeIdent = 
NameIdentifier.of(NameIdentifierUtil.getMetalake(identifier));
+    long metalakeId = EntityIdService.getEntityId(metalakeIdent, 
Entity.EntityType.METALAKE);
     GroupPO groupPO = getGroupPOByMetalakeIdAndName(metalakeId, 
identifier.name());
     List<RolePO> rolePOs = 
RoleMetaService.getInstance().listRolesByGroupId(groupPO.getGroupId());
 
@@ -131,8 +132,10 @@ public class GroupMetaService {
     try {
       AuthorizationUtils.checkGroup(groupEntity.nameIdentifier());
 
-      Long metalakeId =
-          
MetalakeMetaService.getInstance().getMetalakeIdByName(groupEntity.namespace().level(0));
+      NameIdentifier metalakeIdent =
+          
NameIdentifier.of(NameIdentifierUtil.getMetalake(groupEntity.nameIdentifier()));
+      Long metalakeId = EntityIdService.getEntityId(metalakeIdent, 
Entity.EntityType.METALAKE);
+
       GroupPO.Builder builder = GroupPO.builder().withMetalakeId(metalakeId);
       GroupPO GroupPO = POConverters.initializeGroupPOWithVersion(groupEntity, 
builder);
 
@@ -174,9 +177,7 @@ public class GroupMetaService {
   public boolean deleteGroup(NameIdentifier identifier) {
     AuthorizationUtils.checkGroup(identifier);
 
-    Long metalakeId =
-        
MetalakeMetaService.getInstance().getMetalakeIdByName(identifier.namespace().level(0));
-    Long groupId = getGroupIdByMetalakeIdAndName(metalakeId, 
identifier.name());
+    Long groupId = EntityIdService.getEntityId(identifier, 
Entity.EntityType.GROUP);
 
     SessionUtils.doMultipleWithCommit(
         () ->
@@ -200,8 +201,9 @@ public class GroupMetaService {
       NameIdentifier identifier, Function<E, E> updater) throws IOException {
     AuthorizationUtils.checkGroup(identifier);
 
-    Long metalakeId =
-        
MetalakeMetaService.getInstance().getMetalakeIdByName(identifier.namespace().level(0));
+    NameIdentifier metalakeIdent = 
NameIdentifier.of(NameIdentifierUtil.getMetalake(identifier));
+    Long metalakeId = EntityIdService.getEntityId(metalakeIdent, 
Entity.EntityType.METALAKE);
+
     GroupPO oldGroupPO = getGroupPOByMetalakeIdAndName(metalakeId, 
identifier.name());
     List<RolePO> rolePOs =
         
RoleMetaService.getInstance().listRolesByGroupId(oldGroupPO.getGroupId());
@@ -274,7 +276,8 @@ public class GroupMetaService {
     String metalakeName = namespace.level(0);
 
     if (allFields) {
-      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalakeName);
+      NameIdentifier metalakeIdent = NameIdentifier.of(metalakeName);
+      long metalakeId = EntityIdService.getEntityId(metalakeIdent, 
Entity.EntityType.METALAKE);
       List<ExtendedGroupPO> groupPOs =
           SessionUtils.getWithoutCommit(
               GroupMetaMapper.class, mapper -> 
mapper.listExtendedGroupPOsByMetalakeId(metalakeId));
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
index 8aa641de67..04de839247 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
@@ -111,7 +111,8 @@ public class JobMetaService {
     String metalakeName = jobEntity.namespace().level(0);
 
     try {
-      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalakeName);
+      long metalakeId =
+          EntityIdService.getEntityId(NameIdentifier.of(metalakeName), 
Entity.EntityType.METALAKE);
 
       JobPO.JobPOBuilder builder = JobPO.builder().withMetalakeId(metalakeId);
       JobPO jobPO = JobPO.initializeJobPO(jobEntity, builder);
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/JobTemplateMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobTemplateMetaService.java
index c6a8657f8b..ad38114a70 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/JobTemplateMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobTemplateMetaService.java
@@ -84,8 +84,8 @@ public class JobTemplateMetaService {
     String metalakeName = jobTemplateEntity.namespace().level(0);
 
     try {
-      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalakeName);
-
+      Long metalakeId =
+          EntityIdService.getEntityId(NameIdentifier.of(metalakeName), 
Entity.EntityType.METALAKE);
       JobTemplatePO.JobTemplatePOBuilder builder =
           JobTemplatePO.builder().withMetalakeId(metalakeId);
       JobTemplatePO jobTemplatePO =
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
index b2dd95803e..51c812f5cc 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
@@ -21,7 +21,6 @@ package org.apache.gravitino.storage.relational.service;
 import static 
org.apache.gravitino.metrics.source.MetricsSource.GRAVITINO_RELATIONAL_STORE_METRIC_NAME;
 
 import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.util.HashMap;
@@ -63,8 +62,6 @@ public class MetadataObjectService {
 
   private static final String DOT = ".";
   private static final Joiner DOT_JOINER = Joiner.on(DOT);
-  private static final Splitter DOT_SPLITTER = Splitter.on(DOT);
-
   private static final Logger LOG = 
LoggerFactory.getLogger(MetadataObjectService.class);
 
   static final Map<MetadataObject.Type, Function<List<Long>, Map<Long, 
String>>>
@@ -140,58 +137,6 @@ public class MetadataObjectService {
     return metadataObjects;
   }
 
-  @Monitored(
-      metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
-      baseMetricName = "getMetadataObjectId")
-  public static long getMetadataObjectId(
-      long metalakeId, String fullName, MetadataObject.Type type) {
-    if (type == MetadataObject.Type.METALAKE) {
-      return MetalakeMetaService.getInstance().getMetalakeIdByName(fullName);
-    }
-
-    if (type == MetadataObject.Type.ROLE) {
-      return 
RoleMetaService.getInstance().getRoleIdByMetalakeIdAndName(metalakeId, 
fullName);
-    }
-    List<String> names = DOT_SPLITTER.splitToList(fullName);
-    if (type == MetadataObject.Type.TAG) {
-      return TagMetaService.getInstance().getTagIdByTagName(metalakeId, 
fullName);
-    }
-
-    long catalogId =
-        
CatalogMetaService.getInstance().getCatalogIdByMetalakeIdAndName(metalakeId, 
names.get(0));
-    if (type == MetadataObject.Type.CATALOG) {
-      return catalogId;
-    }
-
-    long schemaId =
-        
SchemaMetaService.getInstance().getSchemaIdByCatalogIdAndName(catalogId, 
names.get(1));
-    if (type == MetadataObject.Type.SCHEMA) {
-      return schemaId;
-    }
-
-    if (type == MetadataObject.Type.FILESET) {
-      return 
FilesetMetaService.getInstance().getFilesetIdBySchemaIdAndName(schemaId, 
names.get(2));
-    } else if (type == MetadataObject.Type.TOPIC) {
-      return 
TopicMetaService.getInstance().getTopicIdBySchemaIdAndName(schemaId, 
names.get(2));
-    } else if (type == MetadataObject.Type.MODEL) {
-      return ModelMetaService.getInstance()
-          .getModelIdBySchemaIdAndModelName(schemaId, names.get(2));
-    }
-
-    long tableId =
-        TableMetaService.getInstance().getTableIdBySchemaIdAndName(schemaId, 
names.get(2));
-    if (type == MetadataObject.Type.TABLE) {
-      return tableId;
-    }
-
-    if (type == MetadataObject.Type.COLUMN) {
-      return TableColumnMetaService.getInstance()
-          .getColumnIdByTableIdAndName(tableId, names.get(3));
-    }
-
-    throw new IllegalArgumentException(String.format("Doesn't support the type 
%s", type));
-  }
-
   /**
    * Retrieves a map of Metalake object IDs to their full names.
    *
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
index 7687ce49d6..e7f43b2031 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
@@ -36,6 +36,7 @@ import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.meta.ModelEntity;
+import org.apache.gravitino.meta.NamespacedEntityId;
 import org.apache.gravitino.metrics.Monitored;
 import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.ModelVersionAliasRelMapper;
@@ -72,7 +73,8 @@ public class ModelMetaService {
   public List<ModelEntity> listModelsByNamespace(Namespace ns) {
     NamespaceUtil.checkModel(ns);
 
-    Long schemaId = 
CommonMetaService.getInstance().getParentEntityIdByNamespace(ns);
+    Long schemaId =
+        EntityIdService.getEntityId(NameIdentifier.of(ns.levels()), 
Entity.EntityType.SCHEMA);
 
     List<ModelPO> modelPOs =
         SessionUtils.getWithoutCommit(
@@ -121,7 +123,9 @@ public class ModelMetaService {
     Long schemaId;
     Long modelId;
     try {
-      schemaId = 
CommonMetaService.getInstance().getParentEntityIdByNamespace(ident.namespace());
+      schemaId =
+          EntityIdService.getEntityId(
+              NameIdentifier.of(ident.namespace().levels()), 
Entity.EntityType.SCHEMA);
       modelId = getModelIdBySchemaIdAndModelName(schemaId, ident.name());
     } catch (NoSuchEntityException e) {
       LOG.warn("Failed to delete model: {}", ident, e);
@@ -196,7 +200,7 @@ public class ModelMetaService {
   @Monitored(
       metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
       baseMetricName = "getModelIdBySchemaIdAndModelName")
-  Long getModelIdBySchemaIdAndModelName(Long schemaId, String modelName) {
+  public Long getModelIdBySchemaIdAndModelName(Long schemaId, String 
modelName) {
     Long modelId =
         SessionUtils.getWithoutCommit(
             ModelMetaMapper.class,
@@ -232,10 +236,11 @@ public class ModelMetaService {
 
   private void fillModelPOBuilderParentEntityId(ModelPO.Builder builder, 
Namespace ns) {
     NamespaceUtil.checkModel(ns);
-    Long[] parentEntityIds = 
CommonMetaService.getInstance().getParentEntityIdsByNamespace(ns);
-    builder.withMetalakeId(parentEntityIds[0]);
-    builder.withCatalogId(parentEntityIds[1]);
-    builder.withSchemaId(parentEntityIds[2]);
+    NamespacedEntityId namespacedEntityId =
+        EntityIdService.getEntityIds(NameIdentifier.of(ns.levels()), 
Entity.EntityType.SCHEMA);
+    builder.withMetalakeId(namespacedEntityId.namespaceIds()[0]);
+    builder.withCatalogId(namespacedEntityId.namespaceIds()[1]);
+    builder.withSchemaId(namespacedEntityId.entityId());
   }
 
   @Monitored(
@@ -244,7 +249,9 @@ public class ModelMetaService {
   ModelPO getModelPOByIdentifier(NameIdentifier ident) {
     NameIdentifierUtil.checkModel(ident);
 
-    Long schemaId = 
CommonMetaService.getInstance().getParentEntityIdByNamespace(ident.namespace());
+    Long schemaId =
+        EntityIdService.getEntityId(
+            NameIdentifier.of(ident.namespace().levels()), 
Entity.EntityType.SCHEMA);
 
     ModelPO modelPO =
         SessionUtils.getWithoutCommit(
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelVersionMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelVersionMetaService.java
index a95777c007..82f92a7443 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelVersionMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelVersionMetaService.java
@@ -162,11 +162,7 @@ public class ModelVersionMetaService {
     NameIdentifier modelIdent = modelVersionEntity.modelIdentifier();
     NameIdentifierUtil.checkModel(modelIdent);
 
-    Long schemaId =
-        
CommonMetaService.getInstance().getParentEntityIdByNamespace(modelIdent.namespace());
-    Long modelId =
-        ModelMetaService.getInstance()
-            .getModelIdBySchemaIdAndModelName(schemaId, modelIdent.name());
+    Long modelId = EntityIdService.getEntityId(modelIdent, 
Entity.EntityType.MODEL);
 
     List<ModelVersionPO> modelVersionPOs =
         POConverters.initializeModelVersionPO(modelVersionEntity, modelId);
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/OwnerMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/OwnerMetaService.java
index 11ef4783ed..a68b2ebb3e 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/OwnerMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/OwnerMetaService.java
@@ -23,7 +23,6 @@ import static 
org.apache.gravitino.metrics.source.MetricsSource.GRAVITINO_RELATI
 import java.util.Collections;
 import java.util.Optional;
 import org.apache.gravitino.Entity;
-import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.authorization.AuthorizationUtils;
 import org.apache.gravitino.metrics.Monitored;
@@ -48,10 +47,8 @@ public class OwnerMetaService {
 
   @Monitored(metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME, 
baseMetricName = "getOwner")
   public Optional<Entity> getOwner(NameIdentifier identifier, 
Entity.EntityType type) {
-    long metalakeId =
-        MetalakeMetaService.getInstance()
-            .getMetalakeIdByName(NameIdentifierUtil.getMetalake(identifier));
-    Long entityId = getEntityId(metalakeId, identifier, type);
+
+    Long entityId = EntityIdService.getEntityId(identifier, type);
 
     UserPO userPO =
         SessionUtils.getWithoutCommit(
@@ -92,8 +89,8 @@ public class OwnerMetaService {
         MetalakeMetaService.getInstance()
             .getMetalakeIdByName(NameIdentifierUtil.getMetalake(entity));
 
-    Long entityId = getEntityId(metalakeId, entity, entityType);
-    Long ownerId = getEntityId(metalakeId, owner, ownerType);
+    Long entityId = EntityIdService.getEntityId(entity, entityType);
+    Long ownerId = EntityIdService.getEntityId(owner, ownerType);
 
     OwnerRelPO ownerRelPO =
         POConverters.initializeOwnerRelPOsWithVersion(
@@ -110,20 +107,4 @@ public class OwnerMetaService {
             SessionUtils.doWithoutCommit(
                 OwnerMetaMapper.class, mapper -> 
mapper.insertOwnerRel(ownerRelPO)));
   }
-
-  private static long getEntityId(
-      long metalakeId, NameIdentifier identifier, Entity.EntityType type) {
-    switch (type) {
-      case USER:
-        return UserMetaService.getInstance()
-            .getUserIdByMetalakeIdAndName(metalakeId, identifier.name());
-      case GROUP:
-        return GroupMetaService.getInstance()
-            .getGroupIdByMetalakeIdAndName(metalakeId, identifier.name());
-      default:
-        MetadataObject object = 
NameIdentifierUtil.toMetadataObject(identifier, type);
-        return MetadataObjectService.getMetadataObjectId(
-            metalakeId, object.fullName(), object.type());
-    }
-  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
index fb921dc986..5735ef8ab5 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
@@ -93,7 +93,8 @@ public class PolicyMetaService {
     String metalakeName = ns.level(0);
 
     try {
-      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalakeName);
+      Long metalakeId =
+          EntityIdService.getEntityId(NameIdentifier.of(metalakeName), 
Entity.EntityType.METALAKE);
 
       PolicyPO.Builder builder = PolicyPO.builder().withMetalakeId(metalakeId);
       PolicyPO policyPO = 
POConverters.initializePolicyPOWithVersion(policyEntity, builder);
@@ -218,10 +219,7 @@ public class PolicyMetaService {
 
     List<PolicyPO> PolicyPOs;
     try {
-      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
-      Long metadataObjectId =
-          MetadataObjectService.getMetadataObjectId(
-              metalakeId, metadataObject.fullName(), metadataObject.type());
+      Long metadataObjectId = EntityIdService.getEntityId(objectIdent, 
objectType);
 
       PolicyPOs =
           SessionUtils.getWithoutCommit(
@@ -250,10 +248,7 @@ public class PolicyMetaService {
 
     PolicyPO policyPO;
     try {
-      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
-      Long metadataObjectId =
-          MetadataObjectService.getMetadataObjectId(
-              metalakeId, metadataObject.fullName(), metadataObject.type());
+      Long metadataObjectId = EntityIdService.getEntityId(objectIdent, 
objectType);
 
       policyPO =
           SessionUtils.getWithoutCommit(
@@ -322,10 +317,7 @@ public class PolicyMetaService {
     String metalake = objectIdent.namespace().level(0);
 
     try {
-      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
-      Long metadataObjectId =
-          MetadataObjectService.getMetadataObjectId(
-              metalakeId, metadataObject.fullName(), metadataObject.type());
+      Long metadataObjectId = EntityIdService.getEntityId(objectIdent, 
objectType);
 
       // Fetch all the policies need to associate with the metadata object.
       List<String> policyNamesToAdd =
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
index 87b8c474dd..d0fe1db7dd 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
@@ -54,6 +54,7 @@ import 
org.apache.gravitino.storage.relational.po.SecurableObjectPO;
 import org.apache.gravitino.storage.relational.utils.ExceptionUtils;
 import org.apache.gravitino.storage.relational.utils.POConverters;
 import org.apache.gravitino.storage.relational.utils.SessionUtils;
+import org.apache.gravitino.utils.MetadataObjectUtil;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -116,12 +117,9 @@ public class RoleMetaService {
   public List<RoleEntity> listRolesByMetadataObject(
       NameIdentifier metadataObjectIdent, Entity.EntityType 
metadataObjectType, boolean allFields) {
     String metalake = NameIdentifierUtil.getMetalake(metadataObjectIdent);
-    long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
     MetadataObject metadataObject =
         NameIdentifierUtil.toMetadataObject(metadataObjectIdent, 
metadataObjectType);
-    long metadataObjectId =
-        MetadataObjectService.getMetadataObjectId(
-            metalakeId, metadataObject.fullName(), metadataObject.type());
+    long metadataObjectId = EntityIdService.getEntityId(metadataObjectIdent, 
metadataObjectType);
     List<RolePO> rolePOs =
         SessionUtils.getWithoutCommit(
             RoleMetaMapper.class,
@@ -155,18 +153,18 @@ public class RoleMetaService {
     try {
       AuthorizationUtils.checkRole(roleEntity.nameIdentifier());
 
-      Long metalakeId =
-          
MetalakeMetaService.getInstance().getMetalakeIdByName(roleEntity.namespace().level(0));
+      String metalake = 
NameIdentifierUtil.getMetalake(roleEntity.nameIdentifier());
+      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
       RolePO.Builder builder = RolePO.builder().withMetalakeId(metalakeId);
       RolePO rolePO = POConverters.initializeRolePOWithVersion(roleEntity, 
builder);
       List<SecurableObjectPO> securableObjectPOs = Lists.newArrayList();
       for (SecurableObject object : roleEntity.securableObjects()) {
         SecurableObjectPO.Builder objectBuilder =
             POConverters.initializeSecurablePOBuilderWithVersion(
-                roleEntity.id(), object, getEntityType(object));
-        objectBuilder.withMetadataObjectId(
-            MetadataObjectService.getMetadataObjectId(
-                metalakeId, object.fullName(), object.type()));
+                roleEntity.id(), object, getType(object));
+        NameIdentifier identifier = MetadataObjectUtil.toEntityIdent(metalake, 
object);
+        Entity.EntityType entityType = 
MetadataObjectUtil.toEntityType(object.type());
+        
objectBuilder.withMetadataObjectId(EntityIdService.getEntityId(identifier, 
entityType));
         securableObjectPOs.add(objectBuilder.build());
       }
 
@@ -206,7 +204,7 @@ public class RoleMetaService {
     AuthorizationUtils.checkRole(identifier);
 
     try {
-      String metalake = identifier.namespace().level(0);
+      String metalake = NameIdentifierUtil.getMetalake(identifier);
       Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
 
       RolePO rolePO = getRolePOByMetalakeIdAndName(metalakeId, 
identifier.name());
@@ -232,10 +230,10 @@ public class RoleMetaService {
       }
 
       List<SecurableObjectPO> deleteSecurableObjectPOs =
-          toSecurableObjectPOs(deleteObjects, oldRoleEntity, metalakeId);
+          toSecurableObjectPOs(deleteObjects, oldRoleEntity, metalake);
 
       List<SecurableObjectPO> insertSecurableObjectPOs =
-          toSecurableObjectPOs(insertObjects, oldRoleEntity, metalakeId);
+          toSecurableObjectPOs(insertObjects, oldRoleEntity, metalake);
 
       SessionUtils.doMultipleWithCommit(
           () ->
@@ -271,14 +269,16 @@ public class RoleMetaService {
   }
 
   private List<SecurableObjectPO> toSecurableObjectPOs(
-      Set<SecurableObject> deleteObjects, RoleEntity oldRoleEntity, Long 
metalakeId) {
+      Set<SecurableObject> deleteObjects, RoleEntity oldRoleEntity, String 
metalake) {
     List<SecurableObjectPO> securableObjectPOs = Lists.newArrayList();
     for (SecurableObject object : deleteObjects) {
       SecurableObjectPO.Builder objectBuilder =
           POConverters.initializeSecurablePOBuilderWithVersion(
-              oldRoleEntity.id(), object, getEntityType(object));
-      objectBuilder.withMetadataObjectId(
-          MetadataObjectService.getMetadataObjectId(metalakeId, 
object.fullName(), object.type()));
+              oldRoleEntity.id(), object, getType(object));
+      NameIdentifier nameIdentifier = 
MetadataObjectUtil.toEntityIdent(metalake, object);
+      Entity.EntityType entityType = 
MetadataObjectUtil.toEntityType(object.type());
+
+      
objectBuilder.withMetadataObjectId(EntityIdService.getEntityId(nameIdentifier, 
entityType));
       securableObjectPOs.add(objectBuilder.build());
     }
     return securableObjectPOs;
@@ -456,7 +456,7 @@ public class RoleMetaService {
     return MetadataObject.Type.valueOf(type);
   }
 
-  private static String getEntityType(SecurableObject securableObject) {
+  private static String getType(SecurableObject securableObject) {
     return securableObject.type().name();
   }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
index e171923923..44e2e58c7c 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
@@ -34,6 +34,7 @@ import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.exceptions.NonEmptyEntityException;
 import org.apache.gravitino.meta.FilesetEntity;
 import org.apache.gravitino.meta.ModelEntity;
+import org.apache.gravitino.meta.NamespacedEntityId;
 import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.metrics.Monitored;
@@ -92,11 +93,21 @@ public class SchemaMetaService {
       baseMetricName = "getSchemaIdByMetalakeNameAndCatalogNameAndSchemaName")
   public SchemaIds getSchemaIdByMetalakeNameAndCatalogNameAndSchemaName(
       String metalakeName, String catalogName, String schemaName) {
-    return SessionUtils.getWithoutCommit(
-        SchemaMetaMapper.class,
-        mapper ->
-            mapper.selectSchemaIdByMetalakeNameAndCatalogNameAndSchemaName(
-                metalakeName, catalogName, schemaName));
+    SchemaIds schemaIds =
+        SessionUtils.getWithoutCommit(
+            SchemaMetaMapper.class,
+            mapper ->
+                mapper.selectSchemaIdByMetalakeNameAndCatalogNameAndSchemaName(
+                    metalakeName, catalogName, schemaName));
+
+    if (schemaIds == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.SCHEMA.name().toLowerCase(),
+          schemaName);
+    }
+
+    return schemaIds;
   }
 
   @Monitored(
@@ -125,7 +136,8 @@ public class SchemaMetaService {
     String schemaName = identifier.name();
 
     Long catalogId =
-        
CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace());
+        EntityIdService.getEntityId(
+            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.CATALOG);
 
     SchemaPO schemaPO = getSchemaPOByCatalogIdAndName(catalogId, schemaName);
 
@@ -138,7 +150,9 @@ public class SchemaMetaService {
   public List<SchemaEntity> listSchemasByNamespace(Namespace namespace) {
     NamespaceUtil.checkSchema(namespace);
 
-    Long catalogId = 
CommonMetaService.getInstance().getParentEntityIdByNamespace(namespace);
+    Long catalogId =
+        EntityIdService.getEntityId(
+            NameIdentifier.of(namespace.levels()), Entity.EntityType.CATALOG);
 
     List<SchemaPO> schemaPOs =
         SessionUtils.getWithoutCommit(
@@ -182,7 +196,8 @@ public class SchemaMetaService {
 
     String schemaName = identifier.name();
     Long catalogId =
-        
CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace());
+        EntityIdService.getEntityId(
+            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.CATALOG);
 
     SchemaPO oldSchemaPO = getSchemaPOByCatalogIdAndName(catalogId, 
schemaName);
 
@@ -222,137 +237,129 @@ public class SchemaMetaService {
     NameIdentifierUtil.checkSchema(identifier);
 
     String schemaName = identifier.name();
-    Long catalogId =
-        
CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace());
-    Long schemaId = getSchemaIdByCatalogIdAndName(catalogId, schemaName);
-
-    if (schemaId != null) {
-      if (cascade) {
-        SessionUtils.doMultipleWithCommit(
-            () ->
-                SessionUtils.doWithoutCommit(
-                    SchemaMetaMapper.class,
-                    mapper -> 
mapper.softDeleteSchemaMetasBySchemaId(schemaId)),
-            () ->
-                SessionUtils.doWithoutCommit(
-                    TableMetaMapper.class,
-                    mapper -> mapper.softDeleteTableMetasBySchemaId(schemaId)),
-            () ->
-                SessionUtils.doWithoutCommit(
-                    TableColumnMapper.class,
-                    mapper -> mapper.softDeleteColumnsBySchemaId(schemaId)),
-            () ->
-                SessionUtils.doWithoutCommit(
-                    FilesetMetaMapper.class,
-                    mapper -> 
mapper.softDeleteFilesetMetasBySchemaId(schemaId)),
-            () ->
-                SessionUtils.doWithoutCommit(
-                    FilesetVersionMapper.class,
-                    mapper -> 
mapper.softDeleteFilesetVersionsBySchemaId(schemaId)),
-            () ->
-                SessionUtils.doWithoutCommit(
-                    TopicMetaMapper.class,
-                    mapper -> mapper.softDeleteTopicMetasBySchemaId(schemaId)),
-            () ->
-                SessionUtils.doWithoutCommit(
-                    OwnerMetaMapper.class, mapper -> 
mapper.softDeleteOwnerRelBySchemaId(schemaId)),
-            () ->
-                SessionUtils.doWithoutCommit(
-                    SecurableObjectMapper.class,
-                    mapper -> mapper.softDeleteObjectRelsBySchemaId(schemaId)),
-            () ->
-                SessionUtils.doWithoutCommit(
-                    TagMetadataObjectRelMapper.class,
-                    mapper -> 
mapper.softDeleteTagMetadataObjectRelsBySchemaId(schemaId)),
-            () ->
-                SessionUtils.doWithoutCommit(
-                    PolicyMetadataObjectRelMapper.class,
-                    mapper -> 
mapper.softDeletePolicyMetadataObjectRelsBySchemaId(schemaId)),
-            () ->
-                SessionUtils.doWithoutCommit(
-                    ModelVersionAliasRelMapper.class,
-                    mapper -> 
mapper.softDeleteModelVersionAliasRelsBySchemaId(schemaId)),
-            () ->
-                SessionUtils.doWithoutCommit(
-                    ModelVersionMetaMapper.class,
-                    mapper -> 
mapper.softDeleteModelVersionMetasBySchemaId(schemaId)),
-            () ->
-                SessionUtils.doWithoutCommit(
-                    ModelMetaMapper.class,
-                    mapper -> mapper.softDeleteModelMetasBySchemaId(schemaId)),
-            () ->
-                SessionUtils.doWithoutCommit(
-                    StatisticMetaMapper.class,
-                    mapper -> 
mapper.softDeleteStatisticsBySchemaId(schemaId)));
-      } else {
-        List<TableEntity> tableEntities =
-            TableMetaService.getInstance()
-                .listTablesByNamespace(
-                    NamespaceUtil.ofTable(
-                        identifier.namespace().level(0),
-                        identifier.namespace().level(1),
-                        schemaName));
-        if (!tableEntities.isEmpty()) {
-          throw new NonEmptyEntityException(
-              "Entity %s has sub-entities, you should remove sub-entities 
first", identifier);
-        }
-        List<FilesetEntity> filesetEntities =
-            FilesetMetaService.getInstance()
-                .listFilesetsByNamespace(
-                    NamespaceUtil.ofFileset(
-                        identifier.namespace().level(0),
-                        identifier.namespace().level(1),
-                        schemaName));
-        if (!filesetEntities.isEmpty()) {
-          throw new NonEmptyEntityException(
-              "Entity %s has sub-entities, you should remove sub-entities 
first", identifier);
-        }
-        List<ModelEntity> modelEntities =
-            ModelMetaService.getInstance()
-                .listModelsByNamespace(
-                    NamespaceUtil.ofModel(
-                        identifier.namespace().level(0),
-                        identifier.namespace().level(1),
-                        schemaName));
-        if (!modelEntities.isEmpty()) {
-          throw new NonEmptyEntityException(
-              "Entity %s has sub-entities, you should remove sub-entities 
first", identifier);
-        }
-
-        SessionUtils.doMultipleWithCommit(
-            () ->
-                SessionUtils.doWithoutCommit(
-                    SchemaMetaMapper.class,
-                    mapper -> 
mapper.softDeleteSchemaMetasBySchemaId(schemaId)),
-            () ->
-                SessionUtils.doWithoutCommit(
-                    OwnerMetaMapper.class,
-                    mapper ->
-                        mapper.softDeleteOwnerRelByMetadataObjectIdAndType(
-                            schemaId, MetadataObject.Type.SCHEMA.name())),
-            () ->
-                SessionUtils.doWithoutCommit(
-                    SecurableObjectMapper.class,
-                    mapper ->
-                        mapper.softDeleteObjectRelsByMetadataObject(
-                            schemaId, MetadataObject.Type.SCHEMA.name())),
-            () ->
-                SessionUtils.doWithoutCommit(
-                    TagMetadataObjectRelMapper.class,
-                    mapper ->
-                        mapper.softDeleteTagMetadataObjectRelsByMetadataObject(
-                            schemaId, MetadataObject.Type.SCHEMA.name())),
-            () ->
-                SessionUtils.doWithoutCommit(
-                    StatisticMetaMapper.class,
-                    mapper -> mapper.softDeleteStatisticsByEntityId(schemaId)),
-            () ->
-                SessionUtils.doWithoutCommit(
-                    PolicyMetadataObjectRelMapper.class,
-                    mapper ->
-                        
mapper.softDeletePolicyMetadataObjectRelsByMetadataObject(
-                            schemaId, MetadataObject.Type.SCHEMA.name())));
+    Long schemaId = EntityIdService.getEntityId(identifier, 
Entity.EntityType.SCHEMA);
+
+    if (cascade) {
+      SessionUtils.doMultipleWithCommit(
+          () ->
+              SessionUtils.doWithoutCommit(
+                  SchemaMetaMapper.class,
+                  mapper -> mapper.softDeleteSchemaMetasBySchemaId(schemaId)),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  TableMetaMapper.class, mapper -> 
mapper.softDeleteTableMetasBySchemaId(schemaId)),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  TableColumnMapper.class, mapper -> 
mapper.softDeleteColumnsBySchemaId(schemaId)),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  FilesetMetaMapper.class,
+                  mapper -> mapper.softDeleteFilesetMetasBySchemaId(schemaId)),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  FilesetVersionMapper.class,
+                  mapper -> 
mapper.softDeleteFilesetVersionsBySchemaId(schemaId)),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  TopicMetaMapper.class, mapper -> 
mapper.softDeleteTopicMetasBySchemaId(schemaId)),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  OwnerMetaMapper.class, mapper -> 
mapper.softDeleteOwnerRelBySchemaId(schemaId)),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  SecurableObjectMapper.class,
+                  mapper -> mapper.softDeleteObjectRelsBySchemaId(schemaId)),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  TagMetadataObjectRelMapper.class,
+                  mapper -> 
mapper.softDeleteTagMetadataObjectRelsBySchemaId(schemaId)),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  PolicyMetadataObjectRelMapper.class,
+                  mapper -> 
mapper.softDeletePolicyMetadataObjectRelsBySchemaId(schemaId)),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  ModelVersionAliasRelMapper.class,
+                  mapper -> 
mapper.softDeleteModelVersionAliasRelsBySchemaId(schemaId)),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  ModelVersionMetaMapper.class,
+                  mapper -> 
mapper.softDeleteModelVersionMetasBySchemaId(schemaId)),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  ModelMetaMapper.class, mapper -> 
mapper.softDeleteModelMetasBySchemaId(schemaId)),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  StatisticMetaMapper.class,
+                  mapper -> mapper.softDeleteStatisticsBySchemaId(schemaId)));
+    } else {
+      List<TableEntity> tableEntities =
+          TableMetaService.getInstance()
+              .listTablesByNamespace(
+                  NamespaceUtil.ofTable(
+                      identifier.namespace().level(0),
+                      identifier.namespace().level(1),
+                      schemaName));
+      if (!tableEntities.isEmpty()) {
+        throw new NonEmptyEntityException(
+            "Entity %s has sub-entities, you should remove sub-entities 
first", identifier);
+      }
+      List<FilesetEntity> filesetEntities =
+          FilesetMetaService.getInstance()
+              .listFilesetsByNamespace(
+                  NamespaceUtil.ofFileset(
+                      identifier.namespace().level(0),
+                      identifier.namespace().level(1),
+                      schemaName));
+      if (!filesetEntities.isEmpty()) {
+        throw new NonEmptyEntityException(
+            "Entity %s has sub-entities, you should remove sub-entities 
first", identifier);
       }
+      List<ModelEntity> modelEntities =
+          ModelMetaService.getInstance()
+              .listModelsByNamespace(
+                  NamespaceUtil.ofModel(
+                      identifier.namespace().level(0),
+                      identifier.namespace().level(1),
+                      schemaName));
+      if (!modelEntities.isEmpty()) {
+        throw new NonEmptyEntityException(
+            "Entity %s has sub-entities, you should remove sub-entities 
first", identifier);
+      }
+
+      SessionUtils.doMultipleWithCommit(
+          () ->
+              SessionUtils.doWithoutCommit(
+                  SchemaMetaMapper.class,
+                  mapper -> mapper.softDeleteSchemaMetasBySchemaId(schemaId)),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  OwnerMetaMapper.class,
+                  mapper ->
+                      mapper.softDeleteOwnerRelByMetadataObjectIdAndType(
+                          schemaId, MetadataObject.Type.SCHEMA.name())),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  SecurableObjectMapper.class,
+                  mapper ->
+                      mapper.softDeleteObjectRelsByMetadataObject(
+                          schemaId, MetadataObject.Type.SCHEMA.name())),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  TagMetadataObjectRelMapper.class,
+                  mapper ->
+                      mapper.softDeleteTagMetadataObjectRelsByMetadataObject(
+                          schemaId, MetadataObject.Type.SCHEMA.name())),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  StatisticMetaMapper.class,
+                  mapper -> mapper.softDeleteStatisticsByEntityId(schemaId)),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  PolicyMetadataObjectRelMapper.class,
+                  mapper ->
+                      
mapper.softDeletePolicyMetadataObjectRelsByMetadataObject(
+                          schemaId, MetadataObject.Type.SCHEMA.name())));
     }
     return true;
   }
@@ -370,9 +377,10 @@ public class SchemaMetaService {
 
   private void fillSchemaPOBuilderParentEntityId(SchemaPO.Builder builder, 
Namespace namespace) {
     NamespaceUtil.checkSchema(namespace);
-    Long[] parentEntityIds =
-        
CommonMetaService.getInstance().getParentEntityIdsByNamespace(namespace);
-    builder.withMetalakeId(parentEntityIds[0]);
-    builder.withCatalogId(parentEntityIds[1]);
+    NamespacedEntityId namespacedEntityId =
+        EntityIdService.getEntityIds(
+            NameIdentifier.of(namespace.levels()), Entity.EntityType.CATALOG);
+    builder.withMetalakeId(namespacedEntityId.namespaceIds()[0]);
+    builder.withCatalogId(namespacedEntityId.entityId());
   }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/StatisticMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/StatisticMetaService.java
index 5e90e5ad24..81880859f3 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/StatisticMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/StatisticMetaService.java
@@ -23,8 +23,8 @@ import static 
org.apache.gravitino.metrics.source.MetricsSource.GRAVITINO_RELATI
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
-import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.meta.NamespacedEntityId;
 import org.apache.gravitino.meta.StatisticEntity;
 import org.apache.gravitino.metrics.Monitored;
 import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
@@ -51,16 +51,13 @@ public class StatisticMetaService {
       baseMetricName = "listStatisticsByEntity")
   public List<StatisticEntity> listStatisticsByEntity(
       NameIdentifier identifier, Entity.EntityType type) {
-    long metalakeId =
-        MetalakeMetaService.getInstance()
-            .getMetalakeIdByName(NameIdentifierUtil.getMetalake(identifier));
-    MetadataObject object = NameIdentifierUtil.toMetadataObject(identifier, 
type);
-    long entityId =
-        MetadataObjectService.getMetadataObjectId(metalakeId, 
object.fullName(), object.type());
+    NamespacedEntityId namespacedEntityId = 
EntityIdService.getEntityIds(identifier, type);
     List<StatisticPO> statisticPOs =
         SessionUtils.getWithoutCommit(
             StatisticMetaMapper.class,
-            mapper -> mapper.listStatisticPOsByEntityId(metalakeId, entityId));
+            mapper ->
+                mapper.listStatisticPOsByEntityId(
+                    namespacedEntityId.namespaceIds()[0], 
namespacedEntityId.entityId()));
     return 
statisticPOs.stream().map(StatisticPO::fromStatisticPO).collect(Collectors.toList());
   }
 
@@ -73,14 +70,13 @@ public class StatisticMetaService {
       return;
     }
 
-    String metalake = NameIdentifierUtil.getMetalake(entity);
-    Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
-    MetadataObject object = NameIdentifierUtil.toMetadataObject(entity, type);
-    Long entityId =
-        MetadataObjectService.getMetadataObjectId(metalakeId, 
object.fullName(), object.type());
-
+    NamespacedEntityId namespacedEntityId = 
EntityIdService.getEntityIds(entity, type);
     List<StatisticPO> pos =
-        StatisticPO.initializeStatisticPOs(statisticEntities, metalakeId, 
entityId, object.type());
+        StatisticPO.initializeStatisticPOs(
+            statisticEntities,
+            namespacedEntityId.namespaceIds()[0],
+            namespacedEntityId.entityId(),
+            NameIdentifierUtil.toMetadataObject(entity, type).type());
     SessionUtils.doWithCommit(
         StatisticMetaMapper.class,
         mapper -> mapper.batchInsertStatisticPOsOnDuplicateKeyUpdate(pos));
@@ -94,12 +90,7 @@ public class StatisticMetaService {
     if (statisticNames == null || statisticNames.isEmpty()) {
       return 0;
     }
-    Long metalakeId =
-        MetalakeMetaService.getInstance()
-            .getMetalakeIdByName(NameIdentifierUtil.getMetalake(identifier));
-    MetadataObject object = NameIdentifierUtil.toMetadataObject(identifier, 
type);
-    Long entityId =
-        MetadataObjectService.getMetadataObjectId(metalakeId, 
object.fullName(), object.type());
+    Long entityId = EntityIdService.getEntityId(identifier, type);
 
     return SessionUtils.doWithCommitAndFetchResult(
         StatisticMetaMapper.class,
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableColumnMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableColumnMetaService.java
index 1e4bcc7fc2..1433f7ac62 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableColumnMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableColumnMetaService.java
@@ -66,7 +66,7 @@ public class TableColumnMetaService {
   @Monitored(
       metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
       baseMetricName = "getColumnIdByTableIdAndName")
-  Long getColumnIdByTableIdAndName(Long tableId, String columnName) {
+  public Long getColumnIdByTableIdAndName(Long tableId, String columnName) {
     Long columnId =
         SessionUtils.getWithoutCommit(
             TableColumnMapper.class,
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
index bfbbdaf39f..092263a990 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
@@ -33,6 +33,7 @@ import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.meta.NamespacedEntityId;
 import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.metrics.Monitored;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
@@ -85,7 +86,8 @@ public class TableMetaService {
     NameIdentifierUtil.checkTable(identifier);
 
     Long schemaId =
-        
CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace());
+        EntityIdService.getEntityId(
+            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.SCHEMA);
 
     TablePO tablePO = getTablePOBySchemaIdAndName(schemaId, identifier.name());
     List<ColumnPO> columnPOs =
@@ -101,7 +103,9 @@ public class TableMetaService {
   public List<TableEntity> listTablesByNamespace(Namespace namespace) {
     NamespaceUtil.checkTable(namespace);
 
-    Long schemaId = 
CommonMetaService.getInstance().getParentEntityIdByNamespace(namespace);
+    Long schemaId =
+        EntityIdService.getEntityId(
+            NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
 
     List<TablePO> tablePOs =
         SessionUtils.getWithoutCommit(
@@ -171,7 +175,8 @@ public class TableMetaService {
     String tableName = identifier.name();
 
     Long schemaId =
-        
CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace());
+        EntityIdService.getEntityId(
+            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.SCHEMA);
 
     TablePO oldTablePO = getTablePOBySchemaIdAndName(schemaId, tableName);
     List<ColumnPO> oldTableColumns =
@@ -190,8 +195,8 @@ public class TableMetaService {
     boolean isSchemaChanged = 
!newTableEntity.namespace().equals(oldTableEntity.namespace());
     Long newSchemaId =
         isSchemaChanged
-            ? CommonMetaService.getInstance()
-                .getParentEntityIdByNamespace(newTableEntity.namespace())
+            ? EntityIdService.getEntityId(
+                NameIdentifier.of(newTableEntity.namespace().levels()), 
Entity.EntityType.SCHEMA)
             : schemaId;
 
     TablePO newTablePO =
@@ -237,57 +242,58 @@ public class TableMetaService {
   public boolean deleteTable(NameIdentifier identifier) {
     NameIdentifierUtil.checkTable(identifier);
 
-    String tableName = identifier.name();
-
-    Long schemaId =
-        
CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace());
-
-    Long tableId = getTableIdBySchemaIdAndName(schemaId, tableName);
+    NamespacedEntityId namespacedEntityId =
+        EntityIdService.getEntityIds(identifier, Entity.EntityType.TABLE);
 
     AtomicInteger deleteResult = new AtomicInteger(0);
     TablePO[] tablePOHolder = new TablePO[1];
     SessionUtils.doMultipleWithCommit(
         () -> {
-          tablePOHolder[0] = getTablePOBySchemaIdAndName(schemaId, tableName);
+          tablePOHolder[0] =
+              
getTablePOBySchemaIdAndName(namespacedEntityId.namespaceIds()[2], 
identifier.name());
         },
         () ->
             deleteResult.set(
                 SessionUtils.getWithoutCommit(
                     TableMetaMapper.class,
-                    mapper -> mapper.softDeleteTableMetasByTableId(tableId))),
+                    mapper -> 
mapper.softDeleteTableMetasByTableId(namespacedEntityId.entityId()))),
         () -> {
           if (deleteResult.get() > 0) {
             SessionUtils.doWithoutCommit(
                 OwnerMetaMapper.class,
                 mapper ->
                     mapper.softDeleteOwnerRelByMetadataObjectIdAndType(
-                        tableId, MetadataObject.Type.TABLE.name()));
-            
TableColumnMetaService.getInstance().deleteColumnsByTableId(tableId);
+                        namespacedEntityId.entityId(), 
MetadataObject.Type.TABLE.name()));
+            TableColumnMetaService.getInstance()
+                .deleteColumnsByTableId(namespacedEntityId.entityId());
             SessionUtils.doWithoutCommit(
                 SecurableObjectMapper.class,
                 mapper ->
                     mapper.softDeleteObjectRelsByMetadataObject(
-                        tableId, MetadataObject.Type.TABLE.name()));
+                        namespacedEntityId.entityId(), 
MetadataObject.Type.TABLE.name()));
             SessionUtils.doWithoutCommit(
                 TagMetadataObjectRelMapper.class,
                 mapper ->
                     mapper.softDeleteTagMetadataObjectRelsByMetadataObject(
-                        tableId, MetadataObject.Type.TABLE.name()));
+                        namespacedEntityId.entityId(), 
MetadataObject.Type.TABLE.name()));
             SessionUtils.doWithoutCommit(
                 TagMetadataObjectRelMapper.class,
-                mapper -> 
mapper.softDeleteTagMetadataObjectRelsByTableId(tableId));
+                mapper ->
+                    
mapper.softDeleteTagMetadataObjectRelsByTableId(namespacedEntityId.entityId()));
 
             SessionUtils.doWithoutCommit(
                 StatisticMetaMapper.class,
-                mapper -> mapper.softDeleteStatisticsByEntityId(tableId));
+                mapper -> 
mapper.softDeleteStatisticsByEntityId(namespacedEntityId.entityId()));
             SessionUtils.doWithoutCommit(
                 PolicyMetadataObjectRelMapper.class,
-                mapper -> 
mapper.softDeletePolicyMetadataObjectRelsByTableId(tableId));
+                mapper ->
+                    mapper.softDeletePolicyMetadataObjectRelsByTableId(
+                        namespacedEntityId.entityId()));
             SessionUtils.doWithCommit(
                 TableVersionMapper.class,
                 mapper ->
                     mapper.softDeleteTableVersionByTableIdAndVersion(
-                        tableId, tablePOHolder[0].getCurrentVersion()));
+                        namespacedEntityId.entityId(), 
tablePOHolder[0].getCurrentVersion()));
           }
         });
 
@@ -315,11 +321,12 @@ public class TableMetaService {
 
   private void fillTablePOBuilderParentEntityId(TablePO.Builder builder, 
Namespace namespace) {
     NamespaceUtil.checkTable(namespace);
-    Long[] parentEntityIds =
-        
CommonMetaService.getInstance().getParentEntityIdsByNamespace(namespace);
-    builder.withMetalakeId(parentEntityIds[0]);
-    builder.withCatalogId(parentEntityIds[1]);
-    builder.withSchemaId(parentEntityIds[2]);
+    NamespacedEntityId namespacedEntityId =
+        EntityIdService.getEntityIds(
+            NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
+    builder.withMetalakeId(namespacedEntityId.namespaceIds()[0]);
+    builder.withCatalogId(namespacedEntityId.namespaceIds()[1]);
+    builder.withSchemaId(namespacedEntityId.entityId());
   }
 
   private TablePO getTablePOBySchemaIdAndName(Long schemaId, String tableName) 
{
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TagMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TagMetaService.java
index 0087b1e16b..53687df864 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TagMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TagMetaService.java
@@ -179,10 +179,7 @@ public class TagMetaService {
 
     List<TagPO> tagPOs = null;
     try {
-      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
-      Long metadataObjectId =
-          MetadataObjectService.getMetadataObjectId(
-              metalakeId, metadataObject.fullName(), metadataObject.type());
+      Long metadataObjectId = EntityIdService.getEntityId(objectIdent, 
objectType);
 
       tagPOs =
           SessionUtils.getWithoutCommit(
@@ -211,10 +208,7 @@ public class TagMetaService {
 
     TagPO tagPO = null;
     try {
-      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
-      Long metadataObjectId =
-          MetadataObjectService.getMetadataObjectId(
-              metalakeId, metadataObject.fullName(), metadataObject.type());
+      Long metadataObjectId = EntityIdService.getEntityId(objectIdent, 
objectType);
 
       tagPO =
           SessionUtils.getWithoutCommit(
@@ -307,10 +301,7 @@ public class TagMetaService {
     String metalake = objectIdent.namespace().level(0);
 
     try {
-      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
-      Long metadataObjectId =
-          MetadataObjectService.getMetadataObjectId(
-              metalakeId, metadataObject.fullName(), metadataObject.type());
+      Long metadataObjectId = EntityIdService.getEntityId(objectIdent, 
objectType);
 
       // Fetch all the tags need to associate with the metadata object.
       List<String> tagNamesToAdd =
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
index f467b0d364..4282742c28 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
@@ -31,6 +31,7 @@ import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.meta.NamespacedEntityId;
 import org.apache.gravitino.meta.TopicEntity;
 import org.apache.gravitino.metrics.Monitored;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
@@ -91,7 +92,9 @@ public class TopicMetaService {
   public List<TopicEntity> listTopicsByNamespace(Namespace namespace) {
     NamespaceUtil.checkTopic(namespace);
 
-    Long schemaId = 
CommonMetaService.getInstance().getParentEntityIdByNamespace(namespace);
+    Long schemaId =
+        EntityIdService.getEntityId(
+            NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
 
     List<TopicPO> topicPOs =
         SessionUtils.getWithoutCommit(
@@ -107,7 +110,9 @@ public class TopicMetaService {
 
     String topicName = ident.name();
 
-    Long schemaId = 
CommonMetaService.getInstance().getParentEntityIdByNamespace(ident.namespace());
+    Long schemaId =
+        EntityIdService.getEntityId(
+            NameIdentifier.of(ident.namespace().levels()), 
Entity.EntityType.SCHEMA);
 
     TopicPO oldTopicPO = getTopicPOBySchemaIdAndName(schemaId, topicName);
     TopicEntity oldTopicEntity = POConverters.fromTopicPO(oldTopicPO, 
ident.namespace());
@@ -156,11 +161,12 @@ public class TopicMetaService {
 
   private void fillTopicPOBuilderParentEntityId(TopicPO.Builder builder, 
Namespace namespace) {
     NamespaceUtil.checkTopic(namespace);
-    Long[] parentEntityIds =
-        
CommonMetaService.getInstance().getParentEntityIdsByNamespace(namespace);
-    builder.withMetalakeId(parentEntityIds[0]);
-    builder.withCatalogId(parentEntityIds[1]);
-    builder.withSchemaId(parentEntityIds[2]);
+    NamespacedEntityId namespacedEntityId =
+        EntityIdService.getEntityIds(
+            NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
+    builder.withMetalakeId(namespacedEntityId.namespaceIds()[0]);
+    builder.withCatalogId(namespacedEntityId.namespaceIds()[1]);
+    builder.withSchemaId(namespacedEntityId.entityId());
   }
 
   @Monitored(
@@ -170,7 +176,8 @@ public class TopicMetaService {
     NameIdentifierUtil.checkTopic(identifier);
 
     Long schemaId =
-        
CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace());
+        EntityIdService.getEntityId(
+            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.SCHEMA);
 
     TopicPO topicPO = getTopicPOBySchemaIdAndName(schemaId, identifier.name());
 
@@ -181,12 +188,7 @@ public class TopicMetaService {
   public boolean deleteTopic(NameIdentifier identifier) {
     NameIdentifierUtil.checkTopic(identifier);
 
-    String topicName = identifier.name();
-
-    Long schemaId =
-        
CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace());
-
-    Long topicId = getTopicIdBySchemaIdAndName(schemaId, topicName);
+    Long topicId = EntityIdService.getEntityId(identifier, 
Entity.EntityType.TOPIC);
 
     SessionUtils.doMultipleWithCommit(
         () ->
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/UserMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/UserMetaService.java
index 97d616aa50..709545a53b 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/UserMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/UserMetaService.java
@@ -174,9 +174,7 @@ public class UserMetaService {
   public boolean deleteUser(NameIdentifier identifier) {
     AuthorizationUtils.checkUser(identifier);
 
-    Long metalakeId =
-        
MetalakeMetaService.getInstance().getMetalakeIdByName(identifier.namespace().level(0));
-    Long userId = getUserIdByMetalakeIdAndName(metalakeId, identifier.name());
+    Long userId = EntityIdService.getEntityId(identifier, 
Entity.EntityType.USER);
 
     SessionUtils.doMultipleWithCommit(
         () ->
diff --git 
a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java 
b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
index d319d3b0e1..8979dd254a 100644
--- a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
@@ -18,10 +18,20 @@
  */
 package org.apache.gravitino.utils;
 
+import static org.apache.gravitino.Entity.EntityType.GROUP;
+import static org.apache.gravitino.Entity.EntityType.JOB;
+import static org.apache.gravitino.Entity.EntityType.JOB_TEMPLATE;
+import static org.apache.gravitino.Entity.EntityType.POLICY;
+import static org.apache.gravitino.Entity.EntityType.ROLE;
+import static org.apache.gravitino.Entity.EntityType.TAG;
+import static org.apache.gravitino.Entity.EntityType.USER;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.gravitino.Entity;
@@ -42,6 +52,9 @@ public class NameIdentifierUtil {
 
   private NameIdentifierUtil() {}
 
+  private static final Set<Entity.EntityType> SUPPORT_VIRTUAL_NAMESPACE_TYPES =
+      ImmutableSet.of(USER, GROUP, ROLE, TAG, POLICY, JOB, JOB_TEMPLATE);
+
   /**
    * Create the metalake {@link NameIdentifierUtil} with the given name.
    *
@@ -600,4 +613,50 @@ public class NameIdentifierUtil {
     }
     return NameIdentifier.of(allElems.get(0), allElems.get(1), 
allElems.get(2), allElems.get(3));
   }
+
+  public static NameIdentifier parentNameIdentifier(
+      NameIdentifier nameIdentifier, Entity.EntityType type) {
+
+    if (SUPPORT_VIRTUAL_NAMESPACE_TYPES.contains(type)) {
+      return NameIdentifier.of(NameIdentifierUtil.getMetalake(nameIdentifier));
+
+    } else if (nameIdentifier.hasNamespace()) {
+      return NameIdentifier.of(nameIdentifier.namespace().levels());
+
+    } else {
+      throw new IllegalArgumentException("The entity has no parent name 
identifier");
+    }
+  }
+
+  public static Entity.EntityType parentEntityType(Entity.EntityType type) {
+    switch (type) {
+      case COLUMN:
+        return Entity.EntityType.TABLE;
+      case MODEL_VERSION:
+        return Entity.EntityType.MODEL;
+
+      case TABLE:
+      case FILESET:
+      case MODEL:
+      case TOPIC:
+        return Entity.EntityType.SCHEMA;
+
+      case SCHEMA:
+        return Entity.EntityType.CATALOG;
+
+      case CATALOG:
+      case USER:
+      case GROUP:
+      case ROLE:
+      case TAG:
+      case POLICY:
+      case JOB:
+      case JOB_TEMPLATE:
+        return Entity.EntityType.METALAKE;
+
+      case METALAKE:
+      default:
+        throw new IllegalArgumentException("Metalake has no parent entity 
type");
+    }
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
index 4df794754c..cbe019d2f7 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
@@ -93,6 +93,7 @@ import 
org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.GroupMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.UserMetaMapper;
 import org.apache.gravitino.storage.relational.service.CatalogMetaService;
+import org.apache.gravitino.storage.relational.service.EntityIdService;
 import org.apache.gravitino.storage.relational.service.MetalakeMetaService;
 import org.apache.gravitino.storage.relational.service.RoleMetaService;
 import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
@@ -160,6 +161,7 @@ public class TestJDBCBackend {
     try {
       backend = (RelationalBackend) 
Class.forName(className).getDeclaredConstructor().newInstance();
       backend.initialize(config);
+      EntityIdService.initialize(new RelationalEntityStoreIdResolver());
     } catch (Exception e) {
       throw new RuntimeException(
           "Failed to create and initialize RelationalBackend by name: " + 
backendName, e);
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestModelMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestModelMetaService.java
index b10b1adcab..3b294fb0df 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestModelMetaService.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestModelMetaService.java
@@ -24,6 +24,7 @@ import java.time.Instant;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
+import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
@@ -110,7 +111,8 @@ public class TestModelMetaService extends TestJDBCBackend {
         NoSuchEntityException.class, () -> 
ModelMetaService.getInstance().getModelPOById(111L));
 
     // Test get model id by name
-    Long schemaId = 
CommonMetaService.getInstance().getParentEntityIdByNamespace(MODEL_NS);
+    Long schemaId =
+        EntityIdService.getEntityId(NameIdentifier.of(MODEL_NS.levels()), 
Entity.EntityType.SCHEMA);
     Long modelId =
         
ModelMetaService.getInstance().getModelIdBySchemaIdAndModelName(schemaId, 
"model2");
     Assertions.assertEquals(modelEntity2.id(), modelId);
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSecurableObjects.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSecurableObjects.java
index 950ed6f845..fcfc34d9f2 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSecurableObjects.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSecurableObjects.java
@@ -52,19 +52,24 @@ public class TestSecurableObjects extends TestJDBCBackend {
     String metalakeName = "metalake";
     AuditInfo auditInfo =
         
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
-    BaseMetalake metalake =
-        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName + 
"2", auditInfo);
-    backend.insert(metalake, false);
 
-    BaseMetalake metalake2 =
+    BaseMetalake metalake =
         createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
-    backend.insert(metalake2, false);
+    backend.insert(metalake, false);
 
     CatalogEntity catalog =
         createCatalog(
             RandomIdGenerator.INSTANCE.nextId(), Namespace.of("metalake"), 
"catalog", auditInfo);
     backend.insert(catalog, false);
 
+    CatalogEntity catalog2 =
+        createCatalog(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake"),
+            metalake.name(),
+            auditInfo);
+    backend.insert(catalog2, false);
+
     SchemaEntity schema =
         createSchemaEntity(
             RandomIdGenerator.INSTANCE.nextId(),
@@ -101,15 +106,15 @@ public class TestSecurableObjects extends TestJDBCBackend 
{
         SecurableObjects.ofMetalake(
             metalake.name(), 
Lists.newArrayList(Privileges.UseCatalog.allow()));
 
-    SecurableObject metalakeObject2 =
-        SecurableObjects.ofMetalake(
-            metalake2.name(), 
Lists.newArrayList(Privileges.UseCatalog.allow()));
-
     SecurableObject catalogObject =
         SecurableObjects.ofCatalog(
             "catalog",
             Lists.newArrayList(Privileges.UseCatalog.allow(), 
Privileges.CreateSchema.deny()));
 
+    SecurableObject catalogObject2 =
+        SecurableObjects.ofCatalog(
+            metalake.name(), 
Lists.newArrayList(Privileges.UseCatalog.allow()));
+
     SecurableObject schemaObject =
         SecurableObjects.ofSchema(
             catalogObject, "schema", 
Lists.newArrayList(Privileges.UseSchema.allow()));
@@ -129,7 +134,7 @@ public class TestSecurableObjects extends TestJDBCBackend {
     ArrayList<SecurableObject> securableObjects =
         Lists.newArrayList(
             metalakeObject,
-            metalakeObject2,
+            catalogObject2,
             catalogObject,
             schemaObject,
             tableObject,

Reply via email to