This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new e964783749 [#9429] improvement(core): Use join to improve query 
performance on table, topic and so on.  (#9430)
e964783749 is described below

commit e964783749856a65240dbd4b3351893036ea1ce4
Author: Mini Yu <[email protected]>
AuthorDate: Fri Jan 9 21:07:32 2026 +0800

    [#9429] improvement(core): Use join to improve query performance on table, 
topic and so on.  (#9430)
    
    ### What changes were proposed in this pull request?
    
    This pull request introduces major performance and functionality
    improvements to the Metalake and table metadata management system. The
    most significant change is the addition of a Caffeine-based cache for
    Metalake entities, which reduces redundant storage access and improves
    efficiency. Additionally, it adds a new method for querying tables by
    their fully qualified names and optimizes database schema indexing for
    faster lookups.
    
    ### Metalake caching and management improvements
    
    * Introduced a static Caffeine cache (`metalakeCache`) in
    `MetalakeManager` to store `BaseMetalake` objects, with automatic expiry
    and scheduled cleanup. This cache is now used in `metalakeInUse`,
    `loadMetalake`, and is invalidated on alter, drop, enable, and disable
    operations to ensure consistency.
    
[[1]](diffhunk://#diff-21ed56786d4199a9ebee9969aae3195dff5037135c738ff4b8ba2072fa72d6a0R23-R36)
    
[[2]](diffhunk://#diff-21ed56786d4199a9ebee9969aae3195dff5037135c738ff4b8ba2072fa72d6a0R74-R86)
    
[[3]](diffhunk://#diff-21ed56786d4199a9ebee9969aae3195dff5037135c738ff4b8ba2072fa72d6a0L87-R107)
    
[[4]](diffhunk://#diff-21ed56786d4199a9ebee9969aae3195dff5037135c738ff4b8ba2072fa72d6a0R138-R155)
    
[[5]](diffhunk://#diff-21ed56786d4199a9ebee9969aae3195dff5037135c738ff4b8ba2072fa72d6a0R194-R209)
    
[[6]](diffhunk://#diff-21ed56786d4199a9ebee9969aae3195dff5037135c738ff4b8ba2072fa72d6a0R301-R302)
    
[[7]](diffhunk://#diff-21ed56786d4199a9ebee9969aae3195dff5037135c738ff4b8ba2072fa72d6a0R353-R354)
    
[[8]](diffhunk://#diff-21ed56786d4199a9ebee9969aae3195dff5037135c738ff4b8ba2072fa72d6a0R381)
    
[[9]](diffhunk://#diff-21ed56786d4199a9ebee9969aae3195dff5037135c738ff4b8ba2072fa72d6a0R416)
    
    ### Table metadata querying enhancements
    
    * Added a new method `selectTableByFullQualifiedName` to
    `TableMetaMapper`, its SQL provider, and the service layer, allowing
    retrieval of table metadata using metalake, catalog, schema, and table
    names directly, improving query flexibility and performance.
    
[[1]](diffhunk://#diff-464816dcd4652ac061d317d0ed9ab3a98db358f2db517ce70bbf61fa35624db9R59-R67)
    
[[2]](diffhunk://#diff-e160fd550934e3ac047abde64991c6ab5edc44602bf8cfd8587596f10b0b58efR72-R80)
    
[[3]](diffhunk://#diff-19aff10a8b33dded8f085dc93250ba296ca800b5a68630a22537e83b0bd9b730R236-R292)
    
[[4]](diffhunk://#diff-c0bc429f18cd462c3a3c1fe65c7da0966c338a96737d69b6e32b17257d97a52cL88-L92)
    
[[5]](diffhunk://#diff-c0bc429f18cd462c3a3c1fe65c7da0966c338a96737d69b6e32b17257d97a52cR347-R363)
    
    ### Database schema and indexing optimizations
    
    * Added composite indexes (`idx_name_da`) on the `metalake_name`,
    `catalog_name`, `schema_name`, and `table_name` columns (with
    `deleted_at`) in both H2 and MySQL schema scripts, significantly
    improving query performance for lookups by name.
    
[[1]](diffhunk://#diff-3ca2c363f5538fb4838a31233c4c858b4928e9ec65b5b86bf35a8c5f653133bfR31)
    
[[2]](diffhunk://#diff-3ca2c363f5538fb4838a31233c4c858b4928e9ec65b5b86bf35a8c5f653133bfR49)
    
[[3]](diffhunk://#diff-3ca2c363f5538fb4838a31233c4c858b4928e9ec65b5b86bf35a8c5f653133bfR66)
    
[[4]](diffhunk://#diff-3ca2c363f5538fb4838a31233c4c858b4928e9ec65b5b86bf35a8c5f653133bfR83)
    
[[5]](diffhunk://#diff-6fac45e3da22ec7d24ca5ec0b78c5a03d653fabd65dd8040b2c59404fc49fb21R33-R38)
    
[[6]](diffhunk://#diff-ac6c56c7547bd70d3951716f2b543beea23964b906792b527b833e6229404f70R31)
    
[[7]](diffhunk://#diff-ac6c56c7547bd70d3951716f2b543beea23964b906792b527b833e6229404f70R48)
    
[[8]](diffhunk://#diff-ac6c56c7547bd70d3951716f2b543beea23964b906792b527b833e6229404f70R64)
    
[[9]](diffhunk://#diff-ac6c56c7547bd70d3951716f2b543beea23964b906792b527b833e6229404f70R83)
    ### Why are the changes needed?
    
    To improve performance
    
    Fix: #9429
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A
    
    ### How was this patch tested?
    
    Existing test
---
 .../authorization-chain/build.gradle.kts           |   1 +
 .../authorization-ranger/build.gradle.kts          |   1 +
 build.gradle.kts                                   |   5 +
 .../fileset/TestFilesetCatalogOperations.java      |   6 +-
 .../integration/test/FilesetS3CatalogIT.java       |   2 +-
 catalogs/catalog-hive/build.gradle.kts             |   1 +
 .../hive/integration/test/CatalogHiveS3IT.java     |   2 +-
 catalogs/catalog-jdbc-doris/build.gradle.kts       |   1 +
 .../doris/integration/test/CatalogDorisIT.java     |   2 +-
 .../doris/operation/TestDorisTableOperations.java  |   2 +-
 catalogs/catalog-jdbc-mysql/build.gradle.kts       |   1 +
 catalogs/catalog-jdbc-oceanbase/build.gradle.kts   |   1 +
 catalogs/catalog-jdbc-postgresql/build.gradle.kts  |   1 +
 catalogs/catalog-jdbc-starrocks/build.gradle.kts   |   1 +
 .../integration/test/CatalogStarRocksIT.java       |   2 +-
 .../operation/TestStarRocksTableOperations.java    |   2 +-
 catalogs/catalog-kafka/build.gradle.kts            |   2 +
 .../catalog/kafka/TestKafkaCatalogOperations.java  |   6 +-
 .../catalog-lakehouse-generic/build.gradle.kts     |   1 +
 .../generic/TestGenericCatalogOperations.java      |   6 +-
 catalogs/catalog-lakehouse-hudi/build.gradle.kts   |   1 +
 .../catalog-lakehouse-iceberg/build.gradle.kts     |   1 +
 catalogs/catalog-lakehouse-paimon/build.gradle.kts |   3 +
 .../paimon/integration/test/CatalogPaimonS3IT.java |   2 +-
 catalogs/catalog-model/build.gradle.kts            |   1 +
 .../catalog/model/TestModelCatalogOperations.java  |   6 +-
 clients/cli/build.gradle.kts                       |   1 +
 clients/client-java/build.gradle.kts               |   1 +
 .../gravitino/client/integration/test/JobIT.java   |   2 +-
 .../test/GravitinoVirtualFileSystemS3IT.java       |   2 +-
 .../java/org/apache/gravitino/GravitinoEnv.java    |   4 +
 .../relational/mapper/FilesetMetaMapper.java       |  55 +++++++
 .../mapper/FilesetMetaSQLProviderFactory.java      |  16 +++
 .../storage/relational/mapper/ModelMetaMapper.java |  17 +++
 .../mapper/ModelMetaSQLProviderFactory.java        |  16 +++
 .../relational/mapper/SchemaMetaMapper.java        |  14 ++
 .../mapper/SchemaMetaSQLProviderFactory.java       |  12 ++
 .../storage/relational/mapper/TableMetaMapper.java |  17 +++
 .../mapper/TableMetaSQLProviderFactory.java        |  16 +++
 .../storage/relational/mapper/TopicMetaMapper.java |  17 +++
 .../mapper/TopicMetaSQLProviderFactory.java        |  16 +++
 .../provider/base/FilesetMetaBaseSQLProvider.java  | 117 +++++++++++++++
 .../provider/base/ModelMetaBaseSQLProvider.java    |  85 +++++++++++
 .../provider/base/SchemaMetaBaseSQLProvider.java   |  65 +++++++++
 .../provider/base/TableMetaBaseSQLProvider.java    | 107 ++++++++++++++
 .../provider/base/TopicMetaBaseSQLProvider.java    |  87 ++++++++++++
 .../relational/service/FilesetMetaService.java     | 122 ++++++++++++----
 .../relational/service/ModelMetaService.java       | 104 +++++++++++---
 .../relational/service/SchemaMetaService.java      | 127 +++++++++++++----
 .../relational/service/TableMetaService.java       | 158 +++++++++++++++------
 .../relational/service/TopicMetaService.java       | 123 ++++++++++++----
 .../job/TestBuiltInJobTemplateEventListener.java   |   2 +-
 .../gravitino/job/local/TestLocalJobExecutor.java  |   2 +-
 .../gravitino/metalake/TestMetalakeManager.java    |   2 +-
 .../gravitino/storage/TestEntityStorage.java       |  72 ++++++++--
 .../storage/relational/BackendTestExtension.java   |   7 +
 .../relational/service/TestFilesetMetaService.java |  12 +-
 .../relational/service/TestSchemaMetaService.java  |   2 +-
 flink-connector/flink/build.gradle.kts             |   1 +
 iceberg/iceberg-rest-server/build.gradle.kts       |   1 +
 integration-test-common/build.gradle.kts           |   1 +
 .../gravitino/integration/test/util/BaseIT.java    |   2 +-
 lance/lance-rest-server/build.gradle.kts           |   1 +
 spark-connector/v3.3/spark/build.gradle.kts        |   1 +
 spark-connector/v3.4/spark/build.gradle.kts        |   1 +
 spark-connector/v3.5/spark/build.gradle.kts        |   1 +
 66 files changed, 1291 insertions(+), 178 deletions(-)

diff --git a/authorizations/authorization-chain/build.gradle.kts 
b/authorizations/authorization-chain/build.gradle.kts
index ba703e26f6..cf19b23834 100644
--- a/authorizations/authorization-chain/build.gradle.kts
+++ b/authorizations/authorization-chain/build.gradle.kts
@@ -60,6 +60,7 @@ dependencies {
   testImplementation(project(":integration-test-common", "testArtifacts"))
   testImplementation(project(":authorizations:authorization-ranger"))
   testImplementation(project(":authorizations:authorization-ranger", 
"testArtifacts"))
+  testImplementation(libs.awaitility)
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.mockito.core)
   testImplementation(libs.testcontainers)
diff --git a/authorizations/authorization-ranger/build.gradle.kts 
b/authorizations/authorization-ranger/build.gradle.kts
index e1a872ee25..4b6ee3b7c4 100644
--- a/authorizations/authorization-ranger/build.gradle.kts
+++ b/authorizations/authorization-ranger/build.gradle.kts
@@ -80,6 +80,7 @@ dependencies {
   testImplementation(project(":clients:client-java"))
   testImplementation(project(":server"))
   testImplementation(project(":integration-test-common", "testArtifacts"))
+  testImplementation(libs.awaitility)
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.mockito.core)
   testImplementation(libs.mockito.inline)
diff --git a/build.gradle.kts b/build.gradle.kts
index 02ae5cbdf4..ee123d68c7 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -154,6 +154,11 @@ allprojects {
           "import\\s+.*\\.(Logger|LoggerFactory);",
           "import org.slf4j.${'$'}1;"
         )
+        replaceRegex(
+          "Remove Testcontainers shading",
+          "import\\s+org\\.testcontainers\\.shaded\\.([^;]+);",
+          "import $1;"
+        )
 
         targetExclude("**/build/**", "**/.pnpm/***")
       }
diff --git 
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
 
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
index a55b519f62..d73e3d54dd 100644
--- 
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
+++ 
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
@@ -111,6 +111,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -121,7 +122,6 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 public class TestFilesetCatalogOperations {
 
@@ -216,7 +216,7 @@ public class TestFilesetCatalogOperations {
   }
 
   @BeforeAll
-  public static void setUp() {
+  public static void setUp() throws IllegalAccessException {
     Config config = Mockito.mock(Config.class);
     when(config.get(ENTITY_STORE)).thenReturn(RELATIONAL_ENTITY_STORE);
     
when(config.get(ENTITY_RELATIONAL_STORE)).thenReturn(DEFAULT_ENTITY_RELATIONAL_STORE);
@@ -336,6 +336,8 @@ public class TestFilesetCatalogOperations {
     schemaMetaServiceMockedStatic
         .when(SchemaMetaService::getInstance)
         .thenReturn(spySchemaMetaService);
+
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "config", config, true);
   }
 
   @AfterAll
diff --git 
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/integration/test/FilesetS3CatalogIT.java
 
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/integration/test/FilesetS3CatalogIT.java
index 2401851974..39fc58e398 100644
--- 
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/integration/test/FilesetS3CatalogIT.java
+++ 
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/integration/test/FilesetS3CatalogIT.java
@@ -38,6 +38,7 @@ import org.apache.gravitino.storage.S3Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -46,7 +47,6 @@ import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 @Tag("gravitino-docker-test")
 public class FilesetS3CatalogIT extends FilesetCatalogIT {
diff --git a/catalogs/catalog-hive/build.gradle.kts 
b/catalogs/catalog-hive/build.gradle.kts
index 15be9c6f59..b63efeb09b 100644
--- a/catalogs/catalog-hive/build.gradle.kts
+++ b/catalogs/catalog-hive/build.gradle.kts
@@ -65,6 +65,7 @@ dependencies {
   annotationProcessor(libs.immutables.value)
   annotationProcessor(libs.lombok)
 
+  testImplementation(libs.awaitility)
   testImplementation(project(":catalogs:hive-metastore-common", 
"testArtifacts"))
   testImplementation(project(":common"))
   testImplementation(project(":clients:client-java"))
diff --git 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveS3IT.java
 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveS3IT.java
index dc4ec212bc..04d1b80e5f 100644
--- 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveS3IT.java
+++ 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveS3IT.java
@@ -29,11 +29,11 @@ import 
org.apache.gravitino.integration.test.container.HiveContainer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.spark.sql.SparkSession;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.TestInstance;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CatalogHiveS3IT extends CatalogHive3IT {
diff --git a/catalogs/catalog-jdbc-doris/build.gradle.kts 
b/catalogs/catalog-jdbc-doris/build.gradle.kts
index 80c909f4c4..0987b75673 100644
--- a/catalogs/catalog-jdbc-doris/build.gradle.kts
+++ b/catalogs/catalog-jdbc-doris/build.gradle.kts
@@ -49,6 +49,7 @@ dependencies {
   testImplementation(project(":server"))
   testImplementation(project(":server-common"))
 
+  testImplementation(libs.awaitility)
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.junit.jupiter.params)
   testImplementation(libs.mysql.driver)
diff --git 
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisIT.java
 
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisIT.java
index 977fbfdf29..58bafed928 100644
--- 
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisIT.java
+++ 
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisIT.java
@@ -78,13 +78,13 @@ import org.apache.gravitino.rel.partitions.Partitions;
 import org.apache.gravitino.rel.partitions.RangePartition;
 import org.apache.gravitino.rel.types.Types;
 import org.apache.gravitino.utils.RandomNameUtils;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 @Tag("gravitino-docker-test")
 public class CatalogDorisIT extends BaseIT {
diff --git 
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/operation/TestDorisTableOperations.java
 
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/operation/TestDorisTableOperations.java
index a35d165843..f976882416 100644
--- 
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/operation/TestDorisTableOperations.java
+++ 
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/operation/TestDorisTableOperations.java
@@ -55,11 +55,11 @@ import org.apache.gravitino.rel.partitions.RangePartition;
 import org.apache.gravitino.rel.types.Type;
 import org.apache.gravitino.rel.types.Types;
 import org.apache.gravitino.utils.RandomNameUtils;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 @Tag("gravitino-docker-test")
 public class TestDorisTableOperations extends TestDoris {
diff --git a/catalogs/catalog-jdbc-mysql/build.gradle.kts 
b/catalogs/catalog-jdbc-mysql/build.gradle.kts
index e537732b88..5cc8d4f2b7 100644
--- a/catalogs/catalog-jdbc-mysql/build.gradle.kts
+++ b/catalogs/catalog-jdbc-mysql/build.gradle.kts
@@ -52,6 +52,7 @@ dependencies {
   testImplementation(project(":server"))
   testImplementation(project(":server-common"))
 
+  testImplementation(libs.awaitility)
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.junit.jupiter.params)
   testImplementation(libs.mysql.driver)
diff --git a/catalogs/catalog-jdbc-oceanbase/build.gradle.kts 
b/catalogs/catalog-jdbc-oceanbase/build.gradle.kts
index 2621fec16e..ca6d8aa003 100644
--- a/catalogs/catalog-jdbc-oceanbase/build.gradle.kts
+++ b/catalogs/catalog-jdbc-oceanbase/build.gradle.kts
@@ -52,6 +52,7 @@ dependencies {
   testImplementation(project(":server"))
   testImplementation(project(":server-common"))
 
+  testImplementation(libs.awaitility)
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.junit.jupiter.params)
   testImplementation(libs.mysql.driver)
diff --git a/catalogs/catalog-jdbc-postgresql/build.gradle.kts 
b/catalogs/catalog-jdbc-postgresql/build.gradle.kts
index 817e7dff68..b05a3279d2 100644
--- a/catalogs/catalog-jdbc-postgresql/build.gradle.kts
+++ b/catalogs/catalog-jdbc-postgresql/build.gradle.kts
@@ -50,6 +50,7 @@ dependencies {
   testImplementation(project(":server"))
   testImplementation(project(":server-common"))
 
+  testImplementation(libs.awaitility)
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.junit.jupiter.params)
   testImplementation(libs.mysql.driver)
diff --git a/catalogs/catalog-jdbc-starrocks/build.gradle.kts 
b/catalogs/catalog-jdbc-starrocks/build.gradle.kts
index d000007b5d..02e90d8292 100644
--- a/catalogs/catalog-jdbc-starrocks/build.gradle.kts
+++ b/catalogs/catalog-jdbc-starrocks/build.gradle.kts
@@ -49,6 +49,7 @@ dependencies {
   testImplementation(project(":server"))
   testImplementation(project(":server-common"))
 
+  testImplementation(libs.awaitility)
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.junit.jupiter.params)
   testImplementation(libs.mysql.driver)
diff --git 
a/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/integration/test/CatalogStarRocksIT.java
 
b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/integration/test/CatalogStarRocksIT.java
index a41e92adea..9beb46a71b 100644
--- 
a/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/integration/test/CatalogStarRocksIT.java
+++ 
b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/integration/test/CatalogStarRocksIT.java
@@ -76,13 +76,13 @@ import org.apache.gravitino.rel.partitions.Partitions;
 import org.apache.gravitino.rel.partitions.RangePartition;
 import org.apache.gravitino.rel.types.Types;
 import org.apache.gravitino.utils.RandomNameUtils;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 @Tag("gravitino-docker-test")
 public class CatalogStarRocksIT extends BaseIT {
diff --git 
a/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksTableOperations.java
 
b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksTableOperations.java
index 4f505e3199..b7726b96fd 100644
--- 
a/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksTableOperations.java
+++ 
b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksTableOperations.java
@@ -57,11 +57,11 @@ import org.apache.gravitino.rel.partitions.RangePartition;
 import org.apache.gravitino.rel.types.Type;
 import org.apache.gravitino.rel.types.Types;
 import org.apache.gravitino.utils.RandomNameUtils;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 @Tag("gravitino-docker-test")
 public class TestStarRocksTableOperations extends TestStarRocks {
diff --git a/catalogs/catalog-kafka/build.gradle.kts 
b/catalogs/catalog-kafka/build.gradle.kts
index 6a8c104c72..aaee3650a7 100644
--- a/catalogs/catalog-kafka/build.gradle.kts
+++ b/catalogs/catalog-kafka/build.gradle.kts
@@ -44,8 +44,10 @@ dependencies {
   implementation(libs.kafka.clients)
   implementation(libs.slf4j.api)
 
+  testImplementation(libs.awaitility)
   testImplementation(libs.commons.io)
   testImplementation(libs.curator.test)
+  testImplementation(libs.commons.lang3)
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.kafka)
   testImplementation(libs.mockito.core)
diff --git 
a/catalogs/catalog-kafka/src/test/java/org/apache/gravitino/catalog/kafka/TestKafkaCatalogOperations.java
 
b/catalogs/catalog-kafka/src/test/java/org/apache/gravitino/catalog/kafka/TestKafkaCatalogOperations.java
index 6b8f87888d..0c0f650c68 100644
--- 
a/catalogs/catalog-kafka/src/test/java/org/apache/gravitino/catalog/kafka/TestKafkaCatalogOperations.java
+++ 
b/catalogs/catalog-kafka/src/test/java/org/apache/gravitino/catalog/kafka/TestKafkaCatalogOperations.java
@@ -50,10 +50,12 @@ import java.io.IOException;
 import java.time.Instant;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Configs;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.EntityStoreFactory;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.Schema;
@@ -134,7 +136,7 @@ public class TestKafkaCatalogOperations extends 
KafkaClusterEmbedded {
   private static KafkaCatalogOperations kafkaCatalogOperations;
 
   @BeforeAll
-  public static void setUp() {
+  public static void setUp() throws IllegalAccessException {
     Config config = Mockito.mock(Config.class);
     
Mockito.when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L);
     Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 
1000L);
@@ -210,6 +212,8 @@ public class TestKafkaCatalogOperations extends 
KafkaClusterEmbedded {
                     .build())
             .build();
 
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "config", config, true);
+
     kafkaCatalogOperations = new KafkaCatalogOperations(store, idGenerator);
     kafkaCatalogOperations.initialize(
         MOCK_CATALOG_PROPERTIES, kafkaCatalogEntity.toCatalogInfo(), 
KAFKA_PROPERTIES_METADATA);
diff --git a/catalogs/catalog-lakehouse-generic/build.gradle.kts 
b/catalogs/catalog-lakehouse-generic/build.gradle.kts
index ca0c2d7907..a3d68b8451 100644
--- a/catalogs/catalog-lakehouse-generic/build.gradle.kts
+++ b/catalogs/catalog-lakehouse-generic/build.gradle.kts
@@ -53,6 +53,7 @@ dependencies {
   testImplementation(project(":server"))
   testImplementation(project(":server-common"))
 
+  testImplementation(libs.awaitility)
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.junit.jupiter.params)
   testImplementation(libs.lance) // Included in the test runtime classpath for 
test only
diff --git 
a/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/generic/TestGenericCatalogOperations.java
 
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/generic/TestGenericCatalogOperations.java
index 4c71f4adb1..8f61c12182 100644
--- 
a/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/generic/TestGenericCatalogOperations.java
+++ 
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/generic/TestGenericCatalogOperations.java
@@ -43,11 +43,13 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Configs;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.EntityStoreFactory;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.Schema;
@@ -79,7 +81,7 @@ public class TestGenericCatalogOperations {
   private static GenericCatalogOperations ops;
 
   @BeforeAll
-  public static void setUp() throws IOException {
+  public static void setUp() throws IOException, IllegalAccessException {
     Config config = Mockito.mock(Config.class);
     when(config.get(ENTITY_STORE)).thenReturn(RELATIONAL_ENTITY_STORE);
     
when(config.get(ENTITY_RELATIONAL_STORE)).thenReturn(DEFAULT_ENTITY_RELATIONAL_STORE);
@@ -132,6 +134,8 @@ public class TestGenericCatalogOperations {
             .build();
     store.put(catalog, false);
 
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "config", config, true);
+
     ops = new GenericCatalogOperations(store, idGenerator);
   }
 
diff --git a/catalogs/catalog-lakehouse-hudi/build.gradle.kts 
b/catalogs/catalog-lakehouse-hudi/build.gradle.kts
index 738e0b2db7..1c2e2e6017 100644
--- a/catalogs/catalog-lakehouse-hudi/build.gradle.kts
+++ b/catalogs/catalog-lakehouse-hudi/build.gradle.kts
@@ -75,6 +75,7 @@ dependencies {
     exclude("org.apache.logging.log4j")
   }
 
+  testImplementation(libs.awaitility)
   testImplementation(libs.bundles.jetty)
   testImplementation(libs.bundles.jersey)
   testImplementation(libs.commons.collections3)
diff --git a/catalogs/catalog-lakehouse-iceberg/build.gradle.kts 
b/catalogs/catalog-lakehouse-iceberg/build.gradle.kts
index d6e74990c2..26d74f2fd5 100644
--- a/catalogs/catalog-lakehouse-iceberg/build.gradle.kts
+++ b/catalogs/catalog-lakehouse-iceberg/build.gradle.kts
@@ -78,6 +78,7 @@ dependencies {
     exclude("org.rocksdb")
   }
 
+  testImplementation(libs.awaitility)
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.junit.jupiter.params)
   testImplementation(libs.mockito.core)
diff --git a/catalogs/catalog-lakehouse-paimon/build.gradle.kts 
b/catalogs/catalog-lakehouse-paimon/build.gradle.kts
index 37215b94c4..b0cfb6c5f0 100644
--- a/catalogs/catalog-lakehouse-paimon/build.gradle.kts
+++ b/catalogs/catalog-lakehouse-paimon/build.gradle.kts
@@ -148,7 +148,10 @@ dependencies {
   
testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
 {
     exclude("org.apache.hadoop")
   }
+
+  testImplementation(libs.awaitility)
   testImplementation(libs.slf4j.api)
+  testImplementation(libs.awaitility)
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.mysql.driver)
   testImplementation(libs.postgresql.driver)
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonS3IT.java
 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonS3IT.java
index a435de4b57..9a5ac8e64d 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonS3IT.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonS3IT.java
@@ -28,9 +28,9 @@ import 
org.apache.gravitino.integration.test.util.DownloaderUtils;
 import org.apache.gravitino.integration.test.util.ITUtils;
 import org.apache.gravitino.storage.S3Properties;
 import org.apache.spark.sql.SparkSession;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Tag;
 import org.testcontainers.containers.Container;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 @Tag("gravitino-docker-test")
 public class CatalogPaimonS3IT extends CatalogPaimonBaseIT {
diff --git a/catalogs/catalog-model/build.gradle.kts 
b/catalogs/catalog-model/build.gradle.kts
index 67b0545b78..51bd0e0f3a 100644
--- a/catalogs/catalog-model/build.gradle.kts
+++ b/catalogs/catalog-model/build.gradle.kts
@@ -45,6 +45,7 @@ dependencies {
   testImplementation(project(":integration-test-common", "testArtifacts"))
   testImplementation(project(":server"))
   testImplementation(project(":server-common"))
+  testImplementation(libs.awaitility)
   testImplementation(libs.bundles.log4j)
   testImplementation(libs.commons.io)
   testImplementation(libs.commons.lang3)
diff --git 
a/catalogs/catalog-model/src/test/java/org/apache/gravtitino/catalog/model/TestModelCatalogOperations.java
 
b/catalogs/catalog-model/src/test/java/org/apache/gravtitino/catalog/model/TestModelCatalogOperations.java
index f8119b1fec..65ea324266 100644
--- 
a/catalogs/catalog-model/src/test/java/org/apache/gravtitino/catalog/model/TestModelCatalogOperations.java
+++ 
b/catalogs/catalog-model/src/test/java/org/apache/gravtitino/catalog/model/TestModelCatalogOperations.java
@@ -47,11 +47,13 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Configs;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.EntityStoreFactory;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.Schema;
@@ -100,7 +102,7 @@ public class TestModelCatalogOperations {
   private static ModelCatalogOperations ops;
 
   @BeforeAll
-  public static void setUp() throws IOException {
+  public static void setUp() throws IOException, IllegalAccessException {
     Config config = Mockito.mock(Config.class);
     when(config.get(ENTITY_STORE)).thenReturn(RELATIONAL_ENTITY_STORE);
     
when(config.get(ENTITY_RELATIONAL_STORE)).thenReturn(DEFAULT_ENTITY_RELATIONAL_STORE);
@@ -158,6 +160,8 @@ public class TestModelCatalogOperations {
             .build();
     store.put(catalog, false);
 
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "config", config, true);
+
     ops = new ModelCatalogOperations(store);
     ops.initialize(
         Collections.emptyMap(),
diff --git a/clients/cli/build.gradle.kts b/clients/cli/build.gradle.kts
index 9358808f9f..277c559521 100644
--- a/clients/cli/build.gradle.kts
+++ b/clients/cli/build.gradle.kts
@@ -32,6 +32,7 @@ dependencies {
   implementation(project(":clients:client-java"))
   implementation(project(":common"))
 
+  testImplementation(libs.awaitility)
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.junit.jupiter.params)
   testImplementation(libs.mockito.core)
diff --git a/clients/client-java/build.gradle.kts 
b/clients/client-java/build.gradle.kts
index e723d27638..604f6912ba 100644
--- a/clients/client-java/build.gradle.kts
+++ b/clients/client-java/build.gradle.kts
@@ -47,6 +47,7 @@ dependencies {
   testImplementation(project(":server"))
   testImplementation(project(":server-common"))
 
+  testImplementation(libs.awaitility)
   testImplementation(libs.bundles.jersey)
   testImplementation(libs.bundles.jwt)
   testImplementation(libs.commons.lang3)
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/JobIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/JobIT.java
index 9a0096513f..9dfa11ba8a 100644
--- 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/JobIT.java
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/JobIT.java
@@ -39,13 +39,13 @@ import org.apache.gravitino.job.JobHandle;
 import org.apache.gravitino.job.JobTemplate;
 import org.apache.gravitino.job.JobTemplateChange;
 import org.apache.gravitino.job.ShellJobTemplate;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 public class JobIT extends BaseIT {
 
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3IT.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3IT.java
index 05fc78475c..cdd0d3eb32 100644
--- 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3IT.java
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3IT.java
@@ -34,6 +34,7 @@ import 
org.apache.gravitino.integration.test.util.GravitinoITUtils;
 import org.apache.gravitino.s3.fs.S3FileSystemProvider;
 import org.apache.gravitino.storage.S3Properties;
 import org.apache.hadoop.conf.Configuration;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -41,7 +42,6 @@ import org.junit.jupiter.api.Disabled;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 public class GravitinoVirtualFileSystemS3IT extends 
GravitinoVirtualFileSystemIT {
   private static final Logger LOG = 
LoggerFactory.getLogger(GravitinoVirtualFileSystemS3IT.class);
diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java 
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index b17360423e..6fba49b808 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -442,6 +442,10 @@ public class GravitinoEnv {
     return statisticDispatcher;
   }
 
+  public boolean cacheEnabled() {
+    return config.get(Configs.CACHE_ENABLED);
+  }
+
   public void start() {
     metricsSystem.start();
     eventListenerManager.start();
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaMapper.java
index cd2b70be6a..0c422077ad 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaMapper.java
@@ -85,6 +85,33 @@ public interface FilesetMetaMapper {
   @SelectProvider(type = FilesetMetaSQLProviderFactory.class, method = 
"listFilesetPOsBySchemaId")
   List<FilesetPO> listFilesetPOsBySchemaId(@Param("schemaId") Long schemaId);
 
+  @Results({
+    @Result(property = "filesetId", column = "fileset_id", id = true),
+    @Result(property = "filesetName", column = "fileset_name"),
+    @Result(property = "metalakeId", column = "metalake_id"),
+    @Result(property = "catalogId", column = "catalog_id"),
+    @Result(property = "schemaId", column = "schema_id"),
+    @Result(property = "type", column = "type"),
+    @Result(property = "auditInfo", column = "audit_info"),
+    @Result(property = "currentVersion", column = "current_version"),
+    @Result(property = "lastVersion", column = "last_version"),
+    @Result(property = "deletedAt", column = "deleted_at"),
+    @Result(
+        property = "filesetVersionPOs",
+        javaType = List.class,
+        column =
+            
"{id,version_metalake_id,version_catalog_id,version_schema_id,version_fileset_id,version,"
+                + 
"fileset_comment,properties,storage_location_name,storage_location,version_deleted_at}",
+        many = @Many(resultMap = "mapToFilesetVersionPO"))
+  })
+  @SelectProvider(
+      type = FilesetMetaSQLProviderFactory.class,
+      method = "listFilesetPOsByFullQualifiedName")
+  List<FilesetPO> listFilesetPOsByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName);
+
   @Results({
     @Result(property = "filesetId", column = "fileset_id", id = true),
     @Result(property = "filesetName", column = "fileset_name"),
@@ -160,6 +187,34 @@ public interface FilesetMetaMapper {
   @SelectProvider(type = FilesetMetaSQLProviderFactory.class, method = 
"selectFilesetMetaById")
   FilesetPO selectFilesetMetaById(@Param("filesetId") Long filesetId);
 
+  @Results({
+    @Result(property = "filesetId", column = "fileset_id", id = true),
+    @Result(property = "filesetName", column = "fileset_name"),
+    @Result(property = "metalakeId", column = "metalake_id"),
+    @Result(property = "catalogId", column = "catalog_id"),
+    @Result(property = "schemaId", column = "schema_id"),
+    @Result(property = "type", column = "type"),
+    @Result(property = "auditInfo", column = "audit_info"),
+    @Result(property = "currentVersion", column = "current_version"),
+    @Result(property = "lastVersion", column = "last_version"),
+    @Result(property = "deletedAt", column = "deleted_at"),
+    @Result(
+        property = "filesetVersionPOs",
+        javaType = List.class,
+        column =
+            
"{id,version_metalake_id,version_catalog_id,version_schema_id,version_fileset_id,version,"
+                + 
"fileset_comment,properties,storage_location_name,storage_location,version_deleted_at}",
+        many = @Many(resultMap = "mapToFilesetVersionPO"))
+  })
+  @SelectProvider(
+      type = FilesetMetaSQLProviderFactory.class,
+      method = "selectFilesetByFullQualifiedName")
+  FilesetPO selectFilesetByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("filesetName") String filesetName);
+
   @InsertProvider(type = FilesetMetaSQLProviderFactory.class, method = 
"insertFilesetMeta")
   void insertFilesetMeta(@Param("filesetMeta") FilesetPO filesetPO);
 
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaSQLProviderFactory.java
index 7729b095c0..1d8f19d4b1 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaSQLProviderFactory.java
@@ -56,6 +56,13 @@ public class FilesetMetaSQLProviderFactory {
     return getProvider().listFilesetPOsBySchemaId(schemaId);
   }
 
+  public static String listFilesetPOsByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName) {
+    return getProvider().listFilesetPOsByFullQualifiedName(metalakeName, 
catalogName, schemaName);
+  }
+
   public static String listFilesetPOsByFilesetIds(@Param("filesetIds") 
List<Long> filesetIds) {
     return getProvider().listFilesetPOsByFilesetIds(filesetIds);
   }
@@ -74,6 +81,15 @@ public class FilesetMetaSQLProviderFactory {
     return getProvider().selectFilesetMetaById(filesetId);
   }
 
+  public static String selectFilesetByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("filesetName") String filesetName) {
+    return getProvider()
+        .selectFilesetByFullQualifiedName(metalakeName, catalogName, 
schemaName, filesetName);
+  }
+
   public static String insertFilesetMeta(@Param("filesetMeta") FilesetPO 
filesetPO) {
     return getProvider().insertFilesetMeta(filesetPO);
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaMapper.java
index eba3a0a43c..792429b4ec 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaMapper.java
@@ -40,6 +40,14 @@ public interface ModelMetaMapper {
   @SelectProvider(type = ModelMetaSQLProviderFactory.class, method = 
"listModelPOsBySchemaId")
   List<ModelPO> listModelPOsBySchemaId(@Param("schemaId") Long schemaId);
 
+  @SelectProvider(
+      type = ModelMetaSQLProviderFactory.class,
+      method = "listModelPOsByFullQualifiedName")
+  List<ModelPO> listModelPOsByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName);
+
   @SelectProvider(type = ModelMetaSQLProviderFactory.class, method = 
"listModelPOsByModelIds")
   List<ModelPO> listModelPOsByModelIds(@Param("modelIds") List<Long> modelIds);
 
@@ -49,6 +57,15 @@ public interface ModelMetaMapper {
   ModelPO selectModelMetaBySchemaIdAndModelName(
       @Param("schemaId") Long schemaId, @Param("modelName") String modelName);
 
+  @SelectProvider(
+      type = ModelMetaSQLProviderFactory.class,
+      method = "selectModelByFullQualifiedName")
+  ModelPO selectModelByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("modelName") String modelName);
+
   @SelectProvider(
       type = ModelMetaSQLProviderFactory.class,
       method = "selectModelIdBySchemaIdAndModelName")
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaSQLProviderFactory.java
index e4f7fe8161..8d6b53a4fa 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaSQLProviderFactory.java
@@ -63,6 +63,13 @@ public class ModelMetaSQLProviderFactory {
     return getProvider().listModelPOsBySchemaId(schemaId);
   }
 
+  public static String listModelPOsByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName) {
+    return getProvider().listModelPOsByFullQualifiedName(metalakeName, 
catalogName, schemaName);
+  }
+
   public static String listModelPOsByModelIds(@Param("modelIds") List<Long> 
modelIds) {
     return getProvider().listModelPOsByModelIds(modelIds);
   }
@@ -72,6 +79,15 @@ public class ModelMetaSQLProviderFactory {
     return getProvider().selectModelMetaBySchemaIdAndModelName(schemaId, 
modelName);
   }
 
+  public static String selectModelByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("modelName") String modelName) {
+    return getProvider()
+        .selectModelByFullQualifiedName(metalakeName, catalogName, schemaName, 
modelName);
+  }
+
   public static String selectModelIdBySchemaIdAndModelName(
       @Param("schemaId") Long schemaId, @Param("modelName") String modelName) {
     return getProvider().selectModelIdBySchemaIdAndModelName(schemaId, 
modelName);
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaMapper.java
index e1816a3277..f4809b27f1 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaMapper.java
@@ -42,6 +42,12 @@ public interface SchemaMetaMapper {
   @SelectProvider(type = SchemaMetaSQLProviderFactory.class, method = 
"listSchemaPOsByCatalogId")
   List<SchemaPO> listSchemaPOsByCatalogId(@Param("catalogId") Long catalogId);
 
+  @SelectProvider(
+      type = SchemaMetaSQLProviderFactory.class,
+      method = "listSchemaPOsByFullQualifiedName")
+  List<SchemaPO> listSchemaPOsByFullQualifiedName(
+      @Param("metalakeName") String metalakeName, @Param("catalogName") String 
catalogName);
+
   @SelectProvider(type = SchemaMetaSQLProviderFactory.class, method = 
"listSchemaPOsBySchemaIds")
   List<SchemaPO> listSchemaPOsBySchemaIds(@Param("schemaIds") List<Long> 
schemaIds);
 
@@ -57,6 +63,14 @@ public interface SchemaMetaMapper {
   SchemaPO selectSchemaMetaByCatalogIdAndName(
       @Param("catalogId") Long catalogId, @Param("schemaName") String name);
 
+  @SelectProvider(
+      type = SchemaMetaSQLProviderFactory.class,
+      method = "selectSchemaByFullQualifiedName")
+  SchemaPO selectSchemaByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName);
+
   @SelectProvider(type = SchemaMetaSQLProviderFactory.class, method = 
"selectSchemaMetaById")
   SchemaPO selectSchemaMetaById(@Param("schemaId") Long schemaId);
 
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaSQLProviderFactory.java
index cbab45733c..70e6b0b335 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaSQLProviderFactory.java
@@ -51,6 +51,11 @@ public class SchemaMetaSQLProviderFactory {
 
   static class SchemaMetaH2Provider extends SchemaMetaBaseSQLProvider {}
 
+  public static String listSchemaPOsByFullQualifiedName(
+      @Param("metalakeName") String metalakeName, @Param("catalogName") String 
catalogName) {
+    return getProvider().listSchemaPOsByFullQualifiedName(metalakeName, 
catalogName);
+  }
+
   public static String listSchemaPOsBySchemaIds(@Param("schemaIds") List<Long> 
schemaIds) {
     return getProvider().listSchemaPOsBySchemaIds(schemaIds);
   }
@@ -69,6 +74,13 @@ public class SchemaMetaSQLProviderFactory {
     return getProvider().selectSchemaMetaByCatalogIdAndName(catalogId, name);
   }
 
+  public static String selectSchemaByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName) {
+    return getProvider().selectSchemaByFullQualifiedName(metalakeName, 
catalogName, schemaName);
+  }
+
   public static String selectSchemaMetaById(@Param("schemaId") Long schemaId) {
     return getProvider().selectSchemaMetaById(schemaId);
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java
index 0eb13e4a1e..c284d0ce3c 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java
@@ -41,6 +41,14 @@ public interface TableMetaMapper {
   @SelectProvider(type = TableMetaSQLProviderFactory.class, method = 
"listTablePOsBySchemaId")
   List<TablePO> listTablePOsBySchemaId(@Param("schemaId") Long schemaId);
 
+  @SelectProvider(
+      type = TableMetaSQLProviderFactory.class,
+      method = "listTablePOsByFullQualifiedName")
+  List<TablePO> listTablePOsByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName);
+
   @SelectProvider(type = TableMetaSQLProviderFactory.class, method = 
"listTablePOsByTableIds")
   List<TablePO> listTablePOsByTableIds(@Param("tableIds") List<Long> tableIds);
 
@@ -56,6 +64,15 @@ public interface TableMetaMapper {
   TablePO selectTableMetaBySchemaIdAndName(
       @Param("schemaId") Long schemaId, @Param("tableName") String name);
 
+  @SelectProvider(
+      type = TableMetaSQLProviderFactory.class,
+      method = "selectTableByFullQualifiedName")
+  TablePO selectTableByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("tableName") String tableName);
+
   @InsertProvider(type = TableMetaSQLProviderFactory.class, method = 
"insertTableMeta")
   void insertTableMeta(@Param("tableMeta") TablePO tablePO);
 
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaSQLProviderFactory.java
index ad16f69ceb..2a796a6e15 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaSQLProviderFactory.java
@@ -59,6 +59,13 @@ public class TableMetaSQLProviderFactory {
     return getProvider().listTablePOsByTableIds(tableIds);
   }
 
+  public static String listTablePOsByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName) {
+    return getProvider().listTablePOsByFullQualifiedName(metalakeName, 
catalogName, schemaName);
+  }
+
   public static String selectTableIdBySchemaIdAndName(
       @Param("schemaId") Long schemaId, @Param("tableName") String name) {
     return getProvider().selectTableIdBySchemaIdAndName(schemaId, name);
@@ -69,6 +76,15 @@ public class TableMetaSQLProviderFactory {
     return getProvider().selectTableMetaBySchemaIdAndName(schemaId, name);
   }
 
+  public static String selectTableByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("tableName") String tableName) {
+    return getProvider()
+        .selectTableByFullQualifiedName(metalakeName, catalogName, schemaName, 
tableName);
+  }
+
   public static String selectTableMetaById(@Param("tableId") Long tableId) {
     return getProvider().selectTableMetaById(tableId);
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaMapper.java
index 014ef229ee..57f775453b 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaMapper.java
@@ -40,6 +40,14 @@ public interface TopicMetaMapper {
   @SelectProvider(type = TopicMetaSQLProviderFactory.class, method = 
"listTopicPOsBySchemaId")
   List<TopicPO> listTopicPOsBySchemaId(@Param("schemaId") Long schemaId);
 
+  @SelectProvider(
+      type = TopicMetaSQLProviderFactory.class,
+      method = "listTopicPOsByFullQualifiedName")
+  List<TopicPO> listTopicPOsByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName);
+
   @SelectProvider(type = TopicMetaSQLProviderFactory.class, method = 
"listTopicPOsByTopicIds")
   List<TopicPO> listTopicPOsByTopicIds(@Param("topicIds") List<Long> topicIds);
 
@@ -49,6 +57,15 @@ public interface TopicMetaMapper {
   TopicPO selectTopicMetaBySchemaIdAndName(
       @Param("schemaId") Long schemaId, @Param("topicName") String topicName);
 
+  @SelectProvider(
+      type = TopicMetaSQLProviderFactory.class,
+      method = "selectTopicByFullQualifiedName")
+  TopicPO selectTopicByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("topicName") String topicName);
+
   @SelectProvider(type = TopicMetaSQLProviderFactory.class, method = 
"selectTopicMetaById")
   TopicPO selectTopicMetaById(@Param("topicId") Long topicId);
 
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java
index dbbfc21604..bfe98198d2 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java
@@ -64,6 +64,13 @@ public class TopicMetaSQLProviderFactory {
     return getProvider().listTopicPOsBySchemaId(schemaId);
   }
 
+  public static String listTopicPOsByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName) {
+    return getProvider().listTopicPOsByFullQualifiedName(metalakeName, 
catalogName, schemaName);
+  }
+
   public static String listTopicPOsByTopicIds(@Param("topicIds") List<Long> 
topicIds) {
     return getProvider().listTopicPOsByTopicIds(topicIds);
   }
@@ -73,6 +80,15 @@ public class TopicMetaSQLProviderFactory {
     return getProvider().selectTopicMetaBySchemaIdAndName(schemaId, topicName);
   }
 
+  public static String selectTopicByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("topicName") String topicName) {
+    return getProvider()
+        .selectTopicByFullQualifiedName(metalakeName, catalogName, schemaName, 
topicName);
+  }
+
   public static String selectTopicMetaById(@Param("topicId") Long topicId) {
     return getProvider().selectTopicMetaById(topicId);
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FilesetMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FilesetMetaBaseSQLProvider.java
index ae99e34900..f6e61fdef2 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FilesetMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FilesetMetaBaseSQLProvider.java
@@ -23,6 +23,9 @@ import static 
org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper.M
 import static 
org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper.VERSION_TABLE_NAME;
 
 import java.util.List;
+import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.po.FilesetPO;
 import org.apache.ibatis.annotations.Param;
 
@@ -43,6 +46,62 @@ public class FilesetMetaBaseSQLProvider {
         + " WHERE fm.schema_id = #{schemaId} AND fm.deleted_at = 0 AND 
vi.deleted_at = 0";
   }
 
+  public String listFilesetPOsByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName) {
+    return """
+        SELECT
+            mm.metalake_id,
+            cm.catalog_id,
+            sm.schema_id,
+            vi.fileset_id,
+            fm.fileset_name,
+            fm.type,
+            fm.audit_info,
+            fm.current_version,
+            fm.last_version,
+            fm.deleted_at,
+            vi.id,
+            vi.metalake_id as version_metalake_id,
+            vi.catalog_id as version_catalog_id,
+            vi.schema_id as version_schema_id,
+            vi.fileset_id as version_fileset_id,
+            vi.version,
+            vi.fileset_comment,
+            vi.properties,
+            vi.storage_location_name,
+            vi.storage_location,
+            vi.deleted_at as version_deleted_at
+        FROM
+            %s mm
+        INNER JOIN
+            %s cm ON mm.metalake_id = cm.metalake_id
+            AND cm.catalog_name = #{catalogName}
+            AND cm.deleted_at = 0
+        LEFT JOIN
+            %s sm ON cm.catalog_id = sm.catalog_id
+            AND sm.schema_name = #{schemaName}
+            AND sm.deleted_at = 0
+        LEFT JOIN
+            %s fm ON sm.schema_id = fm.schema_id
+            AND fm.deleted_at = 0
+        LEFT JOIN
+            %s vi ON fm.fileset_id = vi.fileset_id
+            AND fm.current_version = vi.version
+            AND vi.deleted_at = 0
+        WHERE
+            mm.metalake_name = #{metalakeName}
+            AND mm.deleted_at = 0;
+            """
+        .formatted(
+            MetalakeMetaMapper.TABLE_NAME,
+            CatalogMetaMapper.TABLE_NAME,
+            SchemaMetaMapper.TABLE_NAME,
+            META_TABLE_NAME,
+            VERSION_TABLE_NAME);
+  }
+
   public String selectFilesetIdBySchemaIdAndName(
       @Param("schemaId") Long schemaId, @Param("filesetName") String name) {
     return "SELECT fileset_id as filesetId FROM "
@@ -90,6 +149,64 @@ public class FilesetMetaBaseSQLProvider {
         + " AND fm.deleted_at = 0 AND vi.deleted_at = 0";
   }
 
+  public String selectFilesetByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("filesetName") String filesetName) {
+    return """
+        SELECT
+            mm.metalake_id,
+            cm.catalog_id,
+            sm.schema_id,
+            vi.fileset_id,
+            fm.fileset_name,
+            fm.type,
+            fm.audit_info,
+            fm.current_version,
+            fm.last_version,
+            fm.deleted_at,
+            vi.id,
+            vi.metalake_id as version_metalake_id,
+            vi.catalog_id as version_catalog_id,
+            vi.schema_id as version_schema_id,
+            vi.fileset_id as version_fileset_id,
+            vi.version,
+            vi.fileset_comment,
+            vi.properties,
+            vi.storage_location_name,
+            vi.storage_location,
+            vi.deleted_at as version_deleted_at
+        FROM
+            %s mm
+        INNER JOIN
+            %s cm ON mm.metalake_id = cm.metalake_id
+            AND cm.catalog_name = #{catalogName}
+            AND cm.deleted_at = 0
+        LEFT JOIN
+            %s sm ON cm.catalog_id = sm.catalog_id
+            AND sm.schema_name = #{schemaName}
+            AND sm.deleted_at = 0
+        LEFT JOIN
+            %s fm ON sm.schema_id = fm.schema_id
+            AND fm.fileset_name = #{filesetName}
+            AND fm.deleted_at = 0
+        LEFT JOIN
+            %s vi ON fm.fileset_id = vi.fileset_id
+            AND fm.current_version = vi.version
+            AND vi.deleted_at = 0
+        WHERE
+            mm.metalake_name = #{metalakeName}
+            AND mm.deleted_at = 0;
+            """
+        .formatted(
+            MetalakeMetaMapper.TABLE_NAME,
+            CatalogMetaMapper.TABLE_NAME,
+            SchemaMetaMapper.TABLE_NAME,
+            META_TABLE_NAME,
+            VERSION_TABLE_NAME);
+  }
+
   public String selectFilesetMetaById(@Param("filesetId") Long filesetId) {
     return "SELECT fm.fileset_id, fm.fileset_name, fm.metalake_id, 
fm.catalog_id, fm.schema_id,"
         + " fm.type, fm.audit_info, fm.current_version, fm.last_version, 
fm.deleted_at,"
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelMetaBaseSQLProvider.java
index ec8e20de21..ed8d9a17ba 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelMetaBaseSQLProvider.java
@@ -19,7 +19,10 @@
 package org.apache.gravitino.storage.relational.mapper.provider.base;
 
 import java.util.List;
+import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.po.ModelPO;
 import org.apache.ibatis.annotations.Param;
 
@@ -67,6 +70,46 @@ public class ModelMetaBaseSQLProvider {
         + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
   }
 
+  public String listModelPOsByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName) {
+    return """
+        SELECT
+            mm.metalake_id AS metalakeId,
+            cm.catalog_id AS catalogId,
+            sm.schema_id AS schemaId,
+            mo.model_id AS modelId,
+            mo.model_name AS modelName,
+            mo.model_comment AS modelComment,
+            mo.model_properties AS modelProperties,
+            mo.model_latest_version AS modelLatestVersion,
+            mo.audit_info AS auditInfo,
+            mo.deleted_at AS deletedAt
+        FROM
+            %s mm
+        INNER JOIN
+            %s cm ON mm.metalake_id = cm.metalake_id
+            AND cm.catalog_name = #{catalogName}
+            AND cm.deleted_at = 0
+        LEFT JOIN
+            %s sm ON cm.catalog_id = sm.catalog_id
+            AND sm.schema_name = #{schemaName}
+            AND sm.deleted_at = 0
+        LEFT JOIN
+            %s mo ON sm.schema_id = mo.schema_id
+            AND mo.deleted_at = 0
+        WHERE
+            mm.metalake_name = #{metalakeName}
+            AND mm.deleted_at = 0;
+            """
+        .formatted(
+            MetalakeMetaMapper.TABLE_NAME,
+            CatalogMetaMapper.TABLE_NAME,
+            SchemaMetaMapper.TABLE_NAME,
+            ModelMetaMapper.TABLE_NAME);
+  }
+
   public String listModelPOsByModelIds(List<Long> modelIds) {
     return "<script>"
         + " SELECT model_id AS modelId, model_name AS modelName, metalake_id 
AS metalakeId,"
@@ -95,6 +138,48 @@ public class ModelMetaBaseSQLProvider {
         + " WHERE schema_id = #{schemaId} AND model_name = #{modelName} AND 
deleted_at = 0";
   }
 
+  public String selectModelByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("modelName") String modelName) {
+    return """
+        SELECT
+            mm.metalake_id AS metalakeId,
+            cm.catalog_id AS catalogId,
+            sm.schema_id AS schemaId,
+            mo.model_id AS modelId,
+            mo.model_name AS modelName,
+            mo.model_comment AS modelComment,
+            mo.model_properties AS modelProperties,
+            mo.model_latest_version AS modelLatestVersion,
+            mo.audit_info AS auditInfo,
+            mo.deleted_at AS deletedAt
+        FROM
+            %s mm
+        INNER JOIN
+            %s cm ON mm.metalake_id = cm.metalake_id
+            AND cm.catalog_name = #{catalogName}
+            AND cm.deleted_at = 0
+        LEFT JOIN
+            %s sm ON cm.catalog_id = sm.catalog_id
+            AND sm.schema_name = #{schemaName}
+            AND sm.deleted_at = 0
+        LEFT JOIN
+            %s mo ON sm.schema_id = mo.schema_id
+            AND mo.model_name = #{modelName}
+            AND mo.deleted_at = 0
+        WHERE
+            mm.metalake_name = #{metalakeName}
+            AND mm.deleted_at = 0;
+            """
+        .formatted(
+            MetalakeMetaMapper.TABLE_NAME,
+            CatalogMetaMapper.TABLE_NAME,
+            SchemaMetaMapper.TABLE_NAME,
+            ModelMetaMapper.TABLE_NAME);
+  }
+
   public String selectModelIdBySchemaIdAndModelName(
       @Param("schemaId") Long schemaId, @Param("modelName") String modelName) {
     return "SELECT model_id"
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/SchemaMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/SchemaMetaBaseSQLProvider.java
index 437c94ce15..0ac2080398 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/SchemaMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/SchemaMetaBaseSQLProvider.java
@@ -21,6 +21,8 @@ package 
org.apache.gravitino.storage.relational.mapper.provider.base;
 import static 
org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper.TABLE_NAME;
 
 import java.util.List;
+import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
 import org.apache.gravitino.storage.relational.po.SchemaPO;
 import org.apache.ibatis.annotations.Param;
 
@@ -36,6 +38,36 @@ public class SchemaMetaBaseSQLProvider {
         + " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
   }
 
+  public String listSchemaPOsByFullQualifiedName(
+      @Param("metalakeName") String metalakeName, @Param("catalogName") String 
catalogName) {
+    return """
+        SELECT
+            mm.metalake_id as metalakeId,
+            cm.catalog_id as catalogId,
+            sm.schema_id as schemaId,
+            sm.schema_name as schemaName,
+            sm.schema_comment as schemaComment,
+            sm.properties as properties,
+            sm.audit_info as auditInfo,
+            sm.current_version as currentVersion,
+            sm.last_version as lastVersion,
+            sm.deleted_at as deletedAt
+        FROM
+            %s mm
+        INNER JOIN
+            %s cm ON mm.metalake_id = cm.metalake_id
+            AND cm.catalog_name = #{catalogName}
+            AND cm.deleted_at = 0
+        LEFT JOIN
+            %s sm ON cm.catalog_id = sm.catalog_id
+            AND sm.deleted_at = 0
+        WHERE
+            mm.metalake_name = #{metalakeName}
+            AND mm.deleted_at = 0;
+            """
+        .formatted(MetalakeMetaMapper.TABLE_NAME, 
CatalogMetaMapper.TABLE_NAME, TABLE_NAME);
+  }
+
   public String listSchemaPOsBySchemaIds(@Param("schemaIds") List<Long> 
schemaIds) {
     return "<script>"
         + "SELECT schema_id as schemaId, schema_name as schemaName,"
@@ -74,6 +106,39 @@ public class SchemaMetaBaseSQLProvider {
         + " WHERE catalog_id = #{catalogId} AND schema_name = #{schemaName} 
AND deleted_at = 0";
   }
 
+  public String selectSchemaByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName) {
+    return """
+        SELECT
+            mm.metalake_id as metalakeId,
+            cm.catalog_id as catalogId,
+            sm.schema_id as schemaId,
+            sm.schema_name as schemaName,
+            sm.schema_comment as schemaComment,
+            sm.properties as properties,
+            sm.audit_info as auditInfo,
+            sm.current_version as currentVersion,
+            sm.last_version as lastVersion,
+            sm.deleted_at as deletedAt
+        FROM
+            %s mm
+        INNER JOIN
+            %s cm ON mm.metalake_id = cm.metalake_id
+            AND cm.catalog_name = #{catalogName}
+            AND cm.deleted_at = 0
+        LEFT JOIN
+            %s sm ON cm.catalog_id = sm.catalog_id
+            AND sm.schema_name = #{schemaName}
+            AND sm.deleted_at = 0
+        WHERE
+            mm.metalake_name = #{metalakeName}
+            AND mm.deleted_at = 0;
+            """
+        .formatted(MetalakeMetaMapper.TABLE_NAME, 
CatalogMetaMapper.TABLE_NAME, TABLE_NAME);
+  }
+
   public String selectSchemaMetaById(@Param("schemaId") Long schemaId) {
     return "SELECT schema_id as schemaId, schema_name as schemaName,"
         + " metalake_id as metalakeId, catalog_id as catalogId,"
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
index f8f116c5b3..4312091fca 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
@@ -21,6 +21,9 @@ package 
org.apache.gravitino.storage.relational.mapper.provider.base;
 import static 
org.apache.gravitino.storage.relational.mapper.TableMetaMapper.TABLE_NAME;
 
 import java.util.List;
+import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.TableVersionMapper;
 import org.apache.gravitino.storage.relational.po.TablePO;
 import org.apache.ibatis.annotations.Param;
@@ -72,6 +75,57 @@ public class TableMetaBaseSQLProvider {
         + "</script>";
   }
 
+  public String listTablePOsByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName) {
+    return """
+        SELECT
+            sm.schema_id AS schemaId,
+            cm.catalog_id AS catalogId,
+            tm.table_id AS tableId,
+            tm.table_name AS tableName,
+            tm.metalake_id AS metalakeId,
+            tm.audit_info AS auditInfo,
+            tm.current_version AS currentVersion,
+            tm.last_version AS lastVersion,
+            tm.deleted_at AS deletedAt,
+            tvi.format AS format,
+            tvi.properties AS properties,
+            tvi.partitioning AS partitions,
+            tvi.sort_orders AS sortOrders,
+            tvi.distribution AS distribution,
+            tvi.indexes AS indexes,
+            tvi.comment AS comment
+        FROM
+            %s mm
+        INNER JOIN
+            %s cm ON mm.metalake_id = cm.metalake_id
+            AND cm.catalog_name = #{catalogName}
+            AND cm.deleted_at = 0
+        LEFT JOIN
+            %s sm ON cm.catalog_id = sm.catalog_id
+            AND sm.schema_name = #{schemaName}
+            AND sm.deleted_at = 0
+        LEFT JOIN
+            %s tm ON sm.schema_id = tm.schema_id
+            AND tm.deleted_at = 0
+        LEFT JOIN
+            %s tvi ON tm.table_id = tvi.table_id
+            AND tm.current_version = tvi.version
+            AND tvi.deleted_at = 0
+        WHERE
+            mm.metalake_name = #{metalakeName}
+            AND mm.deleted_at = 0;
+            """
+        .formatted(
+            MetalakeMetaMapper.TABLE_NAME,
+            CatalogMetaMapper.TABLE_NAME,
+            SchemaMetaMapper.TABLE_NAME,
+            TABLE_NAME,
+            TableVersionMapper.TABLE_NAME);
+  }
+
   public String selectTableIdBySchemaIdAndName(
       @Param("schemaId") Long schemaId, @Param("tableName") String name) {
     return "SELECT table_id as tableId FROM "
@@ -230,4 +284,57 @@ public class TableMetaBaseSQLProvider {
         + TABLE_NAME
         + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit}";
   }
+
+  public String selectTableByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("tableName") String tableName) {
+    return """
+        SELECT
+            sm.schema_id AS schemaId,
+            cm.catalog_id AS catalogId,
+            tm.table_id AS tableId,
+            tm.table_name AS tableName,
+            tm.metalake_id AS metalakeId,
+            tm.audit_info AS auditInfo,
+            tm.current_version AS currentVersion,
+            tm.last_version AS lastVersion,
+            tm.deleted_at AS deletedAt,
+            tvi.format AS format,
+            tvi.properties AS properties,
+            tvi.partitioning AS partitions,
+            tvi.sort_orders AS sortOrders,
+            tvi.distribution AS distribution,
+            tvi.indexes AS indexes,
+            tvi.comment AS comment
+        FROM
+            %s mm
+        INNER JOIN
+            %s cm ON mm.metalake_id = cm.metalake_id
+            AND cm.catalog_name = #{catalogName}
+            AND cm.deleted_at = 0
+        LEFT JOIN
+            %s sm ON cm.catalog_id = sm.catalog_id
+            AND sm.schema_name = #{schemaName}
+            AND sm.deleted_at = 0
+        LEFT JOIN
+            %s tm ON sm.schema_id = tm.schema_id
+            AND tm.table_name = #{tableName}
+            AND tm.deleted_at = 0
+        LEFT JOIN
+            %s tvi ON tm.table_id = tvi.table_id
+            AND tm.current_version = tvi.version
+            AND tvi.deleted_at = 0
+        WHERE
+            mm.metalake_name = #{metalakeName}
+            AND mm.deleted_at = 0;
+            """
+        .formatted(
+            MetalakeMetaMapper.TABLE_NAME,
+            CatalogMetaMapper.TABLE_NAME,
+            SchemaMetaMapper.TABLE_NAME,
+            TABLE_NAME,
+            TableVersionMapper.TABLE_NAME);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TopicMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TopicMetaBaseSQLProvider.java
index 2f609f7370..348181a6e6 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TopicMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TopicMetaBaseSQLProvider.java
@@ -22,6 +22,9 @@ package 
org.apache.gravitino.storage.relational.mapper.provider.base;
 import static 
org.apache.gravitino.storage.relational.mapper.TopicMetaMapper.TABLE_NAME;
 
 import java.util.List;
+import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.po.TopicPO;
 import org.apache.ibatis.annotations.Param;
 
@@ -91,6 +94,47 @@ public class TopicMetaBaseSQLProvider {
         + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
   }
 
+  public String listTopicPOsByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName) {
+    return """
+        SELECT
+            mm.metalake_id as metalakeId,
+            sm.schema_id as schemaId,
+            cm.catalog_id as catalogId,
+            tm.topic_id as topicId,
+            tm.topic_name as topicName,
+            tm.comment as comment,
+            tm.properties as properties,
+            tm.audit_info as auditInfo,
+            tm.current_version as currentVersion,
+            tm.last_version as lastVersion,
+            tm.deleted_at as deletedAt
+        FROM
+            %s mm
+        INNER JOIN
+            %s cm ON mm.metalake_id = cm.metalake_id
+            AND cm.catalog_name = #{catalogName}
+            AND cm.deleted_at = 0
+        LEFT JOIN
+            %s sm ON cm.catalog_id = sm.catalog_id
+            AND sm.schema_name = #{schemaName}
+            AND sm.deleted_at = 0
+        LEFT JOIN
+            %s tm ON sm.schema_id = tm.schema_id
+            AND tm.deleted_at = 0
+        WHERE
+            mm.metalake_name = #{metalakeName}
+            AND mm.deleted_at = 0;
+            """
+        .formatted(
+            MetalakeMetaMapper.TABLE_NAME,
+            CatalogMetaMapper.TABLE_NAME,
+            SchemaMetaMapper.TABLE_NAME,
+            TABLE_NAME);
+  }
+
   public String listTopicPOsByTopicIds(@Param("topicIds") List<Long> topicIds) 
{
     return "<script>"
         + " SELECT topic_id as topicId, topic_name as topicName, metalake_id 
as metalakeId,"
@@ -121,6 +165,49 @@ public class TopicMetaBaseSQLProvider {
         + " WHERE schema_id = #{schemaId} AND topic_name = #{topicName} AND 
deleted_at = 0";
   }
 
+  public String selectTopicByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("topicName") String topicName) {
+    return """
+        SELECT
+            mm.metalake_id as metalakeId,
+            sm.schema_id as schemaId,
+            cm.catalog_id as catalogId,
+            tm.topic_id as topicId,
+            tm.topic_name as topicName,
+            tm.comment as comment,
+            tm.properties as properties,
+            tm.audit_info as auditInfo,
+            tm.current_version as currentVersion,
+            tm.last_version as lastVersion,
+            tm.deleted_at as deletedAt
+        FROM
+            %s mm
+        INNER JOIN
+            %s cm ON mm.metalake_id = cm.metalake_id
+            AND cm.catalog_name = #{catalogName}
+            AND cm.deleted_at = 0
+        LEFT JOIN
+            %s sm ON cm.catalog_id = sm.catalog_id
+            AND sm.schema_name = #{schemaName}
+            AND sm.deleted_at = 0
+        LEFT JOIN
+            %s tm ON sm.schema_id = tm.schema_id
+            AND tm.topic_name = #{topicName}
+            AND tm.deleted_at = 0
+        WHERE
+            mm.metalake_name = #{metalakeName}
+            AND mm.deleted_at = 0;
+            """
+        .formatted(
+            MetalakeMetaMapper.TABLE_NAME,
+            CatalogMetaMapper.TABLE_NAME,
+            SchemaMetaMapper.TABLE_NAME,
+            TABLE_NAME);
+  }
+
   public String selectTopicMetaById(@Param("topicId") Long topicId) {
     return "SELECT topic_id as topicId, topic_name as topicName,"
         + " metalake_id as metalakeId, catalog_id as catalogId, schema_id as 
schemaId,"
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
index 6b49bcc447..c4f508e557 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
@@ -25,7 +25,9 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
@@ -106,16 +108,7 @@ public class FilesetMetaService {
       metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
       baseMetricName = "getFilesetByIdentifier")
   public FilesetEntity getFilesetByIdentifier(NameIdentifier identifier) {
-    NameIdentifierUtil.checkFileset(identifier);
-
-    String filesetName = identifier.name();
-
-    Long schemaId =
-        EntityIdService.getEntityId(
-            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.SCHEMA);
-
-    FilesetPO filesetPO = getFilesetPOBySchemaIdAndName(schemaId, filesetName);
-
+    FilesetPO filesetPO = getFilesetPOByIdentifier(identifier);
     return POConverters.fromFilesetPO(filesetPO, identifier.namespace());
   }
 
@@ -125,15 +118,37 @@ public class FilesetMetaService {
   public List<FilesetEntity> listFilesetsByNamespace(Namespace namespace) {
     NamespaceUtil.checkFileset(namespace);
 
+    List<FilesetPO> filesetPOs = listFilesetPOs(namespace);
+    return POConverters.fromFilesetPOs(filesetPOs, namespace);
+  }
+
+  private List<FilesetPO> listFilesetPOs(Namespace namespace) {
+    return filesetListFetcher().apply(namespace);
+  }
+
+  private List<FilesetPO> listFilesetPOsBySchemaId(Namespace namespace) {
     Long schemaId =
         EntityIdService.getEntityId(
             NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
+    return SessionUtils.getWithoutCommit(
+        FilesetMetaMapper.class, mapper -> 
mapper.listFilesetPOsBySchemaId(schemaId));
+  }
 
+  private List<FilesetPO> listFilesetPOsByFullQualifiedName(Namespace 
namespace) {
+    String[] namespaceLevels = namespace.levels();
     List<FilesetPO> filesetPOs =
         SessionUtils.getWithoutCommit(
-            FilesetMetaMapper.class, mapper -> 
mapper.listFilesetPOsBySchemaId(schemaId));
-
-    return POConverters.fromFilesetPOs(filesetPOs, namespace);
+            FilesetMetaMapper.class,
+            mapper ->
+                mapper.listFilesetPOsByFullQualifiedName(
+                    namespaceLevels[0], namespaceLevels[1], 
namespaceLevels[2]));
+    if (filesetPOs.isEmpty() || filesetPOs.get(0).getSchemaId() == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.SCHEMA.name().toLowerCase(),
+          namespaceLevels[2]);
+    }
+    return filesetPOs.stream().filter(po -> po.getFilesetId() != 
null).collect(Collectors.toList());
   }
 
   @Monitored(
@@ -182,15 +197,7 @@ public class FilesetMetaService {
       baseMetricName = "updateFileset")
   public <E extends Entity & HasIdentifier> FilesetEntity updateFileset(
       NameIdentifier identifier, Function<E, E> updater) throws IOException {
-    NameIdentifierUtil.checkFileset(identifier);
-
-    String filesetName = identifier.name();
-
-    Long schemaId =
-        EntityIdService.getEntityId(
-            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.SCHEMA);
-
-    FilesetPO oldFilesetPO = getFilesetPOBySchemaIdAndName(schemaId, 
filesetName);
+    FilesetPO oldFilesetPO = getFilesetPOByIdentifier(identifier);
     FilesetEntity oldFilesetEntity =
         POConverters.fromFilesetPO(oldFilesetPO, identifier.namespace());
     FilesetEntity newEntity = (FilesetEntity) updater.apply((E) 
oldFilesetEntity);
@@ -247,9 +254,8 @@ public class FilesetMetaService {
       metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
       baseMetricName = "deleteFileset")
   public boolean deleteFileset(NameIdentifier identifier) {
-    NameIdentifierUtil.checkFileset(identifier);
-
-    Long filesetId = EntityIdService.getEntityId(identifier, 
Entity.EntityType.FILESET);
+    FilesetPO filesetPO = getFilesetPOByIdentifier(identifier);
+    Long filesetId = filesetPO.getFilesetId();
 
     // We should delete meta and version info
     SessionUtils.doMultipleWithCommit(
@@ -347,6 +353,54 @@ public class FilesetMetaService {
     return totalDeletedCount;
   }
 
+  private FilesetPO getFilesetPOByIdentifier(NameIdentifier identifier) {
+    NameIdentifierUtil.checkFileset(identifier);
+
+    return filesetPOFetcher().apply(identifier);
+  }
+
+  private FilesetPO getFilesetPOBySchemaId(NameIdentifier identifier) {
+    Long schemaId =
+        EntityIdService.getEntityId(
+            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.SCHEMA);
+    return getFilesetPOBySchemaIdAndName(schemaId, identifier.name());
+  }
+
+  private FilesetPO getFilesetPOByFullQualifiedName(NameIdentifier identifier) 
{
+    String[] namespaceLevels = identifier.namespace().levels();
+    FilesetPO filesetPO =
+        getFilesetByFullQualifiedName(
+            namespaceLevels[0], namespaceLevels[1], namespaceLevels[2], 
identifier.name());
+
+    if (filesetPO.getSchemaId() == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.SCHEMA.name().toLowerCase(),
+          namespaceLevels[2]);
+    }
+
+    if (filesetPO.getFilesetId() == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.FILESET.name().toLowerCase(),
+          identifier.name());
+    }
+
+    return filesetPO;
+  }
+
+  private Function<Namespace, List<FilesetPO>> filesetListFetcher() {
+    return GravitinoEnv.getInstance().cacheEnabled()
+        ? this::listFilesetPOsBySchemaId
+        : this::listFilesetPOsByFullQualifiedName;
+  }
+
+  private Function<NameIdentifier, FilesetPO> filesetPOFetcher() {
+    return GravitinoEnv.getInstance().cacheEnabled()
+        ? this::getFilesetPOBySchemaId
+        : this::getFilesetPOByFullQualifiedName;
+  }
+
   private void fillFilesetPOBuilderParentEntityId(FilesetPO.Builder builder, 
Namespace namespace) {
     NamespaceUtil.checkFileset(namespace);
     NamespacedEntityId namespacedEntityId =
@@ -356,4 +410,22 @@ public class FilesetMetaService {
     builder.withCatalogId(namespacedEntityId.namespaceIds()[1]);
     builder.withSchemaId(namespacedEntityId.entityId());
   }
+
+  private FilesetPO getFilesetByFullQualifiedName(
+      String metalakeName, String catalogName, String schemaName, String 
filesetName) {
+    FilesetPO filesetPO =
+        SessionUtils.getWithoutCommit(
+            FilesetMetaMapper.class,
+            mapper ->
+                mapper.selectFilesetByFullQualifiedName(
+                    metalakeName, catalogName, schemaName, filesetName));
+    if (filesetPO == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.FILESET.name().toLowerCase(),
+          filesetName);
+    }
+
+    return filesetPO;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
index e7f43b2031..77956dceda 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
@@ -73,13 +74,7 @@ public class ModelMetaService {
   public List<ModelEntity> listModelsByNamespace(Namespace ns) {
     NamespaceUtil.checkModel(ns);
 
-    Long schemaId =
-        EntityIdService.getEntityId(NameIdentifier.of(ns.levels()), 
Entity.EntityType.SCHEMA);
-
-    List<ModelPO> modelPOs =
-        SessionUtils.getWithoutCommit(
-            ModelMetaMapper.class, mapper -> 
mapper.listModelPOsBySchemaId(schemaId));
-
+    List<ModelPO> modelPOs = listModelPOs(ns);
     return modelPOs.stream().map(m -> POConverters.fromModelPO(m, 
ns)).collect(Collectors.toList());
   }
 
@@ -118,19 +113,15 @@ public class ModelMetaService {
 
   @Monitored(metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME, 
baseMetricName = "deleteModel")
   public boolean deleteModel(NameIdentifier ident) {
-    NameIdentifierUtil.checkModel(ident);
-
-    Long schemaId;
-    Long modelId;
+    ModelPO modelPO;
     try {
-      schemaId =
-          EntityIdService.getEntityId(
-              NameIdentifier.of(ident.namespace().levels()), 
Entity.EntityType.SCHEMA);
-      modelId = getModelIdBySchemaIdAndModelName(schemaId, ident.name());
+      modelPO = getModelPOByIdentifier(ident);
     } catch (NoSuchEntityException e) {
       LOG.warn("Failed to delete model: {}", ident, e);
       return false;
     }
+    Long schemaId = modelPO.getSchemaId();
+    Long modelId = modelPO.getModelId();
 
     AtomicInteger modelDeletedCount = new AtomicInteger();
     SessionUtils.doMultipleWithCommit(
@@ -249,24 +240,101 @@ public class ModelMetaService {
   ModelPO getModelPOByIdentifier(NameIdentifier ident) {
     NameIdentifierUtil.checkModel(ident);
 
+    return modelPOFetcher().apply(ident);
+  }
+
+  private List<ModelPO> listModelPOs(Namespace namespace) {
+    return modelListFetcher().apply(namespace);
+  }
+
+  private List<ModelPO> listModelPOsBySchemaId(Namespace namespace) {
+    Long schemaId =
+        EntityIdService.getEntityId(
+            NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
+    return SessionUtils.getWithoutCommit(
+        ModelMetaMapper.class, mapper -> 
mapper.listModelPOsBySchemaId(schemaId));
+  }
+
+  private List<ModelPO> listModelPOsByFullQualifiedName(Namespace namespace) {
+    String[] namespaceLevels = namespace.levels();
+    List<ModelPO> modelPOs =
+        SessionUtils.getWithoutCommit(
+            ModelMetaMapper.class,
+            mapper ->
+                mapper.listModelPOsByFullQualifiedName(
+                    namespaceLevels[0], namespaceLevels[1], 
namespaceLevels[2]));
+    if (modelPOs.isEmpty() || modelPOs.get(0).getSchemaId() == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.SCHEMA.name().toLowerCase(Locale.ROOT),
+          namespaceLevels[2]);
+    }
+    return modelPOs.stream().filter(po -> po.getModelId() != 
null).collect(Collectors.toList());
+  }
+
+  private ModelPO getModelPOBySchemaId(NameIdentifier identifier) {
     Long schemaId =
         EntityIdService.getEntityId(
-            NameIdentifier.of(ident.namespace().levels()), 
Entity.EntityType.SCHEMA);
+            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.SCHEMA);
+
+    ModelPO modelPO =
+        SessionUtils.getWithoutCommit(
+            ModelMetaMapper.class,
+            mapper -> mapper.selectModelMetaBySchemaIdAndModelName(schemaId, 
identifier.name()));
+
+    if (modelPO == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.MODEL.name().toLowerCase(Locale.ROOT),
+          identifier.toString());
+    }
+    return modelPO;
+  }
 
+  private ModelPO getModelPOByFullQualifiedName(NameIdentifier identifier) {
+    String[] namespaceLevels = identifier.namespace().levels();
     ModelPO modelPO =
         SessionUtils.getWithoutCommit(
             ModelMetaMapper.class,
-            mapper -> mapper.selectModelMetaBySchemaIdAndModelName(schemaId, 
ident.name()));
+            mapper ->
+                mapper.selectModelByFullQualifiedName(
+                    namespaceLevels[0], namespaceLevels[1], 
namespaceLevels[2], identifier.name()));
 
     if (modelPO == null) {
       throw new NoSuchEntityException(
           NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
           Entity.EntityType.MODEL.name().toLowerCase(Locale.ROOT),
-          ident.toString());
+          identifier.toString());
+    }
+
+    if (modelPO.getSchemaId() == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.SCHEMA.name().toLowerCase(Locale.ROOT),
+          namespaceLevels[2]);
+    }
+
+    if (modelPO.getModelId() == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.MODEL.name().toLowerCase(Locale.ROOT),
+          identifier.toString());
     }
     return modelPO;
   }
 
+  private Function<Namespace, List<ModelPO>> modelListFetcher() {
+    return GravitinoEnv.getInstance().cacheEnabled()
+        ? this::listModelPOsBySchemaId
+        : this::listModelPOsByFullQualifiedName;
+  }
+
+  private Function<NameIdentifier, ModelPO> modelPOFetcher() {
+    return GravitinoEnv.getInstance().cacheEnabled()
+        ? this::getModelPOBySchemaId
+        : this::getModelPOByFullQualifiedName;
+  }
+
   @Monitored(metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME, 
baseMetricName = "updateModel")
   public <E extends Entity & HasIdentifier> ModelEntity updateModel(
       NameIdentifier identifier, Function<E, E> updater) throws IOException {
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
index c6eb4becf3..c7ffa62816 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
@@ -25,7 +25,10 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
+import org.apache.gravitino.Entity.EntityType;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
@@ -133,15 +136,7 @@ public class SchemaMetaService {
       metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
       baseMetricName = "getSchemaByIdentifier")
   public SchemaEntity getSchemaByIdentifier(NameIdentifier identifier) {
-    NameIdentifierUtil.checkSchema(identifier);
-    String schemaName = identifier.name();
-
-    Long catalogId =
-        EntityIdService.getEntityId(
-            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.CATALOG);
-
-    SchemaPO schemaPO = getSchemaPOByCatalogIdAndName(catalogId, schemaName);
-
+    SchemaPO schemaPO = getSchemaPOByIdentifier(identifier);
     return POConverters.fromSchemaPO(schemaPO, identifier.namespace());
   }
 
@@ -151,13 +146,7 @@ public class SchemaMetaService {
   public List<SchemaEntity> listSchemasByNamespace(Namespace namespace) {
     NamespaceUtil.checkSchema(namespace);
 
-    Long catalogId =
-        EntityIdService.getEntityId(
-            NameIdentifier.of(namespace.levels()), Entity.EntityType.CATALOG);
-
-    List<SchemaPO> schemaPOs =
-        SessionUtils.getWithoutCommit(
-            SchemaMetaMapper.class, mapper -> 
mapper.listSchemaPOsByCatalogId(catalogId));
+    List<SchemaPO> schemaPOs = listSchemaPOs(namespace);
     return POConverters.fromSchemaPOs(schemaPOs, namespace);
   }
 
@@ -193,14 +182,7 @@ public class SchemaMetaService {
       baseMetricName = "updateSchema")
   public <E extends Entity & HasIdentifier> SchemaEntity updateSchema(
       NameIdentifier identifier, Function<E, E> updater) throws IOException {
-    NameIdentifierUtil.checkSchema(identifier);
-
-    String schemaName = identifier.name();
-    Long catalogId =
-        EntityIdService.getEntityId(
-            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.CATALOG);
-
-    SchemaPO oldSchemaPO = getSchemaPOByCatalogIdAndName(catalogId, 
schemaName);
+    SchemaPO oldSchemaPO = getSchemaPOByIdentifier(identifier);
 
     SchemaEntity oldSchemaEntity = POConverters.fromSchemaPO(oldSchemaPO, 
identifier.namespace());
     SchemaEntity newEntity = (SchemaEntity) updater.apply((E) oldSchemaEntity);
@@ -238,7 +220,8 @@ public class SchemaMetaService {
     NameIdentifierUtil.checkSchema(identifier);
 
     String schemaName = identifier.name();
-    Long schemaId = EntityIdService.getEntityId(identifier, 
Entity.EntityType.SCHEMA);
+    SchemaPO schemaPO = getSchemaPOByIdentifier(identifier);
+    Long schemaId = schemaPO.getSchemaId();
 
     if (cascade) {
       SessionUtils.doMultipleWithCommit(
@@ -388,6 +371,100 @@ public class SchemaMetaService {
         });
   }
 
+  private SchemaPO getSchemaPOByIdentifier(NameIdentifier identifier) {
+    NameIdentifierUtil.checkSchema(identifier);
+    return schemaPOFetcher().apply(identifier);
+  }
+
+  private SchemaPO getSchemaByFullQualifiedName(
+      String metalakeName, String catalogName, String schemaName) {
+    SchemaPO schemaPO =
+        SessionUtils.getWithoutCommit(
+            SchemaMetaMapper.class,
+            mapper ->
+                mapper.selectSchemaByFullQualifiedName(metalakeName, 
catalogName, schemaName));
+    if (schemaPO == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          EntityType.CATALOG.name().toLowerCase(),
+          schemaName);
+    }
+
+    return schemaPO;
+  }
+
+  private List<SchemaPO> listSchemaPOs(Namespace namespace) {
+    return schemaListFetcher().apply(namespace);
+  }
+
+  private List<SchemaPO> listSchemaPOsByCatalogId(Namespace namespace) {
+    Long catalogId =
+        EntityIdService.getEntityId(
+            NameIdentifier.of(namespace.levels()), Entity.EntityType.CATALOG);
+
+    return SessionUtils.getWithoutCommit(
+        SchemaMetaMapper.class, mapper -> 
mapper.listSchemaPOsByCatalogId(catalogId));
+  }
+
+  private List<SchemaPO> listSchemaPOsByFullQualifiedName(Namespace namespace) 
{
+    String[] namespaceLevels = namespace.levels();
+    List<SchemaPO> schemaPOs =
+        SessionUtils.getWithoutCommit(
+            SchemaMetaMapper.class,
+            mapper ->
+                mapper.listSchemaPOsByFullQualifiedName(namespaceLevels[0], 
namespaceLevels[1]));
+    if (schemaPOs.isEmpty() || schemaPOs.get(0).getCatalogId() == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.CATALOG.name().toLowerCase(),
+          namespaceLevels[1]);
+    }
+    return schemaPOs.stream().filter(po -> po.getSchemaId() != 
null).collect(Collectors.toList());
+  }
+
+  private SchemaPO getSchemaPOByCatalogId(NameIdentifier identifier) {
+    Long catalogId =
+        EntityIdService.getEntityId(
+            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.CATALOG);
+    return getSchemaPOByCatalogIdAndName(catalogId, identifier.name());
+  }
+
+  private SchemaPO getSchemaPOByFullQualifiedName(NameIdentifier identifier) {
+    String[] namespaceLevels = identifier.namespace().levels();
+    SchemaPO schemaPO =
+        getSchemaByFullQualifiedName(namespaceLevels[0], namespaceLevels[1], 
identifier.name());
+
+    if (schemaPO.getCatalogId() == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.CATALOG.name().toLowerCase(),
+          namespaceLevels[1]);
+    }
+
+    if (schemaPO.getSchemaId() == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.SCHEMA.name().toLowerCase(),
+          identifier.name());
+    }
+
+    return schemaPO;
+  }
+
+  private Function<Namespace, List<SchemaPO>> schemaListFetcher() {
+    // If cache is enabled, we can use catalog id to fetch schemas faster or 
else use full qualified
+    // name to join several tables to get the schema list.
+    return GravitinoEnv.getInstance().cacheEnabled()
+        ? this::listSchemaPOsByCatalogId
+        : this::listSchemaPOsByFullQualifiedName;
+  }
+
+  private Function<NameIdentifier, SchemaPO> schemaPOFetcher() {
+    return GravitinoEnv.getInstance().cacheEnabled()
+        ? this::getSchemaPOByCatalogId
+        : this::getSchemaPOByFullQualifiedName;
+  }
+
   private void fillSchemaPOBuilderParentEntityId(SchemaPO.Builder builder, 
Namespace namespace) {
     NamespaceUtil.checkSchema(namespace);
     NamespacedEntityId namespacedEntityId =
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
index 7c67cfe460..51653f2792 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
@@ -27,7 +27,10 @@ import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
+import org.apache.gravitino.Entity.EntityType;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
@@ -83,13 +86,8 @@ public class TableMetaService {
       metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
       baseMetricName = "getTableByIdentifier")
   public TableEntity getTableByIdentifier(NameIdentifier identifier) {
-    NameIdentifierUtil.checkTable(identifier);
-
-    Long schemaId =
-        EntityIdService.getEntityId(
-            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.SCHEMA);
+    TablePO tablePO = getTablePOByIdentifier(identifier);
 
-    TablePO tablePO = getTablePOBySchemaIdAndName(schemaId, identifier.name());
     List<ColumnPO> columnPOs =
         TableColumnMetaService.getInstance()
             .getColumnsByTableIdAndVersion(tablePO.getTableId(), 
tablePO.getCurrentVersion());
@@ -103,14 +101,7 @@ public class TableMetaService {
   public List<TableEntity> listTablesByNamespace(Namespace namespace) {
     NamespaceUtil.checkTable(namespace);
 
-    Long schemaId =
-        EntityIdService.getEntityId(
-            NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
-
-    List<TablePO> tablePOs =
-        SessionUtils.getWithoutCommit(
-            TableMetaMapper.class, mapper -> 
mapper.listTablePOsBySchemaId(schemaId));
-
+    List<TablePO> tablePOs = listTablePOs(namespace);
     return POConverters.fromTablePOs(tablePOs, namespace);
   }
 
@@ -170,15 +161,7 @@ public class TableMetaService {
   @Monitored(metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME, 
baseMetricName = "updateTable")
   public <E extends Entity & HasIdentifier> TableEntity updateTable(
       NameIdentifier identifier, Function<E, E> updater) throws IOException {
-    NameIdentifierUtil.checkTable(identifier);
-
-    String tableName = identifier.name();
-
-    Long schemaId =
-        EntityIdService.getEntityId(
-            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.SCHEMA);
-
-    TablePO oldTablePO = getTablePOBySchemaIdAndName(schemaId, tableName);
+    TablePO oldTablePO = getTablePOByIdentifier(identifier);
     List<ColumnPO> oldTableColumns =
         TableColumnMetaService.getInstance()
             .getColumnsByTableIdAndVersion(oldTablePO.getTableId(), 
oldTablePO.getCurrentVersion());
@@ -197,7 +180,7 @@ public class TableMetaService {
         isSchemaChanged
             ? EntityIdService.getEntityId(
                 NameIdentifier.of(newTableEntity.namespace().levels()), 
Entity.EntityType.SCHEMA)
-            : schemaId;
+            : oldTablePO.getSchemaId();
 
     TablePO newTablePO =
         POConverters.updateTablePOWithVersionAndSchemaId(oldTablePO, 
newTableEntity, newSchemaId);
@@ -240,60 +223,48 @@ public class TableMetaService {
 
   @Monitored(metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME, 
baseMetricName = "deleteTable")
   public boolean deleteTable(NameIdentifier identifier) {
-    NameIdentifierUtil.checkTable(identifier);
-
-    NamespacedEntityId namespacedEntityId =
-        EntityIdService.getEntityIds(identifier, Entity.EntityType.TABLE);
+    TablePO tablePO = getTablePOByIdentifier(identifier);
 
     AtomicInteger deleteResult = new AtomicInteger(0);
-    TablePO[] tablePOHolder = new TablePO[1];
     SessionUtils.doMultipleWithCommit(
-        () -> {
-          tablePOHolder[0] =
-              
getTablePOBySchemaIdAndName(namespacedEntityId.namespaceIds()[2], 
identifier.name());
-        },
         () ->
             deleteResult.set(
                 SessionUtils.getWithoutCommit(
                     TableMetaMapper.class,
-                    mapper -> 
mapper.softDeleteTableMetasByTableId(namespacedEntityId.entityId()))),
+                    mapper -> 
mapper.softDeleteTableMetasByTableId(tablePO.getTableId()))),
         () -> {
           if (deleteResult.get() > 0) {
             SessionUtils.doWithoutCommit(
                 OwnerMetaMapper.class,
                 mapper ->
                     mapper.softDeleteOwnerRelByMetadataObjectIdAndType(
-                        namespacedEntityId.entityId(), 
MetadataObject.Type.TABLE.name()));
-            TableColumnMetaService.getInstance()
-                .deleteColumnsByTableId(namespacedEntityId.entityId());
+                        tablePO.getTableId(), 
MetadataObject.Type.TABLE.name()));
+            
TableColumnMetaService.getInstance().deleteColumnsByTableId(tablePO.getTableId());
             SessionUtils.doWithoutCommit(
                 SecurableObjectMapper.class,
                 mapper ->
                     mapper.softDeleteObjectRelsByMetadataObject(
-                        namespacedEntityId.entityId(), 
MetadataObject.Type.TABLE.name()));
+                        tablePO.getTableId(), 
MetadataObject.Type.TABLE.name()));
             SessionUtils.doWithoutCommit(
                 TagMetadataObjectRelMapper.class,
                 mapper ->
                     mapper.softDeleteTagMetadataObjectRelsByMetadataObject(
-                        namespacedEntityId.entityId(), 
MetadataObject.Type.TABLE.name()));
+                        tablePO.getTableId(), 
MetadataObject.Type.TABLE.name()));
             SessionUtils.doWithoutCommit(
                 TagMetadataObjectRelMapper.class,
-                mapper ->
-                    
mapper.softDeleteTagMetadataObjectRelsByTableId(namespacedEntityId.entityId()));
+                mapper -> 
mapper.softDeleteTagMetadataObjectRelsByTableId(tablePO.getTableId()));
 
             SessionUtils.doWithoutCommit(
                 StatisticMetaMapper.class,
-                mapper -> 
mapper.softDeleteStatisticsByEntityId(namespacedEntityId.entityId()));
+                mapper -> 
mapper.softDeleteStatisticsByEntityId(tablePO.getTableId()));
             SessionUtils.doWithoutCommit(
                 PolicyMetadataObjectRelMapper.class,
-                mapper ->
-                    mapper.softDeletePolicyMetadataObjectRelsByTableId(
-                        namespacedEntityId.entityId()));
+                mapper -> 
mapper.softDeletePolicyMetadataObjectRelsByTableId(tablePO.getTableId()));
             SessionUtils.doWithoutCommit(
                 TableVersionMapper.class,
                 mapper ->
                     mapper.softDeleteTableVersionByTableIdAndVersion(
-                        namespacedEntityId.entityId(), 
tablePOHolder[0].getCurrentVersion()));
+                        tablePO.getTableId(), tablePO.getCurrentVersion()));
           }
         });
 
@@ -329,6 +300,12 @@ public class TableMetaService {
     builder.withSchemaId(namespacedEntityId.entityId());
   }
 
+  private TablePO getTablePOByIdentifier(NameIdentifier identifier) {
+    NameIdentifierUtil.checkTable(identifier);
+
+    return tablePOFetcher().apply(identifier);
+  }
+
   private TablePO getTablePOBySchemaIdAndName(Long schemaId, String tableName) 
{
     TablePO tablePO =
         SessionUtils.getWithoutCommit(
@@ -342,4 +319,93 @@ public class TableMetaService {
     }
     return tablePO;
   }
+
+  private TablePO getTableByFullQualifiedName(
+      String metalakeName, String catalogName, String schemaName, String 
tableName) {
+    TablePO tablePO =
+        SessionUtils.getWithoutCommit(
+            TableMetaMapper.class,
+            mapper ->
+                mapper.selectTableByFullQualifiedName(
+                    metalakeName, catalogName, schemaName, tableName));
+    if (tablePO == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.TABLE.name().toLowerCase(),
+          tableName);
+    }
+
+    return tablePO;
+  }
+
+  private List<TablePO> listTablePOs(Namespace namespace) {
+    return tableListFetcher().apply(namespace);
+  }
+
+  private List<TablePO> listTablePOsBySchemaId(Namespace namespace) {
+    Long schemaId =
+        EntityIdService.getEntityId(
+            NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
+    return SessionUtils.getWithoutCommit(
+        TableMetaMapper.class, mapper -> 
mapper.listTablePOsBySchemaId(schemaId));
+  }
+
+  private List<TablePO> listTablePOsByFullQualifiedName(Namespace namespace) {
+    String[] namespaceLevels = namespace.levels();
+    List<TablePO> tablePOs =
+        SessionUtils.getWithoutCommit(
+            TableMetaMapper.class,
+            mapper ->
+                mapper.listTablePOsByFullQualifiedName(
+                    namespaceLevels[0], namespaceLevels[1], 
namespaceLevels[2]));
+    if (tablePOs.isEmpty() || tablePOs.get(0).getSchemaId() == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          EntityType.SCHEMA.name().toLowerCase(),
+          namespaceLevels[2]);
+    }
+    return tablePOs.stream().filter(po -> po.getTableId() != 
null).collect(Collectors.toList());
+  }
+
+  private TablePO getTablePOBySchemaId(NameIdentifier identifier) {
+    Long schemaId =
+        EntityIdService.getEntityId(
+            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.SCHEMA);
+    return getTablePOBySchemaIdAndName(schemaId, identifier.name());
+  }
+
+  private TablePO getTablePOByFullQualifiedName(NameIdentifier identifier) {
+    String[] namespaceLevels = identifier.namespace().levels();
+    TablePO tablePO =
+        getTableByFullQualifiedName(
+            namespaceLevels[0], namespaceLevels[1], namespaceLevels[2], 
identifier.name());
+
+    if (tablePO.getSchemaId() == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          EntityType.SCHEMA.name().toLowerCase(),
+          namespaceLevels[2]);
+    }
+
+    if (tablePO.getTableId() == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          EntityType.TABLE.name().toLowerCase(),
+          identifier.name());
+    }
+
+    return tablePO;
+  }
+
+  private Function<Namespace, List<TablePO>> tableListFetcher() {
+    return GravitinoEnv.getInstance().cacheEnabled()
+        ? this::listTablePOsBySchemaId
+        : this::listTablePOsByFullQualifiedName;
+  }
+
+  private Function<NameIdentifier, TablePO> tablePOFetcher() {
+    return GravitinoEnv.getInstance().cacheEnabled()
+        ? this::getTablePOBySchemaId
+        : this::getTablePOByFullQualifiedName;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
index 4282742c28..967f00c40c 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
@@ -25,7 +25,9 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
@@ -92,29 +94,14 @@ public class TopicMetaService {
   public List<TopicEntity> listTopicsByNamespace(Namespace namespace) {
     NamespaceUtil.checkTopic(namespace);
 
-    Long schemaId =
-        EntityIdService.getEntityId(
-            NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
-
-    List<TopicPO> topicPOs =
-        SessionUtils.getWithoutCommit(
-            TopicMetaMapper.class, mapper -> 
mapper.listTopicPOsBySchemaId(schemaId));
-
+    List<TopicPO> topicPOs = listTopicPOs(namespace);
     return POConverters.fromTopicPOs(topicPOs, namespace);
   }
 
   @Monitored(metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME, 
baseMetricName = "updateTopic")
   public <E extends Entity & HasIdentifier> TopicEntity updateTopic(
       NameIdentifier ident, Function<E, E> updater) throws IOException {
-    NameIdentifierUtil.checkTopic(ident);
-
-    String topicName = ident.name();
-
-    Long schemaId =
-        EntityIdService.getEntityId(
-            NameIdentifier.of(ident.namespace().levels()), 
Entity.EntityType.SCHEMA);
-
-    TopicPO oldTopicPO = getTopicPOBySchemaIdAndName(schemaId, topicName);
+    TopicPO oldTopicPO = getTopicPOByIdentifier(ident);
     TopicEntity oldTopicEntity = POConverters.fromTopicPO(oldTopicPO, 
ident.namespace());
     TopicEntity newEntity = (TopicEntity) updater.apply((E) oldTopicEntity);
     Preconditions.checkArgument(
@@ -159,6 +146,94 @@ public class TopicMetaService {
     return topicPO;
   }
 
+  private TopicPO getTopicPOByIdentifier(NameIdentifier identifier) {
+    NameIdentifierUtil.checkTopic(identifier);
+
+    return topicPOFetcher().apply(identifier);
+  }
+
+  private List<TopicPO> listTopicPOs(Namespace namespace) {
+    return topicListFetcher().apply(namespace);
+  }
+
+  private List<TopicPO> listTopicPOsBySchemaId(Namespace namespace) {
+    Long schemaId =
+        EntityIdService.getEntityId(
+            NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
+
+    return SessionUtils.getWithoutCommit(
+        TopicMetaMapper.class, mapper -> 
mapper.listTopicPOsBySchemaId(schemaId));
+  }
+
+  private List<TopicPO> listTopicPOsByFullQualifiedName(Namespace namespace) {
+    String[] namespaceLevels = namespace.levels();
+    List<TopicPO> topicPOs =
+        SessionUtils.getWithoutCommit(
+            TopicMetaMapper.class,
+            mapper ->
+                mapper.listTopicPOsByFullQualifiedName(
+                    namespaceLevels[0], namespaceLevels[1], 
namespaceLevels[2]));
+    if (topicPOs.isEmpty() || topicPOs.get(0).getSchemaId() == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.SCHEMA.name().toLowerCase(),
+          namespaceLevels[2]);
+    }
+    return topicPOs.stream().filter(po -> po.getTopicId() != 
null).collect(Collectors.toList());
+  }
+
+  private TopicPO getTopicPOBySchemaId(NameIdentifier identifier) {
+    Long schemaId =
+        EntityIdService.getEntityId(
+            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.SCHEMA);
+    return getTopicPOBySchemaIdAndName(schemaId, identifier.name());
+  }
+
+  private TopicPO getTopicPOByFullQualifiedName(NameIdentifier identifier) {
+    String[] namespaceLevels = identifier.namespace().levels();
+    TopicPO topicPO =
+        SessionUtils.getWithoutCommit(
+            TopicMetaMapper.class,
+            mapper ->
+                mapper.selectTopicByFullQualifiedName(
+                    namespaceLevels[0], namespaceLevels[1], 
namespaceLevels[2], identifier.name()));
+
+    if (topicPO == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.TOPIC.name().toLowerCase(),
+          identifier.name());
+    }
+
+    if (topicPO.getSchemaId() == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.SCHEMA.name().toLowerCase(),
+          namespaceLevels[2]);
+    }
+
+    if (topicPO.getTopicId() == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.TOPIC.name().toLowerCase(),
+          identifier.name());
+    }
+
+    return topicPO;
+  }
+
+  private Function<Namespace, List<TopicPO>> topicListFetcher() {
+    return GravitinoEnv.getInstance().cacheEnabled()
+        ? this::listTopicPOsBySchemaId
+        : this::listTopicPOsByFullQualifiedName;
+  }
+
+  private Function<NameIdentifier, TopicPO> topicPOFetcher() {
+    return GravitinoEnv.getInstance().cacheEnabled()
+        ? this::getTopicPOBySchemaId
+        : this::getTopicPOByFullQualifiedName;
+  }
+
   private void fillTopicPOBuilderParentEntityId(TopicPO.Builder builder, 
Namespace namespace) {
     NamespaceUtil.checkTopic(namespace);
     NamespacedEntityId namespacedEntityId =
@@ -173,22 +248,14 @@ public class TopicMetaService {
       metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
       baseMetricName = "getTopicByIdentifier")
   public TopicEntity getTopicByIdentifier(NameIdentifier identifier) {
-    NameIdentifierUtil.checkTopic(identifier);
-
-    Long schemaId =
-        EntityIdService.getEntityId(
-            NameIdentifier.of(identifier.namespace().levels()), 
Entity.EntityType.SCHEMA);
-
-    TopicPO topicPO = getTopicPOBySchemaIdAndName(schemaId, identifier.name());
-
+    TopicPO topicPO = getTopicPOByIdentifier(identifier);
     return POConverters.fromTopicPO(topicPO, identifier.namespace());
   }
 
   @Monitored(metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME, 
baseMetricName = "deleteTopic")
   public boolean deleteTopic(NameIdentifier identifier) {
-    NameIdentifierUtil.checkTopic(identifier);
-
-    Long topicId = EntityIdService.getEntityId(identifier, 
Entity.EntityType.TOPIC);
+    TopicPO topicPO = getTopicPOByIdentifier(identifier);
+    Long topicId = topicPO.getTopicId();
 
     SessionUtils.doMultipleWithCommit(
         () ->
diff --git 
a/core/src/test/java/org/apache/gravitino/job/TestBuiltInJobTemplateEventListener.java
 
b/core/src/test/java/org/apache/gravitino/job/TestBuiltInJobTemplateEventListener.java
index e5273e6f83..a02934c2c4 100644
--- 
a/core/src/test/java/org/apache/gravitino/job/TestBuiltInJobTemplateEventListener.java
+++ 
b/core/src/test/java/org/apache/gravitino/job/TestBuiltInJobTemplateEventListener.java
@@ -42,6 +42,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.ServiceLoader;
 import java.util.function.Consumer;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityStore;
@@ -66,7 +67,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
-import org.testcontainers.shaded.org.apache.commons.lang3.reflect.FieldUtils;
 
 public class TestBuiltInJobTemplateEventListener {
 
diff --git 
a/core/src/test/java/org/apache/gravitino/job/local/TestLocalJobExecutor.java 
b/core/src/test/java/org/apache/gravitino/job/local/TestLocalJobExecutor.java
index bfb3ec3a91..1c7cbfae10 100644
--- 
a/core/src/test/java/org/apache/gravitino/job/local/TestLocalJobExecutor.java
+++ 
b/core/src/test/java/org/apache/gravitino/job/local/TestLocalJobExecutor.java
@@ -36,13 +36,13 @@ import org.apache.gravitino.job.JobTemplate;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.JobTemplateEntity;
 import org.apache.gravitino.utils.NamespaceUtil;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 public class TestLocalJobExecutor {
 
diff --git 
a/core/src/test/java/org/apache/gravitino/metalake/TestMetalakeManager.java 
b/core/src/test/java/org/apache/gravitino/metalake/TestMetalakeManager.java
index 4a91539d4f..c5be0774ea 100644
--- a/core/src/test/java/org/apache/gravitino/metalake/TestMetalakeManager.java
+++ b/core/src/test/java/org/apache/gravitino/metalake/TestMetalakeManager.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
@@ -49,7 +50,6 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
-import org.testcontainers.shaded.org.apache.commons.lang3.reflect.FieldUtils;
 
 public class TestMetalakeManager {
 
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java 
b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
index 57efd5b1f1..54846ad241 100644
--- a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
+++ b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
@@ -56,6 +56,7 @@ import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Configs;
@@ -64,6 +65,7 @@ import org.apache.gravitino.Entity.EntityType;
 import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.EntityStoreFactory;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.SupportsRelationOperations;
@@ -120,7 +122,6 @@ import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.shaded.org.apache.commons.lang3.tuple.Pair;
 
 @Tag("gravitino-docker-test")
 public class TestEntityStorage {
@@ -131,8 +132,15 @@ public class TestEntityStorage {
   private static final String DB_DIR = JDBC_STORE_PATH + "/testdb";
   private static final String H2_FILE = DB_DIR + ".mv.db";
 
-  static Object[] storageProvider() {
-    return new Object[] {"h2", "mysql", "postgresql"};
+  static Object[][] storageProvider() {
+    return new Object[][] {
+      {"h2", true},
+      {"h2", false},
+      {"mysql", true},
+      {"mysql", false},
+      {"postgresql", true},
+      {"postgresql", false}
+    };
   }
 
   @AfterEach
@@ -142,7 +150,7 @@ public class TestEntityStorage {
     ContainerSuite.getInstance().close();
   }
 
-  private void init(String type, Config config) {
+  private void init(String type, Config config) throws IllegalAccessException {
     Preconditions.checkArgument(StringUtils.isNotBlank(type));
     File dir = new File(DB_DIR);
     if (dir.exists() || !dir.isDirectory()) {
@@ -157,7 +165,6 @@ public class TestEntityStorage {
     Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 
1000L);
     Mockito.when(config.get(VERSION_RETENTION_COUNT)).thenReturn(1L);
     // Fix cache config for test
-    Mockito.when(config.get(Configs.CACHE_ENABLED)).thenReturn(true);
     Mockito.when(config.get(Configs.CACHE_MAX_ENTRIES)).thenReturn(10_000);
     
Mockito.when(config.get(Configs.CACHE_EXPIRATION_TIME)).thenReturn(3_600_000L);
     Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
@@ -165,6 +172,8 @@ public class TestEntityStorage {
     
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
     Mockito.when(config.get(Configs.CACHE_LOCK_SEGMENTS)).thenReturn(16);
 
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "config", config, true);
+
     BaseIT baseIT = new BaseIT();
 
     try {
@@ -271,8 +280,9 @@ public class TestEntityStorage {
 
   @ParameterizedTest
   @MethodSource("storageProvider")
-  void testRestart(String type) throws IOException {
+  void testRestart(String type, boolean enableCache) throws IOException, 
IllegalAccessException {
     Config config = Mockito.mock(Config.class);
+    Mockito.when(config.get(Configs.CACHE_ENABLED)).thenReturn(enableCache);
     init(type, config);
     AuditInfo auditInfo =
         
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
@@ -524,10 +534,11 @@ public class TestEntityStorage {
 
   @ParameterizedTest
   @MethodSource("storageProvider")
-  void testEntityUpdate(String type) throws Exception {
+  void testEntityUpdate(String type, boolean enableCache) throws Exception {
     Config config = Mockito.mock(Config.class);
-    init(type, config);
+    Mockito.when(config.get(Configs.CACHE_ENABLED)).thenReturn(enableCache);
 
+    init(type, config);
     AuditInfo auditInfo =
         
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
 
@@ -627,8 +638,11 @@ public class TestEntityStorage {
 
   @ParameterizedTest
   @MethodSource("storageProvider")
-  public void testAuthorizationEntityDelete(String type) throws IOException {
+  public void testAuthorizationEntityDelete(String type, boolean enableCache)
+      throws IOException, IllegalAccessException {
     Config config = Mockito.mock(Config.class);
+    Mockito.when(config.get(Configs.CACHE_ENABLED)).thenReturn(enableCache);
+
     init(type, config);
 
     AuditInfo auditInfo =
@@ -668,13 +682,17 @@ public class TestEntityStorage {
       Assertions.assertFalse(store.exists(anotherGroup.nameIdentifier(), 
Entity.EntityType.GROUP));
       Assertions.assertFalse(store.exists(oneRole.nameIdentifier(), 
Entity.EntityType.ROLE));
       Assertions.assertFalse(store.exists(anotherRole.nameIdentifier(), 
Entity.EntityType.ROLE));
+      destroy(type);
     }
   }
 
   @ParameterizedTest
   @MethodSource("storageProvider")
-  void testEntityDelete(String type) throws IOException {
+  void testEntityDelete(String type, boolean enableCache)
+      throws IOException, IllegalAccessException {
     Config config = Mockito.mock(Config.class);
+    Mockito.when(config.get(Configs.CACHE_ENABLED)).thenReturn(enableCache);
+
     init(type, config);
 
     AuditInfo auditInfo =
@@ -982,9 +1000,13 @@ public class TestEntityStorage {
 
   @ParameterizedTest
   @MethodSource("storageProvider")
-  void testSameNameUnderANameSpace(String type) throws IOException {
+  void testSameNameUnderANameSpace(String type, boolean enableCache)
+      throws IOException, IllegalAccessException {
     Config config = Mockito.mock(Config.class);
+    Mockito.when(config.get(Configs.CACHE_ENABLED)).thenReturn(enableCache);
+
     init(type, config);
+
     try (EntityStore store = EntityStoreFactory.createEntityStore(config)) {
       store.initialize(config);
 
@@ -1146,8 +1168,11 @@ public class TestEntityStorage {
 
   @ParameterizedTest
   @MethodSource("storageProvider")
-  void testDeleteAndRename(String type) throws IOException {
+  void testDeleteAndRename(String type, boolean enableCache)
+      throws IOException, IllegalAccessException {
     Config config = Mockito.mock(Config.class);
+    Mockito.when(config.get(Configs.CACHE_ENABLED)).thenReturn(enableCache);
+
     init(type, config);
     try (EntityStore store = EntityStoreFactory.createEntityStore(config)) {
       store.initialize(config);
@@ -2591,8 +2616,15 @@ public class TestEntityStorage {
 
   @ParameterizedTest
   @MethodSource("storageProvider")
-  void testInvalidRelationCache(String type) throws Exception {
+  void testInvalidRelationCache(String type, boolean enableCache) throws 
Exception {
+    // This class will test entity cache specifically for relational stores
+    if (!enableCache) {
+      return;
+    }
+
     Config config = Mockito.mock(Config.class);
+    Mockito.when(config.get(Configs.CACHE_ENABLED)).thenReturn(enableCache);
+
     init(type, config);
 
     AuditInfo auditInfo =
@@ -2847,8 +2879,14 @@ public class TestEntityStorage {
 
   @ParameterizedTest
   @MethodSource("storageProvider")
-  void testTagRelationCache(String type) throws Exception {
+  void testTagRelationCache(String type, boolean enableCache) throws Exception 
{
+    if (!enableCache) {
+      return;
+    }
+
     Config config = Mockito.mock(Config.class);
+    Mockito.when(config.get(Configs.CACHE_ENABLED)).thenReturn(enableCache);
+
     init(type, config);
 
     AuditInfo auditInfo =
@@ -3087,9 +3125,13 @@ public class TestEntityStorage {
 
   @ParameterizedTest
   @MethodSource("storageProvider")
-  void testLanceTableCreateAndUpdate(String type) {
+  void testLanceTableCreateAndUpdate(String type, boolean enableCache)
+      throws IllegalAccessException {
     Config config = Mockito.mock(Config.class);
+    Mockito.when(config.get(Configs.CACHE_ENABLED)).thenReturn(enableCache);
+
     init(type, config);
+    
SqlSessionFactoryHelper.getInstance().init(GravitinoEnv.getInstance().config());
 
     AuditInfo auditInfo =
         
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/BackendTestExtension.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/BackendTestExtension.java
index 95906540a3..03c7c43b5a 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/BackendTestExtension.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/BackendTestExtension.java
@@ -19,6 +19,7 @@
 
 package org.apache.gravitino.storage.relational;
 
+import static org.apache.gravitino.Configs.CACHE_ENABLED;
 import static org.apache.gravitino.Configs.DEFAULT_ENTITY_RELATIONAL_STORE;
 import static 
org.apache.gravitino.Configs.DEFAULT_RELATIONAL_JDBC_BACKEND_MAX_CONNECTIONS;
 import static 
org.apache.gravitino.Configs.DEFAULT_RELATIONAL_JDBC_BACKEND_MAX_WAIT_MILLISECONDS;
@@ -40,8 +41,10 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Stream;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Configs;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.integration.test.util.BaseIT;
 import org.apache.gravitino.storage.relational.service.EntityIdService;
 import org.junit.jupiter.api.extension.AfterAllCallback;
@@ -172,6 +175,10 @@ public class BackendTestExtension
       
Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_WAIT_MILLISECONDS))
           .thenReturn(DEFAULT_RELATIONAL_JDBC_BACKEND_MAX_WAIT_MILLISECONDS);
 
+      Mockito.when(config.get(CACHE_ENABLED)).thenReturn(true);
+
+      FieldUtils.writeField(GravitinoEnv.getInstance(), "config", config, 
true);
+
       RelationalBackend backend = new JDBCBackend();
       if ("mysql".equals(type)) {
         String url = baseIT.startAndInitMySQLBackend();
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestFilesetMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestFilesetMetaService.java
index 93cc6d63ae..58b0eb7e6d 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestFilesetMetaService.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestFilesetMetaService.java
@@ -37,8 +37,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityAlreadyExistsException;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
@@ -55,7 +60,7 @@ import org.apache.ibatis.session.SqlSession;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
-import org.testcontainers.shaded.org.apache.commons.lang3.tuple.Pair;
+import org.mockito.Mockito;
 
 public class TestFilesetMetaService extends TestJDBCBackend {
   private final String metalakeName = 
GravitinoITUtils.genRandomName("tst_metalake");
@@ -63,7 +68,10 @@ public class TestFilesetMetaService extends TestJDBCBackend {
   private final String schemaName = 
GravitinoITUtils.genRandomName("tst_fs_schema");
 
   @BeforeEach
-  public void prepare() throws IOException {
+  public void prepare() throws IOException, IllegalAccessException {
+    Config config = Mockito.mock(Config.class);
+    Mockito.when(config.get(Configs.CACHE_ENABLED)).thenReturn(false);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "config", config, true);
     createAndInsertMakeLake(metalakeName);
     createAndInsertCatalog(metalakeName, catalogName);
     createAndInsertSchema(metalakeName, catalogName, schemaName);
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSchemaMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSchemaMetaService.java
index 5e9a345ee3..ad806f28f4 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSchemaMetaService.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSchemaMetaService.java
@@ -171,7 +171,7 @@ public class TestSchemaMetaService extends TestJDBCBackend {
   }
 
   @TestTemplate
-  public void testDeleteSchemlaaNonCascadingFailsWhenTopicExists() throws 
IOException {
+  public void testDeleteSchemaNonCascadingFailsWhenTopicExists() throws 
IOException {
 
     createAndInsertMakeLake(metalakeName);
     createAndInsertCatalog(metalakeName, catalogName);
diff --git a/flink-connector/flink/build.gradle.kts 
b/flink-connector/flink/build.gradle.kts
index b653814cba..da01614f51 100644
--- a/flink-connector/flink/build.gradle.kts
+++ b/flink-connector/flink/build.gradle.kts
@@ -86,6 +86,7 @@ dependencies {
   testImplementation(project(":integration-test-common", "testArtifacts"))
   testImplementation(project(":server"))
   testImplementation(project(":server-common"))
+  testImplementation(libs.awaitility)
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.junit.jupiter.params)
   testImplementation(libs.mockito.core)
diff --git a/iceberg/iceberg-rest-server/build.gradle.kts 
b/iceberg/iceberg-rest-server/build.gradle.kts
index e74945ff3e..6f2c73b8d0 100644
--- a/iceberg/iceberg-rest-server/build.gradle.kts
+++ b/iceberg/iceberg-rest-server/build.gradle.kts
@@ -95,6 +95,7 @@ dependencies {
     exclude("org.rocksdb")
   }
 
+  testImplementation(libs.awaitility)
   testImplementation(libs.h2db)
   testImplementation(libs.mysql.driver)
   testImplementation(libs.postgresql.driver)
diff --git a/integration-test-common/build.gradle.kts 
b/integration-test-common/build.gradle.kts
index 8aac2cafad..7de6bf080b 100644
--- a/integration-test-common/build.gradle.kts
+++ b/integration-test-common/build.gradle.kts
@@ -35,6 +35,7 @@ dependencies {
   }
   testImplementation(project(":server"))
   testImplementation(project(":server-common"))
+  testImplementation(libs.awaitility)
   testImplementation(libs.bundles.jetty)
   testImplementation(libs.bundles.jersey)
   testImplementation(libs.bundles.jwt)
diff --git 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/BaseIT.java
 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/BaseIT.java
index b39bf40662..d6be20683d 100644
--- 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/BaseIT.java
+++ 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/BaseIT.java
@@ -68,13 +68,13 @@ import org.apache.gravitino.server.GravitinoServer;
 import org.apache.gravitino.server.ServerConfig;
 import org.apache.gravitino.server.web.JettyServerConfig;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 /**
  * BaseIT can be used as a base class for integration tests. It will 
automatically start a Gravitino
diff --git a/lance/lance-rest-server/build.gradle.kts 
b/lance/lance-rest-server/build.gradle.kts
index c2c4065b5e..52f045af2b 100644
--- a/lance/lance-rest-server/build.gradle.kts
+++ b/lance/lance-rest-server/build.gradle.kts
@@ -66,6 +66,7 @@ dependencies {
   testImplementation(project(":server"))
   testImplementation(project(":integration-test-common", "testArtifacts"))
 
+  testImplementation(libs.awaitility)
   testImplementation(libs.commons.io)
   testImplementation(libs.jersey.test.framework.core) {
     exclude(group = "org.junit.jupiter")
diff --git a/spark-connector/v3.3/spark/build.gradle.kts 
b/spark-connector/v3.3/spark/build.gradle.kts
index 1cc7f6d1a1..04a85f710e 100644
--- a/spark-connector/v3.3/spark/build.gradle.kts
+++ b/spark-connector/v3.3/spark/build.gradle.kts
@@ -93,6 +93,7 @@ dependencies {
     exclude("com.fasterxml.jackson")
   }
 
+  testImplementation(libs.awaitility)
   testImplementation(libs.hive2.common) {
     exclude("com.sun.jersey")
     exclude("org.apache.curator")
diff --git a/spark-connector/v3.4/spark/build.gradle.kts 
b/spark-connector/v3.4/spark/build.gradle.kts
index 39e1bdcb89..a732aeb757 100644
--- a/spark-connector/v3.4/spark/build.gradle.kts
+++ b/spark-connector/v3.4/spark/build.gradle.kts
@@ -95,6 +95,7 @@ dependencies {
     exclude("com.fasterxml.jackson")
   }
 
+  testImplementation(libs.awaitility)
   testImplementation(libs.hive2.common) {
     exclude("com.sun.jersey")
     exclude("org.apache.curator")
diff --git a/spark-connector/v3.5/spark/build.gradle.kts 
b/spark-connector/v3.5/spark/build.gradle.kts
index 89ac454f25..edcff1e07e 100644
--- a/spark-connector/v3.5/spark/build.gradle.kts
+++ b/spark-connector/v3.5/spark/build.gradle.kts
@@ -97,6 +97,7 @@ dependencies {
     exclude("org.slf4j")
   }
 
+  testImplementation(libs.awaitility)
   testImplementation(libs.hive2.common) {
     exclude("com.sun.jersey")
     exclude("org.apache.curator")


Reply via email to