This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch branch-1.1
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-1.1 by this push:
     new 448d019ebf [#9893] improvement: Supports `batchGet` metadata objects 
(#9896) (#9950)
448d019ebf is described below

commit 448d019ebf34aca92c7a31108e8654a6543be139
Author: roryqi <[email protected]>
AuthorDate: Wed Feb 11 13:57:36 2026 +0800

    [#9893] improvement: Supports `batchGet` metadata objects (#9896) (#9950)
    
    ### What changes were proposed in this pull request?
    
    Supports batch get metadata objects
    
    ### Why are the changes needed?
    
    Fix: #9893
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Adding new tests and Existed tests.
---
 .../gravitino/storage/relational/JDBCBackend.java  |  34 +-
 .../relational/mapper/CatalogMetaMapper.java       |   6 +
 .../mapper/CatalogMetaSQLProviderFactory.java      |   6 +
 .../relational/mapper/FilesetMetaMapper.java       |  56 ++
 .../mapper/FilesetMetaSQLProviderFactory.java      |  18 +
 .../storage/relational/mapper/JobMetaMapper.java   |  10 +
 .../mapper/JobMetaSQLProviderFactory.java          |  13 +
 .../relational/mapper/JobTemplateMetaMapper.java   |   7 +
 .../mapper/JobTemplateMetaSQLProviderFactory.java  |   7 +
 .../relational/mapper/MetalakeMetaMapper.java      |   3 +
 .../mapper/MetalakeMetaSQLProviderFactory.java     |   5 +
 .../storage/relational/mapper/ModelMetaMapper.java |   7 +
 .../mapper/ModelMetaSQLProviderFactory.java        |   9 +
 .../relational/mapper/PolicyMetaMapper.java        |  27 +-
 .../mapper/PolicyMetaSQLProviderFactory.java       |   5 +
 .../relational/mapper/SchemaMetaMapper.java        |   8 +
 .../mapper/SchemaMetaSQLProviderFactory.java       |   7 +
 .../storage/relational/mapper/TagMetaMapper.java   |   4 +
 .../mapper/TagMetaSQLProviderFactory.java          |   5 +
 .../storage/relational/mapper/TopicMetaMapper.java |   7 +
 .../mapper/TopicMetaSQLProviderFactory.java        |   9 +
 .../provider/base/CatalogMetaBaseSQLProvider.java  |  24 +
 .../provider/base/FilesetMetaBaseSQLProvider.java  | 101 +++
 .../provider/base/JobMetaBaseSQLProvider.java      |  57 ++
 .../base/JobTemplateMetaBaseSQLProvider.java       |  26 +
 .../provider/base/MetalakeMetaBaseSQLProvider.java |  17 +
 .../provider/base/ModelMetaBaseSQLProvider.java    |  38 +
 .../provider/base/PolicyMetaBaseSQLProvider.java   |  26 +
 .../provider/base/SchemaMetaBaseSQLProvider.java   |  32 +
 .../provider/base/TagMetaBaseSQLProvider.java      |  27 +
 .../provider/base/TopicMetaBaseSQLProvider.java    |  38 +
 .../relational/service/CatalogMetaService.java     |  19 +
 .../relational/service/FilesetMetaService.java     |  41 +
 .../storage/relational/service/JobMetaService.java |  30 +
 .../relational/service/JobTemplateMetaService.java |  20 +
 .../relational/service/MetalakeMetaService.java    |  17 +
 .../relational/service/ModelMetaService.java       |  22 +
 .../relational/service/PolicyMetaService.java      |  18 +
 .../relational/service/SchemaMetaService.java      |  21 +
 .../relational/service/TableMetaService.java       |   5 -
 .../storage/relational/service/TagMetaService.java |  17 +
 .../relational/service/TopicMetaService.java       |  23 +
 .../storage/relational/utils/POConverters.java     |  12 +
 .../apache/gravitino/utils/EntityClassMapper.java  |  84 ++
 .../relational/TestJDBCBackendBatchGet.java        | 850 +++++++++++++++++++++
 .../server/authorization/MetadataAuthzHelper.java  |  41 +-
 .../server/authorization/MetadataIdConverter.java  |  46 +-
 .../server/web/rest/TestCatalogOperations.java     |   5 +
 .../server/web/rest/TestFilesetOperations.java     |   5 +
 .../server/web/rest/TestJobOperations.java         |   9 +
 .../server/web/rest/TestMetalakeOperations.java    |   5 +
 .../server/web/rest/TestModelOperations.java       |  15 +
 .../server/web/rest/TestPolicyOperations.java      |  15 +
 .../server/web/rest/TestSchemaOperations.java      |   5 +
 .../server/web/rest/TestTableOperations.java       |   5 +
 .../server/web/rest/TestTagOperations.java         |  15 +
 .../server/web/rest/TestTopicOperations.java       |   5 +
 57 files changed, 1931 insertions(+), 58 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index db2a279c27..f312420eee 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -289,12 +289,44 @@ public class JDBCBackend implements RelationalBackend {
   @Override
   public <E extends Entity & HasIdentifier> List<E> batchGet(
       List<NameIdentifier> identifiers, Entity.EntityType entityType) {
+    if (identifiers == null || identifiers.isEmpty()) {
+      return Lists.newArrayList();
+    }
+
+    // Verify all identifiers have the same namespace
+    Namespace firstNamespace = identifiers.get(0).namespace();
+    Preconditions.checkArgument(
+        identifiers.stream().allMatch(id -> 
id.namespace().equals(firstNamespace)),
+        "All identifiers must have the same namespace for batch get 
operation");
+
     switch (entityType) {
+      case METALAKE:
+        return (List<E>)
+            
MetalakeMetaService.getInstance().batchGetMetalakeByIdentifier(identifiers);
+      case CATALOG:
+        return (List<E>) 
CatalogMetaService.getInstance().batchGetCatalogByIdentifier(identifiers);
+      case SCHEMA:
+        return (List<E>) 
SchemaMetaService.getInstance().batchGetSchemaByIdentifier(identifiers);
       case TABLE:
         return (List<E>) 
TableMetaService.getInstance().batchGetTableByIdentifier(identifiers);
+      case FILESET:
+        return (List<E>) 
FilesetMetaService.getInstance().batchGetFilesetByIdentifier(identifiers);
+      case TOPIC:
+        return (List<E>) 
TopicMetaService.getInstance().batchGetTopicByIdentifier(identifiers);
+      case MODEL:
+        return (List<E>) 
ModelMetaService.getInstance().batchGetModelByIdentifier(identifiers);
+      case TAG:
+        return (List<E>) 
TagMetaService.getInstance().batchGetTagByIdentifier(identifiers);
+      case POLICY:
+        return (List<E>) 
PolicyMetaService.getInstance().batchGetPolicyByIdentifier(identifiers);
+      case JOB:
+        return (List<E>) 
JobMetaService.getInstance().batchGetJobByIdentifier(identifiers);
+      case JOB_TEMPLATE:
+        return (List<E>)
+            
JobTemplateMetaService.getInstance().batchGetJobTemplateByIdentifier(identifiers);
       default:
         throw new UnsupportedEntityTypeException(
-            "Unsupported entity type: %s for get operation", entityType);
+            "Unsupported entity type: %s for batch get operation", entityType);
     }
   }
 
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaMapper.java
index 90a86e26cc..9f19d1a6a8 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaMapper.java
@@ -107,4 +107,10 @@ public interface CatalogMetaMapper {
       method = "selectCatalogIdByMetalakeNameAndCatalogName")
   CatalogIds selectCatalogIdByMetalakeNameAndCatalogName(
       @Param("metalakeName") String metalakeName, @Param("catalogName") String 
catalogName);
+
+  @SelectProvider(
+      type = CatalogMetaSQLProviderFactory.class,
+      method = "batchSelectCatalogByIdentifier")
+  List<CatalogPO> batchSelectCatalogByIdentifier(
+      @Param("metalakeName") String metalakeName, @Param("catalogNames") 
List<String> catalogNames);
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaSQLProviderFactory.java
index 9d81957617..c3a7954a25 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaSQLProviderFactory.java
@@ -121,4 +121,10 @@ public class CatalogMetaSQLProviderFactory {
       @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
     return getProvider().deleteCatalogMetasByLegacyTimeline(legacyTimeline, 
limit);
   }
+
+  public static String batchSelectCatalogByIdentifier(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogNames") List<String> catalogNames) {
+    return getProvider().batchSelectCatalogByIdentifier(metalakeName, 
catalogNames);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaMapper.java
index cd2b70be6a..ed83f1511d 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaMapper.java
@@ -198,4 +198,60 @@ public interface FilesetMetaMapper {
       method = "deleteFilesetMetasByLegacyTimeline")
   Integer deleteFilesetMetasByLegacyTimeline(
       @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+
+  @Results({
+    @Result(property = "filesetId", column = "fileset_id", id = true),
+    @Result(property = "filesetName", column = "fileset_name"),
+    @Result(property = "metalakeId", column = "metalake_id"),
+    @Result(property = "catalogId", column = "catalog_id"),
+    @Result(property = "schemaId", column = "schema_id"),
+    @Result(property = "type", column = "type"),
+    @Result(property = "auditInfo", column = "audit_info"),
+    @Result(property = "currentVersion", column = "current_version"),
+    @Result(property = "lastVersion", column = "last_version"),
+    @Result(property = "deletedAt", column = "deleted_at"),
+    @Result(
+        property = "filesetVersionPOs",
+        javaType = List.class,
+        column =
+            
"{id,version_metalake_id,version_catalog_id,version_schema_id,version_fileset_id,version,"
+                + 
"fileset_comment,properties,storage_location_name,storage_location,version_deleted_at}",
+        many = @Many(resultMap = "mapToFilesetVersionPO"))
+  })
+  @SelectProvider(
+      type = FilesetMetaSQLProviderFactory.class,
+      method = "batchSelectFilesetByIdentifier")
+  List<FilesetPO> batchSelectFilesetByIdentifier(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("filesetNames") List<String> filesetNames);
+
+  @Results({
+    @Result(property = "filesetId", column = "fileset_id", id = true),
+    @Result(property = "filesetName", column = "fileset_name"),
+    @Result(property = "metalakeId", column = "metalake_id"),
+    @Result(property = "catalogId", column = "catalog_id"),
+    @Result(property = "schemaId", column = "schema_id"),
+    @Result(property = "type", column = "type"),
+    @Result(property = "auditInfo", column = "audit_info"),
+    @Result(property = "currentVersion", column = "current_version"),
+    @Result(property = "lastVersion", column = "last_version"),
+    @Result(property = "deletedAt", column = "deleted_at"),
+    @Result(
+        property = "filesetVersionPOs",
+        javaType = List.class,
+        column =
+            
"{id,version_metalake_id,version_catalog_id,version_schema_id,version_fileset_id,version,"
+                + 
"fileset_comment,properties,storage_location_name,storage_location,version_deleted_at}",
+        many = @Many(resultMap = "mapToFilesetVersionPO"))
+  })
+  @SelectProvider(
+      type = FilesetMetaSQLProviderFactory.class,
+      method = "selectFilesetByFullQualifiedName")
+  FilesetPO selectFilesetByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("filesetName") String filesetName);
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaSQLProviderFactory.java
index 7729b095c0..311028ad01 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaSQLProviderFactory.java
@@ -109,4 +109,22 @@ public class FilesetMetaSQLProviderFactory {
       @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
     return getProvider().deleteFilesetMetasByLegacyTimeline(legacyTimeline, 
limit);
   }
+
+  public static String batchSelectFilesetByIdentifier(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("filesetNames") List<String> filesetNames) {
+    return getProvider()
+        .batchSelectFilesetByIdentifier(metalakeName, catalogName, schemaName, 
filesetNames);
+  }
+
+  public static String selectFilesetByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("filesetName") String filesetName) {
+    return getProvider()
+        .selectFilesetByFullQualifiedName(metalakeName, catalogName, 
schemaName, filesetName);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
index 3bc359756d..66ed42469e 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
@@ -70,4 +70,14 @@ public interface JobMetaMapper {
 
   @UpdateProvider(type = JobMetaSQLProviderFactory.class, method = 
"softDeleteJobMetaByRunId")
   Integer softDeleteJobMetaByRunId(@Param("jobRunId") Long jobRunId);
+
+  @SelectProvider(type = JobMetaSQLProviderFactory.class, method = 
"batchSelectJobByRunIds")
+  List<JobPO> batchSelectJobByRunIds(
+      @Param("metalakeName") String metalakeName, @Param("jobRunIds") 
List<Long> jobRunIds);
+
+  @SelectProvider(type = JobMetaSQLProviderFactory.class, method = 
"batchSelectJobByIdentifier")
+  List<JobPO> batchSelectJobByIdentifier(
+      @Param("metalakeName") String metalakeName,
+      @Param("jobTemplateName") String jobTemplateName,
+      @Param("jobNames") List<String> jobNames);
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
index fccb116a9c..35e0839572 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
@@ -19,6 +19,7 @@
 package org.apache.gravitino.storage.relational.mapper;
 
 import com.google.common.collect.ImmutableMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend;
 import 
org.apache.gravitino.storage.relational.mapper.provider.base.JobMetaBaseSQLProvider;
@@ -98,4 +99,16 @@ public class JobMetaSQLProviderFactory {
   public static String softDeleteJobMetaByRunId(@Param("jobRunId") Long 
jobRunId) {
     return getProvider().softDeleteJobMetaByRunId(jobRunId);
   }
+
+  public static String batchSelectJobByRunIds(
+      @Param("metalakeName") String metalakeName, @Param("jobRunIds") 
List<Long> jobRunIds) {
+    return getProvider().batchSelectJobByRunIds(metalakeName, jobRunIds);
+  }
+
+  public static String batchSelectJobByIdentifier(
+      @Param("metalakeName") String metalakeName,
+      @Param("jobTemplateName") String jobTemplateName,
+      @Param("jobNames") List<String> jobNames) {
+    return getProvider().batchSelectJobByIdentifier(metalakeName, 
jobTemplateName, jobNames);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobTemplateMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobTemplateMetaMapper.java
index 1ca511e755..c264294e32 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobTemplateMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobTemplateMetaMapper.java
@@ -79,4 +79,11 @@ public interface JobTemplateMetaMapper {
 
   @SelectProvider(type = JobTemplateMetaSQLProviderFactory.class, method = 
"selectJobTemplateById")
   JobTemplatePO selectJobTemplateById(Long jobTemplateId);
+
+  @SelectProvider(
+      type = JobTemplateMetaSQLProviderFactory.class,
+      method = "batchSelectJobTemplateByIdentifier")
+  List<JobTemplatePO> batchSelectJobTemplateByIdentifier(
+      @Param("metalakeName") String metalakeName,
+      @Param("jobTemplateNames") List<String> jobTemplateNames);
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobTemplateMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobTemplateMetaSQLProviderFactory.java
index a6b04e5947..0598a9a9a0 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobTemplateMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobTemplateMetaSQLProviderFactory.java
@@ -19,6 +19,7 @@
 package org.apache.gravitino.storage.relational.mapper;
 
 import com.google.common.collect.ImmutableMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend;
 import 
org.apache.gravitino.storage.relational.mapper.provider.base.JobTemplateMetaBaseSQLProvider;
@@ -102,4 +103,10 @@ public class JobTemplateMetaSQLProviderFactory {
   public static String selectJobTemplateById(@Param("jobTemplateId") Long 
jobTemplateId) {
     return getProvider().selectJobTemplateById(jobTemplateId);
   }
+
+  public static String batchSelectJobTemplateByIdentifier(
+      @Param("metalakeName") String metalakeName,
+      @Param("jobTemplateNames") List<String> jobTemplateNames) {
+    return getProvider().batchSelectJobTemplateByIdentifier(metalakeName, 
jobTemplateNames);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaMapper.java
index 90027ae151..f705c283ce 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaMapper.java
@@ -80,4 +80,7 @@ public interface MetalakeMetaMapper {
       method = "deleteMetalakeMetasByLegacyTimeline")
   Integer deleteMetalakeMetasByLegacyTimeline(
       @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+
+  @SelectProvider(type = MetalakeMetaSQLProviderFactory.class, method = 
"batchSelectMetalakeByName")
+  List<MetalakePO> batchSelectMetalakeByName(@Param("metalakeNames") 
List<String> metalakeNames);
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaSQLProviderFactory.java
index ee2c52a7dc..eba26f9e02 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaSQLProviderFactory.java
@@ -96,4 +96,9 @@ public class MetalakeMetaSQLProviderFactory {
       @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
     return getProvider().deleteMetalakeMetasByLegacyTimeline(legacyTimeline, 
limit);
   }
+
+  public static String batchSelectMetalakeByName(
+      @Param("metalakeNames") List<String> metalakeNames) {
+    return getProvider().batchSelectMetalakeByName(metalakeNames);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaMapper.java
index eba3a0a43c..419c1edec3 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaMapper.java
@@ -91,4 +91,11 @@ public interface ModelMetaMapper {
   @UpdateProvider(type = ModelMetaSQLProviderFactory.class, method = 
"updateModelMeta")
   Integer updateModelMeta(
       @Param("newModelMeta") ModelPO newModelPO, @Param("oldModelMeta") 
ModelPO oldModelPO);
+
+  @SelectProvider(type = ModelMetaSQLProviderFactory.class, method = 
"batchSelectModelByIdentifier")
+  List<ModelPO> batchSelectModelByIdentifier(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("modelNames") List<String> modelNames);
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaSQLProviderFactory.java
index e4f7fe8161..2d8fac4a28 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaSQLProviderFactory.java
@@ -111,4 +111,13 @@ public class ModelMetaSQLProviderFactory {
       @Param("newModelMeta") ModelPO newModelPO, @Param("oldModelMeta") 
ModelPO oldModelPO) {
     return getProvider().updateModelMeta(newModelPO, oldModelPO);
   }
+
+  public static String batchSelectModelByIdentifier(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("modelNames") List<String> modelNames) {
+    return getProvider()
+        .batchSelectModelByIdentifier(metalakeName, catalogName, schemaName, 
modelNames);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaMapper.java
index 7531f2a52f..e2e35cc6e0 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaMapper.java
@@ -36,9 +36,6 @@ public interface PolicyMetaMapper {
     @Result(property = "policyName", column = "policy_name"),
     @Result(property = "policyType", column = "policy_type"),
     @Result(property = "metalakeId", column = "metalake_id"),
-    @Result(property = "inheritable", column = "inheritable"),
-    @Result(property = "exclusive", column = "exclusive"),
-    @Result(property = "supportedObjectTypes", column = 
"supported_object_types"),
     @Result(property = "auditInfo", column = "audit_info"),
     @Result(property = "currentVersion", column = "current_version"),
     @Result(property = "lastVersion", column = "last_version"),
@@ -161,4 +158,28 @@ public interface PolicyMetaMapper {
   })
   @SelectProvider(type = PolicyMetaSQLProviderFactory.class, method = 
"selectPolicyByPolicyId")
   PolicyPO selectPolicyByPolicyId(@Param("policyId") Long policyId);
+
+  @Results({
+    @Result(property = "policyId", column = "policy_id"),
+    @Result(property = "policyName", column = "policy_name"),
+    @Result(property = "policyType", column = "policy_type"),
+    @Result(property = "metalakeId", column = "metalake_id"),
+    @Result(property = "auditInfo", column = "audit_info"),
+    @Result(property = "currentVersion", column = "current_version"),
+    @Result(property = "lastVersion", column = "last_version"),
+    @Result(property = "deletedAt", column = "deleted_at"),
+    @Result(property = "policyVersionPO.id", column = "id"),
+    @Result(property = "policyVersionPO.metalakeId", column = 
"version_metalake_id"),
+    @Result(property = "policyVersionPO.policyId", column = 
"version_policy_id"),
+    @Result(property = "policyVersionPO.version", column = "version"),
+    @Result(property = "policyVersionPO.policyComment", column = 
"policy_comment"),
+    @Result(property = "policyVersionPO.enabled", column = "enabled"),
+    @Result(property = "policyVersionPO.content", column = "content"),
+    @Result(property = "policyVersionPO.deletedAt", column = 
"version_deleted_at")
+  })
+  @SelectProvider(
+      type = PolicyMetaSQLProviderFactory.class,
+      method = "batchSelectPolicyByIdentifier")
+  List<PolicyPO> batchSelectPolicyByIdentifier(
+      @Param("metalakeName") String metalakeName, @Param("policyNames") 
List<String> policyNames);
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaSQLProviderFactory.java
index e715d6c5ee..afb257227a 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaSQLProviderFactory.java
@@ -100,6 +100,11 @@ public class PolicyMetaSQLProviderFactory {
     return getProvider().selectPolicyByPolicyId(policyId);
   }
 
+  public static String batchSelectPolicyByIdentifier(
+      @Param("metalakeName") String metalakeName, @Param("policyNames") 
List<String> policyNames) {
+    return getProvider().batchSelectPolicyByIdentifier(metalakeName, 
policyNames);
+  }
+
   static class PolicyMetaMySQLProvider extends PolicyMetaBaseSQLProvider {}
 
   static class PolicyMetaH2Provider extends PolicyMetaBaseSQLProvider {}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaMapper.java
index e1816a3277..4e995b33eb 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaMapper.java
@@ -100,4 +100,12 @@ public interface SchemaMetaMapper {
       @Param("metalakeName") String metalakeName,
       @Param("catalogName") String catalogName,
       @Param("schemaName") String schemaName);
+
+  @SelectProvider(
+      type = SchemaMetaSQLProviderFactory.class,
+      method = "batchSelectSchemaByIdentifier")
+  List<SchemaPO> batchSelectSchemaByIdentifier(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaNames") List<String> schemaNames);
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaSQLProviderFactory.java
index cbab45733c..f07ed6cc5f 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaSQLProviderFactory.java
@@ -112,4 +112,11 @@ public class SchemaMetaSQLProviderFactory {
         .selectSchemaIdByMetalakeNameAndCatalogNameAndSchemaName(
             metalakeName, catalogName, schemaName);
   }
+
+  public static String batchSelectSchemaByIdentifier(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaNames") List<String> schemaNames) {
+    return getProvider().batchSelectSchemaByIdentifier(metalakeName, 
catalogName, schemaNames);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaMapper.java
index 5e980cb643..7732c8b1a0 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaMapper.java
@@ -79,4 +79,8 @@ public interface TagMetaMapper {
 
   @SelectProvider(type = TagMetaSQLProviderFactory.class, method = 
"selectTagByTagId")
   TagPO selectTagByTagId(@Param("tagId") Long tagId);
+
+  @SelectProvider(type = TagMetaSQLProviderFactory.class, method = 
"batchSelectTagByIdentifier")
+  List<TagPO> batchSelectTagByIdentifier(
+      @Param("metalakeName") String metalakeName, @Param("tagNames") 
List<String> tagNames);
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaSQLProviderFactory.java
index 172434097e..7273279e6b 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaSQLProviderFactory.java
@@ -105,4 +105,9 @@ public class TagMetaSQLProviderFactory {
   public static String selectTagByTagId(@Param("tagId") Long tagId) {
     return getProvider().selectTagByTagId(tagId);
   }
+
+  public static String batchSelectTagByIdentifier(
+      @Param("metalakeName") String metalakeName, @Param("tagNames") 
List<String> tagNames) {
+    return getProvider().batchSelectTagByIdentifier(metalakeName, tagNames);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaMapper.java
index 014ef229ee..e880a8284a 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaMapper.java
@@ -87,4 +87,11 @@ public interface TopicMetaMapper {
       method = "deleteTopicMetasByLegacyTimeline")
   Integer deleteTopicMetasByLegacyTimeline(
       @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+
+  @SelectProvider(type = TopicMetaSQLProviderFactory.class, method = 
"batchSelectTopicByIdentifier")
+  List<TopicPO> batchSelectTopicByIdentifier(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("topicNames") List<String> topicNames);
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java
index dbbfc21604..38abe097fb 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java
@@ -107,4 +107,13 @@ public class TopicMetaSQLProviderFactory {
       @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
     return getProvider().deleteTopicMetasByLegacyTimeline(legacyTimeline, 
limit);
   }
+
+  public static String batchSelectTopicByIdentifier(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("topicNames") List<String> topicNames) {
+    return getProvider()
+        .batchSelectTopicByIdentifier(metalakeName, catalogName, schemaName, 
topicNames);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/CatalogMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/CatalogMetaBaseSQLProvider.java
index 4b8fcaa36b..be03900dc2 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/CatalogMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/CatalogMetaBaseSQLProvider.java
@@ -241,4 +241,28 @@ public class CatalogMetaBaseSQLProvider {
         + TABLE_NAME
         + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit}";
   }
+
+  public String batchSelectCatalogByIdentifier(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogNames") List<String> catalogNames) {
+    return "<script>"
+        + "SELECT cm.catalog_id as catalogId, cm.catalog_name as catalogName,"
+        + " cm.metalake_id as metalakeId, cm.type, cm.provider,"
+        + " cm.catalog_comment as catalogComment, cm.properties, cm.audit_info 
as auditInfo,"
+        + " cm.current_version as currentVersion, cm.last_version as 
lastVersion,"
+        + " cm.deleted_at as deletedAt"
+        + " FROM "
+        + TABLE_NAME
+        + " cm JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm ON cm.metalake_id = mm.metalake_id"
+        + " WHERE mm.metalake_name = #{metalakeName}"
+        + " AND cm.catalog_name IN ("
+        + "<foreach collection='catalogNames' item='catalogName' 
separator=','>"
+        + "#{catalogName}"
+        + "</foreach>"
+        + " )"
+        + " AND cm.deleted_at = 0 AND mm.deleted_at = 0"
+        + "</script>";
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FilesetMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FilesetMetaBaseSQLProvider.java
index ae99e34900..ce70848e5d 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FilesetMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FilesetMetaBaseSQLProvider.java
@@ -23,6 +23,9 @@ import static 
org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper.M
 import static 
org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper.VERSION_TABLE_NAME;
 
 import java.util.List;
+import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.po.FilesetPO;
 import org.apache.ibatis.annotations.Param;
 
@@ -220,4 +223,102 @@ public class FilesetMetaBaseSQLProvider {
         + META_TABLE_NAME
         + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit}";
   }
+
+  public String batchSelectFilesetByIdentifier(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("filesetNames") List<String> filesetNames) {
+    return "<script>"
+        + "SELECT fm.fileset_id, fm.fileset_name, fm.metalake_id, 
fm.catalog_id, fm.schema_id,"
+        + " fm.type, fm.audit_info, fm.current_version, fm.last_version, 
fm.deleted_at,"
+        + " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as 
version_catalog_id,"
+        + " vi.schema_id as version_schema_id, vi.fileset_id as 
version_fileset_id,"
+        + " vi.version, vi.fileset_comment, vi.properties, 
vi.storage_location_name, vi.storage_location,"
+        + " vi.deleted_at as version_deleted_at"
+        + " FROM "
+        + META_TABLE_NAME
+        + " fm"
+        + " INNER JOIN "
+        + VERSION_TABLE_NAME
+        + " vi ON fm.fileset_id = vi.fileset_id AND fm.current_version = 
vi.version"
+        + " JOIN "
+        + SchemaMetaMapper.TABLE_NAME
+        + " sm ON fm.schema_id = sm.schema_id"
+        + " JOIN "
+        + CatalogMetaMapper.TABLE_NAME
+        + " cm ON sm.catalog_id = cm.catalog_id"
+        + " JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm ON cm.metalake_id = mm.metalake_id"
+        + " WHERE mm.metalake_name = #{metalakeName}"
+        + " AND cm.catalog_name = #{catalogName}"
+        + " AND sm.schema_name = #{schemaName}"
+        + " AND fm.fileset_name IN ("
+        + "<foreach collection='filesetNames' item='filesetName' 
separator=','>"
+        + "#{filesetName}"
+        + "</foreach>"
+        + " )"
+        + " AND fm.deleted_at = 0 AND vi.deleted_at = 0 AND sm.deleted_at = 0"
+        + " AND cm.deleted_at = 0 AND mm.deleted_at = 0"
+        + "</script>";
+  }
+
+  public String selectFilesetByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("filesetName") String filesetName) {
+    return """
+        SELECT
+            mm.metalake_id,
+            cm.catalog_id,
+            sm.schema_id,
+            vi.fileset_id,
+            fm.fileset_name,
+            fm.type,
+            fm.audit_info,
+            fm.current_version,
+            fm.last_version,
+            fm.deleted_at,
+            vi.id,
+            vi.metalake_id as version_metalake_id,
+            vi.catalog_id as version_catalog_id,
+            vi.schema_id as version_schema_id,
+            vi.fileset_id as version_fileset_id,
+            vi.version,
+            vi.fileset_comment,
+            vi.properties,
+            vi.storage_location_name,
+            vi.storage_location,
+            vi.deleted_at as version_deleted_at
+        FROM
+            %s mm
+        INNER JOIN
+            %s cm ON mm.metalake_id = cm.metalake_id
+            AND cm.catalog_name = #{catalogName}
+            AND cm.deleted_at = 0
+        LEFT JOIN
+            %s sm ON cm.catalog_id = sm.catalog_id
+            AND sm.schema_name = #{schemaName}
+            AND sm.deleted_at = 0
+        LEFT JOIN
+            %s fm ON sm.schema_id = fm.schema_id
+            AND fm.fileset_name = #{filesetName}
+            AND fm.deleted_at = 0
+        LEFT JOIN
+            %s vi ON fm.fileset_id = vi.fileset_id
+            AND fm.current_version = vi.version
+            AND vi.deleted_at = 0
+        WHERE
+            mm.metalake_name = #{metalakeName}
+            AND mm.deleted_at = 0;
+            """
+        .formatted(
+            MetalakeMetaMapper.TABLE_NAME,
+            CatalogMetaMapper.TABLE_NAME,
+            SchemaMetaMapper.TABLE_NAME,
+            META_TABLE_NAME,
+            VERSION_TABLE_NAME);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
index 178b2615b3..3c4c88ae59 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
@@ -18,6 +18,7 @@
  */
 package org.apache.gravitino.storage.relational.mapper.provider.base;
 
+import java.util.List;
 import org.apache.gravitino.storage.relational.mapper.JobMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.JobTemplateMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
@@ -181,4 +182,60 @@ public class JobMetaBaseSQLProvider {
         + JobMetaMapper.TABLE_NAME
         + " WHERE deleted_at < #{legacyTimeline} AND deleted_at > 0 LIMIT 
#{limit}";
   }
+
+  public String batchSelectJobByIdentifier(
+      @Param("metalakeName") String metalakeName,
+      @Param("jobTemplateName") String jobTemplateName,
+      @Param("jobNames") List<String> jobNames) {
+    return "<script>"
+        + "SELECT jm.job_run_id, jm.metalake_id, jm.job_template_id,"
+        + " jm.job_execution_id, jm.job_run_status, jm.job_finished_at,"
+        + " jm.audit_info, jm.current_version, jm.last_version, jm.deleted_at"
+        + " FROM "
+        + JobMetaMapper.TABLE_NAME
+        + " jm"
+        + " JOIN "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " jtm ON jm.job_template_id = jtm.job_template_id"
+        + " JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm ON jm.metalake_id = mm.metalake_id"
+        + " WHERE mm.metalake_name = #{metalakeName}"
+        + " AND jtm.job_template_name = #{jobTemplateName}"
+        + " AND jm.job_run_id IN ("
+        + "<foreach collection='jobNames' item='jobName' separator=','>"
+        + "#{jobName}"
+        + "</foreach>"
+        + " )"
+        + " AND jm.deleted_at = 0 AND jtm.deleted_at = 0 AND mm.deleted_at = 0"
+        + "</script>";
+  }
+
+  public String batchSelectJobByRunIds(
+      @Param("metalakeName") String metalakeName, @Param("jobRunIds") 
List<Long> jobRunIds) {
+    return "<script>"
+        + "SELECT jrm.job_run_id AS jobRunId, jtm.job_template_name AS 
jobTemplateName,"
+        + " jrm.metalake_id AS metalakeId, jrm.job_execution_id AS 
jobExecutionId,"
+        + " jrm.job_run_status AS jobRunStatus, jrm.job_finished_at AS 
jobFinishedAt,"
+        + " jrm.audit_info AS auditInfo,"
+        + " jrm.current_version AS currentVersion, jrm.last_version AS 
lastVersion,"
+        + " jrm.deleted_at AS deletedAt"
+        + " FROM "
+        + JobMetaMapper.TABLE_NAME
+        + " jrm"
+        + " JOIN "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " jtm ON jrm.job_template_id = jtm.job_template_id"
+        + " JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm ON jrm.metalake_id = mm.metalake_id"
+        + " WHERE mm.metalake_name = #{metalakeName}"
+        + " AND jrm.job_run_id IN ("
+        + "<foreach collection='jobRunIds' item='jobRunId' separator=','>"
+        + "#{jobRunId}"
+        + "</foreach>"
+        + " )"
+        + " AND jrm.deleted_at = 0 AND jtm.deleted_at = 0 AND mm.deleted_at = 
0"
+        + "</script>";
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobTemplateMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobTemplateMetaBaseSQLProvider.java
index 3c2c2e82c9..782dd362a8 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobTemplateMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobTemplateMetaBaseSQLProvider.java
@@ -18,6 +18,7 @@
  */
 package org.apache.gravitino.storage.relational.mapper.provider.base;
 
+import java.util.List;
 import org.apache.gravitino.storage.relational.mapper.JobTemplateMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
 import org.apache.gravitino.storage.relational.po.JobTemplatePO;
@@ -161,4 +162,29 @@ public class JobTemplateMetaBaseSQLProvider {
         + " WHERE jtm.job_template_id = #{jobTemplateId}"
         + " AND jtm.deleted_at = 0";
   }
+
+  public String batchSelectJobTemplateByIdentifier(
+      @Param("metalakeName") String metalakeName,
+      @Param("jobTemplateNames") List<String> jobTemplateNames) {
+    return "<script>"
+        + "SELECT jtm.job_template_id AS jobTemplateId, jtm.job_template_name 
AS jobTemplateName,"
+        + " jtm.metalake_id AS metalakeId, jtm.job_template_comment AS 
jobTemplateComment,"
+        + " jtm.job_template_content AS jobTemplateContent, jtm.audit_info AS 
auditInfo,"
+        + " jtm.current_version AS currentVersion, jtm.last_version AS 
lastVersion,"
+        + " jtm.deleted_at AS deletedAt"
+        + " FROM "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " jtm"
+        + " JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm ON jtm.metalake_id = mm.metalake_id"
+        + " WHERE mm.metalake_name = #{metalakeName}"
+        + " AND jtm.job_template_name IN ("
+        + "<foreach collection='jobTemplateNames' item='jobTemplateName' 
separator=','>"
+        + "#{jobTemplateName}"
+        + "</foreach>"
+        + " )"
+        + " AND jtm.deleted_at = 0 AND mm.deleted_at = 0"
+        + "</script>";
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/MetalakeMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/MetalakeMetaBaseSQLProvider.java
index 3cba3a26cb..2524eda76f 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/MetalakeMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/MetalakeMetaBaseSQLProvider.java
@@ -168,4 +168,21 @@ public class MetalakeMetaBaseSQLProvider {
         + TABLE_NAME
         + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit}";
   }
+
+  public String batchSelectMetalakeByName(@Param("metalakeNames") List<String> 
metalakeNames) {
+    return "<script>"
+        + "SELECT metalake_id as metalakeId, metalake_name as metalakeName,"
+        + " metalake_comment as metalakeComment, properties, audit_info as 
auditInfo,"
+        + " schema_version as schemaVersion, current_version as 
currentVersion,"
+        + " last_version as lastVersion, deleted_at as deletedAt"
+        + " FROM "
+        + TABLE_NAME
+        + " WHERE metalake_name IN ("
+        + "<foreach collection='metalakeNames' item='metalakeName' 
separator=','>"
+        + "#{metalakeName}"
+        + "</foreach>"
+        + " )"
+        + " AND deleted_at = 0"
+        + "</script>";
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelMetaBaseSQLProvider.java
index ec8e20de21..dd470c77ba 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelMetaBaseSQLProvider.java
@@ -19,7 +19,10 @@
 package org.apache.gravitino.storage.relational.mapper.provider.base;
 
 import java.util.List;
+import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.po.ModelPO;
 import org.apache.ibatis.annotations.Param;
 
@@ -185,4 +188,39 @@ public class ModelMetaBaseSQLProvider {
         + " AND audit_info = #{oldModelMeta.auditInfo}"
         + " AND deleted_at = 0";
   }
+
+  public String batchSelectModelByIdentifier(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("modelNames") List<String> modelNames) {
+    return "<script>"
+        + "SELECT mm.model_id as modelId, mm.model_name as modelName,"
+        + " mm.metalake_id as metalakeId, mm.catalog_id as catalogId, 
mm.schema_id as schemaId,"
+        + " mm.model_comment as modelComment, mm.model_properties as 
modelProperties,"
+        + " mm.model_latest_version as modelLatestVersion, mm.audit_info as 
auditInfo,"
+        + " mm.deleted_at as deletedAt"
+        + " FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " mm"
+        + " JOIN "
+        + SchemaMetaMapper.TABLE_NAME
+        + " sm ON mm.schema_id = sm.schema_id"
+        + " JOIN "
+        + CatalogMetaMapper.TABLE_NAME
+        + " cm ON sm.catalog_id = cm.catalog_id"
+        + " JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mlm ON cm.metalake_id = mlm.metalake_id"
+        + " WHERE mlm.metalake_name = #{metalakeName}"
+        + " AND cm.catalog_name = #{catalogName}"
+        + " AND sm.schema_name = #{schemaName}"
+        + " AND mm.model_name IN ("
+        + "<foreach collection='modelNames' item='modelName' separator=','>"
+        + "#{modelName}"
+        + "</foreach>"
+        + " )"
+        + " AND mm.deleted_at = 0 AND sm.deleted_at = 0 AND cm.deleted_at = 0 
AND mlm.deleted_at = 0"
+        + "</script>";
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyMetaBaseSQLProvider.java
index 5d37abbdbd..fa25c66aec 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyMetaBaseSQLProvider.java
@@ -197,4 +197,30 @@ public class PolicyMetaBaseSQLProvider {
         + " AND pm.policy_name = #{policyName}"
         + " AND pm.deleted_at = 0 ";
   }
+
+  public String batchSelectPolicyByIdentifier(
+      @Param("metalakeName") String metalakeName, @Param("policyNames") 
List<String> policyNames) {
+    return "<script>"
+        + "SELECT pm.policy_id, pm.policy_name, pm.policy_type, 
pm.metalake_id,"
+        + " pm.audit_info, pm.current_version, pm.last_version, pm.deleted_at,"
+        + " pv.id, pv.metalake_id as version_metalake_id, pv.policy_id as 
version_policy_id,"
+        + " pv.version, pv.policy_comment, pv.enabled, pv.content, 
pv.deleted_at as version_deleted_at"
+        + " FROM "
+        + POLICY_META_TABLE_NAME
+        + " pm"
+        + " INNER JOIN "
+        + POLICY_VERSION_TABLE_NAME
+        + " pv ON pm.policy_id = pv.policy_id AND pm.current_version = 
pv.version"
+        + " JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm ON pm.metalake_id = mm.metalake_id"
+        + " WHERE mm.metalake_name = #{metalakeName}"
+        + " AND pm.policy_name IN ("
+        + "<foreach collection='policyNames' item='policyName' separator=','>"
+        + "#{policyName}"
+        + "</foreach>"
+        + " )"
+        + " AND pm.deleted_at = 0 AND pv.deleted_at = 0 AND mm.deleted_at = 0"
+        + "</script>";
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/SchemaMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/SchemaMetaBaseSQLProvider.java
index 437c94ce15..a95c41f0b6 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/SchemaMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/SchemaMetaBaseSQLProvider.java
@@ -21,6 +21,8 @@ package 
org.apache.gravitino.storage.relational.mapper.provider.base;
 import static 
org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper.TABLE_NAME;
 
 import java.util.List;
+import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
 import org.apache.gravitino.storage.relational.po.SchemaPO;
 import org.apache.ibatis.annotations.Param;
 
@@ -208,4 +210,34 @@ public class SchemaMetaBaseSQLProvider {
         + " AND catalog_meta.deleted_at = 0"
         + " AND metalake_meta.deleted_at = 0";
   }
+
+  public String batchSelectSchemaByIdentifier(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaNames") List<String> schemaNames) {
+    return "<script>"
+        + "SELECT sm.schema_id as schemaId, sm.schema_name as schemaName,"
+        + " sm.metalake_id as metalakeId, sm.catalog_id as catalogId,"
+        + " sm.schema_comment as schemaComment, sm.properties, sm.audit_info 
as auditInfo,"
+        + " sm.current_version as currentVersion, sm.last_version as 
lastVersion,"
+        + " sm.deleted_at as deletedAt"
+        + " FROM "
+        + TABLE_NAME
+        + " sm"
+        + " JOIN "
+        + CatalogMetaMapper.TABLE_NAME
+        + " cm ON sm.catalog_id = cm.catalog_id"
+        + " JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm ON cm.metalake_id = mm.metalake_id"
+        + " WHERE mm.metalake_name = #{metalakeName}"
+        + " AND cm.catalog_name = #{catalogName}"
+        + " AND sm.schema_name IN ("
+        + "<foreach collection='schemaNames' item='schemaName' separator=','>"
+        + "#{schemaName}"
+        + "</foreach>"
+        + " )"
+        + " AND sm.deleted_at = 0 AND cm.deleted_at = 0 AND mm.deleted_at = 0"
+        + "</script>";
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TagMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TagMetaBaseSQLProvider.java
index 8872c379f4..74dbfaeeb7 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TagMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TagMetaBaseSQLProvider.java
@@ -225,4 +225,31 @@ public class TagMetaBaseSQLProvider {
         + TAG_TABLE_NAME
         + " WHERE tag_id = #{tagId} and deleted_at = 0";
   }
+
+  public String batchSelectTagByIdentifier(
+      @Param("metalakeName") String metalakeName, @Param("tagNames") 
List<String> tagNames) {
+    return "<script>"
+        + "SELECT tm.tag_id as tagId, tm.tag_name as tagName,"
+        + " tm.metalake_id as metalakeId,"
+        + " tm.tag_comment as comment,"
+        + " tm.properties as properties,"
+        + " tm.audit_info as auditInfo,"
+        + " tm.current_version as currentVersion,"
+        + " tm.last_version as lastVersion,"
+        + " tm.deleted_at as deletedAt"
+        + " FROM "
+        + TAG_TABLE_NAME
+        + " tm"
+        + " JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm ON tm.metalake_id = mm.metalake_id"
+        + " WHERE mm.metalake_name = #{metalakeName}"
+        + " AND tm.tag_name IN ("
+        + "<foreach collection='tagNames' item='tagName' separator=','>"
+        + "#{tagName}"
+        + "</foreach>"
+        + " )"
+        + " AND tm.deleted_at = 0 AND mm.deleted_at = 0"
+        + "</script>";
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TopicMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TopicMetaBaseSQLProvider.java
index 2f609f7370..11095c6699 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TopicMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TopicMetaBaseSQLProvider.java
@@ -22,6 +22,9 @@ package 
org.apache.gravitino.storage.relational.mapper.provider.base;
 import static 
org.apache.gravitino.storage.relational.mapper.TopicMetaMapper.TABLE_NAME;
 
 import java.util.List;
+import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.po.TopicPO;
 import org.apache.ibatis.annotations.Param;
 
@@ -206,4 +209,39 @@ public class TopicMetaBaseSQLProvider {
         + TABLE_NAME
         + " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit}";
   }
+
+  public String batchSelectTopicByIdentifier(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("topicNames") List<String> topicNames) {
+    return "<script>"
+        + "SELECT tm.topic_id as topicId, tm.topic_name as topicName,"
+        + " tm.metalake_id as metalakeId, tm.catalog_id as catalogId, 
tm.schema_id as schemaId,"
+        + " tm.comment as comment, tm.properties as properties, tm.audit_info 
as auditInfo,"
+        + " tm.current_version as currentVersion, tm.last_version as 
lastVersion,"
+        + " tm.deleted_at as deletedAt"
+        + " FROM "
+        + TABLE_NAME
+        + " tm"
+        + " JOIN "
+        + SchemaMetaMapper.TABLE_NAME
+        + " sm ON tm.schema_id = sm.schema_id"
+        + " JOIN "
+        + CatalogMetaMapper.TABLE_NAME
+        + " cm ON sm.catalog_id = cm.catalog_id"
+        + " JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm ON cm.metalake_id = mm.metalake_id"
+        + " WHERE mm.metalake_name = #{metalakeName}"
+        + " AND cm.catalog_name = #{catalogName}"
+        + " AND sm.schema_name = #{schemaName}"
+        + " AND tm.topic_name IN ("
+        + "<foreach collection='topicNames' item='topicName' separator=','>"
+        + "#{topicName}"
+        + "</foreach>"
+        + " )"
+        + " AND tm.deleted_at = 0 AND sm.deleted_at = 0 AND cm.deleted_at = 0 
AND mm.deleted_at = 0"
+        + "</script>";
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
index 7b6edc46a5..64b7ed4522 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.MetadataObject;
@@ -365,4 +366,22 @@ public class CatalogMetaService {
           return mapper.deleteCatalogMetasByLegacyTimeline(legacyTimeline, 
limit);
         });
   }
+
+  @Monitored(
+      metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
+      baseMetricName = "batchGetCatalogByIdentifier")
+  public List<CatalogEntity> batchGetCatalogByIdentifier(List<NameIdentifier> 
identifiers) {
+    NameIdentifier firstIdent = identifiers.get(0);
+    String metalakeName = firstIdent.namespace().level(0);
+    List<String> catalogNames =
+        
identifiers.stream().map(NameIdentifier::name).collect(Collectors.toList());
+
+    return SessionUtils.doWithCommitAndFetchResult(
+        CatalogMetaMapper.class,
+        mapper -> {
+          List<CatalogPO> catalogPOs =
+              mapper.batchSelectCatalogByIdentifier(metalakeName, 
catalogNames);
+          return POConverters.fromCatalogPOs(catalogPOs, 
firstIdent.namespace());
+        });
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
index 6b49bcc447..75afeb422c 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.MetadataObject;
@@ -356,4 +357,44 @@ public class FilesetMetaService {
     builder.withCatalogId(namespacedEntityId.namespaceIds()[1]);
     builder.withSchemaId(namespacedEntityId.entityId());
   }
+
+  private FilesetPO getFilesetByFullQualifiedName(
+      String metalakeName, String catalogName, String schemaName, String 
filesetName) {
+    FilesetPO filesetPO =
+        SessionUtils.getWithoutCommit(
+            FilesetMetaMapper.class,
+            mapper ->
+                mapper.selectFilesetByFullQualifiedName(
+                    metalakeName, catalogName, schemaName, filesetName));
+    if (filesetPO == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.FILESET.name().toLowerCase(),
+          filesetName);
+    }
+
+    return filesetPO;
+  }
+
+  @Monitored(
+      metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
+      baseMetricName = "batchGetFilesetByIdentifier")
+  public List<FilesetEntity> batchGetFilesetByIdentifier(List<NameIdentifier> 
identifiers) {
+    NameIdentifier firstIdent = identifiers.get(0);
+    NameIdentifier schemaIdent = 
NameIdentifierUtil.getSchemaIdentifier(firstIdent);
+    List<String> filesetNames =
+        
identifiers.stream().map(NameIdentifier::name).collect(Collectors.toList());
+
+    return SessionUtils.doWithCommitAndFetchResult(
+        FilesetMetaMapper.class,
+        mapper -> {
+          List<FilesetPO> filesetPOs =
+              mapper.batchSelectFilesetByIdentifier(
+                  schemaIdent.namespace().level(0),
+                  schemaIdent.namespace().level(1),
+                  schemaIdent.name(),
+                  filesetNames);
+          return POConverters.fromFilesetPOs(filesetPOs, 
firstIdent.namespace());
+        });
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
index 3431976700..85b33548ec 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
@@ -161,4 +161,34 @@ public class JobMetaService {
       throw new NoSuchEntityException("Invalid job run ID format %s", 
jobRunId);
     }
   }
+
+  @Monitored(
+      metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
+      baseMetricName = "batchGetJobByIdentifier")
+  public List<JobEntity> batchGetJobByIdentifier(List<NameIdentifier> 
identifiers) {
+    NameIdentifier firstIdent = identifiers.get(0);
+    NamespaceUtil.checkJob(firstIdent.namespace());
+    String metalakeName = firstIdent.namespace().level(0);
+
+    // Parse job names (e.g., "job-123") to extract job run IDs
+    List<Long> jobRunIds =
+        identifiers.stream().map(ident -> 
parseJobRunId(ident.name())).collect(Collectors.toList());
+
+    // Get job template name from the first job identifier's namespace
+    // Job namespace is: [metalake, system_catalog, job_schema_name]
+    // We need to query without filtering by template since job names contain 
the ID
+
+    return SessionUtils.doWithCommitAndFetchResult(
+        JobMetaMapper.class,
+        mapper -> {
+          // For jobs, we query by metalake and job run IDs
+          // Note: Jobs don't have a jobTemplateName in the identifier 
namespace like we thought
+          // The namespace is: [metalake, Entity.SYSTEM_CATALOG_RESERVED_NAME,
+          // Entity.JOB_SCHEMA_NAME]
+          List<JobPO> jobPOs = mapper.batchSelectJobByRunIds(metalakeName, 
jobRunIds);
+          return jobPOs.stream()
+              .map(po -> JobPO.fromJobPO(po, firstIdent.namespace()))
+              .collect(Collectors.toList());
+        });
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/JobTemplateMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobTemplateMetaService.java
index cfaf51f2d6..15685af492 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/JobTemplateMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobTemplateMetaService.java
@@ -219,4 +219,24 @@ public class JobTemplateMetaService {
     }
     return jobTemplateId;
   }
+
+  @Monitored(
+      metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
+      baseMetricName = "batchGetJobTemplateByIdentifier")
+  public List<JobTemplateEntity> 
batchGetJobTemplateByIdentifier(List<NameIdentifier> identifiers) {
+    NameIdentifier firstIdent = identifiers.get(0);
+    String metalakeName = firstIdent.namespace().level(0);
+    List<String> jobTemplateNames =
+        
identifiers.stream().map(NameIdentifier::name).collect(Collectors.toList());
+
+    return SessionUtils.doWithCommitAndFetchResult(
+        JobTemplateMetaMapper.class,
+        mapper -> {
+          List<JobTemplatePO> jobTemplatePOs =
+              mapper.batchSelectJobTemplateByIdentifier(metalakeName, 
jobTemplateNames);
+          return jobTemplatePOs.stream()
+              .map(po -> JobTemplatePO.fromJobTemplatePO(po, 
firstIdent.namespace()))
+              .collect(Collectors.toList());
+        });
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
index 14838bbca1..c8627208ca 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.NameIdentifier;
@@ -382,4 +383,20 @@ public class MetalakeMetaService {
                     mapper -> 
mapper.deleteOwnerMetasByLegacyTimeline(legacyTimeline, limit)));
     return metalakeDeleteCount[0] + ownerRelDeleteCount[0];
   }
+
+  @Monitored(
+      metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
+      baseMetricName = "batchGetMetalakeByIdentifier")
+  public List<BaseMetalake> batchGetMetalakeByIdentifier(List<NameIdentifier> 
identifiers) {
+
+    List<String> metalakeNames =
+        
identifiers.stream().map(NameIdentifier::name).collect(Collectors.toList());
+
+    return SessionUtils.doWithCommitAndFetchResult(
+        MetalakeMetaMapper.class,
+        mapper -> {
+          List<MetalakePO> metalakePOs = 
mapper.batchSelectMetalakeByName(metalakeNames);
+          return POConverters.fromMetalakePOs(metalakePOs);
+        });
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
index e7f43b2031..e73ac4ab75 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
@@ -301,4 +301,26 @@ public class ModelMetaService {
       throw new IOException("Failed to update the entity: " + identifier);
     }
   }
+
+  @Monitored(
+      metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
+      baseMetricName = "batchGetModelByIdentifier")
+  public List<ModelEntity> batchGetModelByIdentifier(List<NameIdentifier> 
identifiers) {
+    NameIdentifier firstIdent = identifiers.get(0);
+    NameIdentifier schemaIdent = 
NameIdentifierUtil.getSchemaIdentifier(firstIdent);
+    List<String> modelNames =
+        
identifiers.stream().map(NameIdentifier::name).collect(Collectors.toList());
+
+    return SessionUtils.doWithCommitAndFetchResult(
+        ModelMetaMapper.class,
+        mapper -> {
+          List<ModelPO> modelPOs =
+              mapper.batchSelectModelByIdentifier(
+                  schemaIdent.namespace().level(0),
+                  schemaIdent.namespace().level(1),
+                  schemaIdent.name(),
+                  modelNames);
+          return POConverters.fromModelPOs(modelPOs, firstIdent.namespace());
+        });
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
index 7ab4de4872..0c623f96c0 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
@@ -482,4 +482,22 @@ public class PolicyMetaService {
     }
     return policyPO.getPolicyId();
   }
+
+  @Monitored(
+      metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
+      baseMetricName = "batchGetPolicyByIdentifier")
+  public List<PolicyEntity> batchGetPolicyByIdentifier(List<NameIdentifier> 
identifiers) {
+    NameIdentifier firstIdent = identifiers.get(0);
+    String metalakeName = firstIdent.namespace().level(0);
+    List<String> policyNames =
+        
identifiers.stream().map(NameIdentifier::name).collect(Collectors.toList());
+
+    return SessionUtils.doWithCommitAndFetchResult(
+        PolicyMetaMapper.class,
+        mapper -> {
+          List<PolicyPO> policyPOs =
+              mapper.batchSelectPolicyByIdentifier(metalakeName, policyNames);
+          return POConverters.fromPolicyPOs(policyPOs, firstIdent.namespace());
+        });
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
index c6eb4becf3..24d69c1263 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.MetadataObject;
@@ -396,4 +397,24 @@ public class SchemaMetaService {
     builder.withMetalakeId(namespacedEntityId.namespaceIds()[0]);
     builder.withCatalogId(namespacedEntityId.entityId());
   }
+
+  @Monitored(
+      metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
+      baseMetricName = "batchGetSchemaByIdentifier")
+  public List<SchemaEntity> batchGetSchemaByIdentifier(List<NameIdentifier> 
identifiers) {
+
+    NameIdentifier firstIdent = identifiers.get(0);
+    NameIdentifier catalogIdent = 
NameIdentifierUtil.getCatalogIdentifier(firstIdent);
+    List<String> schemaNames =
+        
identifiers.stream().map(NameIdentifier::name).collect(Collectors.toList());
+
+    return SessionUtils.doWithCommitAndFetchResult(
+        SchemaMetaMapper.class,
+        mapper -> {
+          List<SchemaPO> schemaPOs =
+              mapper.batchSelectSchemaByIdentifier(
+                  catalogIdent.namespace().level(0), catalogIdent.name(), 
schemaNames);
+          return POConverters.fromSchemaPOs(schemaPOs, firstIdent.namespace());
+        });
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
index 7db50af953..5c9133ff1b 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
@@ -23,13 +23,11 @@ import static 
org.apache.gravitino.metrics.source.MetricsSource.GRAVITINO_RELATI
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
-import org.apache.commons.collections4.CollectionUtils;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.MetadataObject;
@@ -326,9 +324,6 @@ public class TableMetaService {
       metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
       baseMetricName = "batchGetTableByIdentifier")
   public List<TableEntity> batchGetTableByIdentifier(List<NameIdentifier> 
identifiers) {
-    if (CollectionUtils.isEmpty(identifiers)) {
-      return Collections.emptyList();
-    }
     NameIdentifier firstIdent = identifiers.get(0);
     NameIdentifier schemaIdent = 
NameIdentifierUtil.getSchemaIdentifier(firstIdent);
     List<String> tableNames = new ArrayList<>(identifiers.size());
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TagMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TagMetaService.java
index 53687df864..99e575a24d 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TagMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TagMetaService.java
@@ -421,4 +421,21 @@ public class TagMetaService {
         TagMetaMapper.class,
         mapper -> mapper.listTagPOsByMetalakeAndTagNames(metalakeName, 
tagNames));
   }
+
+  @Monitored(
+      metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
+      baseMetricName = "batchGetTagByIdentifier")
+  public List<TagEntity> batchGetTagByIdentifier(List<NameIdentifier> 
identifiers) {
+    NameIdentifier firstIdent = identifiers.get(0);
+    String metalakeName = firstIdent.namespace().level(0);
+    List<String> tagNames =
+        
identifiers.stream().map(NameIdentifier::name).collect(Collectors.toList());
+
+    return SessionUtils.doWithCommitAndFetchResult(
+        TagMetaMapper.class,
+        mapper -> {
+          List<TagPO> tagPOs = mapper.batchSelectTagByIdentifier(metalakeName, 
tagNames);
+          return POConverters.fromTagPOs(tagPOs, firstIdent.namespace());
+        });
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
index 4282742c28..311559de8b 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.MetadataObject;
@@ -254,4 +255,26 @@ public class TopicMetaService {
     }
     return topicId;
   }
+
+  @Monitored(
+      metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
+      baseMetricName = "batchGetTopicByIdentifier")
+  public List<TopicEntity> batchGetTopicByIdentifier(List<NameIdentifier> 
identifiers) {
+    NameIdentifier firstIdent = identifiers.get(0);
+    NameIdentifier schemaIdent = 
NameIdentifierUtil.getSchemaIdentifier(firstIdent);
+    List<String> topicNames =
+        
identifiers.stream().map(NameIdentifier::name).collect(Collectors.toList());
+
+    return SessionUtils.doWithCommitAndFetchResult(
+        TopicMetaMapper.class,
+        mapper -> {
+          List<TopicPO> topicPOs =
+              mapper.batchSelectTopicByIdentifier(
+                  schemaIdent.namespace().level(0),
+                  schemaIdent.namespace().level(1),
+                  schemaIdent.name(),
+                  topicNames);
+          return POConverters.fromTopicPOs(topicPOs, firstIdent.namespace());
+        });
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
index 87cda80432..a94f9b3cba 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
@@ -1404,6 +1404,10 @@ public class POConverters {
     }
   }
 
+  public static List<TagEntity> fromTagPOs(List<TagPO> tagPOs, Namespace 
namespace) {
+    return tagPOs.stream().map(po -> fromTagPO(po, 
namespace)).collect(Collectors.toList());
+  }
+
   public static TagPO initializeTagPOWithVersion(TagEntity tagEntity, 
TagPO.Builder builder) {
     try {
       return builder
@@ -1487,6 +1491,10 @@ public class POConverters {
     }
   }
 
+  public static List<PolicyEntity> fromPolicyPOs(List<PolicyPO> policyPOs, 
Namespace namespace) {
+    return policyPOs.stream().map(po -> fromPolicyPO(po, 
namespace)).collect(Collectors.toList());
+  }
+
   public static PolicyPO initializePolicyPOWithVersion(
       PolicyEntity policyEntity, PolicyPO.Builder builder) {
     try {
@@ -1585,6 +1593,10 @@ public class POConverters {
     }
   }
 
+  public static List<ModelEntity> fromModelPOs(List<ModelPO> modelPOs, 
Namespace namespace) {
+    return modelPOs.stream().map(po -> fromModelPO(po, 
namespace)).collect(Collectors.toList());
+  }
+
   public static ModelPO initializeModelPO(ModelEntity modelEntity, 
ModelPO.Builder builder) {
     try {
       return builder
diff --git 
a/core/src/main/java/org/apache/gravitino/utils/EntityClassMapper.java 
b/core/src/main/java/org/apache/gravitino/utils/EntityClassMapper.java
new file mode 100644
index 0000000000..7a6a4c5c29
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/utils/EntityClassMapper.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.utils;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.CatalogEntity;
+import org.apache.gravitino.meta.ColumnEntity;
+import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.JobEntity;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.meta.ModelEntity;
+import org.apache.gravitino.meta.ModelVersionEntity;
+import org.apache.gravitino.meta.PolicyEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.meta.TagEntity;
+import org.apache.gravitino.meta.TopicEntity;
+import org.apache.gravitino.meta.UserEntity;
+
+/**
+ * Utility class that provides mapping between entity types and their 
corresponding entity classes.
+ */
+public class EntityClassMapper {
+
+  /** Maps entity type to entity class. */
+  private static final Map<Entity.EntityType, Class<?>> ENTITY_CLASS_MAPPING =
+      ImmutableMap.<Entity.EntityType, Class<?>>builder()
+          .put(Entity.EntityType.METALAKE, BaseMetalake.class)
+          .put(Entity.EntityType.CATALOG, CatalogEntity.class)
+          .put(Entity.EntityType.SCHEMA, SchemaEntity.class)
+          .put(Entity.EntityType.TABLE, TableEntity.class)
+          .put(Entity.EntityType.FILESET, FilesetEntity.class)
+          .put(Entity.EntityType.MODEL, ModelEntity.class)
+          .put(Entity.EntityType.TOPIC, TopicEntity.class)
+          .put(Entity.EntityType.TAG, TagEntity.class)
+          .put(Entity.EntityType.MODEL_VERSION, ModelVersionEntity.class)
+          .put(Entity.EntityType.COLUMN, ColumnEntity.class)
+          .put(Entity.EntityType.USER, UserEntity.class)
+          .put(Entity.EntityType.GROUP, GroupEntity.class)
+          .put(Entity.EntityType.ROLE, RoleEntity.class)
+          .put(Entity.EntityType.POLICY, PolicyEntity.class)
+          .put(Entity.EntityType.JOB_TEMPLATE, JobTemplateEntity.class)
+          .put(Entity.EntityType.JOB, JobEntity.class)
+          .build();
+
+  private EntityClassMapper() {}
+
+  /**
+   * Get the entity class for the given entity type.
+   *
+   * @param entityType the entity type
+   * @param <E> the entity class type that extends Entity and HasIdentifier
+   * @return the entity class, or null if not found
+   */
+  @SuppressWarnings("unchecked")
+  public static <E extends Entity & HasIdentifier> Class<E> getEntityClass(
+      Entity.EntityType entityType) {
+    Class<?> clazz = ENTITY_CLASS_MAPPING.get(entityType);
+    return (Class<E>) clazz;
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackendBatchGet.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackendBatchGet.java
new file mode 100644
index 0000000000..d80f433f7b
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackendBatchGet.java
@@ -0,0 +1,850 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.CatalogEntity;
+import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.JobEntity;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.meta.ModelEntity;
+import org.apache.gravitino.meta.PolicyEntity;
+import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.meta.TagEntity;
+import org.apache.gravitino.meta.TopicEntity;
+import org.apache.gravitino.storage.RandomIdGenerator;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+
+/** Tests for batch-get operations in JDBCBackend. */
+public class TestJDBCBackendBatchGet extends TestJDBCBackend {
+
+  @TestTemplate
+  public void testBatchGetMetalakes() throws IOException {
+    // Create metalakes
+    BaseMetalake metalake1 =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), "metalake1", 
AUDIT_INFO);
+    BaseMetalake metalake2 =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), "metalake2", 
AUDIT_INFO);
+    BaseMetalake metalake3 =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), "metalake3", 
AUDIT_INFO);
+
+    backend.insert(metalake1, false);
+    backend.insert(metalake2, false);
+    backend.insert(metalake3, false);
+
+    // Test batch get
+    List<NameIdentifier> identifiers =
+        Lists.newArrayList(
+            metalake1.nameIdentifier(), metalake2.nameIdentifier(), 
metalake3.nameIdentifier());
+
+    List<BaseMetalake> result = backend.batchGet(identifiers, 
Entity.EntityType.METALAKE);
+
+    Assertions.assertEquals(3, result.size());
+    Map<String, BaseMetalake> resultMap =
+        result.stream().collect(Collectors.toMap(BaseMetalake::name, m -> m));
+
+    // Verify metalake1
+    BaseMetalake retrieved1 = resultMap.get("metalake1");
+    Assertions.assertNotNull(retrieved1);
+    Assertions.assertEquals(metalake1.id(), retrieved1.id());
+    Assertions.assertEquals(metalake1.name(), retrieved1.name());
+    Assertions.assertEquals(metalake1.auditInfo().creator(), 
retrieved1.auditInfo().creator());
+
+    // Verify metalake2
+    BaseMetalake retrieved2 = resultMap.get("metalake2");
+    Assertions.assertNotNull(retrieved2);
+    Assertions.assertEquals(metalake2.id(), retrieved2.id());
+    Assertions.assertEquals(metalake2.name(), retrieved2.name());
+    Assertions.assertEquals(metalake2.auditInfo().creator(), 
retrieved2.auditInfo().creator());
+
+    // Verify metalake3
+    BaseMetalake retrieved3 = resultMap.get("metalake3");
+    Assertions.assertNotNull(retrieved3);
+    Assertions.assertEquals(metalake3.id(), retrieved3.id());
+    Assertions.assertEquals(metalake3.name(), retrieved3.name());
+    Assertions.assertEquals(metalake3.auditInfo().creator(), 
retrieved3.auditInfo().creator());
+  }
+
+  @TestTemplate
+  public void testBatchGetCatalogs() throws IOException {
+    // Setup
+    String metalakeName = "metalake_for_catalog_batch";
+    createAndInsertMakeLake(metalakeName);
+
+    // Create catalogs
+    CatalogEntity catalog1 =
+        createCatalog(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofCatalog(metalakeName),
+            "catalog1",
+            AUDIT_INFO);
+    CatalogEntity catalog2 =
+        createCatalog(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofCatalog(metalakeName),
+            "catalog2",
+            AUDIT_INFO);
+    CatalogEntity catalog3 =
+        createCatalog(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofCatalog(metalakeName),
+            "catalog3",
+            AUDIT_INFO);
+
+    backend.insert(catalog1, false);
+    backend.insert(catalog2, false);
+    backend.insert(catalog3, false);
+
+    // Test batch get
+    List<NameIdentifier> identifiers =
+        Lists.newArrayList(
+            catalog1.nameIdentifier(), catalog2.nameIdentifier(), 
catalog3.nameIdentifier());
+
+    List<CatalogEntity> result = backend.batchGet(identifiers, 
Entity.EntityType.CATALOG);
+
+    Assertions.assertEquals(3, result.size());
+    Map<String, CatalogEntity> resultMap =
+        result.stream().collect(Collectors.toMap(CatalogEntity::name, c -> c));
+
+    // Verify catalog1
+    CatalogEntity retrieved1 = resultMap.get("catalog1");
+    Assertions.assertNotNull(retrieved1);
+    Assertions.assertEquals(catalog1.id(), retrieved1.id());
+    Assertions.assertEquals(catalog1.name(), retrieved1.name());
+    Assertions.assertEquals(catalog1.namespace(), retrieved1.namespace());
+    Assertions.assertEquals(catalog1.getType(), retrieved1.getType());
+    Assertions.assertEquals(catalog1.getProvider(), retrieved1.getProvider());
+
+    // Verify catalog2
+    CatalogEntity retrieved2 = resultMap.get("catalog2");
+    Assertions.assertNotNull(retrieved2);
+    Assertions.assertEquals(catalog2.id(), retrieved2.id());
+    Assertions.assertEquals(catalog2.name(), retrieved2.name());
+    Assertions.assertEquals(catalog2.namespace(), retrieved2.namespace());
+
+    // Verify catalog3
+    CatalogEntity retrieved3 = resultMap.get("catalog3");
+    Assertions.assertNotNull(retrieved3);
+    Assertions.assertEquals(catalog3.id(), retrieved3.id());
+    Assertions.assertEquals(catalog3.name(), retrieved3.name());
+    Assertions.assertEquals(catalog3.namespace(), retrieved3.namespace());
+  }
+
+  @TestTemplate
+  public void testBatchGetSchemas() throws IOException {
+    // Setup
+    String metalakeName = "metalake_for_schema_batch";
+    String catalogName = "catalog_for_schema_batch";
+    createAndInsertMakeLake(metalakeName);
+    createAndInsertCatalog(metalakeName, catalogName);
+
+    // Create schemas
+    SchemaEntity schema1 =
+        createSchemaEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofSchema(metalakeName, catalogName),
+            "schema1",
+            AUDIT_INFO);
+    SchemaEntity schema2 =
+        createSchemaEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofSchema(metalakeName, catalogName),
+            "schema2",
+            AUDIT_INFO);
+
+    backend.insert(schema1, false);
+    backend.insert(schema2, false);
+
+    // Test batch get
+    List<NameIdentifier> identifiers =
+        Lists.newArrayList(schema1.nameIdentifier(), schema2.nameIdentifier());
+
+    List<SchemaEntity> result = backend.batchGet(identifiers, 
Entity.EntityType.SCHEMA);
+
+    Assertions.assertEquals(2, result.size());
+    Map<String, SchemaEntity> resultMap =
+        result.stream().collect(Collectors.toMap(SchemaEntity::name, s -> s));
+
+    // Verify schema1
+    SchemaEntity retrieved1 = resultMap.get("schema1");
+    Assertions.assertNotNull(retrieved1);
+    Assertions.assertEquals(schema1.id(), retrieved1.id());
+    Assertions.assertEquals(schema1.name(), retrieved1.name());
+    Assertions.assertEquals(schema1.namespace(), retrieved1.namespace());
+    Assertions.assertEquals(schema1.auditInfo().creator(), 
retrieved1.auditInfo().creator());
+
+    // Verify schema2
+    SchemaEntity retrieved2 = resultMap.get("schema2");
+    Assertions.assertNotNull(retrieved2);
+    Assertions.assertEquals(schema2.id(), retrieved2.id());
+    Assertions.assertEquals(schema2.name(), retrieved2.name());
+    Assertions.assertEquals(schema2.namespace(), retrieved2.namespace());
+  }
+
+  @TestTemplate
+  public void testBatchGetTables() throws IOException {
+    // Setup
+    String metalakeName = "metalake_for_table_batch";
+    String catalogName = "catalog_for_table_batch";
+    String schemaName = "schema_for_table_batch";
+    createParentEntities(metalakeName, catalogName, schemaName, AUDIT_INFO);
+
+    // Create tables
+    TableEntity table1 =
+        createTableEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofTable(metalakeName, catalogName, schemaName),
+            "table1",
+            AUDIT_INFO);
+    TableEntity table2 =
+        createTableEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofTable(metalakeName, catalogName, schemaName),
+            "table2",
+            AUDIT_INFO);
+    TableEntity table3 =
+        createTableEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofTable(metalakeName, catalogName, schemaName),
+            "table3",
+            AUDIT_INFO);
+
+    backend.insert(table1, false);
+    backend.insert(table2, false);
+    backend.insert(table3, false);
+
+    // Test batch get
+    List<NameIdentifier> identifiers =
+        Lists.newArrayList(
+            table1.nameIdentifier(), table2.nameIdentifier(), 
table3.nameIdentifier());
+
+    List<TableEntity> result = backend.batchGet(identifiers, 
Entity.EntityType.TABLE);
+
+    Assertions.assertEquals(3, result.size());
+    Map<String, TableEntity> resultMap =
+        result.stream().collect(Collectors.toMap(TableEntity::name, t -> t));
+
+    // Verify table1
+    TableEntity retrieved1 = resultMap.get("table1");
+    Assertions.assertNotNull(retrieved1);
+    Assertions.assertEquals(table1.id(), retrieved1.id());
+    Assertions.assertEquals(table1.name(), retrieved1.name());
+    Assertions.assertEquals(table1.namespace(), retrieved1.namespace());
+    Assertions.assertEquals(table1.auditInfo().creator(), 
retrieved1.auditInfo().creator());
+
+    // Verify table2
+    TableEntity retrieved2 = resultMap.get("table2");
+    Assertions.assertNotNull(retrieved2);
+    Assertions.assertEquals(table2.id(), retrieved2.id());
+    Assertions.assertEquals(table2.name(), retrieved2.name());
+    Assertions.assertEquals(table2.namespace(), retrieved2.namespace());
+
+    // Verify table3
+    TableEntity retrieved3 = resultMap.get("table3");
+    Assertions.assertNotNull(retrieved3);
+    Assertions.assertEquals(table3.id(), retrieved3.id());
+    Assertions.assertEquals(table3.name(), retrieved3.name());
+    Assertions.assertEquals(table3.namespace(), retrieved3.namespace());
+  }
+
+  @TestTemplate
+  public void testBatchGetFilesets() throws IOException {
+    // Setup
+    String metalakeName = "metalake_for_fileset_batch";
+    String catalogName = "catalog_for_fileset_batch";
+    String schemaName = "schema_for_fileset_batch";
+    createParentEntities(metalakeName, catalogName, schemaName, AUDIT_INFO);
+
+    // Create filesets
+    FilesetEntity fileset1 =
+        createFilesetEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofFileset(metalakeName, catalogName, schemaName),
+            "fileset1",
+            AUDIT_INFO);
+    FilesetEntity fileset2 =
+        createFilesetEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofFileset(metalakeName, catalogName, schemaName),
+            "fileset2",
+            AUDIT_INFO);
+
+    backend.insert(fileset1, false);
+    backend.insert(fileset2, false);
+
+    // Test batch get
+    List<NameIdentifier> identifiers =
+        Lists.newArrayList(fileset1.nameIdentifier(), 
fileset2.nameIdentifier());
+
+    List<FilesetEntity> result = backend.batchGet(identifiers, 
Entity.EntityType.FILESET);
+
+    Assertions.assertEquals(2, result.size());
+    Map<String, FilesetEntity> resultMap =
+        result.stream().collect(Collectors.toMap(FilesetEntity::name, f -> f));
+
+    // Verify fileset1
+    FilesetEntity retrieved1 = resultMap.get("fileset1");
+    Assertions.assertNotNull(retrieved1);
+    Assertions.assertEquals(fileset1.id(), retrieved1.id());
+    Assertions.assertEquals(fileset1.name(), retrieved1.name());
+    Assertions.assertEquals(fileset1.namespace(), retrieved1.namespace());
+    Assertions.assertEquals(fileset1.filesetType(), retrieved1.filesetType());
+    Assertions.assertEquals(fileset1.storageLocation(), 
retrieved1.storageLocation());
+
+    // Verify fileset2
+    FilesetEntity retrieved2 = resultMap.get("fileset2");
+    Assertions.assertNotNull(retrieved2);
+    Assertions.assertEquals(fileset2.id(), retrieved2.id());
+    Assertions.assertEquals(fileset2.name(), retrieved2.name());
+    Assertions.assertEquals(fileset2.namespace(), retrieved2.namespace());
+    Assertions.assertEquals(fileset2.filesetType(), retrieved2.filesetType());
+  }
+
+  @TestTemplate
+  public void testBatchGetTopics() throws IOException {
+    // Setup
+    String metalakeName = "metalake_for_topic_batch";
+    String catalogName = "catalog_for_topic_batch";
+    String schemaName = "schema_for_topic_batch";
+    createParentEntities(metalakeName, catalogName, schemaName, AUDIT_INFO);
+
+    // Create topics
+    TopicEntity topic1 =
+        createTopicEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofTopic(metalakeName, catalogName, schemaName),
+            "topic1",
+            AUDIT_INFO);
+    TopicEntity topic2 =
+        createTopicEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofTopic(metalakeName, catalogName, schemaName),
+            "topic2",
+            AUDIT_INFO);
+
+    backend.insert(topic1, false);
+    backend.insert(topic2, false);
+
+    // Test batch get
+    List<NameIdentifier> identifiers =
+        Lists.newArrayList(topic1.nameIdentifier(), topic2.nameIdentifier());
+
+    List<TopicEntity> result = backend.batchGet(identifiers, 
Entity.EntityType.TOPIC);
+
+    Assertions.assertEquals(2, result.size());
+    Map<String, TopicEntity> resultMap =
+        result.stream().collect(Collectors.toMap(TopicEntity::name, t -> t));
+
+    // Verify topic1
+    TopicEntity retrieved1 = resultMap.get("topic1");
+    Assertions.assertNotNull(retrieved1);
+    Assertions.assertEquals(topic1.id(), retrieved1.id());
+    Assertions.assertEquals(topic1.name(), retrieved1.name());
+    Assertions.assertEquals(topic1.namespace(), retrieved1.namespace());
+    Assertions.assertEquals(topic1.comment(), retrieved1.comment());
+    Assertions.assertEquals(topic1.properties(), retrieved1.properties());
+
+    // Verify topic2
+    TopicEntity retrieved2 = resultMap.get("topic2");
+    Assertions.assertNotNull(retrieved2);
+    Assertions.assertEquals(topic2.id(), retrieved2.id());
+    Assertions.assertEquals(topic2.name(), retrieved2.name());
+    Assertions.assertEquals(topic2.namespace(), retrieved2.namespace());
+    Assertions.assertEquals(topic2.comment(), retrieved2.comment());
+  }
+
+  @TestTemplate
+  public void testBatchGetModels() throws IOException {
+    // Setup
+    String metalakeName = "metalake_for_model_batch";
+    String catalogName = "catalog_for_model_batch";
+    String schemaName = "schema_for_model_batch";
+    createParentEntities(metalakeName, catalogName, schemaName, AUDIT_INFO);
+
+    // Create models
+    ModelEntity model1 =
+        createModelEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofModel(metalakeName, catalogName, schemaName),
+            "model1",
+            "comment1",
+            0,
+            ImmutableMap.of("key1", "value1"),
+            AUDIT_INFO);
+    ModelEntity model2 =
+        createModelEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofModel(metalakeName, catalogName, schemaName),
+            "model2",
+            "comment2",
+            0,
+            ImmutableMap.of("key2", "value2"),
+            AUDIT_INFO);
+
+    backend.insert(model1, false);
+    backend.insert(model2, false);
+
+    // Test batch get
+    List<NameIdentifier> identifiers =
+        Lists.newArrayList(model1.nameIdentifier(), model2.nameIdentifier());
+
+    List<ModelEntity> result = backend.batchGet(identifiers, 
Entity.EntityType.MODEL);
+
+    Assertions.assertEquals(2, result.size());
+    Map<String, ModelEntity> resultMap =
+        result.stream().collect(Collectors.toMap(ModelEntity::name, m -> m));
+
+    // Verify model1
+    ModelEntity retrieved1 = resultMap.get("model1");
+    Assertions.assertNotNull(retrieved1);
+    Assertions.assertEquals(model1.id(), retrieved1.id());
+    Assertions.assertEquals(model1.name(), retrieved1.name());
+    Assertions.assertEquals(model1.namespace(), retrieved1.namespace());
+    Assertions.assertEquals(model1.comment(), retrieved1.comment());
+    Assertions.assertEquals(model1.latestVersion(), 
retrieved1.latestVersion());
+    Assertions.assertEquals(model1.properties(), retrieved1.properties());
+
+    // Verify model2
+    ModelEntity retrieved2 = resultMap.get("model2");
+    Assertions.assertNotNull(retrieved2);
+    Assertions.assertEquals(model2.id(), retrieved2.id());
+    Assertions.assertEquals(model2.name(), retrieved2.name());
+    Assertions.assertEquals(model2.namespace(), retrieved2.namespace());
+    Assertions.assertEquals(model2.comment(), retrieved2.comment());
+    Assertions.assertEquals(model2.latestVersion(), 
retrieved2.latestVersion());
+  }
+
+  @TestTemplate
+  public void testBatchGetTags() throws IOException {
+    // Setup
+    String metalakeName = "metalake_for_tag_batch";
+    createAndInsertMakeLake(metalakeName);
+
+    // Create tags
+    TagEntity tag1 =
+        TagEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("tag1")
+            .withNamespace(Namespace.of(metalakeName))
+            .withComment("comment1")
+            .withProperties(new HashMap<>())
+            .withAuditInfo(AUDIT_INFO)
+            .build();
+    TagEntity tag2 =
+        TagEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("tag2")
+            .withNamespace(Namespace.of(metalakeName))
+            .withComment("comment2")
+            .withProperties(new HashMap<>())
+            .withAuditInfo(AUDIT_INFO)
+            .build();
+
+    backend.insert(tag1, false);
+    backend.insert(tag2, false);
+
+    // Test batch get
+    List<NameIdentifier> identifiers =
+        Lists.newArrayList(tag1.nameIdentifier(), tag2.nameIdentifier());
+
+    List<TagEntity> result = backend.batchGet(identifiers, 
Entity.EntityType.TAG);
+
+    Assertions.assertEquals(2, result.size());
+    Map<String, TagEntity> resultMap =
+        result.stream().collect(Collectors.toMap(TagEntity::name, t -> t));
+
+    // Verify tag1
+    TagEntity retrieved1 = resultMap.get("tag1");
+    Assertions.assertNotNull(retrieved1);
+    Assertions.assertEquals(tag1.id(), retrieved1.id());
+    Assertions.assertEquals(tag1.name(), retrieved1.name());
+    Assertions.assertEquals(tag1.namespace(), retrieved1.namespace());
+    Assertions.assertEquals(tag1.comment(), retrieved1.comment());
+    Assertions.assertEquals(tag1.properties(), retrieved1.properties());
+
+    // Verify tag2
+    TagEntity retrieved2 = resultMap.get("tag2");
+    Assertions.assertNotNull(retrieved2);
+    Assertions.assertEquals(tag2.id(), retrieved2.id());
+    Assertions.assertEquals(tag2.name(), retrieved2.name());
+    Assertions.assertEquals(tag2.namespace(), retrieved2.namespace());
+    Assertions.assertEquals(tag2.comment(), retrieved2.comment());
+  }
+
+  @TestTemplate
+  public void testBatchGetPolicies() throws IOException {
+    // Setup
+    String metalakeName = "metalake_for_policy_batch";
+    createAndInsertMakeLake(metalakeName);
+
+    // Create policies
+    PolicyEntity policy1 =
+        createPolicy(
+            RandomIdGenerator.INSTANCE.nextId(), Namespace.of(metalakeName), 
"policy1", AUDIT_INFO);
+    PolicyEntity policy2 =
+        createPolicy(
+            RandomIdGenerator.INSTANCE.nextId(), Namespace.of(metalakeName), 
"policy2", AUDIT_INFO);
+
+    backend.insert(policy1, false);
+    backend.insert(policy2, false);
+
+    // Test batch get
+    List<NameIdentifier> identifiers =
+        Lists.newArrayList(policy1.nameIdentifier(), policy2.nameIdentifier());
+
+    List<PolicyEntity> result = backend.batchGet(identifiers, 
Entity.EntityType.POLICY);
+
+    Assertions.assertEquals(2, result.size());
+    Map<String, PolicyEntity> resultMap =
+        result.stream().collect(Collectors.toMap(PolicyEntity::name, p -> p));
+
+    // Verify policy1
+    PolicyEntity retrieved1 = resultMap.get("policy1");
+    Assertions.assertNotNull(retrieved1);
+    Assertions.assertEquals(policy1.id(), retrieved1.id());
+    Assertions.assertEquals(policy1.name(), retrieved1.name());
+    Assertions.assertEquals(policy1.namespace(), retrieved1.namespace());
+    Assertions.assertEquals(policy1.policyType(), retrieved1.policyType());
+    Assertions.assertEquals(policy1.enabled(), retrieved1.enabled());
+
+    // Verify policy2
+    PolicyEntity retrieved2 = resultMap.get("policy2");
+    Assertions.assertNotNull(retrieved2);
+    Assertions.assertEquals(policy2.id(), retrieved2.id());
+    Assertions.assertEquals(policy2.name(), retrieved2.name());
+    Assertions.assertEquals(policy2.namespace(), retrieved2.namespace());
+    Assertions.assertEquals(policy2.policyType(), retrieved2.policyType());
+  }
+
+  @TestTemplate
+  public void testBatchGetJobs() throws IOException {
+    // Setup
+    String metalakeName = "metalake_for_job_batch";
+    createAndInsertMakeLake(metalakeName);
+
+    // Create job templates first (jobs require existing templates)
+    JobTemplateEntity.TemplateContent content1 =
+        JobTemplateEntity.TemplateContent.builder()
+            .withJobType(JobTemplate.JobType.SHELL)
+            .withExecutable("/bin/bash")
+            .withArguments(Lists.newArrayList("arg1"))
+            .withEnvironments(ImmutableMap.of("ENV1", "value1"))
+            .withCustomFields(new HashMap<>())
+            .withScripts(Lists.newArrayList("echo 'test1'"))
+            .build();
+
+    JobTemplateEntity.TemplateContent content2 =
+        JobTemplateEntity.TemplateContent.builder()
+            .withJobType(JobTemplate.JobType.SHELL)
+            .withExecutable("/bin/bash")
+            .withArguments(Lists.newArrayList("arg2"))
+            .withEnvironments(ImmutableMap.of("ENV2", "value2"))
+            .withCustomFields(new HashMap<>())
+            .withScripts(Lists.newArrayList("echo 'test2'"))
+            .build();
+
+    JobTemplateEntity template1 =
+        JobTemplateEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("template1")
+            .withNamespace(NamespaceUtil.ofJobTemplate(metalakeName))
+            .withComment("comment1")
+            .withTemplateContent(content1)
+            .withAuditInfo(AUDIT_INFO)
+            .build();
+
+    JobTemplateEntity template2 =
+        JobTemplateEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("template2")
+            .withNamespace(NamespaceUtil.ofJobTemplate(metalakeName))
+            .withComment("comment2")
+            .withTemplateContent(content2)
+            .withAuditInfo(AUDIT_INFO)
+            .build();
+
+    backend.insert(template1, false);
+    backend.insert(template2, false);
+
+    // Now create jobs that reference the templates
+    JobEntity job1 =
+        JobEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withJobExecutionId("exec-id-1")
+            .withNamespace(NamespaceUtil.ofJob(metalakeName))
+            .withStatus(JobHandle.Status.STARTED)
+            .withJobTemplateName("template1")
+            .withAuditInfo(AUDIT_INFO)
+            .build();
+    JobEntity job2 =
+        JobEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withJobExecutionId("exec-id-2")
+            .withNamespace(NamespaceUtil.ofJob(metalakeName))
+            .withStatus(JobHandle.Status.QUEUED)
+            .withJobTemplateName("template2")
+            .withAuditInfo(AUDIT_INFO)
+            .build();
+
+    backend.insert(job1, false);
+    backend.insert(job2, false);
+
+    // Test batch get
+    List<NameIdentifier> identifiers =
+        Lists.newArrayList(job1.nameIdentifier(), job2.nameIdentifier());
+
+    List<JobEntity> result = backend.batchGet(identifiers, 
Entity.EntityType.JOB);
+
+    Assertions.assertEquals(2, result.size());
+    Map<String, JobEntity> resultMap =
+        result.stream().collect(Collectors.toMap(JobEntity::name, j -> j));
+
+    // Verify job1
+    JobEntity retrieved1 = resultMap.get(job1.name());
+    Assertions.assertNotNull(retrieved1);
+    Assertions.assertEquals(job1.id(), retrieved1.id());
+    Assertions.assertEquals(job1.name(), retrieved1.name());
+    Assertions.assertEquals(job1.namespace(), retrieved1.namespace());
+    Assertions.assertEquals(job1.jobExecutionId(), 
retrieved1.jobExecutionId());
+    Assertions.assertEquals(job1.status(), retrieved1.status());
+    Assertions.assertEquals(job1.jobTemplateName(), 
retrieved1.jobTemplateName());
+
+    // Verify job2
+    JobEntity retrieved2 = resultMap.get(job2.name());
+    Assertions.assertNotNull(retrieved2);
+    Assertions.assertEquals(job2.id(), retrieved2.id());
+    Assertions.assertEquals(job2.name(), retrieved2.name());
+    Assertions.assertEquals(job2.namespace(), retrieved2.namespace());
+    Assertions.assertEquals(job2.jobExecutionId(), 
retrieved2.jobExecutionId());
+    Assertions.assertEquals(job2.status(), retrieved2.status());
+  }
+
+  @TestTemplate
+  public void testBatchGetJobTemplates() throws IOException {
+    // Setup
+    String metalakeName = "metalake_for_jobtemplate_batch";
+    createAndInsertMakeLake(metalakeName);
+
+    // Create job templates with simple template content
+    JobTemplateEntity.TemplateContent content1 =
+        JobTemplateEntity.TemplateContent.builder()
+            .withJobType(JobTemplate.JobType.SHELL)
+            .withExecutable("/bin/bash")
+            .withArguments(Lists.newArrayList("arg1"))
+            .withEnvironments(ImmutableMap.of("ENV1", "value1"))
+            .withCustomFields(new HashMap<>())
+            .withScripts(Lists.newArrayList("echo 'test1'"))
+            .build();
+
+    JobTemplateEntity.TemplateContent content2 =
+        JobTemplateEntity.TemplateContent.builder()
+            .withJobType(JobTemplate.JobType.SHELL)
+            .withExecutable("/bin/bash")
+            .withArguments(Lists.newArrayList("arg2"))
+            .withEnvironments(ImmutableMap.of("ENV2", "value2"))
+            .withCustomFields(new HashMap<>())
+            .withScripts(Lists.newArrayList("echo 'test2'"))
+            .build();
+
+    JobTemplateEntity template1 =
+        JobTemplateEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("template1")
+            .withNamespace(NamespaceUtil.ofJobTemplate(metalakeName))
+            .withComment("comment1")
+            .withTemplateContent(content1)
+            .withAuditInfo(AUDIT_INFO)
+            .build();
+
+    JobTemplateEntity template2 =
+        JobTemplateEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("template2")
+            .withNamespace(NamespaceUtil.ofJobTemplate(metalakeName))
+            .withComment("comment2")
+            .withTemplateContent(content2)
+            .withAuditInfo(AUDIT_INFO)
+            .build();
+
+    backend.insert(template1, false);
+    backend.insert(template2, false);
+
+    // Test batch get
+    List<NameIdentifier> identifiers =
+        Lists.newArrayList(template1.nameIdentifier(), 
template2.nameIdentifier());
+
+    List<JobTemplateEntity> result = backend.batchGet(identifiers, 
Entity.EntityType.JOB_TEMPLATE);
+
+    Assertions.assertEquals(2, result.size());
+    Map<String, JobTemplateEntity> resultMap =
+        result.stream().collect(Collectors.toMap(JobTemplateEntity::name, t -> 
t));
+
+    // Verify template1
+    JobTemplateEntity retrieved1 = resultMap.get("template1");
+    Assertions.assertNotNull(retrieved1);
+    Assertions.assertEquals(template1.id(), retrieved1.id());
+    Assertions.assertEquals(template1.name(), retrieved1.name());
+    Assertions.assertEquals(template1.namespace(), retrieved1.namespace());
+    Assertions.assertEquals(template1.comment(), retrieved1.comment());
+    Assertions.assertNotNull(retrieved1.templateContent());
+    Assertions.assertEquals(JobTemplate.JobType.SHELL, 
retrieved1.templateContent().jobType());
+
+    // Verify template2
+    JobTemplateEntity retrieved2 = resultMap.get("template2");
+    Assertions.assertNotNull(retrieved2);
+    Assertions.assertEquals(template2.id(), retrieved2.id());
+    Assertions.assertEquals(template2.name(), retrieved2.name());
+    Assertions.assertEquals(template2.namespace(), retrieved2.namespace());
+    Assertions.assertEquals(template2.comment(), retrieved2.comment());
+    Assertions.assertNotNull(retrieved2.templateContent());
+  }
+
+  @TestTemplate
+  public void testBatchGetEmptyList() {
+    // Test with empty list
+    List<NameIdentifier> emptyList = Lists.newArrayList();
+    List<TableEntity> result = backend.batchGet(emptyList, 
Entity.EntityType.TABLE);
+    Assertions.assertNotNull(result);
+    Assertions.assertTrue(result.isEmpty());
+  }
+
+  @TestTemplate
+  public void testBatchGetNullList() {
+    // Test with null list
+    List<TableEntity> result = backend.batchGet(null, Entity.EntityType.TABLE);
+    Assertions.assertNotNull(result);
+    Assertions.assertTrue(result.isEmpty());
+  }
+
+  @TestTemplate
+  public void testBatchGetDifferentNamespaces() throws IOException {
+    // Setup two different metalakes
+    String metalakeName1 = "metalake_diff1";
+    String metalakeName2 = "metalake_diff2";
+    createAndInsertMakeLake(metalakeName1);
+    createAndInsertMakeLake(metalakeName2);
+
+    // Create catalogs in different namespaces
+    CatalogEntity catalog1 =
+        createCatalog(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofCatalog(metalakeName1),
+            "catalog1",
+            AUDIT_INFO);
+    CatalogEntity catalog2 =
+        createCatalog(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofCatalog(metalakeName2),
+            "catalog2",
+            AUDIT_INFO);
+
+    backend.insert(catalog1, false);
+    backend.insert(catalog2, false);
+
+    // Test batch get with different namespaces - should throw exception
+    List<NameIdentifier> identifiers =
+        Lists.newArrayList(catalog1.nameIdentifier(), 
catalog2.nameIdentifier());
+
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> backend.batchGet(identifiers, Entity.EntityType.CATALOG),
+        "All identifiers must have the same namespace for batch get 
operation");
+  }
+
+  @TestTemplate
+  public void testBatchGetPartialResults() throws IOException {
+    // Setup
+    String metalakeName = "metalake_for_partial_batch";
+    String catalogName = "catalog_for_partial_batch";
+    String schemaName = "schema_for_partial_batch";
+    createParentEntities(metalakeName, catalogName, schemaName, AUDIT_INFO);
+
+    // Create only two tables
+    TableEntity table1 =
+        createTableEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofTable(metalakeName, catalogName, schemaName),
+            "table1",
+            AUDIT_INFO);
+    TableEntity table2 =
+        createTableEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofTable(metalakeName, catalogName, schemaName),
+            "table2",
+            AUDIT_INFO);
+
+    backend.insert(table1, false);
+    backend.insert(table2, false);
+
+    // Request three tables, but only two exist
+    List<NameIdentifier> identifiers =
+        Lists.newArrayList(
+            table1.nameIdentifier(),
+            table2.nameIdentifier(),
+            NameIdentifier.of(metalakeName, catalogName, schemaName, 
"nonexistent_table"));
+
+    List<TableEntity> result = backend.batchGet(identifiers, 
Entity.EntityType.TABLE);
+
+    // Should return only the existing tables
+    Assertions.assertEquals(2, result.size());
+    Map<String, TableEntity> resultMap =
+        result.stream().collect(Collectors.toMap(TableEntity::name, t -> t));
+    Assertions.assertTrue(resultMap.containsKey("table1"));
+    Assertions.assertTrue(resultMap.containsKey("table2"));
+    Assertions.assertFalse(resultMap.containsKey("nonexistent_table"));
+  }
+
+  @TestTemplate
+  public void testBatchGetSingleItem() throws IOException {
+    // Setup
+    String metalakeName = "metalake_for_single_batch";
+    createAndInsertMakeLake(metalakeName);
+
+    CatalogEntity catalog =
+        createCatalog(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofCatalog(metalakeName),
+            "catalog_single",
+            AUDIT_INFO);
+    backend.insert(catalog, false);
+
+    // Test batch get with single item
+    List<NameIdentifier> identifiers = 
Lists.newArrayList(catalog.nameIdentifier());
+
+    List<CatalogEntity> result = backend.batchGet(identifiers, 
Entity.EntityType.CATALOG);
+
+    Assertions.assertEquals(1, result.size());
+    Assertions.assertEquals("catalog_single", result.get(0).name());
+  }
+}
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
index 6bfb172682..5049d2f4aa 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.server.authorization;
 import java.lang.reflect.Array;
 import java.security.Principal;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -38,9 +39,9 @@ import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.authorization.AuthorizationRequestContext;
 import org.apache.gravitino.authorization.GravitinoAuthorizer;
 import org.apache.gravitino.dto.tag.MetadataObjectDTO;
-import org.apache.gravitino.meta.TableEntity;
 import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants;
 import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionEvaluator;
+import org.apache.gravitino.utils.EntityClassMapper;
 import org.apache.gravitino.utils.MetadataObjectUtil;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.PrincipalUtils;
@@ -57,6 +58,24 @@ public class MetadataAuthzHelper {
   private static final Logger LOG = 
LoggerFactory.getLogger(MetadataAuthzHelper.class);
   private static volatile Executor executor = null;
 
+  /**
+   * Entity types that support batch get operations for cache preloading. 
These types have
+   * implemented the batchGetByIdentifier method in their respective 
MetaService classes.
+   */
+  private static final List<Entity.EntityType> SUPPORTED_PRELOAD_ENTITY_TYPES =
+      Arrays.asList(
+          Entity.EntityType.METALAKE,
+          Entity.EntityType.CATALOG,
+          Entity.EntityType.SCHEMA,
+          Entity.EntityType.TABLE,
+          Entity.EntityType.FILESET,
+          Entity.EntityType.TOPIC,
+          Entity.EntityType.MODEL,
+          Entity.EntityType.TAG,
+          Entity.EntityType.POLICY,
+          Entity.EntityType.JOB,
+          Entity.EntityType.JOB_TEMPLATE);
+
   private MetadataAuthzHelper() {}
 
   public static Metalake[] filterMetalakes(Metalake[] metalakes, String 
expression) {
@@ -329,12 +348,20 @@ public class MetadataAuthzHelper {
   private static void preloadToCache(
       Entity.EntityType entityType, NameIdentifier[] nameIdentifiers) {
     Config config = GravitinoEnv.getInstance().config();
-    if (config != null && !config.get(Configs.CACHE_ENABLED)) {
-      if (entityType == Entity.EntityType.TABLE) {
-        GravitinoEnv.getInstance()
-            .entityStore()
-            .batchGet(nameIdentifiers, entityType, TableEntity.class);
-      }
+    if (config == null || !config.get(Configs.CACHE_ENABLED)) {
+      return;
     }
+
+    // Only preload entity types that support batch get operations
+    if (!SUPPORTED_PRELOAD_ENTITY_TYPES.contains(entityType)) {
+      return;
+    }
+
+    GravitinoEnv.getInstance()
+        .entityStore()
+        .batchGet(
+            Arrays.asList(nameIdentifiers),
+            entityType,
+            EntityClassMapper.getEntityClass(entityType));
   }
 }
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataIdConverter.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataIdConverter.java
index 6f674d291a..41febe5218 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataIdConverter.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataIdConverter.java
@@ -33,21 +33,7 @@ import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.catalog.CapabilityHelpers;
 import org.apache.gravitino.catalog.CatalogManager;
 import org.apache.gravitino.connector.capability.Capability;
-import org.apache.gravitino.meta.BaseMetalake;
-import org.apache.gravitino.meta.CatalogEntity;
-import org.apache.gravitino.meta.ColumnEntity;
-import org.apache.gravitino.meta.FilesetEntity;
-import org.apache.gravitino.meta.GroupEntity;
-import org.apache.gravitino.meta.JobEntity;
-import org.apache.gravitino.meta.JobTemplateEntity;
-import org.apache.gravitino.meta.ModelEntity;
-import org.apache.gravitino.meta.ModelVersionEntity;
-import org.apache.gravitino.meta.RoleEntity;
-import org.apache.gravitino.meta.SchemaEntity;
-import org.apache.gravitino.meta.TableEntity;
-import org.apache.gravitino.meta.TagEntity;
-import org.apache.gravitino.meta.TopicEntity;
-import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.utils.EntityClassMapper;
 import org.apache.gravitino.utils.MetadataObjectUtil;
 
 /** It is used to convert MetadataObject to MetadataId */
@@ -62,25 +48,6 @@ public class MetadataIdConverter {
           MetadataObject.Type.FILESET, Capability.Scope.FILESET,
           MetadataObject.Type.TOPIC, Capability.Scope.TOPIC,
           MetadataObject.Type.COLUMN, Capability.Scope.COLUMN);
-  // Maps entity type to entity class.
-  private static final Map<Entity.EntityType, Class<?>> ENTITY_CLASS_MAPPING =
-      ImmutableMap.<Entity.EntityType, Class<?>>builder()
-          .put(Entity.EntityType.METALAKE, BaseMetalake.class)
-          .put(Entity.EntityType.CATALOG, CatalogEntity.class)
-          .put(Entity.EntityType.SCHEMA, SchemaEntity.class)
-          .put(Entity.EntityType.TABLE, TableEntity.class)
-          .put(Entity.EntityType.FILESET, FilesetEntity.class)
-          .put(Entity.EntityType.MODEL, ModelEntity.class)
-          .put(Entity.EntityType.TOPIC, TopicEntity.class)
-          .put(Entity.EntityType.TAG, TagEntity.class)
-          .put(Entity.EntityType.MODEL_VERSION, ModelVersionEntity.class)
-          .put(Entity.EntityType.COLUMN, ColumnEntity.class)
-          .put(Entity.EntityType.USER, UserEntity.class)
-          .put(Entity.EntityType.GROUP, GroupEntity.class)
-          .put(Entity.EntityType.ROLE, RoleEntity.class)
-          .put(Entity.EntityType.JOB_TEMPLATE, JobTemplateEntity.class)
-          .put(Entity.EntityType.JOB, JobEntity.class)
-          .build();
 
   private MetadataIdConverter() {}
 
@@ -106,7 +73,9 @@ public class MetadataIdConverter {
 
     Entity entity;
     try {
-      entity = entityStore.get(normalizedIdent, entityType, 
getEntityClass(entityType));
+      entity =
+          entityStore.get(
+              normalizedIdent, entityType, 
EntityClassMapper.getEntityClass(entityType));
     } catch (IOException e) {
       throw new RuntimeException(
           "failed to load entity from entity store: " + 
metadataObject.fullName(), e);
@@ -132,11 +101,4 @@ public class MetadataIdConverter {
 
     return ((HasIdentifier) entity).id();
   }
-
-  @SuppressWarnings("unchecked")
-  private static <E extends Entity & HasIdentifier> Class<E> getEntityClass(
-      Entity.EntityType entityType) {
-    Class<?> clazz = ENTITY_CLASS_MAPPING.get(entityType);
-    return (Class<E>) clazz;
-  }
 }
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestCatalogOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestCatalogOperations.java
index fbf462ba25..4298c5d2c7 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestCatalogOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestCatalogOperations.java
@@ -20,6 +20,8 @@ package org.apache.gravitino.server.web.rest;
 
 import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
 import static org.apache.gravitino.Catalog.PROPERTY_IN_USE;
+import static org.apache.gravitino.Configs.CACHE_ENABLED;
+import static org.apache.gravitino.Configs.ENABLE_AUTHORIZATION;
 import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
 import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
 import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
@@ -95,7 +97,10 @@ public class TestCatalogOperations extends 
BaseOperationsTest {
     Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
     Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
     Mockito.doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
+    Mockito.doReturn(false).when(config).get(CACHE_ENABLED);
+    Mockito.doReturn(false).when(config).get(ENABLE_AUTHORIZATION);
     FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new 
LockManager(config), true);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "config", config, true);
   }
 
   @Override
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestFilesetOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestFilesetOperations.java
index 1a263be311..8bfb5bd6a5 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestFilesetOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestFilesetOperations.java
@@ -18,6 +18,8 @@
  */
 package org.apache.gravitino.server.web.rest;
 
+import static org.apache.gravitino.Configs.CACHE_ENABLED;
+import static org.apache.gravitino.Configs.ENABLE_AUTHORIZATION;
 import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
 import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
 import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
@@ -102,6 +104,9 @@ public class TestFilesetOperations extends 
BaseOperationsTest {
     Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
     Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
     Mockito.doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
+    Mockito.doReturn(false).when(config).get(CACHE_ENABLED);
+    Mockito.doReturn(false).when(config).get(ENABLE_AUTHORIZATION);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "config", config, true);
     FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new 
LockManager(config), true);
   }
 
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestJobOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestJobOperations.java
index 452608c151..651ab955da 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestJobOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestJobOperations.java
@@ -19,8 +19,11 @@
 package org.apache.gravitino.server.web.rest;
 
 import static javax.ws.rs.core.MediaType.APPLICATION_JSON_TYPE;
+import static org.apache.gravitino.Configs.CACHE_ENABLED;
+import static org.apache.gravitino.Configs.ENABLE_AUTHORIZATION;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -37,6 +40,7 @@ import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.Application;
 import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Config;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.dto.job.JobTemplateDTO;
 import org.apache.gravitino.dto.job.ShellTemplateUpdateDTO;
@@ -126,6 +130,11 @@ public class TestJobOperations extends JerseyTest {
 
   @BeforeAll
   public static void setup() throws IllegalAccessException {
+    Config config = mock(Config.class);
+    doReturn(false).when(config).get(CACHE_ENABLED);
+    doReturn(false).when(config).get(ENABLE_AUTHORIZATION);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "config", config, true);
+
     IdGenerator idGenerator = new RandomIdGenerator();
     FieldUtils.writeField(GravitinoEnv.getInstance(), "idGenerator", 
idGenerator, true);
   }
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestMetalakeOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestMetalakeOperations.java
index a3b59dca42..895b4fd492 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestMetalakeOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestMetalakeOperations.java
@@ -18,6 +18,8 @@
  */
 package org.apache.gravitino.server.web.rest;
 
+import static org.apache.gravitino.Configs.CACHE_ENABLED;
+import static org.apache.gravitino.Configs.ENABLE_AUTHORIZATION;
 import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
 import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
 import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
@@ -94,6 +96,9 @@ public class TestMetalakeOperations extends 
BaseOperationsTest {
     Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
     Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
     Mockito.doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
+    Mockito.doReturn(false).when(config).get(CACHE_ENABLED);
+    Mockito.doReturn(false).when(config).get(ENABLE_AUTHORIZATION);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "config", config, true);
     FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new 
LockManager(config), true);
   }
 
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestModelOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestModelOperations.java
index 02aaecc764..521096a717 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestModelOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestModelOperations.java
@@ -18,6 +18,8 @@
  */
 package org.apache.gravitino.server.web.rest;
 
+import static org.apache.gravitino.Configs.CACHE_ENABLED;
+import static org.apache.gravitino.Configs.ENABLE_AUTHORIZATION;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -33,6 +35,9 @@ import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.Application;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.catalog.ModelDispatcher;
@@ -68,7 +73,9 @@ import org.glassfish.jersey.internal.inject.AbstractBinder;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.test.TestProperties;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 public class TestModelOperations extends BaseOperationsTest {
 
@@ -97,6 +104,14 @@ public class TestModelOperations extends BaseOperationsTest 
{
 
   private Namespace modelNs = NamespaceUtil.ofModel(metalake, catalog, schema);
 
+  @BeforeAll
+  public static void setup() throws IllegalAccessException {
+    Config config = mock(Config.class);
+    Mockito.doReturn(false).when(config).get(CACHE_ENABLED);
+    Mockito.doReturn(false).when(config).get(ENABLE_AUTHORIZATION);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "config", config, true);
+  }
+
   @Override
   protected Application configure() {
     try {
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestPolicyOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestPolicyOperations.java
index 698954057a..5788583b5a 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestPolicyOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestPolicyOperations.java
@@ -18,6 +18,8 @@
  */
 package org.apache.gravitino.server.web.rest;
 
+import static org.apache.gravitino.Configs.CACHE_ENABLED;
+import static org.apache.gravitino.Configs.ENABLE_AUTHORIZATION;
 import static org.apache.gravitino.dto.util.DTOConverters.toDTO;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -37,6 +39,9 @@ import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.Application;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.dto.requests.PolicyCreateRequest;
@@ -68,7 +73,9 @@ import org.glassfish.jersey.internal.inject.AbstractBinder;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.test.TestProperties;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 public class TestPolicyOperations extends BaseOperationsTest {
 
@@ -89,6 +96,14 @@ public class TestPolicyOperations extends BaseOperationsTest 
{
   private final AuditInfo testAuditInfo1 =
       
AuditInfo.builder().withCreator("user1").withCreateTime(Instant.now()).build();
 
+  @BeforeAll
+  public static void setup() throws IllegalAccessException {
+    Config config = mock(Config.class);
+    Mockito.doReturn(false).when(config).get(CACHE_ENABLED);
+    Mockito.doReturn(false).when(config).get(ENABLE_AUTHORIZATION);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "config", config, true);
+  }
+
   @Override
   protected Application configure() {
     try {
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestSchemaOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestSchemaOperations.java
index c99af56b89..e619e15816 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestSchemaOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestSchemaOperations.java
@@ -18,6 +18,8 @@
  */
 package org.apache.gravitino.server.web.rest;
 
+import static org.apache.gravitino.Configs.CACHE_ENABLED;
+import static org.apache.gravitino.Configs.ENABLE_AUTHORIZATION;
 import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
 import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
 import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
@@ -90,6 +92,9 @@ public class TestSchemaOperations extends BaseOperationsTest {
     Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
     Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
     Mockito.doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
+    Mockito.doReturn(false).when(config).get(CACHE_ENABLED);
+    Mockito.doReturn(false).when(config).get(ENABLE_AUTHORIZATION);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "config", config, true);
     FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new 
LockManager(config), true);
   }
 
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTableOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTableOperations.java
index 33d4ebcbb8..6de2640068 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTableOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTableOperations.java
@@ -18,6 +18,8 @@
  */
 package org.apache.gravitino.server.web.rest;
 
+import static org.apache.gravitino.Configs.CACHE_ENABLED;
+import static org.apache.gravitino.Configs.ENABLE_AUTHORIZATION;
 import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
 import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
 import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
@@ -119,6 +121,9 @@ public class TestTableOperations extends BaseOperationsTest 
{
     Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
     Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
     Mockito.doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
+    Mockito.doReturn(false).when(config).get(CACHE_ENABLED);
+    Mockito.doReturn(false).when(config).get(ENABLE_AUTHORIZATION);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "config", config, true);
     FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new 
LockManager(config), true);
   }
 
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTagOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTagOperations.java
index 75312f4443..14c19791ba 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTagOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTagOperations.java
@@ -18,6 +18,8 @@
  */
 package org.apache.gravitino.server.web.rest;
 
+import static org.apache.gravitino.Configs.CACHE_ENABLED;
+import static org.apache.gravitino.Configs.ENABLE_AUTHORIZATION;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -38,6 +40,9 @@ import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.Application;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.dto.requests.TagCreateRequest;
@@ -66,7 +71,9 @@ import org.glassfish.jersey.internal.inject.AbstractBinder;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.test.TestProperties;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 public class TestTagOperations extends BaseOperationsTest {
 
@@ -87,6 +94,14 @@ public class TestTagOperations extends BaseOperationsTest {
   private AuditInfo testAuditInfo1 =
       
AuditInfo.builder().withCreator("user1").withCreateTime(Instant.now()).build();
 
+  @BeforeAll
+  public static void setup() throws IllegalAccessException {
+    Config config = mock(Config.class);
+    Mockito.doReturn(false).when(config).get(CACHE_ENABLED);
+    Mockito.doReturn(false).when(config).get(ENABLE_AUTHORIZATION);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "config", config, true);
+  }
+
   @Override
   protected Application configure() {
     try {
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTopicOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTopicOperations.java
index b838d62744..ab33fdaf2c 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTopicOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTopicOperations.java
@@ -18,6 +18,8 @@
  */
 package org.apache.gravitino.server.web.rest;
 
+import static org.apache.gravitino.Configs.CACHE_ENABLED;
+import static org.apache.gravitino.Configs.ENABLE_AUTHORIZATION;
 import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
 import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
 import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
@@ -88,6 +90,9 @@ public class TestTopicOperations extends BaseOperationsTest {
     Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
     Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
     Mockito.doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
+    Mockito.doReturn(false).when(config).get(CACHE_ENABLED);
+    Mockito.doReturn(false).when(config).get(ENABLE_AUTHORIZATION);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "config", config, true);
     FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new 
LockManager(config), true);
   }
 


Reply via email to