This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 21ab7d465b [#7721] feat(policy): Add Relational Backend Support for 
Policy (part-4) (#7722)
21ab7d465b is described below

commit 21ab7d465b6765fcc55f524237e1b1afa6c133f7
Author: mchades <[email protected]>
AuthorDate: Mon Aug 4 15:36:40 2025 +0800

    [#7721] feat(policy): Add Relational Backend Support for Policy (part-4) 
(#7722)
    
    ### What changes were proposed in this pull request?
    
    support policy association operations in backend storage
    
    ### Why are the changes needed?
    
    Fix: #7721
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    tests added
---
 .../relational/mapper/PolicyMetaMapper.java        |  29 +-
 .../mapper/PolicyMetaSQLProviderFactory.java       |   6 +
 ...per.java => PolicyMetadataObjectRelMapper.java} |  92 +++-
 .../PolicyMetadataObjectRelSQLProviderFactory.java | 128 +++++
 .../provider/base/PolicyMetaBaseSQLProvider.java   |  27 +
 .../PolicyMetadataObjectRelBaseSQLProvider.java    | 234 ++++++++
 .../PolicyMetadataObjectRelPostgreSQLProvider.java | 201 +++++++
 .../relational/po/PolicyMetadataObjectRelPO.java   | 131 +++++
 .../relational/service/CatalogMetaService.java     |  11 +
 .../relational/service/FilesetMetaService.java     |   7 +
 .../relational/service/ModelMetaService.java       |   7 +
 .../relational/service/PolicyMetaService.java      | 208 ++++++-
 .../relational/service/SchemaMetaService.java      |  11 +
 .../relational/service/TableMetaService.java       |   4 +
 .../relational/service/TopicMetaService.java       |   7 +
 .../storage/relational/utils/POConverters.java     |  24 +
 .../relational/service/TestPolicyMetaService.java  | 606 +++++++++++++++++++++
 17 files changed, 1692 insertions(+), 41 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaMapper.java
index e2fd8058fe..9bbfa345c4 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaMapper.java
@@ -55,6 +55,33 @@ public interface PolicyMetaMapper {
   @SelectProvider(type = PolicyMetaSQLProviderFactory.class, method = 
"listPolicyPOsByMetalake")
   List<PolicyPO> listPolicyPOsByMetalake(@Param("metalakeName") String 
metalakeName);
 
+  @Results({
+    @Result(property = "policyId", column = "policy_id"),
+    @Result(property = "policyName", column = "policy_name"),
+    @Result(property = "policyType", column = "policy_type"),
+    @Result(property = "metalakeId", column = "metalake_id"),
+    @Result(property = "inheritable", column = "inheritable"),
+    @Result(property = "exclusive", column = "exclusive"),
+    @Result(property = "supportedObjectTypes", column = 
"supported_object_types"),
+    @Result(property = "auditInfo", column = "audit_info"),
+    @Result(property = "currentVersion", column = "current_version"),
+    @Result(property = "lastVersion", column = "last_version"),
+    @Result(property = "deletedAt", column = "deleted_at"),
+    @Result(property = "policyVersionPO.id", column = "id"),
+    @Result(property = "policyVersionPO.metalakeId", column = 
"version_metalake_id"),
+    @Result(property = "policyVersionPO.policyId", column = 
"version_policy_id"),
+    @Result(property = "policyVersionPO.version", column = "version"),
+    @Result(property = "policyVersionPO.policyComment", column = 
"policy_comment"),
+    @Result(property = "policyVersionPO.enabled", column = "enabled"),
+    @Result(property = "policyVersionPO.content", column = "content"),
+    @Result(property = "policyVersionPO.deletedAt", column = 
"version_deleted_at")
+  })
+  @SelectProvider(
+      type = PolicyMetaSQLProviderFactory.class,
+      method = "listPolicyPOsByMetalakeAndPolicyNames")
+  List<PolicyPO> listPolicyPOsByMetalakeAndPolicyNames(
+      @Param("metalakeName") String metalakeName, @Param("policyNames") 
List<String> policyNames);
+
   @InsertProvider(
       type = PolicyMetaSQLProviderFactory.class,
       method = "insertPolicyMetaOnDuplicateKeyUpdate")
@@ -87,7 +114,7 @@ public interface PolicyMetaMapper {
   @SelectProvider(
       type = PolicyMetaSQLProviderFactory.class,
       method = "selectPolicyMetaByMetalakeAndName")
-  PolicyPO selectTagMetaByMetalakeAndName(
+  PolicyPO selectPolicyMetaByMetalakeAndName(
       @Param("metalakeName") String metalakeName, @Param("policyName") String 
policyName);
 
   @UpdateProvider(type = PolicyMetaSQLProviderFactory.class, method = 
"updatePolicyMeta")
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaSQLProviderFactory.java
index 7a7243ca24..593e0b40f5 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaSQLProviderFactory.java
@@ -19,6 +19,7 @@
 package org.apache.gravitino.storage.relational.mapper;
 
 import com.google.common.collect.ImmutableMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
 import 
org.apache.gravitino.storage.relational.mapper.provider.base.PolicyMetaBaseSQLProvider;
@@ -51,6 +52,11 @@ public class PolicyMetaSQLProviderFactory {
     return getProvider().listPolicyPOsByMetalake(metalakeName);
   }
 
+  public static String listPolicyPOsByMetalakeAndPolicyNames(
+      @Param("metalakeName") String metalakeName, @Param("policyNames") 
List<String> policyNames) {
+    return getProvider().listPolicyPOsByMetalakeAndPolicyNames(metalakeName, 
policyNames);
+  }
+
   public static String insertPolicyMetaOnDuplicateKeyUpdate(
       @Param("policyMeta") PolicyPO policyPO) {
     return getProvider().insertPolicyMetaOnDuplicateKeyUpdate(policyPO);
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetadataObjectRelMapper.java
similarity index 56%
copy from 
core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaMapper.java
copy to 
core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetadataObjectRelMapper.java
index e2fd8058fe..08e5a9fa22 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetadataObjectRelMapper.java
@@ -19,6 +19,7 @@
 package org.apache.gravitino.storage.relational.mapper;
 
 import java.util.List;
+import org.apache.gravitino.storage.relational.po.PolicyMetadataObjectRelPO;
 import org.apache.gravitino.storage.relational.po.PolicyPO;
 import org.apache.ibatis.annotations.DeleteProvider;
 import org.apache.ibatis.annotations.InsertProvider;
@@ -28,8 +29,8 @@ import org.apache.ibatis.annotations.Results;
 import org.apache.ibatis.annotations.SelectProvider;
 import org.apache.ibatis.annotations.UpdateProvider;
 
-public interface PolicyMetaMapper {
-  String POLICY_META_TABLE_NAME = "policy_meta";
+public interface PolicyMetadataObjectRelMapper {
+  String POLICY_METADATA_OBJECT_RELATION_TABLE_NAME = "policy_relation_meta";
 
   @Results({
     @Result(property = "policyId", column = "policy_id"),
@@ -52,16 +53,12 @@ public interface PolicyMetaMapper {
     @Result(property = "policyVersionPO.content", column = "content"),
     @Result(property = "policyVersionPO.deletedAt", column = 
"version_deleted_at")
   })
-  @SelectProvider(type = PolicyMetaSQLProviderFactory.class, method = 
"listPolicyPOsByMetalake")
-  List<PolicyPO> listPolicyPOsByMetalake(@Param("metalakeName") String 
metalakeName);
-
-  @InsertProvider(
-      type = PolicyMetaSQLProviderFactory.class,
-      method = "insertPolicyMetaOnDuplicateKeyUpdate")
-  void insertPolicyMetaOnDuplicateKeyUpdate(@Param("policyMeta") PolicyPO 
policyPO);
-
-  @InsertProvider(type = PolicyMetaSQLProviderFactory.class, method = 
"insertPolicyMeta")
-  void insertPolicyMeta(@Param("policyMeta") PolicyPO policyPO);
+  @SelectProvider(
+      type = PolicyMetadataObjectRelSQLProviderFactory.class,
+      method = "listPolicyPOsByMetadataObjectIdAndType")
+  List<PolicyPO> listPolicyPOsByMetadataObjectIdAndType(
+      @Param("metadataObjectId") Long metadataObjectId,
+      @Param("metadataObjectType") String metadataObjectType);
 
   @Results({
     @Result(property = "policyId", column = "policy_id"),
@@ -85,30 +82,69 @@ public interface PolicyMetaMapper {
     @Result(property = "policyVersionPO.deletedAt", column = 
"version_deleted_at")
   })
   @SelectProvider(
-      type = PolicyMetaSQLProviderFactory.class,
-      method = "selectPolicyMetaByMetalakeAndName")
-  PolicyPO selectTagMetaByMetalakeAndName(
+      type = PolicyMetadataObjectRelSQLProviderFactory.class,
+      method = "getPolicyPOsByMetadataObjectAndPolicyName")
+  PolicyPO getPolicyPOsByMetadataObjectAndPolicyName(
+      @Param("metadataObjectId") Long metadataObjectId,
+      @Param("metadataObjectType") String metadataObjectType,
+      @Param("policyName") String policyName);
+
+  @SelectProvider(
+      type = PolicyMetadataObjectRelSQLProviderFactory.class,
+      method = "listPolicyMetadataObjectRelsByMetalakeAndPolicyName")
+  List<PolicyMetadataObjectRelPO> 
listPolicyMetadataObjectRelsByMetalakeAndPolicyName(
       @Param("metalakeName") String metalakeName, @Param("policyName") String 
policyName);
 
-  @UpdateProvider(type = PolicyMetaSQLProviderFactory.class, method = 
"updatePolicyMeta")
-  Integer updatePolicyMeta(
-      @Param("newPolicyMeta") PolicyPO newPolicyMeta,
-      @Param("oldPolicyMeta") PolicyPO oldPolicyMeta);
+  @InsertProvider(
+      type = PolicyMetadataObjectRelSQLProviderFactory.class,
+      method = "batchInsertPolicyMetadataObjectRels")
+  void batchInsertPolicyMetadataObjectRels(
+      @Param("policyRels") List<PolicyMetadataObjectRelPO> policyRelPOs);
+
+  @UpdateProvider(
+      type = PolicyMetadataObjectRelSQLProviderFactory.class,
+      method = 
"batchDeletePolicyMetadataObjectRelsByPolicyIdsAndMetadataObject")
+  void batchDeletePolicyMetadataObjectRelsByPolicyIdsAndMetadataObject(
+      @Param("metadataObjectId") Long metadataObjectId,
+      @Param("metadataObjectType") String metadataObjectType,
+      @Param("policyIds") List<Long> policyIds);
 
   @UpdateProvider(
-      type = PolicyMetaSQLProviderFactory.class,
-      method = "softDeletePolicyByMetalakeAndPolicyName")
-  Integer softDeletePolicyByMetalakeAndPolicyName(
+      type = PolicyMetadataObjectRelSQLProviderFactory.class,
+      method = "softDeletePolicyMetadataObjectRelsByMetalakeAndPolicyName")
+  Integer softDeletePolicyMetadataObjectRelsByMetalakeAndPolicyName(
       @Param("metalakeName") String metalakeName, @Param("policyName") String 
policyName);
 
   @UpdateProvider(
-      type = PolicyMetaSQLProviderFactory.class,
-      method = "softDeletePolicyMetasByMetalakeId")
-  void softDeletePolicyMetasByMetalakeId(@Param("metalakeId") Long metalakeId);
+      type = PolicyMetadataObjectRelSQLProviderFactory.class,
+      method = "softDeletePolicyMetadataObjectRelsByMetalakeId")
+  void softDeletePolicyMetadataObjectRelsByMetalakeId(@Param("metalakeId") 
Long metalakeId);
+
+  @UpdateProvider(
+      type = PolicyMetadataObjectRelSQLProviderFactory.class,
+      method = "softDeletePolicyMetadataObjectRelsByMetadataObject")
+  void softDeletePolicyMetadataObjectRelsByMetadataObject(
+      @Param("metadataObjectId") Long metadataObjectId,
+      @Param("metadataObjectType") String metadataObjectType);
+
+  @UpdateProvider(
+      type = PolicyMetadataObjectRelSQLProviderFactory.class,
+      method = "softDeletePolicyMetadataObjectRelsByCatalogId")
+  void softDeletePolicyMetadataObjectRelsByCatalogId(@Param("catalogId") Long 
catalogId);
+
+  @UpdateProvider(
+      type = PolicyMetadataObjectRelSQLProviderFactory.class,
+      method = "softDeletePolicyMetadataObjectRelsBySchemaId")
+  void softDeletePolicyMetadataObjectRelsBySchemaId(@Param("schemaId") Long 
schemaId);
+
+  @UpdateProvider(
+      type = PolicyMetadataObjectRelSQLProviderFactory.class,
+      method = "softDeletePolicyMetadataObjectRelsByTableId")
+  void softDeletePolicyMetadataObjectRelsByTableId(@Param("tableId") Long 
tableId);
 
   @DeleteProvider(
-      type = PolicyMetaSQLProviderFactory.class,
-      method = "deletePolicyMetasByLegacyTimeline")
-  Integer deletePolicyMetasByLegacyTimeline(
+      type = PolicyMetadataObjectRelSQLProviderFactory.class,
+      method = "deletePolicyEntityRelsByLegacyTimeline")
+  Integer deletePolicyEntityRelsByLegacyTimeline(
       @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetadataObjectRelSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetadataObjectRelSQLProviderFactory.java
new file mode 100644
index 0000000000..166677a10d
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetadataObjectRelSQLProviderFactory.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.PolicyMetadataObjectRelBaseSQLProvider;
+import 
org.apache.gravitino.storage.relational.mapper.provider.postgresql.PolicyMetadataObjectRelPostgreSQLProvider;
+import org.apache.gravitino.storage.relational.po.PolicyMetadataObjectRelPO;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.annotations.Param;
+
+public class PolicyMetadataObjectRelSQLProviderFactory {
+
+  private static final Map<JDBCBackendType, 
PolicyMetadataObjectRelBaseSQLProvider>
+      POLICY_METADATA_OBJECT_RELATION_SQL_PROVIDER_MAP =
+          ImmutableMap.of(
+              JDBCBackendType.MYSQL, new 
PolicyMetadataObjectRelMySQLProvider(),
+              JDBCBackendType.H2, new PolicyMetadataObjectRelH2Provider(),
+              JDBCBackendType.POSTGRESQL, new 
PolicyMetadataObjectRelPostgreSQLProvider());
+
+  public static PolicyMetadataObjectRelBaseSQLProvider getProvider() {
+    String databaseId =
+        SqlSessionFactoryHelper.getInstance()
+            .getSqlSessionFactory()
+            .getConfiguration()
+            .getDatabaseId();
+
+    JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId);
+    return 
POLICY_METADATA_OBJECT_RELATION_SQL_PROVIDER_MAP.get(jdbcBackendType);
+  }
+
+  static class PolicyMetadataObjectRelMySQLProvider
+      extends PolicyMetadataObjectRelBaseSQLProvider {}
+
+  static class PolicyMetadataObjectRelH2Provider extends 
PolicyMetadataObjectRelBaseSQLProvider {}
+
+  public static String listPolicyPOsByMetadataObjectIdAndType(
+      @Param("metadataObjectId") Long metadataObjectId,
+      @Param("metadataObjectType") String metadataObjectType) {
+    return getProvider()
+        .listPolicyPOsByMetadataObjectIdAndType(metadataObjectId, 
metadataObjectType);
+  }
+
+  public static String getPolicyPOsByMetadataObjectAndPolicyName(
+      @Param("metadataObjectId") Long metadataObjectId,
+      @Param("metadataObjectType") String metadataObjectType,
+      @Param("policyName") String policyName) {
+    return getProvider()
+        .getPolicyPOsByMetadataObjectAndPolicyName(
+            metadataObjectId, metadataObjectType, policyName);
+  }
+
+  public static String listPolicyMetadataObjectRelsByMetalakeAndPolicyName(
+      @Param("metalakeName") String metalakeName, @Param("policyName") String 
policyName) {
+    return getProvider()
+        .listPolicyMetadataObjectRelsByMetalakeAndPolicyName(metalakeName, 
policyName);
+  }
+
+  public static String batchInsertPolicyMetadataObjectRels(
+      @Param("policyRels") List<PolicyMetadataObjectRelPO> policyRelPOs) {
+    return getProvider().batchInsertPolicyMetadataObjectRels(policyRelPOs);
+  }
+
+  public static String 
batchDeletePolicyMetadataObjectRelsByPolicyIdsAndMetadataObject(
+      @Param("metadataObjectId") Long metadataObjectId,
+      @Param("metadataObjectType") String metadataObjectType,
+      @Param("policyIds") List<Long> policyIds) {
+    return getProvider()
+        .batchDeletePolicyMetadataObjectRelsByPolicyIdsAndMetadataObject(
+            metadataObjectId, metadataObjectType, policyIds);
+  }
+
+  public static String 
softDeletePolicyMetadataObjectRelsByMetalakeAndPolicyName(
+      @Param("metalakeName") String metalakeName, @Param("policyName") String 
policyName) {
+    return getProvider()
+        
.softDeletePolicyMetadataObjectRelsByMetalakeAndPolicyName(metalakeName, 
policyName);
+  }
+
+  public static String softDeletePolicyMetadataObjectRelsByMetalakeId(
+      @Param("metalakeId") Long metalakeId) {
+    return 
getProvider().softDeletePolicyMetadataObjectRelsByMetalakeId(metalakeId);
+  }
+
+  public static String softDeletePolicyMetadataObjectRelsByMetadataObject(
+      @Param("metadataObjectId") Long metadataObjectId,
+      @Param("metadataObjectType") String metadataObjectType) {
+    return getProvider()
+        .softDeletePolicyMetadataObjectRelsByMetadataObject(metadataObjectId, 
metadataObjectType);
+  }
+
+  public static String softDeletePolicyMetadataObjectRelsByCatalogId(
+      @Param("catalogId") Long catalogId) {
+    return 
getProvider().softDeletePolicyMetadataObjectRelsByCatalogId(catalogId);
+  }
+
+  public static String softDeletePolicyMetadataObjectRelsBySchemaId(
+      @Param("schemaId") Long schemaId) {
+    return 
getProvider().softDeletePolicyMetadataObjectRelsBySchemaId(schemaId);
+  }
+
+  public static String 
softDeletePolicyMetadataObjectRelsByTableId(@Param("tableId") Long tableId) {
+    return getProvider().softDeletePolicyMetadataObjectRelsByTableId(tableId);
+  }
+
+  public static String deletePolicyEntityRelsByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return 
getProvider().deletePolicyEntityRelsByLegacyTimeline(legacyTimeline, limit);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyMetaBaseSQLProvider.java
index 1f1e525071..65f64a05df 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyMetaBaseSQLProvider.java
@@ -21,6 +21,7 @@ package 
org.apache.gravitino.storage.relational.mapper.provider.base;
 import static 
org.apache.gravitino.storage.relational.mapper.PolicyMetaMapper.POLICY_META_TABLE_NAME;
 import static 
org.apache.gravitino.storage.relational.mapper.PolicyVersionMapper.POLICY_VERSION_TABLE_NAME;
 
+import java.util.List;
 import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
 import org.apache.gravitino.storage.relational.po.PolicyPO;
 import org.apache.ibatis.annotations.Param;
@@ -46,6 +47,32 @@ public class PolicyMetaBaseSQLProvider {
         + " AND pvi.deleted_at = 0";
   }
 
+  public String listPolicyPOsByMetalakeAndPolicyNames(
+      @Param("metalakeName") String metalakeName, @Param("policyNames") 
List<String> policyNames) {
+    return "<script>"
+        + "SELECT pm.policy_id, pm.policy_name, pm.policy_type, 
pm.metalake_id, pm.inheritable,"
+        + " pm.exclusive, pm.supported_object_types, pm.audit_info, 
pm.current_version, pm.last_version,"
+        + " pm.deleted_at, pvi.id, pvi.metalake_id as version_metalake_id, 
pvi.policy_id as version_policy_id,"
+        + " pvi.version, pvi.policy_comment, pvi.enabled, pvi.content, 
pvi.deleted_at as version_deleted_at"
+        + " FROM "
+        + POLICY_META_TABLE_NAME
+        + " pm JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm ON pm.metalake_id = mm.metalake_id"
+        + " JOIN "
+        + POLICY_VERSION_TABLE_NAME
+        + " pvi ON pm.policy_id = pvi.policy_id"
+        + " AND pm.current_version = pvi.version"
+        + " WHERE mm.metalake_name = #{metalakeName}"
+        + " AND pm.policy_name IN "
+        + " <foreach item='policyName' index='index' collection='policyNames' 
open='(' separator=',' close=')'>"
+        + " #{policyName}"
+        + " </foreach>"
+        + " AND pm.deleted_at = 0 AND mm.deleted_at = 0"
+        + " AND pvi.deleted_at = 0"
+        + "</script>";
+  }
+
   public String insertPolicyMetaOnDuplicateKeyUpdate(@Param("policyMeta") 
PolicyPO policyPO) {
     return "INSERT INTO "
         + POLICY_META_TABLE_NAME
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyMetadataObjectRelBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyMetadataObjectRelBaseSQLProvider.java
new file mode 100644
index 0000000000..29150fe024
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyMetadataObjectRelBaseSQLProvider.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import static 
org.apache.gravitino.storage.relational.mapper.PolicyVersionMapper.POLICY_VERSION_TABLE_NAME;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.PolicyMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper;
+import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.TopicMetaMapper;
+import org.apache.gravitino.storage.relational.po.PolicyMetadataObjectRelPO;
+import org.apache.ibatis.annotations.Param;
+
+public class PolicyMetadataObjectRelBaseSQLProvider {
+
+  public String listPolicyPOsByMetadataObjectIdAndType(
+      @Param("metadataObjectId") Long metadataObjectId,
+      @Param("metadataObjectType") String metadataObjectType) {
+    return "SELECT pm.policy_id, pm.policy_name, pm.policy_type, 
pm.metalake_id, pm.inheritable,"
+        + " pm.exclusive, pm.supported_object_types, pm.audit_info, 
pm.current_version, pm.last_version,"
+        + " pm.deleted_at, pvi.id, pvi.metalake_id as version_metalake_id, 
pvi.policy_id as version_policy_id,"
+        + " pvi.version, pvi.policy_comment, pvi.enabled, pvi.content, 
pvi.deleted_at as version_deleted_at"
+        + " FROM "
+        + PolicyMetaMapper.POLICY_META_TABLE_NAME
+        + " pm JOIN "
+        + 
PolicyMetadataObjectRelMapper.POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " pe ON pm.policy_id = pe.policy_id"
+        + " JOIN "
+        + POLICY_VERSION_TABLE_NAME
+        + " pvi ON pm.policy_id = pvi.policy_id AND pm.current_version = 
pvi.version"
+        + " WHERE pe.metadata_object_id = #{metadataObjectId}"
+        + " AND pe.metadata_object_type = #{metadataObjectType} AND 
pe.deleted_at = 0"
+        + " AND pm.deleted_at = 0 AND pvi.deleted_at = 0";
+  }
+
+  public String getPolicyPOsByMetadataObjectAndPolicyName(
+      @Param("metadataObjectId") Long metadataObjectId,
+      @Param("metadataObjectType") String metadataObjectType,
+      @Param("policyName") String policyName) {
+    return "SELECT pm.policy_id, pm.policy_name, pm.policy_type, 
pm.metalake_id, pm.inheritable,"
+        + " pm.exclusive, pm.supported_object_types, pm.audit_info, 
pm.current_version, pm.last_version,"
+        + " pm.deleted_at, pvi.id, pvi.metalake_id as version_metalake_id, 
pvi.policy_id as version_policy_id,"
+        + " pvi.version, pvi.policy_comment, pvi.enabled, pvi.content, 
pvi.deleted_at as version_deleted_at"
+        + " FROM "
+        + PolicyMetaMapper.POLICY_META_TABLE_NAME
+        + " pm JOIN "
+        + 
PolicyMetadataObjectRelMapper.POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " pe ON pm.policy_id = pe.policy_id"
+        + " JOIN "
+        + POLICY_VERSION_TABLE_NAME
+        + " pvi ON pm.policy_id = pvi.policy_id AND pm.current_version = 
pvi.version"
+        + " WHERE pe.metadata_object_id = #{metadataObjectId}"
+        + " AND pe.metadata_object_type = #{metadataObjectType} AND 
pm.policy_name = #{policyName}"
+        + " AND pe.deleted_at = 0 AND pm.deleted_at = 0 AND pvi.deleted_at = 
0";
+  }
+
+  public String listPolicyMetadataObjectRelsByMetalakeAndPolicyName(
+      @Param("metalakeName") String metalakeName, @Param("policyName") String 
policyName) {
+    return "SELECT pe.policy_id as policyId, pe.metadata_object_id as 
metadataObjectId,"
+        + " pe.metadata_object_type as metadataObjectType, pe.audit_info as 
auditInfo,"
+        + " pe.current_version as currentVersion, pe.last_version as 
lastVersion,"
+        + " pe.deleted_at as deletedAt"
+        + " FROM "
+        + 
PolicyMetadataObjectRelMapper.POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " pe JOIN "
+        + PolicyMetaMapper.POLICY_META_TABLE_NAME
+        + " pm ON pe.policy_id = pm.policy_id"
+        + " JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm ON pm.metalake_id = mm.metalake_id"
+        + " WHERE mm.metalake_name = #{metalakeName} AND pm.policy_name = 
#{policyName}"
+        + " AND pe.deleted_at = 0 AND pm.deleted_at = 0 AND mm.deleted_at = 0";
+  }
+
+  public String batchInsertPolicyMetadataObjectRels(
+      @Param("policyRels") List<PolicyMetadataObjectRelPO> policyRelPOs) {
+    return "<script>"
+        + "INSERT INTO "
+        + 
PolicyMetadataObjectRelMapper.POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + "(policy_id, metadata_object_id, metadata_object_type, audit_info,"
+        + " current_version, last_version, deleted_at)"
+        + " VALUES "
+        + "<foreach collection='policyRels' item='item' separator=','>"
+        + "(#{item.policyId},"
+        + " #{item.metadataObjectId},"
+        + " #{item.metadataObjectType},"
+        + " #{item.auditInfo},"
+        + " #{item.currentVersion},"
+        + " #{item.lastVersion},"
+        + " #{item.deletedAt})"
+        + "</foreach>"
+        + "</script>";
+  }
+
+  public String 
batchDeletePolicyMetadataObjectRelsByPolicyIdsAndMetadataObject(
+      @Param("metadataObjectId") Long metadataObjectId,
+      @Param("metadataObjectType") String metadataObjectType,
+      @Param("policyIds") List<Long> policyIds) {
+    return "<script>"
+        + "UPDATE "
+        + 
PolicyMetadataObjectRelMapper.POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE policy_id IN "
+        + "<foreach item='policyId' collection='policyIds' open='(' 
separator=',' close=')'>"
+        + "#{policyId}"
+        + "</foreach>"
+        + " AND metadata_object_id = #{metadataObjectId}"
+        + " AND metadata_object_type = #{metadataObjectType} AND deleted_at = 
0"
+        + "</script>";
+  }
+
+  public String softDeletePolicyMetadataObjectRelsByMetalakeAndPolicyName(
+      @Param("metalakeName") String metalakeName, @Param("policyName") String 
policyName) {
+    return "UPDATE "
+        + 
PolicyMetadataObjectRelMapper.POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " pe JOIN "
+        + PolicyMetaMapper.POLICY_META_TABLE_NAME
+        + " pm ON pe.policy_id = pm.policy_id JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm ON pm.metalake_id = mm.metalake_id"
+        + " SET pe.deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE mm.metalake_name = #{metalakeName} AND pm.policy_name = 
#{policyName}"
+        + " AND pe.deleted_at = 0 AND pm.deleted_at = 0 AND mm.deleted_at = 0";
+  }
+
+  public String softDeletePolicyMetadataObjectRelsByMetalakeId(
+      @Param("metalakeId") Long metalakeId) {
+    return "UPDATE "
+        + 
PolicyMetadataObjectRelMapper.POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " pe JOIN "
+        + PolicyMetaMapper.POLICY_META_TABLE_NAME
+        + " pm ON pe.policy_id = pm.policy_id"
+        + " SET pe.deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE pm.metalake_id = #{metalakeId}"
+        + " AND pe.deleted_at = 0 AND pm.deleted_at = 0";
+  }
+
+  public String softDeletePolicyMetadataObjectRelsByMetadataObject(
+      @Param("metadataObjectId") Long metadataObjectId,
+      @Param("metadataObjectType") String metadataObjectType) {
+    return " UPDATE "
+        + 
PolicyMetadataObjectRelMapper.POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE metadata_object_id = #{metadataObjectId} AND deleted_at = 0"
+        + " AND metadata_object_type = #{metadataObjectType}";
+  }
+
+  public String 
softDeletePolicyMetadataObjectRelsByCatalogId(@Param("catalogId") Long 
catalogId) {
+    return "UPDATE "
+        + 
PolicyMetadataObjectRelMapper.POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0) + EXTRACT(MICROSECOND 
FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE deleted_at = 0 AND ("
+        + "   (metadata_object_type = 'CATALOG' AND metadata_object_id = 
#{catalogId})"
+        + "   OR (metadata_object_type = 'SCHEMA' AND metadata_object_id IN 
(SELECT schema_id FROM "
+        + SchemaMetaMapper.TABLE_NAME
+        + " WHERE catalog_id = #{catalogId}))"
+        + "   OR (metadata_object_type = 'TOPIC' AND metadata_object_id IN 
(SELECT topic_id FROM "
+        + TopicMetaMapper.TABLE_NAME
+        + " WHERE catalog_id = #{catalogId}))"
+        + "   OR (metadata_object_type = 'TABLE' AND metadata_object_id IN 
(SELECT table_id FROM "
+        + TableMetaMapper.TABLE_NAME
+        + " WHERE catalog_id = #{catalogId}))"
+        + "   OR (metadata_object_type = 'FILESET' AND metadata_object_id IN 
(SELECT fileset_id FROM "
+        + FilesetMetaMapper.META_TABLE_NAME
+        + " WHERE catalog_id = #{catalogId}))"
+        + "   OR (metadata_object_type = 'MODEL' AND metadata_object_id IN 
(SELECT model_id FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " WHERE catalog_id = #{catalogId}))"
+        + " )";
+  }
+
+  public String 
softDeletePolicyMetadataObjectRelsBySchemaId(@Param("schemaId") Long schemaId) {
+    return "UPDATE "
+        + 
PolicyMetadataObjectRelMapper.POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0) + EXTRACT(MICROSECOND 
FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE deleted_at = 0 AND ("
+        + "   (metadata_object_type = 'SCHEMA' AND metadata_object_id = 
#{schemaId})"
+        + "   OR (metadata_object_type = 'TOPIC' AND metadata_object_id IN 
(SELECT topic_id FROM "
+        + TopicMetaMapper.TABLE_NAME
+        + " WHERE schema_id = #{schemaId}))"
+        + "   OR (metadata_object_type = 'TABLE' AND metadata_object_id IN 
(SELECT table_id FROM "
+        + TableMetaMapper.TABLE_NAME
+        + " WHERE schema_id = #{schemaId}))"
+        + "   OR (metadata_object_type = 'FILESET' AND metadata_object_id IN 
(SELECT fileset_id FROM "
+        + FilesetMetaMapper.META_TABLE_NAME
+        + " WHERE schema_id = #{schemaId}))"
+        + "   OR (metadata_object_type = 'MODEL' AND metadata_object_id IN 
(SELECT model_id FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " WHERE schema_id = #{schemaId}))"
+        + " )";
+  }
+
+  public String softDeletePolicyMetadataObjectRelsByTableId(@Param("tableId") 
Long tableId) {
+    return "UPDATE "
+        + 
PolicyMetadataObjectRelMapper.POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE metadata_object_id = #{tableId}"
+        + " AND metadata_object_type = 'TABLE'"
+        + " AND deleted_at = 0";
+  }
+
+  public String deletePolicyEntityRelsByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return "DELETE FROM "
+        + 
PolicyMetadataObjectRelMapper.POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit}";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/PolicyMetadataObjectRelPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/PolicyMetadataObjectRelPostgreSQLProvider.java
new file mode 100644
index 0000000000..4e3a7bd48c
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/PolicyMetadataObjectRelPostgreSQLProvider.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import static 
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper.POLICY_METADATA_OBJECT_RELATION_TABLE_NAME;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.PolicyMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.TableColumnMapper;
+import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.TopicMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.PolicyMetadataObjectRelBaseSQLProvider;
+import org.apache.ibatis.annotations.Param;
+
+public class PolicyMetadataObjectRelPostgreSQLProvider
+    extends PolicyMetadataObjectRelBaseSQLProvider {
+  private static final String DELETED_AT_NOW_EXPRESSION =
+      " floor(extract(epoch from((current_timestamp - timestamp '1970-01-01 
00:00:00')*1000))) ";
+
+  @Override
+  public String softDeletePolicyMetadataObjectRelsByMetalakeAndPolicyName(
+      String metalakeName, String policyName) {
+    return "UPDATE "
+        + POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " te SET deleted_at ="
+        + DELETED_AT_NOW_EXPRESSION
+        + " WHERE te.policy_id IN (SELECT tm.policy_id FROM "
+        + PolicyMetaMapper.POLICY_META_TABLE_NAME
+        + " tm WHERE tm.metalake_id IN (SELECT mm.metalake_id FROM "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm WHERE mm.metalake_name = #{metalakeName} AND mm.deleted_at = 0)"
+        + " AND tm.policy_name = #{policyName} AND tm.deleted_at = 0) AND 
te.deleted_at = 0";
+  }
+
+  @Override
+  public String softDeletePolicyMetadataObjectRelsByMetalakeId(Long 
metalakeId) {
+    return "UPDATE "
+        + POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " te SET deleted_at ="
+        + DELETED_AT_NOW_EXPRESSION
+        + " WHERE te.policy_id IN (SELECT tm.policy_id FROM "
+        + PolicyMetaMapper.POLICY_META_TABLE_NAME
+        + " tm WHERE tm.metalake_id = #{metalakeId} AND tm.deleted_at = 0) AND 
te.deleted_at = 0";
+  }
+
+  @Override
+  public String softDeletePolicyMetadataObjectRelsByMetadataObject(
+      @Param("metadataObjectId") Long metadataObjectId,
+      @Param("metadataObjectType") String metadataObjectType) {
+    return "UPDATE "
+        + POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " SET deleted_at ="
+        + DELETED_AT_NOW_EXPRESSION
+        + " WHERE metadata_object_id = #{metadataObjectId} AND deleted_at = 0"
+        + " AND metadata_object_type = #{metadataObjectType}";
+  }
+
+  @Override
+  public String 
softDeletePolicyMetadataObjectRelsByCatalogId(@Param("catalogId") Long 
catalogId) {
+    return "UPDATE "
+        + POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " pe SET deleted_at ="
+        + DELETED_AT_NOW_EXPRESSION
+        + " FROM "
+        + POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " pe_alias"
+        + " LEFT JOIN "
+        + CatalogMetaMapper.TABLE_NAME
+        + " ct ON pe_alias.metadata_object_id = ct.catalog_id AND 
pe_alias.metadata_object_type = 'CATALOG'"
+        + " LEFT JOIN "
+        + SchemaMetaMapper.TABLE_NAME
+        + " st ON pe_alias.metadata_object_id = st.schema_id AND 
pe_alias.metadata_object_type = 'SCHEMA'"
+        + " LEFT JOIN "
+        + TopicMetaMapper.TABLE_NAME
+        + " tt ON pe_alias.metadata_object_id = tt.topic_id AND 
pe_alias.metadata_object_type = 'TOPIC'"
+        + " LEFT JOIN "
+        + TableMetaMapper.TABLE_NAME
+        + " tat ON pe_alias.metadata_object_id = tat.table_id AND 
pe_alias.metadata_object_type = 'TABLE'"
+        + " LEFT JOIN "
+        + FilesetMetaMapper.META_TABLE_NAME
+        + " ft ON pe_alias.metadata_object_id = ft.fileset_id AND 
pe_alias.metadata_object_type = 'FILESET'"
+        + " LEFT JOIN "
+        + ModelMetaMapper.TABLE_NAME
+        + " mt ON pe_alias.metadata_object_id = mt.model_id AND 
pe_alias.metadata_object_type = 'MODEL'"
+        + " WHERE pe.id = pe_alias.id AND pe.deleted_at = 0 AND ("
+        + "   ct.catalog_id = #{catalogId} OR st.catalog_id = #{catalogId} OR 
tt.catalog_id = #{catalogId}"
+        + "   OR tat.catalog_id = #{catalogId} OR ft.catalog_id = #{catalogId} 
OR mt.catalog_id = #{catalogId}"
+        + " )";
+  }
+
+  @Override
+  public String 
softDeletePolicyMetadataObjectRelsBySchemaId(@Param("schemaId") Long schemaId) {
+    return "UPDATE "
+        + POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " pe SET deleted_at ="
+        + DELETED_AT_NOW_EXPRESSION
+        + " FROM "
+        + POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " pe_alias"
+        + " LEFT JOIN "
+        + SchemaMetaMapper.TABLE_NAME
+        + " st ON pe_alias.metadata_object_id = st.schema_id AND 
pe_alias.metadata_object_type = 'SCHEMA'"
+        + " LEFT JOIN "
+        + TopicMetaMapper.TABLE_NAME
+        + " tt ON pe_alias.metadata_object_id = tt.topic_id AND 
pe_alias.metadata_object_type = 'TOPIC'"
+        + " LEFT JOIN "
+        + TableMetaMapper.TABLE_NAME
+        + " tat ON pe_alias.metadata_object_id = tat.table_id AND 
pe_alias.metadata_object_type = 'TABLE'"
+        + " LEFT JOIN "
+        + FilesetMetaMapper.META_TABLE_NAME
+        + " ft ON pe_alias.metadata_object_id = ft.fileset_id AND 
pe_alias.metadata_object_type = 'FILESET'"
+        + " LEFT JOIN "
+        + ModelMetaMapper.TABLE_NAME
+        + " mt ON pe_alias.metadata_object_id = mt.model_id AND 
pe_alias.metadata_object_type = 'MODEL'"
+        + " WHERE pe.id = pe_alias.id AND pe.deleted_at = 0 AND ("
+        + "   st.schema_id = #{schemaId} OR tt.schema_id = #{schemaId} OR 
tat.schema_id = #{schemaId}"
+        + "   OR ft.schema_id = #{schemaId} OR mt.schema_id = #{schemaId}"
+        + " )";
+  }
+
+  @Override
+  public String softDeletePolicyMetadataObjectRelsByTableId(@Param("tableId") 
Long tableId) {
+    return "UPDATE "
+        + POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " SET deleted_at ="
+        + DELETED_AT_NOW_EXPRESSION
+        + " WHERE deleted_at = 0 AND ("
+        + "   (metadata_object_id = #{tableId} AND metadata_object_type = 
'TABLE') OR "
+        + "   metadata_object_id IN (SELECT column_id FROM "
+        + TableColumnMapper.COLUMN_TABLE_NAME
+        + " WHERE table_id = #{tableId} AND deleted_at = 0)"
+        + " AND metadata_object_type = 'COLUMN'"
+        + ")";
+  }
+
+  @Override
+  public String 
batchDeletePolicyMetadataObjectRelsByPolicyIdsAndMetadataObject(
+      Long metadataObjectId, String metadataObjectType, List<Long> policyIds) {
+    return "<script>"
+        + "UPDATE "
+        + POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " SET deleted_at ="
+        + DELETED_AT_NOW_EXPRESSION
+        + " WHERE policy_id IN "
+        + "<foreach item='policyId' collection='policyIds' open='(' 
separator=',' close=')'>"
+        + "#{policyId}"
+        + "</foreach>"
+        + " AND metadata_object_id = #{metadataObjectId}"
+        + " AND metadata_object_type = #{metadataObjectType} AND deleted_at = 
0"
+        + "</script>";
+  }
+
+  @Override
+  public String listPolicyMetadataObjectRelsByMetalakeAndPolicyName(
+      String metalakeName, String policyName) {
+    return "SELECT te.policy_id as policyId, te.metadata_object_id as 
metadataObjectId,"
+        + " te.metadata_object_type as metadataObjectType, te.audit_info as 
auditInfo,"
+        + " te.current_version as currentVersion, te.last_version as 
lastVersion,"
+        + " te.deleted_at as deletedAt"
+        + " FROM "
+        + POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " te JOIN "
+        + PolicyMetaMapper.POLICY_META_TABLE_NAME
+        + " tm ON te.policy_id = tm.policy_id JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm ON tm.metalake_id = mm.metalake_id"
+        + " WHERE mm.metalake_name = #{metalakeName} AND tm.policy_name = 
#{policyName}"
+        + " AND te.deleted_at = 0 AND tm.deleted_at = 0 AND mm.deleted_at = 0";
+  }
+
+  @Override
+  public String deletePolicyEntityRelsByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return "DELETE FROM "
+        + POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " WHERE id IN (SELECT id FROM "
+        + POLICY_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit})";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/po/PolicyMetadataObjectRelPO.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/PolicyMetadataObjectRelPO.java
new file mode 100644
index 0000000000..4c24746124
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/PolicyMetadataObjectRelPO.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.po;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
+
+@Getter
+public class PolicyMetadataObjectRelPO {
+  private Long policyId;
+  private Long metadataObjectId;
+  private String metadataObjectType;
+  private String auditInfo;
+  private Long currentVersion;
+  private Long lastVersion;
+  private Long deletedAt;
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof PolicyMetadataObjectRelPO)) {
+      return false;
+    }
+    PolicyMetadataObjectRelPO policyRelPO = (PolicyMetadataObjectRelPO) o;
+    return Objects.equal(policyId, policyRelPO.policyId)
+        && Objects.equal(metadataObjectId, policyRelPO.metadataObjectId)
+        && Objects.equal(metadataObjectType, policyRelPO.metadataObjectType)
+        && Objects.equal(auditInfo, policyRelPO.auditInfo)
+        && Objects.equal(currentVersion, policyRelPO.currentVersion)
+        && Objects.equal(lastVersion, policyRelPO.lastVersion)
+        && Objects.equal(deletedAt, policyRelPO.deletedAt);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(
+        policyId,
+        metadataObjectId,
+        metadataObjectType,
+        auditInfo,
+        currentVersion,
+        lastVersion,
+        deletedAt);
+  }
+
+  public static class Builder {
+    private final PolicyMetadataObjectRelPO policyRelPO;
+
+    private Builder() {
+      policyRelPO = new PolicyMetadataObjectRelPO();
+    }
+
+    public Builder withPolicyId(Long policyId) {
+      policyRelPO.policyId = policyId;
+      return this;
+    }
+
+    public Builder withMetadataObjectId(Long metadataObjectId) {
+      policyRelPO.metadataObjectId = metadataObjectId;
+      return this;
+    }
+
+    public Builder withMetadataObjectType(String metadataObjectType) {
+      policyRelPO.metadataObjectType = metadataObjectType;
+      return this;
+    }
+
+    public Builder withAuditInfo(String auditInfo) {
+      policyRelPO.auditInfo = auditInfo;
+      return this;
+    }
+
+    public Builder withCurrentVersion(Long currentVersion) {
+      policyRelPO.currentVersion = currentVersion;
+      return this;
+    }
+
+    public Builder withLastVersion(Long lastVersion) {
+      policyRelPO.lastVersion = lastVersion;
+      return this;
+    }
+
+    public Builder withDeletedAt(Long deletedAt) {
+      policyRelPO.deletedAt = deletedAt;
+      return this;
+    }
+
+    private void validate() {
+      Preconditions.checkArgument(policyRelPO.policyId != null, "Policy id is 
required");
+      Preconditions.checkArgument(
+          policyRelPO.metadataObjectId != null, "Metadata object id is 
required");
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(policyRelPO.metadataObjectType),
+          "Metadata object type should not be empty");
+      Preconditions.checkArgument(policyRelPO.auditInfo != null, "Audit info 
is required");
+      Preconditions.checkArgument(
+          policyRelPO.currentVersion != null, "Current version is required");
+      Preconditions.checkArgument(policyRelPO.lastVersion != null, "Last 
version is required");
+      Preconditions.checkArgument(policyRelPO.deletedAt != null, "Deleted at 
is required");
+    }
+
+    public PolicyMetadataObjectRelPO build() {
+      validate();
+      return policyRelPO;
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
index 16b328c351..a297c47afb 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
@@ -41,6 +41,7 @@ import 
org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.ModelVersionAliasRelMapper;
 import org.apache.gravitino.storage.relational.mapper.ModelVersionMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
 import org.apache.gravitino.storage.relational.mapper.TableColumnMapper;
@@ -258,6 +259,10 @@ public class CatalogMetaService {
               SessionUtils.doWithoutCommit(
                   TagMetadataObjectRelMapper.class,
                   mapper -> 
mapper.softDeleteTagMetadataObjectRelsByCatalogId(catalogId)),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  PolicyMetadataObjectRelMapper.class,
+                  mapper -> 
mapper.softDeletePolicyMetadataObjectRelsByCatalogId(catalogId)),
           () ->
               SessionUtils.doWithoutCommit(
                   ModelVersionAliasRelMapper.class,
@@ -301,6 +306,12 @@ public class CatalogMetaService {
                   TagMetadataObjectRelMapper.class,
                   mapper ->
                       mapper.softDeleteTagMetadataObjectRelsByMetadataObject(
+                          catalogId, MetadataObject.Type.CATALOG.name())),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  PolicyMetadataObjectRelMapper.class,
+                  mapper ->
+                      
mapper.softDeletePolicyMetadataObjectRelsByMetadataObject(
                           catalogId, MetadataObject.Type.CATALOG.name())));
     }
 
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
index e97cbd356e..669acbbe87 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
@@ -33,6 +33,7 @@ import org.apache.gravitino.meta.FilesetEntity;
 import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
 import 
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.po.FilesetMaxVersionPO;
@@ -260,6 +261,12 @@ public class FilesetMetaService {
                 TagMetadataObjectRelMapper.class,
                 mapper ->
                     mapper.softDeleteTagMetadataObjectRelsByMetadataObject(
+                        filesetId, MetadataObject.Type.FILESET.name())),
+        () ->
+            SessionUtils.doWithoutCommit(
+                PolicyMetadataObjectRelMapper.class,
+                mapper ->
+                    mapper.softDeletePolicyMetadataObjectRelsByMetadataObject(
                         filesetId, MetadataObject.Type.FILESET.name())));
 
     return true;
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
index 79ca6fc1b0..5b98be0504 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
@@ -38,6 +38,7 @@ import 
org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.ModelVersionAliasRelMapper;
 import org.apache.gravitino.storage.relational.mapper.ModelVersionMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
 import 
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.po.ModelPO;
@@ -156,6 +157,12 @@ public class ModelMetaService {
                 TagMetadataObjectRelMapper.class,
                 mapper ->
                     mapper.softDeleteTagMetadataObjectRelsByMetadataObject(
+                        modelId, MetadataObject.Type.MODEL.name())),
+        () ->
+            SessionUtils.doWithoutCommit(
+                PolicyMetadataObjectRelMapper.class,
+                mapper ->
+                    mapper.softDeletePolicyMetadataObjectRelsByMetadataObject(
                         modelId, MetadataObject.Type.MODEL.name())));
 
     return modelDeletedCount.get() > 0;
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
index 36ebcd6411..cc59db2400 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
@@ -19,8 +19,12 @@
 package org.apache.gravitino.storage.relational.service;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -28,17 +32,22 @@ import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.meta.PolicyEntity;
 import org.apache.gravitino.storage.relational.mapper.PolicyMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.mapper.PolicyVersionMapper;
 import org.apache.gravitino.storage.relational.po.PolicyMaxVersionPO;
+import org.apache.gravitino.storage.relational.po.PolicyMetadataObjectRelPO;
 import org.apache.gravitino.storage.relational.po.PolicyPO;
 import org.apache.gravitino.storage.relational.utils.ExceptionUtils;
 import org.apache.gravitino.storage.relational.utils.POConverters;
 import org.apache.gravitino.storage.relational.utils.SessionUtils;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.NamespaceUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -182,33 +191,201 @@ public class PolicyMetaService {
   }
 
   public List<PolicyEntity> listPoliciesForMetadataObject(
-      NameIdentifier objectIdent, MetadataObject.Type objectType)
+      NameIdentifier objectIdent, Entity.EntityType objectType)
       throws NoSuchEntityException, IOException {
-    // todo: implement this method
-    throw new UnsupportedOperationException("Not implemented yet");
+    MetadataObject metadataObject = 
NameIdentifierUtil.toMetadataObject(objectIdent, objectType);
+    String metalake = objectIdent.namespace().level(0);
+
+    List<PolicyPO> PolicyPOs;
+    try {
+      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
+      Long metadataObjectId =
+          MetadataObjectService.getMetadataObjectId(
+              metalakeId, metadataObject.fullName(), metadataObject.type());
+
+      PolicyPOs =
+          SessionUtils.doWithoutCommitAndFetchResult(
+              PolicyMetadataObjectRelMapper.class,
+              mapper ->
+                  mapper.listPolicyPOsByMetadataObjectIdAndType(
+                      metadataObjectId, metadataObject.type().toString()));
+    } catch (RuntimeException e) {
+      ExceptionUtils.checkSQLException(e, Entity.EntityType.POLICY, 
objectIdent.toString());
+      throw e;
+    }
+
+    return PolicyPOs.stream()
+        .map(PolicyPO -> POConverters.fromPolicyPO(PolicyPO, 
NamespaceUtil.ofPolicy(metalake)))
+        .collect(Collectors.toList());
   }
 
   public PolicyEntity getPolicyForMetadataObject(
-      NameIdentifier objectIdent, MetadataObject.Type objectType, 
NameIdentifier policyIdent)
+      NameIdentifier objectIdent, Entity.EntityType objectType, NameIdentifier 
policyIdent)
       throws NoSuchEntityException, IOException {
-    // todo: implement this method
-    throw new UnsupportedOperationException("Not implemented yet");
+    MetadataObject metadataObject = 
NameIdentifierUtil.toMetadataObject(objectIdent, objectType);
+    String metalake = objectIdent.namespace().level(0);
+
+    PolicyPO policyPO;
+    try {
+      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
+      Long metadataObjectId =
+          MetadataObjectService.getMetadataObjectId(
+              metalakeId, metadataObject.fullName(), metadataObject.type());
+
+      policyPO =
+          SessionUtils.getWithoutCommit(
+              PolicyMetadataObjectRelMapper.class,
+              mapper ->
+                  mapper.getPolicyPOsByMetadataObjectAndPolicyName(
+                      metadataObjectId, metadataObject.type().toString(), 
policyIdent.name()));
+    } catch (RuntimeException e) {
+      ExceptionUtils.checkSQLException(e, Entity.EntityType.POLICY, 
policyIdent.toString());
+      throw e;
+    }
+
+    if (policyPO == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.POLICY.name().toLowerCase(),
+          policyIdent.name());
+    }
+
+    return POConverters.fromPolicyPO(policyPO, 
NamespaceUtil.ofPolicy(metalake));
   }
 
   public List<MetadataObject> 
listAssociatedMetadataObjectsForPolicy(NameIdentifier policyIdent)
       throws IOException {
-    // todo: implement this method
-    throw new UnsupportedOperationException("Not implemented yet");
+    String metalakeName = policyIdent.namespace().level(0);
+    String policyName = policyIdent.name();
+
+    try {
+      List<PolicyMetadataObjectRelPO> policyMetadataObjectRelPOs =
+          SessionUtils.doWithCommitAndFetchResult(
+              PolicyMetadataObjectRelMapper.class,
+              mapper ->
+                  mapper.listPolicyMetadataObjectRelsByMetalakeAndPolicyName(
+                      metalakeName, policyName));
+
+      List<MetadataObject> metadataObjects = Lists.newArrayList();
+      Map<String, List<PolicyMetadataObjectRelPO>> 
policyMetadataObjectRelPOsByType =
+          policyMetadataObjectRelPOs.stream()
+              
.collect(Collectors.groupingBy(PolicyMetadataObjectRelPO::getMetadataObjectType));
+
+      for (Map.Entry<String, List<PolicyMetadataObjectRelPO>> entry :
+          policyMetadataObjectRelPOsByType.entrySet()) {
+        String metadataObjectType = entry.getKey();
+        List<PolicyMetadataObjectRelPO> rels = entry.getValue();
+
+        List<Long> metadataObjectIds =
+            rels.stream()
+                .map(PolicyMetadataObjectRelPO::getMetadataObjectId)
+                .collect(Collectors.toList());
+        Map<Long, String> metadataObjectNames =
+            MetadataObjectService.TYPE_TO_FULLNAME_FUNCTION_MAP
+                .get(MetadataObject.Type.valueOf(metadataObjectType))
+                .apply(metadataObjectIds);
+
+        for (Map.Entry<Long, String> metadataObjectName : 
metadataObjectNames.entrySet()) {
+          String fullName = metadataObjectName.getValue();
+
+          // Metadata object may be deleted asynchronously when we query the 
name, so it will
+          // return null, we should skip this metadata object.
+          if (fullName != null) {
+            metadataObjects.add(
+                MetadataObjects.parse(fullName, 
MetadataObject.Type.valueOf(metadataObjectType)));
+          }
+        }
+      }
+
+      return metadataObjects;
+    } catch (RuntimeException e) {
+      ExceptionUtils.checkSQLException(e, Entity.EntityType.POLICY, 
policyIdent.toString());
+      throw e;
+    }
   }
 
   public List<PolicyEntity> associatePoliciesWithMetadataObject(
       NameIdentifier objectIdent,
-      MetadataObject.Type objectType,
+      Entity.EntityType objectType,
       NameIdentifier[] policiesToAdd,
       NameIdentifier[] policiesToRemove)
       throws NoSuchEntityException, EntityAlreadyExistsException, IOException {
-    // todo: implement this method
-    throw new UnsupportedOperationException("Not implemented yet");
+    MetadataObject metadataObject = 
NameIdentifierUtil.toMetadataObject(objectIdent, objectType);
+    String metalake = objectIdent.namespace().level(0);
+
+    try {
+      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
+      Long metadataObjectId =
+          MetadataObjectService.getMetadataObjectId(
+              metalakeId, metadataObject.fullName(), metadataObject.type());
+
+      // Fetch all the policies need to associate with the metadata object.
+      List<String> policyNamesToAdd =
+          
Arrays.stream(policiesToAdd).map(NameIdentifier::name).collect(Collectors.toList());
+      List<PolicyPO> policyPOsToAdd =
+          policyNamesToAdd.isEmpty()
+              ? Collections.emptyList()
+              : getPolicyPOsByMetalakeAndNames(metalake, policyNamesToAdd);
+
+      // Fetch all the policies need to remove from the metadata object.
+      List<String> policyNamesToRemove =
+          
Arrays.stream(policiesToRemove).map(NameIdentifier::name).collect(Collectors.toList());
+      List<PolicyPO> policyPOsToRemove =
+          policyNamesToRemove.isEmpty()
+              ? Collections.emptyList()
+              : getPolicyPOsByMetalakeAndNames(metalake, policyNamesToRemove);
+
+      SessionUtils.doMultipleWithCommit(
+          () -> {
+            // Insert the policy metadata object relations.
+            if (policyPOsToAdd.isEmpty()) {
+              return;
+            }
+
+            List<PolicyMetadataObjectRelPO> policyRelsToAdd =
+                policyPOsToAdd.stream()
+                    .map(
+                        policyPO ->
+                            
POConverters.initializePolicyMetadataObjectRelPOWithVersion(
+                                policyPO.getPolicyId(),
+                                metadataObjectId,
+                                metadataObject.type().toString()))
+                    .collect(Collectors.toList());
+            SessionUtils.doWithoutCommit(
+                PolicyMetadataObjectRelMapper.class,
+                mapper -> 
mapper.batchInsertPolicyMetadataObjectRels(policyRelsToAdd));
+          },
+          () -> {
+            // Remove the policy metadata object relations.
+            if (policyPOsToRemove.isEmpty()) {
+              return;
+            }
+
+            List<Long> policyIdsToRemove =
+                
policyPOsToRemove.stream().map(PolicyPO::getPolicyId).collect(Collectors.toList());
+            SessionUtils.doWithoutCommit(
+                PolicyMetadataObjectRelMapper.class,
+                mapper ->
+                    
mapper.batchDeletePolicyMetadataObjectRelsByPolicyIdsAndMetadataObject(
+                        metadataObjectId, metadataObject.type().toString(), 
policyIdsToRemove));
+          });
+
+      // Fetch all the policies associated with the metadata object after the 
operation.
+      List<PolicyPO> policyPOs =
+          SessionUtils.doWithoutCommitAndFetchResult(
+              PolicyMetadataObjectRelMapper.class,
+              mapper ->
+                  mapper.listPolicyPOsByMetadataObjectIdAndType(
+                      metadataObjectId, metadataObject.type().toString()));
+
+      return policyPOs.stream()
+          .map(policyPO -> POConverters.fromPolicyPO(policyPO, 
NamespaceUtil.ofPolicy(metalake)))
+          .collect(Collectors.toList());
+
+    } catch (RuntimeException e) {
+      ExceptionUtils.checkSQLException(e, Entity.EntityType.POLICY, 
objectIdent.toString());
+      throw e;
+    }
   }
 
   public int deletePolicyAndVersionMetasByLegacyTimeline(Long legacyTimeline, 
int limit) {
@@ -261,7 +438,7 @@ public class PolicyMetaService {
     PolicyPO policyPO =
         SessionUtils.getWithoutCommit(
             PolicyMetaMapper.class,
-            mapper -> mapper.selectTagMetaByMetalakeAndName(metalakeName, 
policyName));
+            mapper -> mapper.selectPolicyMetaByMetalakeAndName(metalakeName, 
policyName));
 
     if (policyPO == null) {
       throw new NoSuchEntityException(
@@ -271,4 +448,11 @@ public class PolicyMetaService {
     }
     return policyPO;
   }
+
+  private List<PolicyPO> getPolicyPOsByMetalakeAndNames(
+      String metalakeName, List<String> policyNames) {
+    return SessionUtils.getWithoutCommit(
+        PolicyMetaMapper.class,
+        mapper -> mapper.listPolicyPOsByMetalakeAndPolicyNames(metalakeName, 
policyNames));
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
index 447f3405c6..e3f954239d 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
@@ -41,6 +41,7 @@ import 
org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.ModelVersionAliasRelMapper;
 import org.apache.gravitino.storage.relational.mapper.ModelVersionMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
 import org.apache.gravitino.storage.relational.mapper.TableColumnMapper;
@@ -241,6 +242,10 @@ public class SchemaMetaService {
                 SessionUtils.doWithoutCommit(
                     TagMetadataObjectRelMapper.class,
                     mapper -> 
mapper.softDeleteTagMetadataObjectRelsBySchemaId(schemaId)),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    PolicyMetadataObjectRelMapper.class,
+                    mapper -> 
mapper.softDeletePolicyMetadataObjectRelsBySchemaId(schemaId)),
             () ->
                 SessionUtils.doWithoutCommit(
                     ModelVersionAliasRelMapper.class,
@@ -310,6 +315,12 @@ public class SchemaMetaService {
                     TagMetadataObjectRelMapper.class,
                     mapper ->
                         mapper.softDeleteTagMetadataObjectRelsByMetadataObject(
+                            schemaId, MetadataObject.Type.SCHEMA.name())),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    PolicyMetadataObjectRelMapper.class,
+                    mapper ->
+                        
mapper.softDeletePolicyMetadataObjectRelsByMetadataObject(
                             schemaId, MetadataObject.Type.SCHEMA.name())));
       }
     }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
index bc44ac43a9..d4a93c1a9b 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
@@ -33,6 +33,7 @@ import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
 import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
@@ -239,6 +240,9 @@ public class TableMetaService {
             SessionUtils.doWithoutCommit(
                 TagMetadataObjectRelMapper.class,
                 mapper -> 
mapper.softDeleteTagMetadataObjectRelsByTableId(tableId));
+            SessionUtils.doWithoutCommit(
+                PolicyMetadataObjectRelMapper.class,
+                mapper -> 
mapper.softDeletePolicyMetadataObjectRelsByTableId(tableId));
           }
         });
 
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
index 66a12aa9de..8f3e7eedd5 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
@@ -31,6 +31,7 @@ import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.meta.TopicEntity;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
 import 
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.mapper.TopicMetaMapper;
@@ -203,6 +204,12 @@ public class TopicMetaService {
                 TagMetadataObjectRelMapper.class,
                 mapper ->
                     mapper.softDeleteTagMetadataObjectRelsByMetadataObject(
+                        topicId, MetadataObject.Type.TOPIC.name())),
+        () ->
+            SessionUtils.doWithoutCommit(
+                PolicyMetadataObjectRelMapper.class,
+                mapper ->
+                    mapper.softDeletePolicyMetadataObjectRelsByMetadataObject(
                         topicId, MetadataObject.Type.TOPIC.name())));
 
     return true;
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
index 5763f96d37..b295ddf110 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
@@ -77,6 +77,7 @@ import org.apache.gravitino.storage.relational.po.ModelPO;
 import org.apache.gravitino.storage.relational.po.ModelVersionAliasRelPO;
 import org.apache.gravitino.storage.relational.po.ModelVersionPO;
 import org.apache.gravitino.storage.relational.po.OwnerRelPO;
+import org.apache.gravitino.storage.relational.po.PolicyMetadataObjectRelPO;
 import org.apache.gravitino.storage.relational.po.PolicyPO;
 import org.apache.gravitino.storage.relational.po.PolicyVersionPO;
 import org.apache.gravitino.storage.relational.po.RolePO;
@@ -1453,6 +1454,29 @@ public class POConverters {
     }
   }
 
+  public static PolicyMetadataObjectRelPO 
initializePolicyMetadataObjectRelPOWithVersion(
+      Long policyId, Long metadataObjectId, String metadataObjectType) {
+    try {
+      AuditInfo auditInfo =
+          AuditInfo.builder()
+              .withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+              .withCreateTime(Instant.now())
+              .build();
+
+      return PolicyMetadataObjectRelPO.builder()
+          .withPolicyId(policyId)
+          .withMetadataObjectId(metadataObjectId)
+          .withMetadataObjectType(metadataObjectType)
+          
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(auditInfo))
+          .withCurrentVersion(INIT_VERSION)
+          .withLastVersion(INIT_VERSION)
+          .withDeletedAt(DEFAULT_DELETED_AT)
+          .build();
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to serialize json object:", e);
+    }
+  }
+
   public static OwnerRelPO initializeOwnerRelPOsWithVersion(
       Long metalakeId,
       String ownerType,
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPolicyMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPolicyMetaService.java
index 5f89469fb9..5735ef1728 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPolicyMetaService.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPolicyMetaService.java
@@ -20,21 +20,38 @@ package org.apache.gravitino.storage.relational.service;
 
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.time.Instant;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.CatalogEntity;
+import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.ModelEntity;
 import org.apache.gravitino.meta.PolicyEntity;
+import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.meta.TopicEntity;
 import org.apache.gravitino.policy.PolicyContent;
 import org.apache.gravitino.policy.PolicyContents;
 import org.apache.gravitino.storage.RandomIdGenerator;
 import org.apache.gravitino.storage.relational.TestJDBCBackend;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.NamespaceUtil;
+import org.apache.ibatis.session.SqlSession;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -352,4 +369,593 @@ public class TestPolicyMetaService extends 
TestJDBCBackend {
             policyMetaService.getPolicyByIdentifier(
                 NameIdentifierUtil.ofPolicy(metalakeName + "1", "policy2")));
   }
+
+  @Test
+  public void testAssociateAndDisassociatePoliciesWithMetadataObject() throws 
IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
+    backend.insert(metalake, false);
+
+    CatalogEntity catalog =
+        createCatalog(
+            RandomIdGenerator.INSTANCE.nextId(), Namespace.of(metalakeName), 
"catalog1", auditInfo);
+    backend.insert(catalog, false);
+
+    SchemaEntity schema =
+        createSchemaEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of(metalakeName, catalog.name()),
+            "schema1",
+            auditInfo);
+    backend.insert(schema, false);
+
+    TableEntity table =
+        createTableEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of(metalakeName, catalog.name(), schema.name()),
+            "table1",
+            auditInfo);
+    backend.insert(table, false);
+
+    // Create policies to associate
+    PolicyMetaService policyMetaService = PolicyMetaService.getInstance();
+    PolicyEntity policyEntity1 =
+        PolicyEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("policy1")
+            .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+            .withComment("comment")
+            .withPolicyType("test")
+            .withContent(content)
+            .withSupportedObjectTypes(supportedObjectTypes)
+            .withAuditInfo(auditInfo)
+            .build();
+    policyMetaService.insertPolicy(policyEntity1, false);
+
+    PolicyEntity policyEntity2 =
+        PolicyEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("policy2")
+            .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+            .withComment("comment")
+            .withPolicyType("test")
+            .withContent(content)
+            .withSupportedObjectTypes(supportedObjectTypes)
+            .withAuditInfo(auditInfo)
+            .build();
+    policyMetaService.insertPolicy(policyEntity2, false);
+
+    PolicyEntity policyEntity3 =
+        PolicyEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("policy3")
+            .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+            .withComment("comment")
+            .withPolicyType("test")
+            .withContent(content)
+            .withSupportedObjectTypes(supportedObjectTypes)
+            .withAuditInfo(auditInfo)
+            .build();
+    policyMetaService.insertPolicy(policyEntity3, false);
+
+    // Test associate policies with metadata object
+    NameIdentifier[] policiesToAdd =
+        new NameIdentifier[] {
+          NameIdentifierUtil.ofPolicy(metalakeName, "policy1"),
+          NameIdentifierUtil.ofPolicy(metalakeName, "policy2"),
+          NameIdentifierUtil.ofPolicy(metalakeName, "policy3")
+        };
+
+    List<PolicyEntity> policyEntities =
+        policyMetaService.associatePoliciesWithMetadataObject(
+            catalog.nameIdentifier(), catalog.type(), policiesToAdd, new 
NameIdentifier[0]);
+    Assertions.assertEquals(3, policyEntities.size());
+    Assertions.assertTrue(policyEntities.contains(policyEntity1));
+    Assertions.assertTrue(policyEntities.contains(policyEntity2));
+    Assertions.assertTrue(policyEntities.contains(policyEntity3));
+
+    // Test disassociate policies with metadata object
+    NameIdentifier[] policiesToRemove =
+        new NameIdentifier[] {NameIdentifierUtil.ofPolicy(metalakeName, 
"policy1")};
+
+    List<PolicyEntity> policyEntities1 =
+        policyMetaService.associatePoliciesWithMetadataObject(
+            catalog.nameIdentifier(), catalog.type(), new NameIdentifier[0], 
policiesToRemove);
+
+    Assertions.assertEquals(2, policyEntities1.size());
+    Assertions.assertFalse(policyEntities1.contains(policyEntity1));
+    Assertions.assertTrue(policyEntities1.contains(policyEntity2));
+    Assertions.assertTrue(policyEntities1.contains(policyEntity3));
+
+    // Test no policies to associate and disassociate
+    List<PolicyEntity> policyEntities2 =
+        policyMetaService.associatePoliciesWithMetadataObject(
+            catalog.nameIdentifier(), catalog.type(), new NameIdentifier[0], 
new NameIdentifier[0]);
+    Assertions.assertEquals(2, policyEntities2.size());
+    Assertions.assertFalse(policyEntities2.contains(policyEntity1));
+    Assertions.assertTrue(policyEntities2.contains(policyEntity2));
+    Assertions.assertTrue(policyEntities2.contains(policyEntity3));
+
+    // Test associate and disassociate same policies with metadata object
+    List<PolicyEntity> policyEntities3 =
+        policyMetaService.associatePoliciesWithMetadataObject(
+            catalog.nameIdentifier(), catalog.type(), policiesToRemove, 
policiesToRemove);
+
+    Assertions.assertEquals(2, policyEntities3.size());
+    Assertions.assertFalse(policyEntities3.contains(policyEntity1));
+    Assertions.assertTrue(policyEntities3.contains(policyEntity2));
+    Assertions.assertTrue(policyEntities3.contains(policyEntity3));
+
+    // Test associate and disassociate in-existent policies with metadata 
object
+    NameIdentifier[] policiesToAdd1 =
+        new NameIdentifier[] {
+          NameIdentifierUtil.ofPolicy(metalakeName, "policy4"),
+          NameIdentifierUtil.ofPolicy(metalakeName, "policy5")
+        };
+
+    NameIdentifier[] policiesToRemove1 =
+        new NameIdentifier[] {
+          NameIdentifierUtil.ofPolicy(metalakeName, "policy6"),
+          NameIdentifierUtil.ofPolicy(metalakeName, "policy7")
+        };
+
+    List<PolicyEntity> policyEntities4 =
+        policyMetaService.associatePoliciesWithMetadataObject(
+            catalog.nameIdentifier(), catalog.type(), policiesToAdd1, 
policiesToRemove1);
+
+    Assertions.assertEquals(2, policyEntities4.size());
+    Assertions.assertTrue(policyEntities4.contains(policyEntity2));
+    Assertions.assertTrue(policyEntities4.contains(policyEntity3));
+
+    // Test associate already associated policies with metadata object
+    Assertions.assertThrows(
+        EntityAlreadyExistsException.class,
+        () ->
+            policyMetaService.associatePoliciesWithMetadataObject(
+                catalog.nameIdentifier(), catalog.type(), policiesToAdd, new 
NameIdentifier[0]));
+
+    // Test disassociate already disassociated policies with metadata object
+    List<PolicyEntity> policyEntities5 =
+        policyMetaService.associatePoliciesWithMetadataObject(
+            catalog.nameIdentifier(), catalog.type(), new NameIdentifier[0], 
policiesToRemove);
+
+    Assertions.assertEquals(2, policyEntities5.size());
+    Assertions.assertTrue(policyEntities5.contains(policyEntity2));
+    Assertions.assertTrue(policyEntities5.contains(policyEntity3));
+
+    // Test associate and disassociate with invalid metadata object
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            policyMetaService.associatePoliciesWithMetadataObject(
+                NameIdentifier.of(metalakeName, "non-existent-catalog"),
+                catalog.type(),
+                policiesToAdd,
+                policiesToRemove));
+
+    // Test associate and disassociate to a schema
+    List<PolicyEntity> policyEntities6 =
+        policyMetaService.associatePoliciesWithMetadataObject(
+            schema.nameIdentifier(), schema.type(), policiesToAdd, 
policiesToRemove);
+
+    Assertions.assertEquals(2, policyEntities6.size());
+    Assertions.assertTrue(policyEntities6.contains(policyEntity2));
+    Assertions.assertTrue(policyEntities6.contains(policyEntity3));
+
+    // Test associate and disassociate to a table
+    List<PolicyEntity> policyEntities7 =
+        policyMetaService.associatePoliciesWithMetadataObject(
+            table.nameIdentifier(), table.type(), policiesToAdd, 
policiesToRemove);
+
+    Assertions.assertEquals(2, policyEntities7.size());
+    Assertions.assertTrue(policyEntities7.contains(policyEntity2));
+    Assertions.assertTrue(policyEntities7.contains(policyEntity3));
+  }
+
+  @Test
+  public void testListPoliciesForMetadataObject() throws IOException {
+    testAssociateAndDisassociatePoliciesWithMetadataObject();
+
+    PolicyMetaService policyMetaService = PolicyMetaService.getInstance();
+
+    // Test list policies for catalog
+    List<PolicyEntity> policyEntities =
+        policyMetaService.listPoliciesForMetadataObject(
+            NameIdentifier.of(metalakeName, "catalog1"), 
Entity.EntityType.CATALOG);
+    Assertions.assertEquals(2, policyEntities.size());
+    Assertions.assertTrue(
+        policyEntities.stream().anyMatch(policyEntity -> 
policyEntity.name().equals("policy2")));
+    Assertions.assertTrue(
+        policyEntities.stream().anyMatch(policyEntity -> 
policyEntity.name().equals("policy3")));
+
+    // Test list policies for schema
+    List<PolicyEntity> policyEntities1 =
+        policyMetaService.listPoliciesForMetadataObject(
+            NameIdentifier.of(metalakeName, "catalog1", "schema1"), 
Entity.EntityType.SCHEMA);
+
+    Assertions.assertEquals(2, policyEntities1.size());
+    Assertions.assertTrue(
+        policyEntities1.stream().anyMatch(policyEntity -> 
policyEntity.name().equals("policy2")));
+    Assertions.assertTrue(
+        policyEntities1.stream().anyMatch(policyEntity -> 
policyEntity.name().equals("policy3")));
+
+    // Test list policies for table
+    List<PolicyEntity> policyEntities2 =
+        policyMetaService.listPoliciesForMetadataObject(
+            NameIdentifier.of(metalakeName, "catalog1", "schema1", "table1"),
+            Entity.EntityType.TABLE);
+
+    Assertions.assertEquals(2, policyEntities2.size());
+    Assertions.assertTrue(
+        policyEntities2.stream().anyMatch(policyEntity -> 
policyEntity.name().equals("policy2")));
+    Assertions.assertTrue(
+        policyEntities2.stream().anyMatch(policyEntity -> 
policyEntity.name().equals("policy3")));
+
+    // Test list policies for non-existent metadata object
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            policyMetaService.listPoliciesForMetadataObject(
+                NameIdentifier.of(metalakeName, "catalog1", "schema1", 
"table2"),
+                Entity.EntityType.TABLE));
+  }
+
+  @Test
+  public void testGetPolicyForMetadataObject() throws IOException {
+    testAssociateAndDisassociatePoliciesWithMetadataObject();
+
+    PolicyMetaService policyMetaService = PolicyMetaService.getInstance();
+
+    // Test get policy for catalog
+    PolicyEntity policyEntity =
+        policyMetaService.getPolicyForMetadataObject(
+            NameIdentifier.of(metalakeName, "catalog1"),
+            Entity.EntityType.CATALOG,
+            NameIdentifierUtil.ofPolicy(metalakeName, "policy2"));
+    Assertions.assertEquals("policy2", policyEntity.name());
+
+    // Test get policy for schema
+    PolicyEntity policyEntity1 =
+        policyMetaService.getPolicyForMetadataObject(
+            NameIdentifier.of(metalakeName, "catalog1", "schema1"),
+            Entity.EntityType.SCHEMA,
+            NameIdentifierUtil.ofPolicy(metalakeName, "policy3"));
+    Assertions.assertEquals("policy3", policyEntity1.name());
+
+    // Test get policy for table
+    PolicyEntity policyEntity2 =
+        policyMetaService.getPolicyForMetadataObject(
+            NameIdentifier.of(metalakeName, "catalog1", "schema1", "table1"),
+            Entity.EntityType.TABLE,
+            NameIdentifierUtil.ofPolicy(metalakeName, "policy2"));
+    Assertions.assertEquals("policy2", policyEntity2.name());
+
+    // Test get policy for non-existent metadata object
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            policyMetaService.getPolicyForMetadataObject(
+                NameIdentifier.of(metalakeName, "catalog1", "schema1", 
"table2"),
+                Entity.EntityType.TABLE,
+                NameIdentifierUtil.ofPolicy(metalakeName, "policy2")));
+
+    // Test get policy for non-existent policy
+    Throwable e =
+        Assertions.assertThrows(
+            NoSuchEntityException.class,
+            () ->
+                policyMetaService.getPolicyForMetadataObject(
+                    NameIdentifier.of(metalakeName, "catalog1", "schema1", 
"table1"),
+                    Entity.EntityType.TABLE,
+                    NameIdentifierUtil.ofPolicy(metalakeName, "policy4")));
+    Assertions.assertTrue(e.getMessage().contains("No such policy entity: 
policy4"));
+  }
+
+  @Test
+  public void testListAssociatedMetadataObjectsForPolicy() throws IOException {
+    testAssociateAndDisassociatePoliciesWithMetadataObject();
+
+    PolicyMetaService policyMetaService = PolicyMetaService.getInstance();
+
+    // Test list associated metadata objects for policy2
+    List<MetadataObject> metadataObjects =
+        policyMetaService.listAssociatedMetadataObjectsForPolicy(
+            NameIdentifierUtil.ofPolicy(metalakeName, "policy2"));
+
+    Assertions.assertEquals(3, metadataObjects.size());
+    Assertions.assertTrue(
+        metadataObjects.contains(MetadataObjects.parse("catalog1", 
MetadataObject.Type.CATALOG)));
+    Assertions.assertTrue(
+        metadataObjects.contains(
+            MetadataObjects.parse("catalog1.schema1", 
MetadataObject.Type.SCHEMA)));
+    Assertions.assertTrue(
+        metadataObjects.contains(
+            MetadataObjects.parse("catalog1.schema1.table1", 
MetadataObject.Type.TABLE)));
+
+    // Test list associated metadata objects for policy3
+    List<MetadataObject> metadataObjects1 =
+        policyMetaService.listAssociatedMetadataObjectsForPolicy(
+            NameIdentifierUtil.ofPolicy(metalakeName, "policy3"));
+
+    Assertions.assertEquals(3, metadataObjects1.size());
+    Assertions.assertTrue(
+        metadataObjects1.contains(MetadataObjects.parse("catalog1", 
MetadataObject.Type.CATALOG)));
+    Assertions.assertTrue(
+        metadataObjects1.contains(
+            MetadataObjects.parse("catalog1.schema1", 
MetadataObject.Type.SCHEMA)));
+    Assertions.assertTrue(
+        metadataObjects1.contains(
+            MetadataObjects.parse("catalog1.schema1.table1", 
MetadataObject.Type.TABLE)));
+
+    // Test list associated metadata objects for non-existent policy
+    List<MetadataObject> metadataObjects2 =
+        policyMetaService.listAssociatedMetadataObjectsForPolicy(
+            NameIdentifierUtil.ofPolicy(metalakeName, "policy4"));
+    Assertions.assertEquals(0, metadataObjects2.size());
+
+    // Test metadata object non-exist scenario.
+    backend.delete(
+        NameIdentifier.of(metalakeName, "catalog1", "schema1", "table1"),
+        Entity.EntityType.TABLE,
+        false);
+
+    List<MetadataObject> metadataObjects3 =
+        policyMetaService.listAssociatedMetadataObjectsForPolicy(
+            NameIdentifierUtil.ofPolicy(metalakeName, "policy2"));
+
+    Assertions.assertEquals(2, metadataObjects3.size());
+    Assertions.assertTrue(
+        metadataObjects3.contains(MetadataObjects.parse("catalog1", 
MetadataObject.Type.CATALOG)));
+    Assertions.assertTrue(
+        metadataObjects3.contains(
+            MetadataObjects.parse("catalog1.schema1", 
MetadataObject.Type.SCHEMA)));
+
+    backend.delete(
+        NameIdentifier.of(metalakeName, "catalog1", "schema1"), 
Entity.EntityType.SCHEMA, false);
+
+    List<MetadataObject> metadataObjects4 =
+        policyMetaService.listAssociatedMetadataObjectsForPolicy(
+            NameIdentifierUtil.ofPolicy(metalakeName, "policy2"));
+
+    Assertions.assertEquals(1, metadataObjects4.size());
+    Assertions.assertTrue(
+        metadataObjects4.contains(MetadataObjects.parse("catalog1", 
MetadataObject.Type.CATALOG)));
+
+    backend.delete(NameIdentifier.of(metalakeName, "catalog1"), 
Entity.EntityType.CATALOG, false);
+
+    List<MetadataObject> metadataObjects5 =
+        policyMetaService.listAssociatedMetadataObjectsForPolicy(
+            NameIdentifierUtil.ofPolicy(metalakeName, "policy2"));
+
+    Assertions.assertEquals(0, metadataObjects5.size());
+  }
+
+  @Test
+  public void testDeleteMetadataObjectForPolicy() throws IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
+    backend.insert(metalake, false);
+
+    PolicyMetaService policyMetaService = PolicyMetaService.getInstance();
+    PolicyEntity policyEntity1 =
+        PolicyEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("policy1")
+            .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+            .withComment("comment")
+            .withPolicyType("test")
+            .withContent(content)
+            .withSupportedObjectTypes(supportedObjectTypes)
+            .withAuditInfo(auditInfo)
+            .build();
+    policyMetaService.insertPolicy(policyEntity1, false);
+
+    // 1. Test non-cascade deletion
+    EntitiesToTest entities = createAndAssociateEntities("catalog1", 
"schema1", policyEntity1);
+    Assertions.assertEquals(6, countActivePolicyRel(policyEntity1.id()));
+    Assertions.assertEquals(6, countAllPolicyRel(policyEntity1.id()));
+
+    // Test to delete a model
+    
ModelMetaService.getInstance().deleteModel(entities.model.nameIdentifier());
+    Assertions.assertEquals(5, countActivePolicyRel(policyEntity1.id()));
+    Assertions.assertEquals(6, countAllPolicyRel(policyEntity1.id()));
+
+    // Test to drop a table
+    
TableMetaService.getInstance().deleteTable(entities.table.nameIdentifier());
+    Assertions.assertEquals(4, countActivePolicyRel(policyEntity1.id()));
+    Assertions.assertEquals(6, countAllPolicyRel(policyEntity1.id()));
+
+    // Test to drop a topic
+    
TopicMetaService.getInstance().deleteTopic(entities.topic.nameIdentifier());
+    Assertions.assertEquals(3, countActivePolicyRel(policyEntity1.id()));
+    Assertions.assertEquals(6, countAllPolicyRel(policyEntity1.id()));
+
+    // Test to drop a fileset
+    
FilesetMetaService.getInstance().deleteFileset(entities.fileset.nameIdentifier());
+    Assertions.assertEquals(2, countActivePolicyRel(policyEntity1.id()));
+    Assertions.assertEquals(6, countAllPolicyRel(policyEntity1.id()));
+
+    // Test to drop a schema
+    
SchemaMetaService.getInstance().deleteSchema(entities.schema.nameIdentifier(), 
false);
+    Assertions.assertEquals(1, countActivePolicyRel(policyEntity1.id()));
+    Assertions.assertEquals(6, countAllPolicyRel(policyEntity1.id()));
+
+    // Test to drop a catalog
+    
CatalogMetaService.getInstance().deleteCatalog(entities.catalog.nameIdentifier(),
 false);
+    Assertions.assertEquals(0, countActivePolicyRel(policyEntity1.id()));
+    Assertions.assertEquals(6, countAllPolicyRel(policyEntity1.id()));
+
+    // 2. Test cascade deletion for catalog
+    EntitiesToTest entitiesForCascadeCatalog =
+        createAndAssociateEntities("catalog2", "schema2", policyEntity1);
+    CatalogMetaService.getInstance()
+        .deleteCatalog(entitiesForCascadeCatalog.catalog.nameIdentifier(), 
true);
+    Assertions.assertEquals(0, countActivePolicyRel(policyEntity1.id()));
+    // 6 from previous test + 6 from this test
+    Assertions.assertEquals(12, countAllPolicyRel(policyEntity1.id()));
+
+    // 3. Test cascade deletion for schema
+    EntitiesToTest entitiesForCascadeSchema =
+        createAndAssociateEntities("catalog3", "schema3", policyEntity1);
+    SchemaMetaService.getInstance()
+        .deleteSchema(entitiesForCascadeSchema.schema.nameIdentifier(), true);
+    Assertions.assertEquals(1, countActivePolicyRel(policyEntity1.id()));
+    // 12 from previous tests + 6 from this test
+    Assertions.assertEquals(18, countAllPolicyRel(policyEntity1.id()));
+  }
+
+  private static class EntitiesToTest {
+    final CatalogEntity catalog;
+    final SchemaEntity schema;
+    final TableEntity table;
+    final TopicEntity topic;
+    final FilesetEntity fileset;
+    final ModelEntity model;
+
+    EntitiesToTest(
+        CatalogEntity catalog,
+        SchemaEntity schema,
+        TableEntity table,
+        TopicEntity topic,
+        FilesetEntity fileset,
+        ModelEntity model) {
+      this.catalog = catalog;
+      this.schema = schema;
+      this.table = table;
+      this.topic = topic;
+      this.fileset = fileset;
+      this.model = model;
+    }
+  }
+
+  private EntitiesToTest createAndAssociateEntities(
+      String catalogName, String schemaName, PolicyEntity policyEntity) throws 
IOException {
+    PolicyMetaService policyMetaService = PolicyMetaService.getInstance();
+    NameIdentifier policyIdent = policyEntity.nameIdentifier();
+
+    // Create entities
+    CatalogEntity catalog =
+        createCatalog(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of(metalakeName),
+            catalogName,
+            auditInfo);
+    backend.insert(catalog, false);
+
+    SchemaEntity schema =
+        createSchemaEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of(metalakeName, catalog.name()),
+            schemaName,
+            auditInfo);
+    backend.insert(schema, false);
+
+    TableEntity table =
+        createTableEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of(metalakeName, catalog.name(), schema.name()),
+            "table1",
+            auditInfo);
+    backend.insert(table, false);
+
+    TopicEntity topic =
+        createTopicEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of(metalakeName, catalog.name(), schema.name()),
+            "topic1",
+            auditInfo);
+    backend.insert(topic, false);
+
+    FilesetEntity fileset =
+        createFilesetEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of(metalakeName, catalog.name(), schema.name()),
+            "fileset1",
+            auditInfo);
+    backend.insert(fileset, false);
+
+    ModelEntity model =
+        createModelEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of(metalakeName, catalog.name(), schema.name()),
+            "model1",
+            "comment",
+            1,
+            null,
+            auditInfo);
+    backend.insert(model, false);
+
+    // Associate policy with all entities
+    policyMetaService.associatePoliciesWithMetadataObject(
+        catalog.nameIdentifier(),
+        catalog.type(),
+        new NameIdentifier[] {policyIdent},
+        new NameIdentifier[0]);
+    policyMetaService.associatePoliciesWithMetadataObject(
+        schema.nameIdentifier(),
+        schema.type(),
+        new NameIdentifier[] {policyIdent},
+        new NameIdentifier[0]);
+    policyMetaService.associatePoliciesWithMetadataObject(
+        table.nameIdentifier(),
+        table.type(),
+        new NameIdentifier[] {policyIdent},
+        new NameIdentifier[0]);
+    policyMetaService.associatePoliciesWithMetadataObject(
+        topic.nameIdentifier(),
+        topic.type(),
+        new NameIdentifier[] {policyIdent},
+        new NameIdentifier[0]);
+    policyMetaService.associatePoliciesWithMetadataObject(
+        fileset.nameIdentifier(),
+        fileset.type(),
+        new NameIdentifier[] {policyIdent},
+        new NameIdentifier[0]);
+    policyMetaService.associatePoliciesWithMetadataObject(
+        model.nameIdentifier(),
+        model.type(),
+        new NameIdentifier[] {policyIdent},
+        new NameIdentifier[0]);
+
+    return new EntitiesToTest(catalog, schema, table, topic, fileset, model);
+  }
+
+  private Integer countActivePolicyRel(Long policyId) {
+    try (SqlSession sqlSession =
+            
SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true);
+        Connection connection = sqlSession.getConnection();
+        Statement statement1 = connection.createStatement();
+        ResultSet rs1 =
+            statement1.executeQuery(
+                String.format(
+                    "SELECT count(*) FROM policy_relation_meta WHERE policy_id 
= %d AND deleted_at = 0",
+                    policyId))) {
+      if (rs1.next()) {
+        return rs1.getInt(1);
+      } else {
+        throw new RuntimeException("Doesn't contain data");
+      }
+    } catch (SQLException se) {
+      throw new RuntimeException("SQL execution failed", se);
+    }
+  }
+
+  private Integer countAllPolicyRel(Long policyId) {
+    try (SqlSession sqlSession =
+            
SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true);
+        Connection connection = sqlSession.getConnection();
+        Statement statement1 = connection.createStatement();
+        ResultSet rs1 =
+            statement1.executeQuery(
+                String.format(
+                    "SELECT count(*) FROM policy_relation_meta WHERE policy_id 
= %d", policyId))) {
+      if (rs1.next()) {
+        return rs1.getInt(1);
+      } else {
+        throw new RuntimeException("Doesn't contain data");
+      }
+    } catch (SQLException se) {
+      throw new RuntimeException("SQL execution failed", se);
+    }
+  }
 }

Reply via email to