This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 31e7c4b088 [#11103] feat(core): Add the hierarchy conversion layer 
(#11074)
31e7c4b088 is described below

commit 31e7c4b088a7d94c1ca7e050fc2706ee6ccdea65
Author: roryqi <[email protected]>
AuthorDate: Mon May 18 17:30:14 2026 +0800

    [#11103] feat(core): Add the hierarchy conversion layer (#11074)
    
    ### What changes were proposed in this pull request?
    
    Add abstract class BasePOStorageOps to let all the PO logic to one class
    
    Add
    SchemaPOStorageOps,TablePOStorageOps,FunctionPOStorageOps,ViewPOStorageOps.
    
    Add Hierachical convention class
    
    ### Why are the changes needed?
    
    Fix: #11103
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added tests.
---
 .../fileset/TestFilesetCatalogOperations.java      |   2 +-
 .../gravitino/SupportsRelationOperations.java      |  28 ++
 .../gravitino/storage/relational/JDBCBackend.java  |  23 ++
 .../storage/relational/RelationalEntityStore.java  |  22 +-
 .../relational/service/BasePOStorageOps.java       |  78 +++++
 .../relational/service/FunctionMetaService.java    | 127 ++------
 .../relational/service/FunctionPOStorageOps.java   |  98 ++++++
 .../HierarchicalConversionPOStorageOps.java        | 193 ++++++++++++
 .../relational/service/MetadataObjectService.java  |  33 ++-
 .../relational/service/POStorageReadRouting.java   | 118 ++++++++
 .../relational/service/SchemaMetaService.java      | 222 +++++---------
 .../relational/service/SchemaPOStorageOps.java     | 112 +++++++
 .../relational/service/TableMetaService.java       | 139 ++-------
 .../relational/service/TablePOStorageOps.java      | 128 ++++++++
 .../relational/service/ViewMetaService.java        | 122 ++------
 .../relational/service/ViewPOStorageOps.java       | 120 ++++++++
 .../relational/TestJDBCBackendBatchInsert.java     | 170 +++++++++++
 .../TestHierarchicalConversionPOStorageOps.java    | 327 +++++++++++++++++++++
 .../service/TestPOStorageReadRouting.java          | 152 ++++++++++
 .../relational/service/TestSchemaMetaService.java  |  38 +--
 20 files changed, 1764 insertions(+), 488 deletions(-)

diff --git 
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
 
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
index a0dbce64a2..492cdcc38b 100644
--- 
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
+++ 
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
@@ -325,7 +325,7 @@ public class TestFilesetCatalogOperations {
     MockedStatic<CatalogMetaService> catalogMetaServiceMockedStatic =
         Mockito.mockStatic(CatalogMetaService.class);
     MockedStatic<SchemaMetaService> schemaMetaServiceMockedStatic =
-        Mockito.mockStatic(SchemaMetaService.class);
+        Mockito.mockStatic(SchemaMetaService.class, 
Mockito.CALLS_REAL_METHODS);
 
     metalakeMetaServiceMockedStatic
         .when(MetalakeMetaService::getInstance)
diff --git 
a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java 
b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
index 854760060d..25c4fc60b1 100644
--- a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
+++ b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
@@ -165,6 +165,34 @@ public interface SupportsRelationOperations {
       boolean override)
       throws IOException;
 
+  /**
+   * Batch inserts the same relation from many source entities to one 
destination. Parameter order
+   * matches {@link #insertRelation}. All sources share one {@code srcType}. 
Semantics match
+   * repeated {@link #insertRelation} with the same {@code relType}; a 
relational backend may use
+   * fewer round-trips for some relation types (e.g. {@link Type#OWNER_REL}).
+   *
+   * @param relType the relation type (e.g. {@link Type#OWNER_REL})
+   * @param srcIdentifiers identifiers of the source side for each relation row
+   * @param srcType entity type shared by every identifier in {@code 
srcIdentifiers}
+   * @param dstIdentifier destination entity identifier (shared by all rows)
+   * @param dstType destination entity type
+   * @param override if true, replace existing relations of each source entity 
first, per {@link
+   *     #insertRelation}
+   * @throws IOException if the storage operation fails
+   */
+  default void batchInsertRelations(
+      Type relType,
+      List<NameIdentifier> srcIdentifiers,
+      Entity.EntityType srcType,
+      NameIdentifier dstIdentifier,
+      Entity.EntityType dstType,
+      boolean override)
+      throws IOException {
+    for (NameIdentifier srcIdent : srcIdentifiers) {
+      insertRelation(relType, srcIdent, srcType, dstIdentifier, dstType, 
override);
+    }
+  }
+
   /**
    * Updates the relations for a given entity by adding a set of new relations 
and removing another
    * set of relations.
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index 22c7c98330..f401cd50be 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -733,6 +733,29 @@ public class JDBCBackend implements RelationalBackend {
     }
   }
 
+  @Override
+  public void batchInsertRelations(
+      Type relType,
+      List<NameIdentifier> srcIdentifiers,
+      Entity.EntityType srcType,
+      NameIdentifier dstIdentifier,
+      Entity.EntityType dstType,
+      boolean override)
+      throws IOException {
+    if (srcIdentifiers == null || srcIdentifiers.isEmpty()) {
+      return;
+    }
+    switch (relType) {
+      case OWNER_REL:
+        OwnerMetaService.getInstance()
+            .batchSetOwners(srcIdentifiers, srcType, dstIdentifier, dstType);
+        break;
+      default:
+        throw new IllegalArgumentException(
+            String.format("Doesn't support batch insert for the relation type 
%s", relType));
+    }
+  }
+
   @Override
   public <E extends Entity & HasIdentifier> List<E> updateEntityRelations(
       Type relType,
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
index d89c6c678d..27381659d6 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
@@ -55,7 +55,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Relation store to store entities. This means we can store entities in a 
relational store. I.e.,
  * MySQL, PostgreSQL, etc. If you want to use a different backend, you can 
implement the {@link
- * RelationalBackend} interface
+ * RelationalBackend} interface. The default JDBC backend is {@link 
JDBCBackend}.
  */
 public class RelationalEntityStore implements EntityStore, 
SupportsRelationOperations {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RelationalEntityStore.class);
@@ -327,6 +327,26 @@ public class RelationalEntityStore implements EntityStore, 
SupportsRelationOpera
     cache.invalidate(dstIdentifier, dstType, relType);
   }
 
+  @Override
+  public void batchInsertRelations(
+      Type relType,
+      List<NameIdentifier> srcIdentifiers,
+      Entity.EntityType srcType,
+      NameIdentifier dstIdentifier,
+      Entity.EntityType dstType,
+      boolean override)
+      throws IOException {
+    if (srcIdentifiers == null || srcIdentifiers.isEmpty()) {
+      return;
+    }
+    backend.batchInsertRelations(
+        relType, srcIdentifiers, srcType, dstIdentifier, dstType, override);
+    for (NameIdentifier ident : srcIdentifiers) {
+      cache.invalidate(ident, srcType, relType);
+    }
+    cache.invalidate(dstIdentifier, dstType, relType);
+  }
+
   @Override
   public <E extends Entity & HasIdentifier> List<E> updateEntityRelations(
       Type relType,
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/BasePOStorageOps.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/BasePOStorageOps.java
new file mode 100644
index 0000000000..645b239963
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/BasePOStorageOps.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.util.List;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+
+public abstract class BasePOStorageOps<PO, Mapper> {
+  public void insertPO(Mapper mapper, PO po, boolean overwrite) {
+    throw new UnsupportedOperationException(
+        "insertPO is not supported by " + getClass().getSimpleName());
+  }
+
+  public void batchInsertPOs(Mapper mapper, List<PO> pos, boolean overwrite) {
+    throw new UnsupportedOperationException(
+        "batchInsertPOs is not supported by " + getClass().getSimpleName());
+  }
+
+  public Integer updatePO(Mapper mapper, PO newPO, PO oldPO) {
+    throw new UnsupportedOperationException(
+        "updatePO is not supported by " + getClass().getSimpleName());
+  }
+
+  /**
+   * When {@code true} and the entity-id cache is enabled, callers may resolve 
rows by parent entity
+   * id plus short name via {@link #getPO} and {@link #listPOs}; see {@link 
POStorageReadRouting}.
+   */
+  public boolean supportsParentIdRelationalRead() {
+    return false;
+  }
+
+  public PO getPO(Mapper mapper, Long parentId, String name) {
+    throw new UnsupportedOperationException(
+        "getPO by parent id is not supported by " + 
getClass().getSimpleName());
+  }
+
+  public List<PO> listPOs(Mapper mapper, Long parentId) {
+    throw new UnsupportedOperationException(
+        "listPOs by parent id is not supported by " + 
getClass().getSimpleName());
+  }
+
+  public List<PO> listPOs(Mapper mapper, Namespace namespace, List<String> 
names) {
+    throw new UnsupportedOperationException(
+        "listPOs by namespace and names is not supported by " + 
getClass().getSimpleName());
+  }
+
+  public List<PO> listPOs(Mapper mapper, List<Long> entityIds) {
+    throw new UnsupportedOperationException(
+        "listPOs by entityIds is not supported by " + 
getClass().getSimpleName());
+  }
+
+  public PO getPOByFullName(Mapper mapper, NameIdentifier identifier) {
+    throw new UnsupportedOperationException(
+        "getPOByFullName is not supported by " + getClass().getSimpleName());
+  }
+
+  public List<PO> listPOsByNSFullName(Mapper mapper, Namespace namespace) {
+    throw new UnsupportedOperationException(
+        "listPOsByNSFullName is not supported by " + 
getClass().getSimpleName());
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionMetaService.java
index 0377afed00..33532edad2 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionMetaService.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
-import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
@@ -56,14 +55,17 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class FunctionMetaService {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FunctionMetaService.class);
+  private static final FunctionMetaService INSTANCE = new 
FunctionMetaService();
+  private BasePOStorageOps<FunctionPO, FunctionMetaMapper> ops;
+
   public static FunctionMetaService getInstance() {
     return INSTANCE;
   }
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(FunctionMetaService.class);
-  private static final FunctionMetaService INSTANCE = new 
FunctionMetaService();
-
-  private FunctionMetaService() {}
+  private FunctionMetaService() {
+    this.ops = new HierarchicalConversionPOStorageOps<>(new 
FunctionPOStorageOps());
+  }
 
   @Monitored(
       metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
@@ -87,18 +89,17 @@ public class FunctionMetaService {
       metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
       baseMetricName = "getFunctionIdBySchemaIdAndFunctionName")
   public Long getFunctionIdBySchemaIdAndFunctionName(Long schemaId, String 
functionName) {
-    Long functionId =
+    FunctionPO functionPO =
         SessionUtils.getWithoutCommit(
-            FunctionMetaMapper.class,
-            mapper -> 
mapper.selectFunctionIdBySchemaIdAndFunctionName(schemaId, functionName));
+            FunctionMetaMapper.class, mapper -> ops.getPO(mapper, schemaId, 
functionName));
 
-    if (functionId == null) {
+    if (functionPO == null) {
       throw new NoSuchEntityException(
           NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
           Entity.EntityType.FUNCTION.name().toLowerCase(Locale.ROOT),
           functionName);
     }
-    return functionId;
+    return functionPO.functionId();
   }
 
   @Monitored(
@@ -115,14 +116,7 @@ public class FunctionMetaService {
       SessionUtils.doMultipleWithCommit(
           () ->
               SessionUtils.doWithoutCommit(
-                  FunctionMetaMapper.class,
-                  mapper -> {
-                    if (overwrite) {
-                      mapper.insertFunctionMetaOnDuplicateKeyUpdate(po);
-                    } else {
-                      mapper.insertFunctionMeta(po);
-                    }
-                  }),
+                  FunctionMetaMapper.class, mapper -> ops.insertPO(mapper, po, 
overwrite)),
           () ->
               SessionUtils.doWithoutCommit(
                   FunctionVersionMetaMapper.class,
@@ -231,8 +225,18 @@ public class FunctionMetaService {
       baseMetricName = "getFunctionPOByIdentifier")
   FunctionPO getFunctionPOByIdentifier(NameIdentifier ident) {
     NameIdentifierUtil.checkFunction(ident);
+    FunctionPO functionPO =
+        SessionUtils.getWithoutCommit(
+            FunctionMetaMapper.class,
+            mapper -> POStorageReadRouting.getPO(mapper, ident, ops, 
Entity.EntityType.FUNCTION));
 
-    return functionPOFetcher().apply(ident);
+    if (functionPO == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.FUNCTION.name().toLowerCase(Locale.ROOT),
+          ident.name());
+    }
+    return functionPO;
   }
 
   @Monitored(
@@ -260,7 +264,7 @@ public class FunctionMetaService {
           () ->
               SessionUtils.doWithoutCommit(
                   FunctionMetaMapper.class,
-                  mapper -> mapper.updateFunctionMeta(newFunctionPO, 
oldFunctionPO)));
+                  mapper -> ops.updatePO(mapper, newFunctionPO, 
oldFunctionPO)));
 
       return newEntity;
     } catch (RuntimeException re) {
@@ -270,89 +274,14 @@ public class FunctionMetaService {
     }
   }
 
-  private Function<NameIdentifier, FunctionPO> functionPOFetcher() {
-    return GravitinoEnv.getInstance().cacheEnabled()
-        ? this::getFunctionPOBySchemaId
-        : this::getFunctionPOByFullQualifiedName;
-  }
-
-  private FunctionPO getFunctionPOBySchemaId(NameIdentifier ident) {
-    Long schemaId =
-        EntityIdService.getEntityId(
-            NameIdentifier.of(ident.namespace().levels()), 
Entity.EntityType.SCHEMA);
-
-    FunctionPO functionPO =
-        SessionUtils.getWithoutCommit(
-            FunctionMetaMapper.class,
-            mapper -> mapper.selectFunctionMetaBySchemaIdAndName(schemaId, 
ident.name()));
-
-    if (functionPO == null) {
-      throw new NoSuchEntityException(
-          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-          Entity.EntityType.FUNCTION.name().toLowerCase(Locale.ROOT),
-          ident.toString());
-    }
-    return functionPO;
-  }
-
-  private FunctionPO getFunctionPOByFullQualifiedName(NameIdentifier ident) {
-    String[] namespaceLevels = ident.namespace().levels();
-    FunctionPO functionPO =
-        SessionUtils.getWithoutCommit(
-            FunctionMetaMapper.class,
-            mapper ->
-                mapper.selectFunctionMetaByFullQualifiedName(
-                    namespaceLevels[0], namespaceLevels[1], 
namespaceLevels[2], ident.name()));
-
-    if (functionPO == null || functionPO.functionId() == null) {
-      throw new NoSuchEntityException(
-          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-          Entity.EntityType.FUNCTION.name().toLowerCase(Locale.ROOT),
-          ident.name());
-    }
-
-    if (functionPO.schemaId() == null) {
-      throw new NoSuchEntityException(
-          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-          Entity.EntityType.SCHEMA.name().toLowerCase(),
-          namespaceLevels[2]);
-    }
-    return functionPO;
+  public BasePOStorageOps<FunctionPO, FunctionMetaMapper> ops() {
+    return ops;
   }
 
   private List<FunctionPO> listFunctionPOs(Namespace namespace) {
-    return functionListFetcher().apply(namespace);
-  }
-
-  private List<FunctionPO> listFunctionPOsBySchemaId(Namespace namespace) {
-    Long schemaId =
-        EntityIdService.getEntityId(
-            NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
     return SessionUtils.getWithoutCommit(
-        FunctionMetaMapper.class, mapper -> 
mapper.listFunctionPOsBySchemaId(schemaId));
-  }
-
-  private Function<Namespace, List<FunctionPO>> functionListFetcher() {
-    return GravitinoEnv.getInstance().cacheEnabled()
-        ? this::listFunctionPOsBySchemaId
-        : this::listFunctionPOsByFullQualifiedName;
-  }
-
-  private List<FunctionPO> listFunctionPOsByFullQualifiedName(Namespace 
namespace) {
-    String[] namespaceLevels = namespace.levels();
-    List<FunctionPO> functionPOs =
-        SessionUtils.getWithoutCommit(
-            FunctionMetaMapper.class,
-            mapper ->
-                mapper.listFunctionPOsByFullQualifiedName(
-                    namespaceLevels[0], namespaceLevels[1], 
namespaceLevels[2]));
-    if (functionPOs.isEmpty() || functionPOs.get(0).schemaId() == null) {
-      throw new NoSuchEntityException(
-          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-          Entity.EntityType.SCHEMA.name().toLowerCase(),
-          namespaceLevels[2]);
-    }
-    return functionPOs.stream().filter(po -> po.functionId() != 
null).collect(Collectors.toList());
+        FunctionMetaMapper.class,
+        mapper -> POStorageReadRouting.listPOs(mapper, namespace, ops, 
Entity.EntityType.FUNCTION));
   }
 
   private void fillFunctionPOBuilderParentEntityId(
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionPOStorageOps.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionPOStorageOps.java
new file mode 100644
index 0000000000..45f22a6b3e
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionPOStorageOps.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper;
+import org.apache.gravitino.storage.relational.po.FunctionPO;
+
+public class FunctionPOStorageOps extends BasePOStorageOps<FunctionPO, 
FunctionMetaMapper> {
+
+  public FunctionPOStorageOps() {}
+
+  @Override
+  public void insertPO(FunctionMetaMapper mapper, FunctionPO functionPO, 
boolean overwrite) {
+    if (overwrite) {
+      mapper.insertFunctionMetaOnDuplicateKeyUpdate(functionPO);
+    } else {
+      mapper.insertFunctionMeta(functionPO);
+    }
+  }
+
+  @Override
+  public Integer updatePO(FunctionMetaMapper mapper, FunctionPO newPO, 
FunctionPO oldPO) {
+    return mapper.updateFunctionMeta(newPO, oldPO);
+  }
+
+  @Override
+  public FunctionPO getPO(FunctionMetaMapper mapper, Long parentId, String 
name) {
+    return mapper.selectFunctionMetaBySchemaIdAndName(parentId, name);
+  }
+
+  @Override
+  public FunctionPO getPOByFullName(FunctionMetaMapper mapper, NameIdentifier 
identifier) {
+    Namespace namespace = identifier.namespace();
+    return mapper.selectFunctionMetaByFullQualifiedName(
+        namespace.level(0), namespace.level(1), namespace.level(2), 
identifier.name());
+  }
+
+  @Override
+  public List<FunctionPO> listPOs(FunctionMetaMapper mapper, Long parentId) {
+    return mapper.listFunctionPOsBySchemaId(parentId);
+  }
+
+  @Override
+  public List<FunctionPO> listPOs(FunctionMetaMapper mapper, List<Long> 
entityIds) {
+    return mapper.listFunctionPOsByFunctionIds(entityIds);
+  }
+
+  @Override
+  public List<FunctionPO> listPOsByNSFullName(FunctionMetaMapper mapper, 
Namespace namespace) {
+    List<FunctionPO> pos =
+        mapper.listFunctionPOsByFullQualifiedName(
+            namespace.level(0), namespace.level(1), namespace.level(2));
+    // INNER JOIN on metalake/catalog: an empty result means the metalake or 
catalog does not exist.
+    if (pos.isEmpty()) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.CATALOG.name().toLowerCase(),
+          namespace.level(1));
+    }
+    // LEFT JOIN on schema_meta: rows with non-null catalogId but null 
schemaId mean the
+    // catalog exists but the schema does not.
+    if (pos.get(0).schemaId() == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.SCHEMA.name().toLowerCase(),
+          namespace.level(2));
+    }
+    // LEFT JOIN on function_meta: filter out the placeholder row for a schema 
without functions.
+    return pos.stream().filter(po -> po.functionId() != 
null).collect(Collectors.toList());
+  }
+
+  @Override
+  public boolean supportsParentIdRelationalRead() {
+    return true;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/HierarchicalConversionPOStorageOps.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/HierarchicalConversionPOStorageOps.java
new file mode 100644
index 0000000000..35699c8377
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/HierarchicalConversionPOStorageOps.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.util.List;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.utils.HierarchicalSchemaUtil;
+
+/**
+ * Wraps a {@link BasePOStorageOps} to bridge the hierarchical schema naming 
conversation.
+ * Identifiers and namespace segments in logical form (logical separator) are 
translated to physical
+ * form (physical separator) before delegating. Two optional PO rewriters 
allow callers to translate
+ * a PO field across the boundary: {@code physicalToLogicalRewriter} is 
applied to POs returned from
+ * read methods (typically physical→logical), and {@code 
logicalToPhysicalRewriter} is applied to
+ * POs passed into write methods (typically logical→physical) so the SQL still 
receives storage-form
+ * values.
+ *
+ * @param <PO> persistent object type
+ * @param <Mapper> MyBatis mapper type
+ */
+public class HierarchicalConversionPOStorageOps<PO, Mapper> extends 
BasePOStorageOps<PO, Mapper> {
+
+  private final BasePOStorageOps<PO, Mapper> delegate;
+  private final UnaryOperator<PO> physicalToLogicalRewriter;
+  private final UnaryOperator<PO> logicalToPhysicalRewriter;
+
+  public HierarchicalConversionPOStorageOps(BasePOStorageOps<PO, Mapper> 
delegate) {
+    this(delegate, UnaryOperator.identity(), UnaryOperator.identity());
+  }
+
+  public HierarchicalConversionPOStorageOps(
+      BasePOStorageOps<PO, Mapper> delegate,
+      UnaryOperator<PO> physicalToLogicalRewriter,
+      UnaryOperator<PO> logicalToPhysicalRewriter) {
+    this.delegate = delegate;
+    this.physicalToLogicalRewriter = physicalToLogicalRewriter;
+    this.logicalToPhysicalRewriter = logicalToPhysicalRewriter;
+  }
+
+  @Override
+  public void insertPO(Mapper mapper, PO po, boolean overwrite) {
+    delegate.insertPO(mapper, logicalToPhysicalRewriter.apply(po), overwrite);
+  }
+
+  @Override
+  public void batchInsertPOs(Mapper mapper, List<PO> pos, boolean overwrite) {
+    delegate.batchInsertPOs(mapper, convertLogicalToPhysical(pos), overwrite);
+  }
+
+  @Override
+  public Integer updatePO(Mapper mapper, PO newPO, PO oldPO) {
+    return delegate.updatePO(
+        mapper, logicalToPhysicalRewriter.apply(newPO), 
logicalToPhysicalRewriter.apply(oldPO));
+  }
+
+  @Override
+  public PO getPO(Mapper mapper, Long parentId, String name) {
+    return convertPhysicalToLogical(
+        delegate.getPO(mapper, parentId, toPhysicalIfHierarchical(name)));
+  }
+
+  @Override
+  public List<PO> listPOs(Mapper mapper, Long parentId) {
+    return convertPhysicalToLogical(delegate.listPOs(mapper, parentId));
+  }
+
+  @Override
+  public List<PO> listPOs(Mapper mapper, Namespace logicalNamespace, 
List<String> names) {
+    Namespace physicalNamespace = logicalToPhysicalNamespace(logicalNamespace);
+    List<String> physicalNames =
+        names.stream()
+            .map(HierarchicalConversionPOStorageOps::toPhysicalIfHierarchical)
+            .collect(Collectors.toList());
+    return convertPhysicalToLogical(delegate.listPOs(mapper, 
physicalNamespace, physicalNames));
+  }
+
+  @Override
+  public List<PO> listPOs(Mapper mapper, List<Long> entityIds) {
+    return convertPhysicalToLogical(delegate.listPOs(mapper, entityIds));
+  }
+
+  @Override
+  public PO getPOByFullName(Mapper mapper, NameIdentifier logical) {
+    NameIdentifier physical = logicalToPhysicalIdentifier(logical);
+    return convertPhysicalToLogical(delegate.getPOByFullName(mapper, 
physical));
+  }
+
+  @Override
+  public List<PO> listPOsByNSFullName(Mapper mapper, Namespace logical) {
+    Namespace physical = logicalToPhysicalNamespace(logical);
+    return convertPhysicalToLogical(delegate.listPOsByNSFullName(mapper, 
physical));
+  }
+
+  @Override
+  public boolean supportsParentIdRelationalRead() {
+    return delegate.supportsParentIdRelationalRead();
+  }
+
+  private PO convertPhysicalToLogical(PO po) {
+    return po == null ? null : physicalToLogicalRewriter.apply(po);
+  }
+
+  private List<PO> convertPhysicalToLogical(List<PO> pos) {
+    if (pos == null || pos.isEmpty()) {
+      return pos;
+    }
+    return 
pos.stream().map(this::convertPhysicalToLogical).collect(Collectors.toList());
+  }
+
+  private List<PO> convertLogicalToPhysical(List<PO> pos) {
+    if (pos == null || pos.isEmpty()) {
+      return pos;
+    }
+    return 
pos.stream().map(logicalToPhysicalRewriter).collect(Collectors.toList());
+  }
+
+  private static String toPhysicalIfHierarchical(String name) {
+    if (StringUtils.isBlank(name)) {
+      return name;
+    }
+    String sep = HierarchicalSchemaUtil.schemaSeparator();
+    if (!name.contains(sep)) {
+      return name;
+    }
+    return HierarchicalSchemaUtil.logicalToPhysical(name, sep);
+  }
+
+  private static NameIdentifier logicalToPhysicalIdentifier(NameIdentifier 
logical) {
+    String[] levels = logical.namespace().levels();
+    if (levels.length == 2) {
+      String rawName = logical.name();
+      String physicalName =
+          StringUtils.isNotBlank(rawName)
+              ? HierarchicalSchemaUtil.logicalToPhysical(
+                  rawName, HierarchicalSchemaUtil.schemaSeparator())
+              : rawName;
+      if (physicalName.equals(logical.name())) {
+        return logical;
+      }
+      return NameIdentifier.of(logical.namespace(), physicalName);
+    }
+    if (levels.length == 3) {
+      String rawSeg = levels[2];
+      String physicalSchema =
+          StringUtils.isNotBlank(rawSeg)
+              ? HierarchicalSchemaUtil.logicalToPhysical(
+                  rawSeg, HierarchicalSchemaUtil.schemaSeparator())
+              : rawSeg;
+      if (physicalSchema.equals(levels[2])) {
+        return logical;
+      }
+      return NameIdentifier.of(Namespace.of(levels[0], levels[1], 
physicalSchema), logical.name());
+    }
+    return logical;
+  }
+
+  private static Namespace logicalToPhysicalNamespace(Namespace logical) {
+    String[] levels = logical.levels();
+    if (levels.length != 3) {
+      return logical;
+    }
+    String rawSeg = levels[2];
+    String physicalSchema =
+        StringUtils.isNotBlank(rawSeg)
+            ? HierarchicalSchemaUtil.logicalToPhysical(
+                rawSeg, HierarchicalSchemaUtil.schemaSeparator())
+            : rawSeg;
+    if (physicalSchema.equals(levels[2])) {
+      return logical;
+    }
+    return Namespace.of(levels[0], levels[1], physicalSchema);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
index ee8f7ff8ad..634f63be01 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
@@ -96,6 +96,14 @@ public class MetadataObjectService {
                   MetadataObjectService::getJobTemplateObjectsFullName)
               .build();
 
+  static final Map<MetadataObject.Type, BasePOStorageOps<?, ?>> 
TYPE_TO_STORAGE_OPS_MAP =
+      ImmutableMap.<MetadataObject.Type, BasePOStorageOps<?, ?>>builder()
+          .put(MetadataObject.Type.SCHEMA, 
SchemaMetaService.getInstance().ops())
+          .put(MetadataObject.Type.TABLE, TableMetaService.getInstance().ops())
+          .put(MetadataObject.Type.VIEW, ViewMetaService.getInstance().ops())
+          .put(MetadataObject.Type.FUNCTION, 
FunctionMetaService.getInstance().ops())
+          .build();
+
   private static Map<Long, String> getPolicyObjectsFullName(List<Long> 
policyIds) {
     if (policyIds == null || policyIds.isEmpty()) {
       return Maps.newHashMap();
@@ -345,9 +353,13 @@ public class MetadataObjectService {
       return Maps.newHashMap();
     }
 
+    @SuppressWarnings("unchecked")
+    BasePOStorageOps<FunctionPO, FunctionMetaMapper> ops =
+        (BasePOStorageOps<FunctionPO, FunctionMetaMapper>)
+            TYPE_TO_STORAGE_OPS_MAP.get(MetadataObject.Type.FUNCTION);
     List<FunctionPO> functionPOs =
         SessionUtils.getWithoutCommit(
-            FunctionMetaMapper.class, mapper -> 
mapper.listFunctionPOsByFunctionIds(functionIds));
+            FunctionMetaMapper.class, mapper -> ops.listPOs(mapper, 
functionIds));
 
     if (functionPOs == null || functionPOs.isEmpty()) {
       return new HashMap<>();
@@ -395,9 +407,13 @@ public class MetadataObjectService {
       return Maps.newHashMap();
     }
 
+    @SuppressWarnings("unchecked")
+    BasePOStorageOps<TablePO, TableMetaMapper> ops =
+        (BasePOStorageOps<TablePO, TableMetaMapper>)
+            TYPE_TO_STORAGE_OPS_MAP.get(MetadataObject.Type.TABLE);
     List<TablePO> tablePOs =
         SessionUtils.getWithoutCommit(
-            TableMetaMapper.class, mapper -> 
mapper.listTablePOsByTableIds(tableIds));
+            TableMetaMapper.class, mapper -> ops.listPOs(mapper, tableIds));
 
     if (tablePOs == null || tablePOs.isEmpty()) {
       return Maps.newHashMap();
@@ -534,9 +550,12 @@ public class MetadataObjectService {
       metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
       baseMetricName = "getViewObjectsFullName")
   public static Map<Long, String> getViewObjectsFullName(List<Long> viewIds) {
+    @SuppressWarnings("unchecked")
+    BasePOStorageOps<ViewPO, ViewMetaMapper> ops =
+        (BasePOStorageOps<ViewPO, ViewMetaMapper>)
+            TYPE_TO_STORAGE_OPS_MAP.get(MetadataObject.Type.VIEW);
     List<ViewPO> viewPOs =
-        SessionUtils.getWithoutCommit(
-            ViewMetaMapper.class, mapper -> 
mapper.listViewPOsByViewIds(viewIds));
+        SessionUtils.getWithoutCommit(ViewMetaMapper.class, mapper -> 
ops.listPOs(mapper, viewIds));
     if (viewPOs == null || viewPOs.isEmpty()) {
       return new HashMap<>();
     }
@@ -601,9 +620,13 @@ public class MetadataObjectService {
       metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
       baseMetricName = "getSchemaObjectsFullName")
   public static Map<Long, String> getSchemaObjectsFullName(List<Long> 
schemaIds) {
+    @SuppressWarnings("unchecked")
+    BasePOStorageOps<SchemaPO, SchemaMetaMapper> ops =
+        (BasePOStorageOps<SchemaPO, SchemaMetaMapper>)
+            TYPE_TO_STORAGE_OPS_MAP.get(MetadataObject.Type.SCHEMA);
     List<SchemaPO> schemaPOs =
         SessionUtils.getWithoutCommit(
-            SchemaMetaMapper.class, mapper -> 
mapper.listSchemaPOsBySchemaIds(schemaIds));
+            SchemaMetaMapper.class, mapper -> ops.listPOs(mapper, schemaIds));
 
     if (schemaPOs == null || schemaPOs.isEmpty()) {
       return new HashMap<>();
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/POStorageReadRouting.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/POStorageReadRouting.java
new file mode 100644
index 0000000000..3b41ecb59c
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/POStorageReadRouting.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.util.List;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+
+/**
+ * Routes relational PO reads between parent-id based SQL (when the entity-id 
cache is enabled) and
+ * full qualified name SQL. Callers keep cache policy in one place instead of 
embedding it in {@link
+ * BasePOStorageOps}.
+ */
+public final class POStorageReadRouting {
+
+  private POStorageReadRouting() {}
+
+  /**
+   * Loads a PO using parent-id based SQL when the entity-id cache is enabled 
and {@code ops}
+   * supports that path; otherwise uses full qualified name SQL.
+   *
+   * @param mapper MyBatis mapper session
+   * @param identifier entity name identifier (logical naming when wrapped by 
hierarchical ops)
+   * @param ops storage operations delegate
+   * @param entityType entity type for {@code identifier} (used to resolve the 
parent id)
+   * @param <PO> persistent object type
+   * @param <Mapper> mapper type
+   * @return loaded PO or null when the delegate returns null
+   */
+  public static <PO, Mapper> PO getPO(
+      Mapper mapper,
+      NameIdentifier identifier,
+      BasePOStorageOps<PO, Mapper> ops,
+      Entity.EntityType entityType) {
+    return getPO(mapper, identifier, ops, entityType, 
GravitinoEnv.getInstance().cacheEnabled());
+  }
+
+  /**
+   * Same as {@link #getPO} but uses an explicit cache flag (typically for 
tests).
+   *
+   * @param cacheEnabled when true, prefer parent-id based reads when supported
+   */
+  public static <PO, Mapper> PO getPO(
+      Mapper mapper,
+      NameIdentifier identifier,
+      BasePOStorageOps<PO, Mapper> ops,
+      Entity.EntityType entityType,
+      boolean cacheEnabled) {
+    if (cacheEnabled && ops.supportsParentIdRelationalRead()) {
+      Long parentId =
+          EntityIdService.getEntityId(
+              NameIdentifier.parse(identifier.namespace().toString()),
+              NameIdentifierUtil.parentEntityType(entityType));
+      return ops.getPO(mapper, parentId, identifier.name());
+    }
+    return ops.getPOByFullName(mapper, identifier);
+  }
+
+  /**
+   * Lists POs under a namespace using parent-id based SQL when the entity-id 
cache is enabled and
+   * {@code ops} supports that path; otherwise uses full qualified namespace 
SQL.
+   *
+   * @param mapper MyBatis mapper session
+   * @param namespace parent namespace for the listed entities
+   * @param ops storage operations delegate
+   * @param entityType entity type stored under {@code namespace} (used to 
resolve the parent id)
+   * @param <PO> persistent object type
+   * @param <Mapper> mapper type
+   * @return list from the delegate (may be empty)
+   */
+  public static <PO, Mapper> List<PO> listPOs(
+      Mapper mapper,
+      Namespace namespace,
+      BasePOStorageOps<PO, Mapper> ops,
+      Entity.EntityType entityType) {
+    return listPOs(mapper, namespace, ops, entityType, 
GravitinoEnv.getInstance().cacheEnabled());
+  }
+
+  /**
+   * Same as {@link #listPOs} but uses an explicit cache flag (typically for 
tests).
+   *
+   * @param cacheEnabled when true, prefer parent-id based reads when supported
+   */
+  public static <PO, Mapper> List<PO> listPOs(
+      Mapper mapper,
+      Namespace namespace,
+      BasePOStorageOps<PO, Mapper> ops,
+      Entity.EntityType entityType,
+      boolean cacheEnabled) {
+    if (cacheEnabled && ops.supportsParentIdRelationalRead()) {
+      Long parentId =
+          EntityIdService.getEntityId(
+              NameIdentifier.parse(namespace.toString()),
+              NameIdentifierUtil.parentEntityType(entityType));
+      return ops.listPOs(mapper, parentId);
+    }
+    return ops.listPOsByNSFullName(mapper, namespace);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
index c7edb6d26f..c8e6972834 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
@@ -33,7 +33,6 @@ import java.util.function.Function;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
-import org.apache.gravitino.Entity.EntityType;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.MetadataObject;
@@ -80,29 +79,18 @@ import org.apache.gravitino.utils.NamespaceUtil;
 /** The service class for schema metadata. It provides the basic database 
operations for schema. */
 public class SchemaMetaService {
   private static final SchemaMetaService INSTANCE = new SchemaMetaService();
+  private BasePOStorageOps<SchemaPO, SchemaMetaMapper> ops;
 
   public static SchemaMetaService getInstance() {
     return INSTANCE;
   }
 
-  private SchemaMetaService() {}
-
-  @Monitored(
-      metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
-      baseMetricName = "getSchemaPOByCatalogIdAndName")
-  public SchemaPO getSchemaPOByCatalogIdAndName(Long catalogId, String 
schemaName) {
-    SchemaPO schemaPO =
-        SessionUtils.getWithoutCommit(
-            SchemaMetaMapper.class,
-            mapper -> mapper.selectSchemaMetaByCatalogIdAndName(catalogId, 
schemaName));
-
-    if (schemaPO == null) {
-      throw new NoSuchEntityException(
-          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-          Entity.EntityType.SCHEMA.name().toLowerCase(),
-          schemaName);
-    }
-    return schemaPO;
+  private SchemaMetaService() {
+    this.ops =
+        new HierarchicalConversionPOStorageOps<>(
+            new SchemaPOStorageOps(),
+            SchemaMetaService::physicalToLogicalSchemaPO,
+            SchemaMetaService::logicalToPhysicalSchemaPO);
   }
 
   @Monitored(
@@ -110,39 +98,21 @@ public class SchemaMetaService {
       baseMetricName = "getSchemaIdByMetalakeNameAndCatalogNameAndSchemaName")
   public SchemaIds getSchemaIdByMetalakeNameAndCatalogNameAndSchemaName(
       String metalakeName, String catalogName, String schemaName) {
-    SchemaIds schemaIds =
+    NameIdentifier identifier = NameIdentifier.of(metalakeName, catalogName, 
schemaName);
+    SchemaPO schemaPO =
         SessionUtils.getWithoutCommit(
             SchemaMetaMapper.class,
             mapper ->
-                mapper.selectSchemaIdByMetalakeNameAndCatalogNameAndSchemaName(
-                    metalakeName, catalogName, schemaName));
+                POStorageReadRouting.getPO(mapper, identifier, ops, 
Entity.EntityType.SCHEMA));
 
-    if (schemaIds == null) {
+    if (schemaPO == null) {
       throw new NoSuchEntityException(
           NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
           Entity.EntityType.SCHEMA.name().toLowerCase(),
           schemaName);
     }
 
-    return schemaIds;
-  }
-
-  @Monitored(
-      metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
-      baseMetricName = "getSchemaIdByCatalogIdAndName")
-  public Long getSchemaIdByCatalogIdAndName(Long catalogId, String schemaName) 
{
-    Long schemaId =
-        SessionUtils.getWithoutCommit(
-            SchemaMetaMapper.class,
-            mapper -> mapper.selectSchemaIdByCatalogIdAndName(catalogId, 
schemaName));
-
-    if (schemaId == null) {
-      throw new NoSuchEntityException(
-          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-          Entity.EntityType.SCHEMA.name().toLowerCase(),
-          schemaName);
-    }
-    return schemaId;
+    return new SchemaIds(schemaPO.getMetalakeId(), schemaPO.getCatalogId(), 
schemaPO.getSchemaId());
   }
 
   @Monitored(
@@ -169,23 +139,24 @@ public class SchemaMetaService {
   public void insertSchema(SchemaEntity schemaEntity, boolean overwrite) 
throws IOException {
     try {
       NameIdentifierUtil.checkSchema(schemaEntity.nameIdentifier());
-      // Callers above this service (e.g. JDBCBackend + naming bridge) pass 
storage-form schema
-      // names: nested paths use the internal physical separator, not the 
external logical one.
-      String physicalSep = HierarchicalSchemaUtil.physicalSeparator();
+      // SchemaEntity arrives in API/logical form (separator = 
HierarchicalSchemaUtil
+      // .schemaSeparator()). We split here on the logical separator and build 
ancestor rows in
+      // logical form. HierarchicalConversionPOStorageOps.batchInsertPOs 
applies its write
+      // rewriter to translate each PO's name to storage form before SQL 
execution.
+      String logicalSep = HierarchicalSchemaUtil.schemaSeparator();
       String schemaName = schemaEntity.name();
       List<SchemaEntity> rowsToInsert = new ArrayList<>();
-      if (schemaName == null || !schemaName.contains(physicalSep)) {
+      if (schemaName == null || !schemaName.contains(logicalSep)) {
         rowsToInsert.add(schemaEntity);
       } else {
-        // Segments of the storage-form name; e.g. [A, B, C] -> ancestor rows 
"A", "A"+sep+"B", then
-        // leaf.
-        String[] parts = schemaName.split(Pattern.quote(physicalSep), -1);
+        // Segments of the logical name; e.g. "A:B:C" -> ancestor rows "A", 
"A:B", then leaf.
+        String[] parts = schemaName.split(Pattern.quote(logicalSep), -1);
         for (int nSeg = 1; nSeg < parts.length; nSeg++) {
-          String ancestorPhysical = String.join(physicalSep, 
Arrays.copyOf(parts, nSeg));
+          String ancestorLogical = String.join(logicalSep, 
Arrays.copyOf(parts, nSeg));
           SchemaEntity ancestor =
               SchemaEntity.builder()
                   .withId(nextIdForNestedAncestor())
-                  .withName(ancestorPhysical)
+                  .withName(ancestorLogical)
                   .withNamespace(schemaEntity.namespace())
                   .withComment(null)
                   .withProperties(Collections.emptyMap())
@@ -204,19 +175,16 @@ public class SchemaMetaService {
             if (n > 1) {
               SchemaEntity firstAncestor = rowsToInsert.get(0);
               Namespace ancestorNs = firstAncestor.namespace();
-              List<String> ancestorPhysicalNames =
+              List<String> ancestorNames =
                   rowsToInsert.subList(0, n - 1).stream()
                       .map(SchemaEntity::name)
                       .collect(Collectors.toList());
-              Set<String> existingAncestorNames =
-                  mapper
-                      .batchSelectSchemaByIdentifier(
-                          ancestorNs.level(0), ancestorNs.level(1), 
ancestorPhysicalNames)
-                      .stream()
+              Set<String> existingLogicalNames =
+                  ops.listPOs(mapper, ancestorNs, ancestorNames).stream()
                       .map(SchemaPO::getSchemaName)
                       .collect(Collectors.toSet());
               for (SchemaEntity row : rowsToInsert.subList(0, n - 1)) {
-                if (existingAncestorNames.contains(row.name())) {
+                if (existingLogicalNames.contains(row.name())) {
                   continue;
                 }
                 SchemaPO.Builder builder = SchemaPO.builder();
@@ -230,11 +198,7 @@ public class SchemaMetaService {
             SchemaPO leafPO = 
POConverters.initializeSchemaPOWithVersion(leafRow, leafBuilder);
             List<SchemaPO> schemaPosToInsert = new 
ArrayList<>(missingAncestorPOs);
             schemaPosToInsert.add(leafPO);
-            if (overwrite) {
-              
mapper.batchInsertSchemaMetaOnDuplicateKeyUpdate(schemaPosToInsert);
-            } else {
-              mapper.batchInsertSchemaMeta(schemaPosToInsert);
-            }
+            ops.batchInsertPOs(mapper, schemaPosToInsert, overwrite);
           });
     } catch (RuntimeException re) {
       ExceptionUtils.checkSQLException(
@@ -271,7 +235,8 @@ public class SchemaMetaService {
                   SessionUtils.getWithoutCommit(
                       SchemaMetaMapper.class,
                       mapper ->
-                          mapper.updateSchemaMeta(
+                          ops.updatePO(
+                              mapper,
                               
POConverters.updateSchemaPOWithVersion(oldSchemaPO, newEntity),
                               oldSchemaPO))),
           () -> {
@@ -494,96 +459,24 @@ public class SchemaMetaService {
 
   private SchemaPO getSchemaPOByIdentifier(NameIdentifier identifier) {
     NameIdentifierUtil.checkSchema(identifier);
-    return schemaPOFetcher().apply(identifier);
-  }
-
-  private SchemaPO getSchemaByFullQualifiedName(
-      String metalakeName, String catalogName, String schemaName) {
     SchemaPO schemaPO =
         SessionUtils.getWithoutCommit(
             SchemaMetaMapper.class,
             mapper ->
-                mapper.selectSchemaByFullQualifiedName(metalakeName, 
catalogName, schemaName));
+                POStorageReadRouting.getPO(mapper, identifier, ops, 
Entity.EntityType.SCHEMA));
     if (schemaPO == null) {
-      throw new NoSuchEntityException(
-          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-          EntityType.CATALOG.name().toLowerCase(),
-          schemaName);
-    }
-
-    return schemaPO;
-  }
-
-  private List<SchemaPO> listSchemaPOs(Namespace namespace) {
-    return schemaListFetcher().apply(namespace);
-  }
-
-  private List<SchemaPO> listSchemaPOsByCatalogId(Namespace namespace) {
-    Long catalogId =
-        EntityIdService.getEntityId(
-            NameIdentifier.of(namespace.levels()), Entity.EntityType.CATALOG);
-
-    return SessionUtils.getWithoutCommit(
-        SchemaMetaMapper.class, mapper -> 
mapper.listSchemaPOsByCatalogId(catalogId));
-  }
-
-  private List<SchemaPO> listSchemaPOsByFullQualifiedName(Namespace namespace) 
{
-    String[] namespaceLevels = namespace.levels();
-    List<SchemaPO> schemaPOs =
-        SessionUtils.getWithoutCommit(
-            SchemaMetaMapper.class,
-            mapper ->
-                mapper.listSchemaPOsByFullQualifiedName(namespaceLevels[0], 
namespaceLevels[1]));
-    if (schemaPOs.isEmpty() || schemaPOs.get(0).getCatalogId() == null) {
-      throw new NoSuchEntityException(
-          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-          Entity.EntityType.CATALOG.name().toLowerCase(),
-          namespaceLevels[1]);
-    }
-    return schemaPOs.stream().filter(po -> po.getSchemaId() != 
null).collect(Collectors.toList());
-  }
-
-  private SchemaPO getSchemaPOByCatalogId(NameIdentifier identifier) {
-    Long catalogId =
-        EntityIdService.getEntityId(
-            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.CATALOG);
-    return getSchemaPOByCatalogIdAndName(catalogId, identifier.name());
-  }
-
-  private SchemaPO getSchemaPOByFullQualifiedName(NameIdentifier identifier) {
-    String[] namespaceLevels = identifier.namespace().levels();
-    SchemaPO schemaPO =
-        getSchemaByFullQualifiedName(namespaceLevels[0], namespaceLevels[1], 
identifier.name());
-
-    if (schemaPO.getCatalogId() == null) {
-      throw new NoSuchEntityException(
-          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-          Entity.EntityType.CATALOG.name().toLowerCase(),
-          namespaceLevels[1]);
-    }
-
-    if (schemaPO.getSchemaId() == null) {
       throw new NoSuchEntityException(
           NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
           Entity.EntityType.SCHEMA.name().toLowerCase(),
           identifier.name());
     }
-
     return schemaPO;
   }
 
-  private Function<Namespace, List<SchemaPO>> schemaListFetcher() {
-    // If cache is enabled, we can use catalog id to fetch schemas faster or 
else use full qualified
-    // name to join several tables to get the schema list.
-    return GravitinoEnv.getInstance().cacheEnabled()
-        ? this::listSchemaPOsByCatalogId
-        : this::listSchemaPOsByFullQualifiedName;
-  }
-
-  private Function<NameIdentifier, SchemaPO> schemaPOFetcher() {
-    return GravitinoEnv.getInstance().cacheEnabled()
-        ? this::getSchemaPOByCatalogId
-        : this::getSchemaPOByFullQualifiedName;
+  private List<SchemaPO> listSchemaPOs(Namespace namespace) {
+    return SessionUtils.getWithoutCommit(
+        SchemaMetaMapper.class,
+        mapper -> POStorageReadRouting.listPOs(mapper, namespace, ops, 
Entity.EntityType.SCHEMA));
   }
 
   private void fillSchemaPOBuilderParentEntityId(SchemaPO.Builder builder, 
Namespace namespace) {
@@ -605,16 +498,22 @@ public class SchemaMetaService {
     List<String> schemaNames =
         
identifiers.stream().map(NameIdentifier::name).collect(Collectors.toList());
 
-    return SessionUtils.doWithCommitAndFetchResult(
+    return SessionUtils.getWithoutCommit(
         SchemaMetaMapper.class,
         mapper -> {
           List<SchemaPO> schemaPOs =
-              mapper.batchSelectSchemaByIdentifier(
-                  catalogIdent.namespace().level(0), catalogIdent.name(), 
schemaNames);
+              ops.listPOs(
+                  mapper,
+                  Namespace.of(catalogIdent.namespace().levels()[0], 
catalogIdent.name()),
+                  schemaNames);
           return POConverters.fromSchemaPOs(schemaPOs, firstIdent.namespace());
         });
   }
 
+  public BasePOStorageOps<SchemaPO, SchemaMetaMapper> ops() {
+    return ops;
+  }
+
   private static long nextIdForNestedAncestor() {
     IdGenerator generator = GravitinoEnv.getInstance().idGenerator();
     if (generator == null) {
@@ -623,4 +522,39 @@ public class SchemaMetaService {
     }
     return generator.nextId();
   }
+
+  private static SchemaPO physicalToLogicalSchemaPO(SchemaPO po) {
+    String name = po.getSchemaName();
+    if (name == null || 
!name.contains(HierarchicalSchemaUtil.physicalSeparator())) {
+      return po;
+    }
+    return copySchemaPOWithName(
+        po,
+        HierarchicalSchemaUtil.physicalToLogical(name, 
HierarchicalSchemaUtil.schemaSeparator()));
+  }
+
+  private static SchemaPO logicalToPhysicalSchemaPO(SchemaPO po) {
+    String name = po.getSchemaName();
+    if (name == null || 
!name.contains(HierarchicalSchemaUtil.schemaSeparator())) {
+      return po;
+    }
+    return copySchemaPOWithName(
+        po,
+        HierarchicalSchemaUtil.logicalToPhysical(name, 
HierarchicalSchemaUtil.schemaSeparator()));
+  }
+
+  private static SchemaPO copySchemaPOWithName(SchemaPO po, String name) {
+    return SchemaPO.builder()
+        .withSchemaId(po.getSchemaId())
+        .withSchemaName(name)
+        .withMetalakeId(po.getMetalakeId())
+        .withCatalogId(po.getCatalogId())
+        .withSchemaComment(po.getSchemaComment())
+        .withProperties(po.getProperties())
+        .withAuditInfo(po.getAuditInfo())
+        .withCurrentVersion(po.getCurrentVersion())
+        .withLastVersion(po.getLastVersion())
+        .withDeletedAt(po.getDeletedAt())
+        .build();
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaPOStorageOps.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaPOStorageOps.java
new file mode 100644
index 0000000000..7102c4976d
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaPOStorageOps.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
+import org.apache.gravitino.storage.relational.po.SchemaPO;
+
+public class SchemaPOStorageOps extends BasePOStorageOps<SchemaPO, 
SchemaMetaMapper> {
+
+  public SchemaPOStorageOps() {}
+
+  @Override
+  public void batchInsertPOs(SchemaMetaMapper mapper, List<SchemaPO> 
schemaPOs, boolean overwrite) {
+    if (overwrite) {
+      mapper.batchInsertSchemaMetaOnDuplicateKeyUpdate(schemaPOs);
+    } else {
+      mapper.batchInsertSchemaMeta(schemaPOs);
+    }
+  }
+
+  @Override
+  public Integer updatePO(SchemaMetaMapper mapper, SchemaPO oldPO, SchemaPO 
newPO) {
+    return mapper.updateSchemaMeta(oldPO, newPO);
+  }
+
+  @Override
+  public SchemaPO getPO(SchemaMetaMapper mapper, Long parentId, String name) {
+    return mapper.selectSchemaMetaByCatalogIdAndName(parentId, name);
+  }
+
+  @Override
+  public SchemaPO getPOByFullName(SchemaMetaMapper mapper, NameIdentifier 
identifier) {
+    Namespace namespace = identifier.namespace();
+    SchemaPO po =
+        mapper.selectSchemaByFullQualifiedName(
+            namespace.level(0), namespace.level(1), identifier.name());
+    // INNER JOIN on metalake/catalog: a null PO means the metalake or catalog 
does not exist.
+    if (po == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.CATALOG.name().toLowerCase(),
+          namespace.level(1));
+    }
+    // LEFT JOIN on schema_meta: a row with non-null catalogId but null 
schemaId means
+    // the catalog exists but the schema does not.
+    if (po.getSchemaId() == null) {
+      return null;
+    }
+    return po;
+  }
+
+  @Override
+  public List<SchemaPO> listPOs(SchemaMetaMapper schemaMetaMapper, Long 
parentId) {
+    return schemaMetaMapper.listSchemaPOsByCatalogId(parentId);
+  }
+
+  @Override
+  public List<SchemaPO> listPOs(
+      SchemaMetaMapper schemaMetaMapper, Namespace namespace, List<String> 
names) {
+    return schemaMetaMapper.batchSelectSchemaByIdentifier(
+        namespace.level(0), namespace.level(1), names);
+  }
+
+  @Override
+  public List<SchemaPO> listPOs(SchemaMetaMapper schemaMetaMapper, List<Long> 
entityIds) {
+    return schemaMetaMapper.listSchemaPOsBySchemaIds(entityIds);
+  }
+
+  @Override
+  public List<SchemaPO> listPOsByNSFullName(
+      SchemaMetaMapper schemaMetaMapper, Namespace namespace) {
+    List<SchemaPO> pos =
+        schemaMetaMapper.listSchemaPOsByFullQualifiedName(namespace.level(0), 
namespace.level(1));
+    // An empty result means the parent metalake or catalog does not exist.
+    if (pos.isEmpty()) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.CATALOG.name().toLowerCase(),
+          namespace.level(1));
+    }
+    // Same LEFT JOIN behavior as getPOByFullName: filter out the placeholder 
row that
+    // represents an existing catalog without any matching schema.
+    return pos.stream().filter(po -> po.getSchemaId() != 
null).collect(Collectors.toList());
+  }
+
+  @Override
+  public boolean supportsParentIdRelationalRead() {
+    return true;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
index 02c094a509..72391bc31b 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
@@ -28,10 +28,7 @@ import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
-import org.apache.gravitino.Entity.EntityType;
-import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
@@ -60,29 +57,31 @@ import org.apache.gravitino.utils.NamespaceUtil;
 /** The service class for table metadata. It provides the basic database 
operations for table. */
 public class TableMetaService {
   private static final TableMetaService INSTANCE = new TableMetaService();
+  private BasePOStorageOps<TablePO, TableMetaMapper> ops;
 
   public static TableMetaService getInstance() {
     return INSTANCE;
   }
 
-  private TableMetaService() {}
+  private TableMetaService() {
+    this.ops = new HierarchicalConversionPOStorageOps<>(new 
TablePOStorageOps());
+  }
 
   @Monitored(
       metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
       baseMetricName = "getTableIdBySchemaIdAndName")
   public Long getTableIdBySchemaIdAndName(Long schemaId, String tableName) {
-    Long tableId =
+    TablePO tablePO =
         SessionUtils.getWithoutCommit(
-            TableMetaMapper.class,
-            mapper -> mapper.selectTableIdBySchemaIdAndName(schemaId, 
tableName));
+            TableMetaMapper.class, mapper -> ops.getPO(mapper, schemaId, 
tableName));
 
-    if (tableId == null) {
+    if (tablePO == null) {
       throw new NoSuchEntityException(
           NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
           Entity.EntityType.TABLE.name().toLowerCase(),
           tableName);
     }
-    return tableId;
+    return tablePO.getTableId();
   }
 
   @Monitored(
@@ -124,11 +123,7 @@ public class TableMetaService {
                   TableMetaMapper.class,
                   mapper -> {
                     tablePORef.set(po);
-                    if (overwrite) {
-                      mapper.insertTableMetaOnDuplicateKeyUpdate(po);
-                    } else {
-                      mapper.insertTableMeta(po);
-                    }
+                    ops.insertPO(mapper, po, overwrite);
                   }),
           () ->
               SessionUtils.doWithoutCommit(
@@ -204,7 +199,7 @@ public class TableMetaService {
               updateResult.set(
                   SessionUtils.getWithoutCommit(
                       TableMetaMapper.class,
-                      mapper -> mapper.updateTableMeta(newTablePO, oldTablePO, 
newSchemaId))),
+                      mapper -> ops.updatePO(mapper, newTablePO, oldTablePO))),
           () ->
               SessionUtils.doWithoutCommit(
                   TableVersionMapper.class,
@@ -347,131 +342,47 @@ public class TableMetaService {
           Objects.equals(schemaIdent, 
NameIdentifierUtil.getSchemaIdentifier(ident)));
       tableNames.add(ident.name());
     }
-    Long schemaId = EntityIdService.getEntityId(schemaIdent, 
Entity.EntityType.SCHEMA);
     return SessionUtils.doWithCommitAndFetchResult(
         TableMetaMapper.class,
         mapper -> {
-          List<TablePO> tableList = 
mapper.batchSelectTableByIdentifier(schemaId, tableNames);
+          List<TablePO> tableList = ops.listPOs(mapper, 
firstIdent.namespace(), tableNames);
           return POConverters.fromTablePOs(tableList, firstIdent.namespace());
         });
   }
 
-  private void fillTablePOBuilderParentEntityId(TablePO.Builder builder, 
Namespace namespace) {
-    NamespaceUtil.checkTable(namespace);
-    NamespacedEntityId namespacedEntityId =
-        EntityIdService.getEntityIds(
-            NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
-    builder.withMetalakeId(namespacedEntityId.namespaceIds()[0]);
-    builder.withCatalogId(namespacedEntityId.namespaceIds()[1]);
-    builder.withSchemaId(namespacedEntityId.entityId());
+  public BasePOStorageOps<TablePO, TableMetaMapper> ops() {
+    return ops;
   }
 
   private TablePO getTablePOByIdentifier(NameIdentifier identifier) {
     NameIdentifierUtil.checkTable(identifier);
-
-    return tablePOFetcher().apply(identifier);
-  }
-
-  private TablePO getTablePOBySchemaIdAndName(Long schemaId, String tableName) 
{
     TablePO tablePO =
         SessionUtils.getWithoutCommit(
             TableMetaMapper.class,
-            mapper -> mapper.selectTableMetaBySchemaIdAndName(schemaId, 
tableName));
+            mapper -> POStorageReadRouting.getPO(mapper, identifier, ops, 
Entity.EntityType.TABLE));
     if (tablePO == null) {
       throw new NoSuchEntityException(
           NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
           Entity.EntityType.TABLE.name().toLowerCase(),
-          tableName);
-    }
-    return tablePO;
-  }
-
-  private TablePO getTableByFullQualifiedName(
-      String metalakeName, String catalogName, String schemaName, String 
tableName) {
-    TablePO tablePO =
-        SessionUtils.getWithoutCommit(
-            TableMetaMapper.class,
-            mapper ->
-                mapper.selectTableByFullQualifiedName(
-                    metalakeName, catalogName, schemaName, tableName));
-    if (tablePO == null) {
-      throw new NoSuchEntityException(
-          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-          Entity.EntityType.TABLE.name().toLowerCase(),
-          tableName);
+          identifier.name());
     }
 
     return tablePO;
   }
 
   private List<TablePO> listTablePOs(Namespace namespace) {
-    return tableListFetcher().apply(namespace);
-  }
-
-  private List<TablePO> listTablePOsBySchemaId(Namespace namespace) {
-    Long schemaId =
-        EntityIdService.getEntityId(
-            NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
     return SessionUtils.getWithoutCommit(
-        TableMetaMapper.class, mapper -> 
mapper.listTablePOsBySchemaId(schemaId));
-  }
-
-  private List<TablePO> listTablePOsByFullQualifiedName(Namespace namespace) {
-    String[] namespaceLevels = namespace.levels();
-    List<TablePO> tablePOs =
-        SessionUtils.getWithoutCommit(
-            TableMetaMapper.class,
-            mapper ->
-                mapper.listTablePOsByFullQualifiedName(
-                    namespaceLevels[0], namespaceLevels[1], 
namespaceLevels[2]));
-    if (tablePOs.isEmpty() || tablePOs.get(0).getSchemaId() == null) {
-      throw new NoSuchEntityException(
-          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-          EntityType.SCHEMA.name().toLowerCase(),
-          namespaceLevels[2]);
-    }
-    return tablePOs.stream().filter(po -> po.getTableId() != 
null).collect(Collectors.toList());
-  }
-
-  private TablePO getTablePOBySchemaId(NameIdentifier identifier) {
-    Long schemaId =
-        EntityIdService.getEntityId(
-            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.SCHEMA);
-    return getTablePOBySchemaIdAndName(schemaId, identifier.name());
-  }
-
-  private TablePO getTablePOByFullQualifiedName(NameIdentifier identifier) {
-    String[] namespaceLevels = identifier.namespace().levels();
-    TablePO tablePO =
-        getTableByFullQualifiedName(
-            namespaceLevels[0], namespaceLevels[1], namespaceLevels[2], 
identifier.name());
-
-    if (tablePO.getSchemaId() == null) {
-      throw new NoSuchEntityException(
-          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-          EntityType.SCHEMA.name().toLowerCase(),
-          namespaceLevels[2]);
-    }
-
-    if (tablePO.getTableId() == null) {
-      throw new NoSuchEntityException(
-          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-          EntityType.TABLE.name().toLowerCase(),
-          identifier.name());
-    }
-
-    return tablePO;
-  }
-
-  private Function<Namespace, List<TablePO>> tableListFetcher() {
-    return GravitinoEnv.getInstance().cacheEnabled()
-        ? this::listTablePOsBySchemaId
-        : this::listTablePOsByFullQualifiedName;
+        TableMetaMapper.class,
+        mapper -> POStorageReadRouting.listPOs(mapper, namespace, ops, 
Entity.EntityType.TABLE));
   }
 
-  private Function<NameIdentifier, TablePO> tablePOFetcher() {
-    return GravitinoEnv.getInstance().cacheEnabled()
-        ? this::getTablePOBySchemaId
-        : this::getTablePOByFullQualifiedName;
+  private void fillTablePOBuilderParentEntityId(TablePO.Builder builder, 
Namespace namespace) {
+    NamespaceUtil.checkTable(namespace);
+    NamespacedEntityId namespacedEntityId =
+        EntityIdService.getEntityIds(
+            NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
+    builder.withMetalakeId(namespacedEntityId.namespaceIds()[0]);
+    builder.withCatalogId(namespacedEntityId.namespaceIds()[1]);
+    builder.withSchemaId(namespacedEntityId.entityId());
   }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TablePOStorageOps.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TablePOStorageOps.java
new file mode 100644
index 0000000000..599a25fded
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TablePOStorageOps.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
+import org.apache.gravitino.storage.relational.po.TablePO;
+
+public class TablePOStorageOps extends BasePOStorageOps<TablePO, 
TableMetaMapper> {
+
+  public TablePOStorageOps() {}
+
+  @Override
+  public void insertPO(TableMetaMapper mapper, TablePO tablePO, boolean 
overwrite) {
+    if (overwrite) {
+      mapper.insertTableMetaOnDuplicateKeyUpdate(tablePO);
+    } else {
+      mapper.insertTableMeta(tablePO);
+    }
+  }
+
+  @Override
+  public Integer updatePO(TableMetaMapper mapper, TablePO newPO, TablePO 
oldPO) {
+    return mapper.updateTableMeta(newPO, oldPO, newPO.getSchemaId());
+  }
+
+  @Override
+  public TablePO getPO(TableMetaMapper mapper, Long parentId, String name) {
+    return mapper.selectTableMetaBySchemaIdAndName(parentId, name);
+  }
+
+  @Override
+  public TablePO getPOByFullName(TableMetaMapper mapper, NameIdentifier 
identifier) {
+    Namespace namespace = identifier.namespace();
+    TablePO po =
+        mapper.selectTableByFullQualifiedName(
+            namespace.level(0), namespace.level(1), namespace.level(2), 
identifier.name());
+    // INNER JOIN on metalake/catalog: a null PO means the metalake or catalog 
does not exist.
+    if (po == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.CATALOG.name().toLowerCase(),
+          namespace.level(1));
+    }
+    // LEFT JOIN on schema_meta: a row with non-null catalogId but null 
schemaId means
+    // the catalog exists but the schema does not.
+    if (po.getSchemaId() == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.SCHEMA.name().toLowerCase(),
+          namespace.level(2));
+    }
+    // LEFT JOIN on table_meta: a row with non-null schemaId but null tableId 
means
+    // the schema exists but the table does not.
+    if (po.getTableId() == null) {
+      return null;
+    }
+    return po;
+  }
+
+  @Override
+  public List<TablePO> listPOs(TableMetaMapper mapper, Long parentId) {
+    return mapper.listTablePOsBySchemaId(parentId);
+  }
+
+  @Override
+  public List<TablePO> listPOs(TableMetaMapper mapper, Namespace namespace, 
List<String> names) {
+    Long schemaId =
+        EntityIdService.getEntityId(
+            NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
+    return mapper.batchSelectTableByIdentifier(schemaId, names);
+  }
+
+  @Override
+  public List<TablePO> listPOs(TableMetaMapper mapper, List<Long> entityIds) {
+    return mapper.listTablePOsByTableIds(entityIds);
+  }
+
+  @Override
+  public List<TablePO> listPOsByNSFullName(TableMetaMapper mapper, Namespace 
namespace) {
+    List<TablePO> pos =
+        mapper.listTablePOsByFullQualifiedName(
+            namespace.level(0), namespace.level(1), namespace.level(2));
+    // INNER JOIN on metalake/catalog: an empty result means the metalake or 
catalog does not exist.
+    if (pos.isEmpty()) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.CATALOG.name().toLowerCase(),
+          namespace.level(1));
+    }
+    // LEFT JOIN on schema_meta: rows with non-null catalogId but null 
schemaId mean the
+    // catalog exists but the schema does not.
+    if (pos.get(0).getSchemaId() == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.SCHEMA.name().toLowerCase(),
+          namespace.level(2));
+    }
+    // LEFT JOIN on table_meta: filter out the placeholder row for a schema 
without tables.
+    return pos.stream().filter(po -> po.getTableId() != 
null).collect(Collectors.toList());
+  }
+
+  @Override
+  public boolean supportsParentIdRelationalRead() {
+    return true;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ViewMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ViewMetaService.java
index 177118c811..a305e05645 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ViewMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ViewMetaService.java
@@ -31,8 +31,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
-import org.apache.gravitino.Entity.EntityType;
-import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
@@ -57,29 +55,31 @@ import org.apache.gravitino.utils.NamespaceUtil;
 public class ViewMetaService {
 
   private static final ViewMetaService INSTANCE = new ViewMetaService();
+  private BasePOStorageOps<ViewPO, ViewMetaMapper> ops;
 
   public static ViewMetaService getInstance() {
     return INSTANCE;
   }
 
-  private ViewMetaService() {}
+  private ViewMetaService() {
+    this.ops = new HierarchicalConversionPOStorageOps<>(new 
ViewPOStorageOps());
+  }
 
   @Monitored(
       metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
       baseMetricName = "getViewIdBySchemaIdAndName")
   public Long getViewIdBySchemaIdAndName(Long schemaId, String viewName) {
-    Long viewId =
+    ViewPO viewPO =
         SessionUtils.getWithoutCommit(
-            ViewMetaMapper.class,
-            mapper -> mapper.selectViewIdBySchemaIdAndName(schemaId, 
viewName));
+            ViewMetaMapper.class, mapper -> ops.getPO(mapper, schemaId, 
viewName));
 
-    if (viewId == null) {
+    if (viewPO == null) {
       throw new NoSuchEntityException(
           NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
           Entity.EntityType.VIEW.name().toLowerCase(),
           viewName);
     }
-    return viewId;
+    return viewPO.getViewId();
   }
 
   @Monitored(
@@ -87,7 +87,7 @@ public class ViewMetaService {
       baseMetricName = "listViewsByNamespace")
   public List<ViewEntity> listViewsByNamespace(Namespace namespace) {
     NamespaceUtil.checkView(namespace);
-    List<ViewPO> viewPOs = listViewPOsByNamespace(namespace);
+    List<ViewPO> viewPOs = listViewPOs(namespace);
     return viewPOs.stream().map(po -> fromViewPO(po, 
namespace)).collect(Collectors.toList());
   }
 
@@ -95,8 +95,7 @@ public class ViewMetaService {
       metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
       baseMetricName = "getViewByIdentifier")
   public ViewEntity getViewByIdentifier(NameIdentifier identifier) {
-    NameIdentifierUtil.checkView(identifier);
-    ViewPO viewPO = viewPOFetcher().apply(identifier);
+    ViewPO viewPO = getViewPOByIdentifier(identifier);
     return fromViewPO(viewPO, identifier.namespace());
   }
 
@@ -111,14 +110,7 @@ public class ViewMetaService {
       SessionUtils.doMultipleWithCommit(
           () ->
               SessionUtils.doWithoutCommit(
-                  ViewMetaMapper.class,
-                  mapper -> {
-                    if (overwrite) {
-                      mapper.insertViewMetaOnDuplicateKeyUpdate(po);
-                    } else {
-                      mapper.insertViewMeta(po);
-                    }
-                  }),
+                  ViewMetaMapper.class, mapper -> ops.insertPO(mapper, po, 
overwrite)),
           () ->
               SessionUtils.doWithoutCommit(
                   ViewVersionInfoMapper.class,
@@ -141,9 +133,7 @@ public class ViewMetaService {
       baseMetricName = "updateViewByIdentifier")
   public <E extends Entity & HasIdentifier> ViewEntity updateView(
       NameIdentifier ident, Function<E, E> updater) throws IOException {
-    NameIdentifierUtil.checkView(ident);
-
-    ViewPO oldViewPO = viewPOFetcher().apply(ident);
+    ViewPO oldViewPO = getViewPOByIdentifier(ident);
     ViewEntity oldViewEntity = fromViewPO(oldViewPO, ident.namespace());
     ViewEntity newEntity = (ViewEntity) updater.apply((E) oldViewEntity);
     Preconditions.checkArgument(
@@ -171,7 +161,7 @@ public class ViewMetaService {
           () -> {
             updateResult.set(
                 SessionUtils.getWithoutCommit(
-                    ViewMetaMapper.class, mapper -> 
mapper.updateViewMeta(newViewPO, oldViewPO)));
+                    ViewMetaMapper.class, mapper -> ops.updatePO(mapper, 
newViewPO, oldViewPO)));
             if (updateResult.get() == 0) {
               throw new RuntimeException("Failed to update the entity: " + 
ident);
             }
@@ -203,8 +193,7 @@ public class ViewMetaService {
       metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
       baseMetricName = "deleteViewByIdentifier")
   public boolean deleteView(NameIdentifier ident) {
-    NameIdentifierUtil.checkView(ident);
-    ViewPO viewPO = viewPOFetcher().apply(ident);
+    ViewPO viewPO = getViewPOByIdentifier(ident);
     String metalakeName = ident.namespace().level(0);
     String catalogName = ident.namespace().level(1);
     String schemaName = ident.namespace().level(2);
@@ -231,6 +220,10 @@ public class ViewMetaService {
     return versionDeletedCount + metaDeletedCount;
   }
 
+  public BasePOStorageOps<ViewPO, ViewMetaMapper> ops() {
+    return ops;
+  }
+
   private boolean deleteView(Long viewId, String metalakeName, String 
viewFullName) {
     AtomicInteger deleteResult = new AtomicInteger(0);
     SessionUtils.doMultipleWithCommit(
@@ -287,88 +280,25 @@ public class ViewMetaService {
     return buildViewPO(newEntity, builder, newVersion.intValue());
   }
 
-  private List<ViewPO> listViewPOsByNamespace(Namespace namespace) {
-    return viewListFetcher().apply(namespace);
-  }
-
-  private Function<Namespace, List<ViewPO>> viewListFetcher() {
-    return GravitinoEnv.getInstance().cacheEnabled()
-        ? this::listViewPOsBySchemaId
-        : this::listViewPOsByFullQualifiedName;
-  }
-
-  private Function<NameIdentifier, ViewPO> viewPOFetcher() {
-    return GravitinoEnv.getInstance().cacheEnabled()
-        ? this::getViewPOBySchemaId
-        : this::getViewPOByFullQualifiedName;
-  }
-
-  private List<ViewPO> listViewPOsBySchemaId(Namespace namespace) {
-    Long schemaId =
-        EntityIdService.getEntityId(
-            NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
-    return SessionUtils.getWithoutCommit(
-        ViewMetaMapper.class, mapper -> 
mapper.listViewPOsBySchemaId(schemaId));
-  }
-
-  private List<ViewPO> listViewPOsByFullQualifiedName(Namespace namespace) {
-    String[] namespaceLevels = namespace.levels();
-    List<ViewPO> viewPOs =
-        SessionUtils.getWithoutCommit(
-            ViewMetaMapper.class,
-            mapper ->
-                mapper.listViewPOsByFullQualifiedName(
-                    namespaceLevels[0], namespaceLevels[1], 
namespaceLevels[2]));
-    if (viewPOs.isEmpty() || viewPOs.get(0).getSchemaId() == null) {
-      throw new NoSuchEntityException(
-          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-          EntityType.SCHEMA.name().toLowerCase(),
-          namespaceLevels[2]);
-    }
-    return viewPOs.stream().filter(po -> po.getViewId() != 
null).collect(Collectors.toList());
-  }
-
-  private ViewPO getViewPOBySchemaId(NameIdentifier identifier) {
-    Long schemaId =
-        EntityIdService.getEntityId(
-            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.SCHEMA);
+  private ViewPO getViewPOByIdentifier(NameIdentifier identifier) {
+    NameIdentifierUtil.checkView(identifier);
     ViewPO viewPO =
         SessionUtils.getWithoutCommit(
             ViewMetaMapper.class,
-            mapper -> mapper.selectViewMetaBySchemaIdAndName(schemaId, 
identifier.name()));
-
+            mapper -> POStorageReadRouting.getPO(mapper, identifier, ops, 
Entity.EntityType.VIEW));
     if (viewPO == null) {
       throw new NoSuchEntityException(
           NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
           Entity.EntityType.VIEW.name().toLowerCase(),
           identifier.name());
     }
+
     return viewPO;
   }
 
-  private ViewPO getViewPOByFullQualifiedName(NameIdentifier identifier) {
-    String[] namespaceLevels = identifier.namespace().levels();
-    ViewPO viewPO =
-        SessionUtils.getWithoutCommit(
-            ViewMetaMapper.class,
-            mapper ->
-                mapper.selectViewByFullQualifiedName(
-                    namespaceLevels[0], namespaceLevels[1], 
namespaceLevels[2], identifier.name()));
-
-    if (viewPO == null || viewPO.getSchemaId() == null) {
-      throw new NoSuchEntityException(
-          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-          EntityType.SCHEMA.name().toLowerCase(),
-          namespaceLevels[2]);
-    }
-
-    if (viewPO.getViewId() == null) {
-      throw new NoSuchEntityException(
-          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-          EntityType.VIEW.name().toLowerCase(),
-          identifier.name());
-    }
-
-    return viewPO;
+  private List<ViewPO> listViewPOs(Namespace namespace) {
+    return SessionUtils.getWithoutCommit(
+        ViewMetaMapper.class,
+        mapper -> POStorageReadRouting.listPOs(mapper, namespace, ops, 
Entity.EntityType.VIEW));
   }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ViewPOStorageOps.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ViewPOStorageOps.java
new file mode 100644
index 0000000000..7294c50887
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ViewPOStorageOps.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.storage.relational.mapper.ViewMetaMapper;
+import org.apache.gravitino.storage.relational.po.ViewPO;
+
+public class ViewPOStorageOps extends BasePOStorageOps<ViewPO, ViewMetaMapper> 
{
+
+  public ViewPOStorageOps() {}
+
+  @Override
+  public void insertPO(ViewMetaMapper mapper, ViewPO viewPO, boolean 
overwrite) {
+    if (overwrite) {
+      mapper.insertViewMetaOnDuplicateKeyUpdate(viewPO);
+    } else {
+      mapper.insertViewMeta(viewPO);
+    }
+  }
+
+  @Override
+  public Integer updatePO(ViewMetaMapper mapper, ViewPO newPO, ViewPO oldPO) {
+    return mapper.updateViewMeta(newPO, oldPO);
+  }
+
+  @Override
+  public ViewPO getPO(ViewMetaMapper mapper, Long parentId, String name) {
+    return mapper.selectViewMetaBySchemaIdAndName(parentId, name);
+  }
+
+  @Override
+  public ViewPO getPOByFullName(ViewMetaMapper mapper, NameIdentifier 
identifier) {
+    Namespace namespace = identifier.namespace();
+    ViewPO po =
+        mapper.selectViewByFullQualifiedName(
+            namespace.level(0), namespace.level(1), namespace.level(2), 
identifier.name());
+    // INNER JOIN on metalake/catalog: a null PO means the metalake or catalog 
does not exist.
+    if (po == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.CATALOG.name().toLowerCase(),
+          namespace.level(1));
+    }
+    // LEFT JOIN on schema_meta: a row with non-null catalogId but null 
schemaId means
+    // the catalog exists but the schema does not.
+    if (po.getSchemaId() == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.SCHEMA.name().toLowerCase(),
+          namespace.level(2));
+    }
+    // LEFT JOIN on view_meta: a row with non-null schemaId but null viewId 
means
+    // the schema exists but the view does not.
+    if (po.getViewId() == null) {
+      return null;
+    }
+    return po;
+  }
+
+  @Override
+  public List<ViewPO> listPOs(ViewMetaMapper mapper, Long parentId) {
+    return mapper.listViewPOsBySchemaId(parentId);
+  }
+
+  @Override
+  public List<ViewPO> listPOs(ViewMetaMapper mapper, List<Long> entityIds) {
+    return mapper.listViewPOsByViewIds(entityIds);
+  }
+
+  @Override
+  public List<ViewPO> listPOsByNSFullName(ViewMetaMapper mapper, Namespace 
namespace) {
+    List<ViewPO> pos =
+        mapper.listViewPOsByFullQualifiedName(
+            namespace.level(0), namespace.level(1), namespace.level(2));
+    // INNER JOIN on metalake/catalog: an empty result means the metalake or 
catalog does not exist.
+    if (pos.isEmpty()) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.CATALOG.name().toLowerCase(),
+          namespace.level(1));
+    }
+    // LEFT JOIN on schema_meta: rows with non-null catalogId but null 
schemaId mean the
+    // catalog exists but the schema does not.
+    if (pos.get(0).getSchemaId() == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.SCHEMA.name().toLowerCase(),
+          namespace.level(2));
+    }
+    // LEFT JOIN on view_meta: filter out the placeholder row for a schema 
without views.
+    return pos.stream().filter(po -> po.getViewId() != 
null).collect(Collectors.toList());
+  }
+
+  @Override
+  public boolean supportsParentIdRelationalRead() {
+    return true;
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackendBatchInsert.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackendBatchInsert.java
new file mode 100644
index 0000000000..ca822541f7
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackendBatchInsert.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.storage.RandomIdGenerator;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+
+/**
+ * Tests schema insert paths that use relational {@code batchInsertSchemaMeta} 
(single or batch).
+ */
+public class TestJDBCBackendBatchInsert extends TestJDBCBackend {
+
+  @TestTemplate
+  public void testBatchInsertSingleSchemaViaBackend() throws IOException {
+    String metalakeName = "metalake_batch_insert_single";
+    String catalogName = "catalog_batch_insert_single";
+    createAndInsertMakeLake(metalakeName);
+    createAndInsertCatalog(metalakeName, catalogName);
+
+    SchemaEntity schema =
+        createSchemaEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofSchema(metalakeName, catalogName),
+            "flat_schema_batch",
+            AUDIT_INFO);
+    backend.insert(schema, false);
+
+    SchemaEntity loaded =
+        (SchemaEntity)
+            backend.get(
+                NameIdentifier.of(metalakeName, catalogName, 
"flat_schema_batch"),
+                Entity.EntityType.SCHEMA);
+    Assertions.assertEquals(schema.id(), loaded.id());
+    Assertions.assertEquals("flat_schema_batch", loaded.name());
+    Assertions.assertEquals(schema.namespace(), loaded.namespace());
+
+    List<SchemaEntity> listed =
+        backend.list(
+            NamespaceUtil.ofSchema(metalakeName, catalogName), 
Entity.EntityType.SCHEMA, true);
+    Assertions.assertEquals(1, listed.size());
+    Assertions.assertEquals("flat_schema_batch", listed.get(0).name());
+  }
+
+  @TestTemplate
+  public void 
testBatchInsertHierarchicalSchemaCreatesAncestorsAndLeafViaBackend()
+      throws IOException {
+    String metalakeName = "metalake_batch_insert_nested";
+    String catalogName = "catalog_batch_insert_nested";
+    createAndInsertMakeLake(metalakeName);
+    createAndInsertCatalog(metalakeName, catalogName);
+
+    String logicalLeaf = "ns_a:ns_b:leaf";
+    SchemaEntity hierarchical =
+        SchemaEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName(logicalLeaf)
+            .withNamespace(NamespaceUtil.ofSchema(metalakeName, catalogName))
+            .withComment("nested")
+            .withProperties(Collections.emptyMap())
+            .withAuditInfo(AUDIT_INFO)
+            .build();
+    backend.insert(hierarchical, false);
+
+    List<SchemaEntity> schemas =
+        backend.list(
+            NamespaceUtil.ofSchema(metalakeName, catalogName), 
Entity.EntityType.SCHEMA, true);
+    Set<String> logicalNames = 
schemas.stream().map(SchemaEntity::name).collect(Collectors.toSet());
+
+    Assertions.assertTrue(logicalNames.contains("ns_a"));
+    Assertions.assertTrue(logicalNames.contains("ns_a:ns_b"));
+    Assertions.assertTrue(logicalNames.contains(logicalLeaf));
+
+    SchemaEntity loaded =
+        (SchemaEntity)
+            backend.get(
+                NameIdentifier.of(metalakeName, catalogName, logicalLeaf),
+                Entity.EntityType.SCHEMA);
+    Assertions.assertEquals(logicalLeaf, loaded.name());
+    Assertions.assertEquals("nested", loaded.comment());
+  }
+
+  @TestTemplate
+  public void testBatchInsertHierarchicalSecondLeafReusesAncestorsViaBackend() 
throws IOException {
+    String metalakeName = "metalake_batch_insert_reuse";
+    String catalogName = "catalog_batch_insert_reuse";
+    createAndInsertMakeLake(metalakeName);
+    createAndInsertCatalog(metalakeName, catalogName);
+
+    String leaf1 = "ns_a:ns_b:leaf1";
+    String leaf2 = "ns_a:ns_b:leaf2";
+    String ancestorA = "ns_a";
+    String ancestorAB = "ns_a:ns_b";
+
+    SchemaEntity first =
+        SchemaEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName(leaf1)
+            .withNamespace(NamespaceUtil.ofSchema(metalakeName, catalogName))
+            .withComment("first")
+            .withProperties(Collections.emptyMap())
+            .withAuditInfo(AUDIT_INFO)
+            .build();
+    backend.insert(first, false);
+
+    long idA =
+        ((SchemaEntity)
+                backend.get(
+                    NameIdentifier.of(metalakeName, catalogName, ancestorA),
+                    Entity.EntityType.SCHEMA))
+            .id();
+    long idAB =
+        ((SchemaEntity)
+                backend.get(
+                    NameIdentifier.of(metalakeName, catalogName, ancestorAB),
+                    Entity.EntityType.SCHEMA))
+            .id();
+
+    SchemaEntity second =
+        SchemaEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName(leaf2)
+            .withNamespace(NamespaceUtil.ofSchema(metalakeName, catalogName))
+            .withComment("second")
+            .withProperties(Collections.emptyMap())
+            .withAuditInfo(AUDIT_INFO)
+            .build();
+    backend.insert(second, false);
+
+    Assertions.assertEquals(
+        idA,
+        ((SchemaEntity)
+                backend.get(
+                    NameIdentifier.of(metalakeName, catalogName, ancestorA),
+                    Entity.EntityType.SCHEMA))
+            .id());
+    Assertions.assertEquals(
+        idAB,
+        ((SchemaEntity)
+                backend.get(
+                    NameIdentifier.of(metalakeName, catalogName, ancestorAB),
+                    Entity.EntityType.SCHEMA))
+            .id());
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestHierarchicalConversionPOStorageOps.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestHierarchicalConversionPOStorageOps.java
new file mode 100644
index 0000000000..ec69b86369
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestHierarchicalConversionPOStorageOps.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.UnaryOperator;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.utils.HierarchicalSchemaUtil;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TestHierarchicalConversionPOStorageOps {
+
+  private static final String SEP = HierarchicalSchemaUtil.schemaSeparator();
+  private static final String PHYS = 
HierarchicalSchemaUtil.physicalSeparator();
+
+  private BasePOStorageOps<String, Object> delegate;
+  private Object mapper;
+
+  @BeforeEach
+  @SuppressWarnings("unchecked")
+  public void setUp() {
+    delegate = mock(BasePOStorageOps.class);
+    mapper = new Object();
+  }
+
+  // ---------- Input name conversion ----------
+
+  @Test
+  public void getPOByParentIdConvertsHierarchicalName() {
+    when(delegate.getPO(eq(mapper), eq(7L), eq("ns_a" + PHYS + 
"ns_b"))).thenReturn("found");
+    HierarchicalConversionPOStorageOps<String, Object> wrapper =
+        new HierarchicalConversionPOStorageOps<>(delegate);
+
+    String result = wrapper.getPO(mapper, 7L, "ns_a" + SEP + "ns_b");
+
+    assertEquals("found", result);
+    verify(delegate).getPO(mapper, 7L, "ns_a" + PHYS + "ns_b");
+  }
+
+  @Test
+  public void getPOByParentIdLeavesSimpleNameUnchanged() {
+    when(delegate.getPO(eq(mapper), eq(7L), eq("plain"))).thenReturn("po");
+    HierarchicalConversionPOStorageOps<String, Object> wrapper =
+        new HierarchicalConversionPOStorageOps<>(delegate);
+
+    String result = wrapper.getPO(mapper, 7L, "plain");
+
+    assertEquals("po", result);
+    verify(delegate).getPO(mapper, 7L, "plain");
+  }
+
+  @Test
+  public void getPOByParentIdReturnsNullWhenDelegateMisses() {
+    when(delegate.getPO(any(), any(Long.class), 
any(String.class))).thenReturn(null);
+    HierarchicalConversionPOStorageOps<String, Object> wrapper =
+        new HierarchicalConversionPOStorageOps<>(
+            delegate, s -> s + "-rewritten", UnaryOperator.identity());
+
+    assertNull(wrapper.getPO(mapper, 1L, "missing"));
+  }
+
+  @Test
+  public void listPOsByNamespaceAndNamesConvertsEachHierarchicalName() {
+    List<String> names = Arrays.asList("plain", "ns_a" + SEP + "ns_b", 
"other");
+    Namespace ns = Namespace.of("ml", "cat");
+    when(delegate.listPOs(eq(mapper), eq(ns), any(List.class)))
+        .thenReturn(Collections.singletonList("po"));
+
+    HierarchicalConversionPOStorageOps<String, Object> wrapper =
+        new HierarchicalConversionPOStorageOps<>(delegate);
+
+    wrapper.listPOs(mapper, ns, names);
+
+    ArgumentCaptor<List<String>> namesCaptor = 
ArgumentCaptor.forClass(List.class);
+    verify(delegate).listPOs(eq(mapper), eq(ns), namesCaptor.capture());
+    assertEquals(Arrays.asList("plain", "ns_a" + PHYS + "ns_b", "other"), 
namesCaptor.getValue());
+  }
+
+  // ---------- Identifier / namespace logical → physical translation 
----------
+
+  @Test
+  public void getPOByFullNameConvertsSchemaIdentifierName() {
+    NameIdentifier ident = NameIdentifier.of(Namespace.of("ml", "cat"), "ns_a" 
+ SEP + "ns_b");
+    when(delegate.getPOByFullName(any(), 
any(NameIdentifier.class))).thenReturn("po");
+
+    HierarchicalConversionPOStorageOps<String, Object> wrapper =
+        new HierarchicalConversionPOStorageOps<>(delegate);
+
+    wrapper.getPOByFullName(mapper, ident);
+
+    ArgumentCaptor<NameIdentifier> captor = 
ArgumentCaptor.forClass(NameIdentifier.class);
+    verify(delegate).getPOByFullName(eq(mapper), captor.capture());
+    NameIdentifier converted = captor.getValue();
+    assertEquals("ns_a" + PHYS + "ns_b", converted.name());
+    assertEquals(Namespace.of("ml", "cat"), converted.namespace());
+  }
+
+  @Test
+  public void getPOByFullNameConvertsSchemaSegmentForChildIdentifier() {
+    NameIdentifier ident =
+        NameIdentifier.of(Namespace.of("ml", "cat", "ns_a" + SEP + "ns_b"), 
"tbl");
+    when(delegate.getPOByFullName(any(), 
any(NameIdentifier.class))).thenReturn("po");
+
+    HierarchicalConversionPOStorageOps<String, Object> wrapper =
+        new HierarchicalConversionPOStorageOps<>(delegate);
+
+    wrapper.getPOByFullName(mapper, ident);
+
+    ArgumentCaptor<NameIdentifier> captor = 
ArgumentCaptor.forClass(NameIdentifier.class);
+    verify(delegate).getPOByFullName(eq(mapper), captor.capture());
+    NameIdentifier converted = captor.getValue();
+    assertEquals("tbl", converted.name());
+    assertEquals(Namespace.of("ml", "cat", "ns_a" + PHYS + "ns_b"), 
converted.namespace());
+  }
+
+  @Test
+  public void getPOByFullNamePassesThroughWhenNoSeparatorPresent() {
+    NameIdentifier ident = NameIdentifier.of(Namespace.of("ml", "cat", 
"schema"), "tbl");
+    when(delegate.getPOByFullName(any(), 
any(NameIdentifier.class))).thenReturn("po");
+
+    HierarchicalConversionPOStorageOps<String, Object> wrapper =
+        new HierarchicalConversionPOStorageOps<>(delegate);
+
+    wrapper.getPOByFullName(mapper, ident);
+
+    verify(delegate).getPOByFullName(mapper, ident);
+  }
+
+  @Test
+  public void listPOsByNSFullNameConvertsSchemaSegment() {
+    Namespace ns = Namespace.of("ml", "cat", "ns_a" + SEP + "ns_b");
+    when(delegate.listPOsByNSFullName(any(), any(Namespace.class)))
+        .thenReturn(Collections.singletonList("po"));
+
+    HierarchicalConversionPOStorageOps<String, Object> wrapper =
+        new HierarchicalConversionPOStorageOps<>(delegate);
+
+    wrapper.listPOsByNSFullName(mapper, ns);
+
+    ArgumentCaptor<Namespace> nsCaptor = 
ArgumentCaptor.forClass(Namespace.class);
+    verify(delegate).listPOsByNSFullName(eq(mapper), nsCaptor.capture());
+    assertEquals(Namespace.of("ml", "cat", "ns_a" + PHYS + "ns_b"), 
nsCaptor.getValue());
+  }
+
+  @Test
+  public void listPOsByNSFullNameLeavesShortNamespaceUnchanged() {
+    Namespace ns = Namespace.of("ml", "cat");
+    when(delegate.listPOsByNSFullName(any(), any(Namespace.class)))
+        .thenReturn(Collections.emptyList());
+
+    HierarchicalConversionPOStorageOps<String, Object> wrapper =
+        new HierarchicalConversionPOStorageOps<>(delegate);
+
+    wrapper.listPOsByNSFullName(mapper, ns);
+
+    verify(delegate).listPOsByNSFullName(mapper, ns);
+  }
+
+  // ---------- physicalToLogicalRewriter (read path) ----------
+
+  @Test
+  public void physicalToLogicalRewriterAppliedToGetPOByParentId() {
+    when(delegate.getPO(any(), any(Long.class), 
any(String.class))).thenReturn("raw");
+    HierarchicalConversionPOStorageOps<String, Object> wrapper =
+        new HierarchicalConversionPOStorageOps<>(
+            delegate, s -> s + "-rewritten", UnaryOperator.identity());
+
+    assertEquals("raw-rewritten", wrapper.getPO(mapper, 1L, "plain"));
+  }
+
+  @Test
+  public void physicalToLogicalRewriterAppliedToListPOsByParentId() {
+    when(delegate.listPOs(any(), 
any(Long.class))).thenReturn(Arrays.asList("a", "b"));
+    HierarchicalConversionPOStorageOps<String, Object> wrapper =
+        new HierarchicalConversionPOStorageOps<>(
+            delegate, s -> s.toUpperCase(), UnaryOperator.identity());
+
+    assertEquals(Arrays.asList("A", "B"), wrapper.listPOs(mapper, 1L));
+  }
+
+  @Test
+  public void physicalToLogicalRewriterAppliedToListPOsByIds() {
+    when(delegate.listPOs(any(), 
any(List.class))).thenReturn(Arrays.asList("x", "y"));
+    HierarchicalConversionPOStorageOps<String, Object> wrapper =
+        new HierarchicalConversionPOStorageOps<>(delegate, s -> s + "!", 
UnaryOperator.identity());
+
+    List<Long> ids = Arrays.asList(1L, 2L);
+    assertEquals(Arrays.asList("x!", "y!"), wrapper.listPOs(mapper, ids));
+  }
+
+  @Test
+  public void physicalToLogicalRewriterIsNotInvokedOnNullResult() {
+    when(delegate.getPO(any(), any(Long.class), 
any(String.class))).thenReturn(null);
+    HierarchicalConversionPOStorageOps<String, Object> wrapper =
+        new HierarchicalConversionPOStorageOps<>(
+            delegate,
+            s -> {
+              throw new AssertionError("rewriter must not be invoked on null");
+            },
+            UnaryOperator.identity());
+
+    assertNull(wrapper.getPO(mapper, 1L, "x"));
+  }
+
+  @Test
+  public void physicalToLogicalRewriterIsNotInvokedOnEmptyList() {
+    when(delegate.listPOs(any(), 
any(Long.class))).thenReturn(Collections.emptyList());
+    HierarchicalConversionPOStorageOps<String, Object> wrapper =
+        new HierarchicalConversionPOStorageOps<>(
+            delegate,
+            s -> {
+              throw new AssertionError("rewriter must not be invoked on empty 
list");
+            },
+            UnaryOperator.identity());
+
+    assertEquals(Collections.emptyList(), wrapper.listPOs(mapper, 1L));
+  }
+
+  // ---------- logicalToPhysicalRewriter (write path) ----------
+
+  @Test
+  public void logicalToPhysicalRewriterAppliedToInsertPO() {
+    HierarchicalConversionPOStorageOps<String, Object> wrapper =
+        new HierarchicalConversionPOStorageOps<>(
+            delegate, UnaryOperator.identity(), s -> s + "-written");
+
+    wrapper.insertPO(mapper, "logical", true);
+
+    verify(delegate).insertPO(mapper, "logical-written", true);
+  }
+
+  @Test
+  public void logicalToPhysicalRewriterAppliedToBatchInsertPOs() {
+    HierarchicalConversionPOStorageOps<String, Object> wrapper =
+        new HierarchicalConversionPOStorageOps<>(delegate, 
UnaryOperator.identity(), s -> s + "-W");
+
+    wrapper.batchInsertPOs(mapper, Arrays.asList("a", "b"), false);
+
+    ArgumentCaptor<List<String>> captor = ArgumentCaptor.forClass(List.class);
+    verify(delegate).batchInsertPOs(eq(mapper), captor.capture(), eq(false));
+    assertEquals(Arrays.asList("a-W", "b-W"), captor.getValue());
+  }
+
+  @Test
+  public void logicalToPhysicalRewriterAppliedToBothPOsInUpdate() {
+    when(delegate.updatePO(any(), eq("new-W"), eq("old-W"))).thenReturn(1);
+    HierarchicalConversionPOStorageOps<String, Object> wrapper =
+        new HierarchicalConversionPOStorageOps<>(delegate, 
UnaryOperator.identity(), s -> s + "-W");
+
+    assertEquals(1, wrapper.updatePO(mapper, "new", "old"));
+    verify(delegate).updatePO(mapper, "new-W", "old-W");
+  }
+
+  @Test
+  public void singleArgConstructorUsesIdentityRewritersForBothDirections() {
+    when(delegate.getPO(any(), any(Long.class), 
any(String.class))).thenReturn("po");
+    HierarchicalConversionPOStorageOps<String, Object> wrapper =
+        new HierarchicalConversionPOStorageOps<>(delegate);
+
+    assertEquals("po", wrapper.getPO(mapper, 1L, "plain"));
+    wrapper.insertPO(mapper, "raw", false);
+    verify(delegate).insertPO(mapper, "raw", false);
+
+    wrapper.batchInsertPOs(mapper, Arrays.asList("a", "b"), true);
+    ArgumentCaptor<List<String>> captor = ArgumentCaptor.forClass(List.class);
+    verify(delegate).batchInsertPOs(eq(mapper), captor.capture(), eq(true));
+    assertEquals(Arrays.asList("a", "b"), captor.getValue());
+  }
+
+  // ---------- Delegation ----------
+
+  @Test
+  public void supportsParentIdRelationalReadAndEntityTypeDelegated() {
+    when(delegate.supportsParentIdRelationalRead()).thenReturn(true);
+
+    HierarchicalConversionPOStorageOps<String, Object> wrapper =
+        new HierarchicalConversionPOStorageOps<>(delegate);
+
+    assertTrue(wrapper.supportsParentIdRelationalRead());
+  }
+
+  @Test
+  public void batchInsertEmptyListIsForwarded() {
+    HierarchicalConversionPOStorageOps<String, Object> wrapper =
+        new HierarchicalConversionPOStorageOps<>(
+            delegate,
+            UnaryOperator.identity(),
+            s -> {
+              throw new AssertionError("rewriter must not be invoked on empty 
list");
+            });
+
+    wrapper.batchInsertPOs(mapper, new ArrayList<>(), false);
+
+    verify(delegate).batchInsertPOs(eq(mapper), eq(Collections.emptyList()), 
eq(false));
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPOStorageReadRouting.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPOStorageReadRouting.java
new file mode 100644
index 0000000000..b0e09a0780
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPOStorageReadRouting.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mockStatic;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+public class TestPOStorageReadRouting {
+
+  private static final Object MAPPER = new Object();
+
+  @Test
+  public void getPOUsesFullNameWhenCacheDisabled() {
+    RecordingOps ops = new RecordingOps(true);
+    NameIdentifier id = NameIdentifier.of("ml", "cat", "schema");
+    String result = POStorageReadRouting.getPO(MAPPER, id, ops, 
Entity.EntityType.SCHEMA, false);
+    assertEquals("by-full", result);
+    assertFalse(ops.getPOByParentCalled);
+    assertTrue(ops.getPOByFullNameCalled);
+  }
+
+  @Test
+  public void getPOUsesFullNameWhenCacheEnabledButParentPathUnsupported() {
+    RecordingOps ops = new RecordingOps(false);
+    String result =
+        POStorageReadRouting.getPO(
+            MAPPER, NameIdentifier.of("ml", "cat", "schema"), ops, 
Entity.EntityType.SCHEMA, true);
+    assertEquals("by-full", result);
+    assertFalse(ops.getPOByParentCalled);
+    assertTrue(ops.getPOByFullNameCalled);
+  }
+
+  @Test
+  public void getPOUsesParentIdWhenCacheEnabledAndSupported() {
+    try (MockedStatic<EntityIdService> entityIds = 
mockStatic(EntityIdService.class)) {
+      entityIds
+          .when(() -> EntityIdService.getEntityId(any(), 
eq(Entity.EntityType.CATALOG)))
+          .thenReturn(42L);
+      RecordingOps ops = new RecordingOps(true);
+      NameIdentifier id = NameIdentifier.of("ml", "cat", "schema");
+      String result = POStorageReadRouting.getPO(MAPPER, id, ops, 
Entity.EntityType.SCHEMA, true);
+      assertEquals("by-parent", result);
+      assertTrue(ops.getPOByParentCalled);
+      assertFalse(ops.getPOByFullNameCalled);
+      assertEquals(Long.valueOf(42), ops.seenParentId);
+      assertEquals("schema", ops.seenShortName);
+    }
+  }
+
+  @Test
+  public void listPOsUsesFullNamespaceWhenCacheDisabled() {
+    RecordingOps ops = new RecordingOps(true);
+    Namespace ns = Namespace.of("ml", "cat", "sch");
+    List<String> result =
+        POStorageReadRouting.listPOs(MAPPER, ns, ops, Entity.EntityType.TABLE, 
false);
+    assertTrue(result.isEmpty());
+    assertFalse(ops.listByParentCalled);
+    assertTrue(ops.listByFullNsCalled);
+  }
+
+  @Test
+  public void listPOsUsesParentIdWhenCacheEnabledAndSupported() {
+    try (MockedStatic<EntityIdService> entityIds = 
mockStatic(EntityIdService.class)) {
+      entityIds
+          .when(() -> EntityIdService.getEntityId(any(), 
eq(Entity.EntityType.SCHEMA)))
+          .thenReturn(7L);
+      RecordingOps ops = new RecordingOps(true);
+      Namespace ns = Namespace.of("ml", "cat", "sch");
+      List<String> out =
+          POStorageReadRouting.listPOs(MAPPER, ns, ops, 
Entity.EntityType.TABLE, true);
+      assertEquals(Collections.singletonList("row"), out);
+      assertTrue(ops.listByParentCalled);
+      assertFalse(ops.listByFullNsCalled);
+      assertEquals(Long.valueOf(7), ops.seenListParentId);
+    }
+  }
+
+  private static final class RecordingOps extends BasePOStorageOps<String, 
Object> {
+    private final boolean supportsParent;
+    boolean getPOByParentCalled;
+    boolean getPOByFullNameCalled;
+    boolean listByParentCalled;
+    boolean listByFullNsCalled;
+    Long seenParentId;
+    String seenShortName;
+    Long seenListParentId;
+
+    RecordingOps(boolean supportsParent) {
+      this.supportsParent = supportsParent;
+    }
+
+    @Override
+    public boolean supportsParentIdRelationalRead() {
+      return supportsParent;
+    }
+
+    @Override
+    public String getPO(Object mapper, Long parentId, String name) {
+      getPOByParentCalled = true;
+      seenParentId = parentId;
+      seenShortName = name;
+      return "by-parent";
+    }
+
+    @Override
+    public String getPOByFullName(Object mapper, NameIdentifier identifier) {
+      getPOByFullNameCalled = true;
+      return "by-full";
+    }
+
+    @Override
+    public List<String> listPOs(Object mapper, Long parentId) {
+      listByParentCalled = true;
+      seenListParentId = parentId;
+      return Collections.singletonList("row");
+    }
+
+    @Override
+    public List<String> listPOsByNSFullName(Object mapper, Namespace 
namespace) {
+      listByFullNsCalled = true;
+      return Collections.emptyList();
+    }
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSchemaMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSchemaMetaService.java
index 2c19502caa..599c32886e 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSchemaMetaService.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSchemaMetaService.java
@@ -24,11 +24,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.IOException;
 import java.time.Instant;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityAlreadyExistsException;
@@ -38,7 +36,6 @@ import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.TopicEntity;
 import org.apache.gravitino.storage.RandomIdGenerator;
 import org.apache.gravitino.storage.relational.TestJDBCBackend;
-import org.apache.gravitino.utils.HierarchicalSchemaUtil;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.NamespaceUtil;
 import org.junit.jupiter.api.Assertions;
@@ -220,12 +217,10 @@ public class TestSchemaMetaService extends 
TestJDBCBackend {
 
     SchemaMetaService schemaMetaService = SchemaMetaService.getInstance();
     String logicalLeaf = "ns_a:ns_b:leaf";
-    String sep = HierarchicalSchemaUtil.schemaSeparator();
-    String physicalLeaf = 
HierarchicalSchemaUtil.logicalToPhysical(logicalLeaf, sep);
     SchemaEntity hierarchical =
         SchemaEntity.builder()
             .withId(RandomIdGenerator.INSTANCE.nextId())
-            .withName(physicalLeaf)
+            .withName(logicalLeaf)
             .withNamespace(NamespaceUtil.ofSchema(metalakeName, catalogName))
             .withComment("nested")
             .withProperties(Collections.emptyMap())
@@ -235,17 +230,7 @@ public class TestSchemaMetaService extends TestJDBCBackend 
{
 
     List<SchemaEntity> schemas =
         
schemaMetaService.listSchemasByNamespace(NamespaceUtil.ofSchema(metalakeName, 
catalogName));
-    Set<String> logicalNames =
-        schemas.stream()
-            .map(SchemaEntity::name)
-            .map(
-                n -> {
-                  if (n != null && 
n.contains(HierarchicalSchemaUtil.physicalSeparator())) {
-                    return HierarchicalSchemaUtil.physicalToLogical(n, sep);
-                  }
-                  return n;
-                })
-            .collect(Collectors.toSet());
+    Set<String> logicalNames = 
schemas.stream().map(SchemaEntity::name).collect(Collectors.toSet());
 
     Assertions.assertTrue(logicalNames.contains("ns_a"));
     Assertions.assertTrue(logicalNames.contains("ns_a:ns_b"));
@@ -253,8 +238,8 @@ public class TestSchemaMetaService extends TestJDBCBackend {
 
     SchemaEntity loaded =
         schemaMetaService.getSchemaByIdentifier(
-            NameIdentifier.of(metalakeName, catalogName, physicalLeaf));
-    Assertions.assertEquals(physicalLeaf, loaded.name());
+            NameIdentifier.of(metalakeName, catalogName, logicalLeaf));
+    Assertions.assertEquals(logicalLeaf, loaded.name());
     Assertions.assertEquals("nested", loaded.comment());
   }
 
@@ -264,18 +249,15 @@ public class TestSchemaMetaService extends 
TestJDBCBackend {
     createAndInsertCatalog(metalakeName, catalogName);
 
     SchemaMetaService schemaMetaService = SchemaMetaService.getInstance();
-    String sep = HierarchicalSchemaUtil.schemaSeparator();
-    String physSep = HierarchicalSchemaUtil.physicalSeparator();
-    String physicalLeaf1 = 
HierarchicalSchemaUtil.logicalToPhysical("ns_a:ns_b:leaf1", sep);
-    String physicalLeaf2 = 
HierarchicalSchemaUtil.logicalToPhysical("ns_a:ns_b:leaf2", sep);
-    String[] parts = physicalLeaf1.split(Pattern.quote(physSep), -1);
-    String ancestorA = parts[0];
-    String ancestorAB = String.join(physSep, Arrays.copyOfRange(parts, 0, 2));
+    String leaf1 = "ns_a:ns_b:leaf1";
+    String leaf2 = "ns_a:ns_b:leaf2";
+    String ancestorA = "ns_a";
+    String ancestorAB = "ns_a:ns_b";
 
     SchemaEntity first =
         SchemaEntity.builder()
             .withId(RandomIdGenerator.INSTANCE.nextId())
-            .withName(physicalLeaf1)
+            .withName(leaf1)
             .withNamespace(NamespaceUtil.ofSchema(metalakeName, catalogName))
             .withComment("first")
             .withProperties(Collections.emptyMap())
@@ -295,7 +277,7 @@ public class TestSchemaMetaService extends TestJDBCBackend {
     SchemaEntity second =
         SchemaEntity.builder()
             .withId(RandomIdGenerator.INSTANCE.nextId())
-            .withName(physicalLeaf2)
+            .withName(leaf2)
             .withNamespace(NamespaceUtil.ofSchema(metalakeName, catalogName))
             .withComment("second")
             .withProperties(Collections.emptyMap())

Reply via email to