This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new f52cfd54fa [#7360] feat(policy): support CURD ops for policy in 
backend storage (part-3) (#7560)
f52cfd54fa is described below

commit f52cfd54faa123c5e53a6904152464a1eaf2e09b
Author: mchades <[email protected]>
AuthorDate: Tue Jul 15 11:47:21 2025 +0800

    [#7360] feat(policy): support CURD ops for policy in backend storage 
(part-3) (#7560)
    
    ### What changes were proposed in this pull request?
    
    support CURD ops for policy in backend storage (part-3)
    
    ### Why are the changes needed?
    
    Fix: #7360
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    tests added
---
 .../gravitino/storage/relational/JDBCBackend.java  |  22 +-
 .../relational/mapper/PolicyMetaMapper.java        | 114 +++++++
 .../mapper/PolicyMetaSQLProviderFactory.java       |  91 ++++++
 .../relational/mapper/PolicyVersionMapper.java     |  71 +++++
 .../mapper/PolicyVersionSQLProviderFactory.java    |  90 ++++++
 .../provider/base/PolicyMetaBaseSQLProvider.java   | 158 +++++++++
 .../base/PolicyVersionBaseSQLProvider.java         | 113 +++++++
 .../postgresql/PolicyMetaPostgreSQLProvider.java   |  87 +++++
 .../PolicyVersionPostgreSQLProvider.java           |  94 ++++++
 .../storage/relational/po/PolicyMaxVersionPO.java  |  40 +++
 .../relational/service/MetalakeMetaService.java    |  10 +
 .../relational/service/PolicyMetaService.java      | 242 ++++++++++++++
 .../storage/relational/utils/POConverters.java     | 114 +++++++
 .../apache/gravitino/utils/NameIdentifierUtil.java |  11 +
 .../storage/relational/TestJDBCBackend.java        | 175 ++++++++++
 .../relational/service/TestPolicyMetaService.java  | 355 +++++++++++++++++++++
 16 files changed, 1783 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index 966265c79f..0911a3a5df 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -44,6 +44,7 @@ import org.apache.gravitino.meta.FilesetEntity;
 import org.apache.gravitino.meta.GroupEntity;
 import org.apache.gravitino.meta.ModelEntity;
 import org.apache.gravitino.meta.ModelVersionEntity;
+import org.apache.gravitino.meta.PolicyEntity;
 import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.TableEntity;
@@ -59,6 +60,7 @@ import 
org.apache.gravitino.storage.relational.service.MetalakeMetaService;
 import org.apache.gravitino.storage.relational.service.ModelMetaService;
 import org.apache.gravitino.storage.relational.service.ModelVersionMetaService;
 import org.apache.gravitino.storage.relational.service.OwnerMetaService;
+import org.apache.gravitino.storage.relational.service.PolicyMetaService;
 import org.apache.gravitino.storage.relational.service.RoleMetaService;
 import org.apache.gravitino.storage.relational.service.SchemaMetaService;
 import org.apache.gravitino.storage.relational.service.TableColumnMetaService;
@@ -124,6 +126,8 @@ public class JDBCBackend implements RelationalBackend {
       case MODEL_VERSION:
         return (List<E>)
             
ModelVersionMetaService.getInstance().listModelVersionsByNamespace(namespace);
+      case POLICY:
+        return (List<E>) 
PolicyMetaService.getInstance().listPoliciesByNamespace(namespace);
       default:
         throw new UnsupportedEntityTypeException(
             "Unsupported entity type: %s for list operation", entityType);
@@ -172,6 +176,8 @@ public class JDBCBackend implements RelationalBackend {
                 + "inserting the new model version.");
       }
       
ModelVersionMetaService.getInstance().insertModelVersion((ModelVersionEntity) 
e);
+    } else if (e instanceof PolicyEntity) {
+      PolicyMetaService.getInstance().insertPolicy((PolicyEntity) e, 
overwritten);
     } else {
       throw new UnsupportedEntityTypeException(
           "Unsupported entity type: %s for insert operation", e.getClass());
@@ -207,6 +213,8 @@ public class JDBCBackend implements RelationalBackend {
         return (E) ModelMetaService.getInstance().updateModel(ident, updater);
       case MODEL_VERSION:
         return (E) 
ModelVersionMetaService.getInstance().updateModelVersion(ident, updater);
+      case POLICY:
+        return (E) PolicyMetaService.getInstance().updatePolicy(ident, 
updater);
       default:
         throw new UnsupportedEntityTypeException(
             "Unsupported entity type: %s for update operation", entityType);
@@ -242,6 +250,8 @@ public class JDBCBackend implements RelationalBackend {
         return (E) ModelMetaService.getInstance().getModelByIdentifier(ident);
       case MODEL_VERSION:
         return (E) 
ModelVersionMetaService.getInstance().getModelVersionByIdentifier(ident);
+      case POLICY:
+        return (E) 
PolicyMetaService.getInstance().getPolicyByIdentifier(ident);
       default:
         throw new UnsupportedEntityTypeException(
             "Unsupported entity type: %s for get operation", entityType);
@@ -276,6 +286,8 @@ public class JDBCBackend implements RelationalBackend {
         return ModelMetaService.getInstance().deleteModel(ident);
       case MODEL_VERSION:
         return ModelVersionMetaService.getInstance().deleteModelVersion(ident);
+      case POLICY:
+        return PolicyMetaService.getInstance().deletePolicy(ident);
       default:
         throw new UnsupportedEntityTypeException(
             "Unsupported entity type: %s for delete operation", entityType);
@@ -327,8 +339,9 @@ public class JDBCBackend implements RelationalBackend {
             .deleteTagMetasByLegacyTimeline(
                 legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
       case POLICY:
-        // todo: Implement hard delete logic for policies.
-        return 0;
+        return PolicyMetaService.getInstance()
+            .deletePolicyAndVersionMetasByLegacyTimeline(
+                legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
       case COLUMN:
         return TableColumnMetaService.getInstance()
             .deleteColumnsByLegacyTimeline(legacyTimeline, 
GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
@@ -376,8 +389,9 @@ public class JDBCBackend implements RelationalBackend {
                 versionRetentionCount, 
GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
 
       case POLICY:
-        // todo: Implement delete old version logic for policies.
-        return 0;
+        return PolicyMetaService.getInstance()
+            .deletePolicyVersionsByRetentionCount(
+                versionRetentionCount, 
GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
 
       default:
         throw new IllegalArgumentException(
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaMapper.java
new file mode 100644
index 0000000000..e2fd8058fe
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaMapper.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.po.PolicyPO;
+import org.apache.ibatis.annotations.DeleteProvider;
+import org.apache.ibatis.annotations.InsertProvider;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.Result;
+import org.apache.ibatis.annotations.Results;
+import org.apache.ibatis.annotations.SelectProvider;
+import org.apache.ibatis.annotations.UpdateProvider;
+
+public interface PolicyMetaMapper {
+  String POLICY_META_TABLE_NAME = "policy_meta";
+
+  @Results({
+    @Result(property = "policyId", column = "policy_id"),
+    @Result(property = "policyName", column = "policy_name"),
+    @Result(property = "policyType", column = "policy_type"),
+    @Result(property = "metalakeId", column = "metalake_id"),
+    @Result(property = "inheritable", column = "inheritable"),
+    @Result(property = "exclusive", column = "exclusive"),
+    @Result(property = "supportedObjectTypes", column = 
"supported_object_types"),
+    @Result(property = "auditInfo", column = "audit_info"),
+    @Result(property = "currentVersion", column = "current_version"),
+    @Result(property = "lastVersion", column = "last_version"),
+    @Result(property = "deletedAt", column = "deleted_at"),
+    @Result(property = "policyVersionPO.id", column = "id"),
+    @Result(property = "policyVersionPO.metalakeId", column = 
"version_metalake_id"),
+    @Result(property = "policyVersionPO.policyId", column = 
"version_policy_id"),
+    @Result(property = "policyVersionPO.version", column = "version"),
+    @Result(property = "policyVersionPO.policyComment", column = 
"policy_comment"),
+    @Result(property = "policyVersionPO.enabled", column = "enabled"),
+    @Result(property = "policyVersionPO.content", column = "content"),
+    @Result(property = "policyVersionPO.deletedAt", column = 
"version_deleted_at")
+  })
+  @SelectProvider(type = PolicyMetaSQLProviderFactory.class, method = 
"listPolicyPOsByMetalake")
+  List<PolicyPO> listPolicyPOsByMetalake(@Param("metalakeName") String 
metalakeName);
+
+  @InsertProvider(
+      type = PolicyMetaSQLProviderFactory.class,
+      method = "insertPolicyMetaOnDuplicateKeyUpdate")
+  void insertPolicyMetaOnDuplicateKeyUpdate(@Param("policyMeta") PolicyPO 
policyPO);
+
+  @InsertProvider(type = PolicyMetaSQLProviderFactory.class, method = 
"insertPolicyMeta")
+  void insertPolicyMeta(@Param("policyMeta") PolicyPO policyPO);
+
+  @Results({
+    @Result(property = "policyId", column = "policy_id"),
+    @Result(property = "policyName", column = "policy_name"),
+    @Result(property = "policyType", column = "policy_type"),
+    @Result(property = "metalakeId", column = "metalake_id"),
+    @Result(property = "inheritable", column = "inheritable"),
+    @Result(property = "exclusive", column = "exclusive"),
+    @Result(property = "supportedObjectTypes", column = 
"supported_object_types"),
+    @Result(property = "auditInfo", column = "audit_info"),
+    @Result(property = "currentVersion", column = "current_version"),
+    @Result(property = "lastVersion", column = "last_version"),
+    @Result(property = "deletedAt", column = "deleted_at"),
+    @Result(property = "policyVersionPO.id", column = "id"),
+    @Result(property = "policyVersionPO.metalakeId", column = 
"version_metalake_id"),
+    @Result(property = "policyVersionPO.policyId", column = 
"version_policy_id"),
+    @Result(property = "policyVersionPO.version", column = "version"),
+    @Result(property = "policyVersionPO.policyComment", column = 
"policy_comment"),
+    @Result(property = "policyVersionPO.enabled", column = "enabled"),
+    @Result(property = "policyVersionPO.content", column = "content"),
+    @Result(property = "policyVersionPO.deletedAt", column = 
"version_deleted_at")
+  })
+  @SelectProvider(
+      type = PolicyMetaSQLProviderFactory.class,
+      method = "selectPolicyMetaByMetalakeAndName")
+  PolicyPO selectTagMetaByMetalakeAndName(
+      @Param("metalakeName") String metalakeName, @Param("policyName") String 
policyName);
+
+  @UpdateProvider(type = PolicyMetaSQLProviderFactory.class, method = 
"updatePolicyMeta")
+  Integer updatePolicyMeta(
+      @Param("newPolicyMeta") PolicyPO newPolicyMeta,
+      @Param("oldPolicyMeta") PolicyPO oldPolicyMeta);
+
+  @UpdateProvider(
+      type = PolicyMetaSQLProviderFactory.class,
+      method = "softDeletePolicyByMetalakeAndPolicyName")
+  Integer softDeletePolicyByMetalakeAndPolicyName(
+      @Param("metalakeName") String metalakeName, @Param("policyName") String 
policyName);
+
+  @UpdateProvider(
+      type = PolicyMetaSQLProviderFactory.class,
+      method = "softDeletePolicyMetasByMetalakeId")
+  void softDeletePolicyMetasByMetalakeId(@Param("metalakeId") Long metalakeId);
+
+  @DeleteProvider(
+      type = PolicyMetaSQLProviderFactory.class,
+      method = "deletePolicyMetasByLegacyTimeline")
+  Integer deletePolicyMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaSQLProviderFactory.java
new file mode 100644
index 0000000000..7a7243ca24
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaSQLProviderFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.PolicyMetaBaseSQLProvider;
+import 
org.apache.gravitino.storage.relational.mapper.provider.postgresql.PolicyMetaPostgreSQLProvider;
+import org.apache.gravitino.storage.relational.po.PolicyPO;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.annotations.Param;
+
+public class PolicyMetaSQLProviderFactory {
+
+  private static final Map<JDBCBackendType, PolicyMetaBaseSQLProvider>
+      POLICY_META_SQL_PROVIDER_MAP =
+          ImmutableMap.of(
+              JDBCBackendType.MYSQL, new PolicyMetaMySQLProvider(),
+              JDBCBackendType.H2, new PolicyMetaH2Provider(),
+              JDBCBackendType.POSTGRESQL, new PolicyMetaPostgreSQLProvider());
+
+  public static PolicyMetaBaseSQLProvider getProvider() {
+    String databaseId =
+        SqlSessionFactoryHelper.getInstance()
+            .getSqlSessionFactory()
+            .getConfiguration()
+            .getDatabaseId();
+
+    JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId);
+    return POLICY_META_SQL_PROVIDER_MAP.get(jdbcBackendType);
+  }
+
+  public static String listPolicyPOsByMetalake(@Param("metalakeName") String 
metalakeName) {
+    return getProvider().listPolicyPOsByMetalake(metalakeName);
+  }
+
+  public static String insertPolicyMetaOnDuplicateKeyUpdate(
+      @Param("policyMeta") PolicyPO policyPO) {
+    return getProvider().insertPolicyMetaOnDuplicateKeyUpdate(policyPO);
+  }
+
+  public static String insertPolicyMeta(@Param("policyMeta") PolicyPO 
policyPO) {
+    return getProvider().insertPolicyMeta(policyPO);
+  }
+
+  public static String selectPolicyMetaByMetalakeAndName(
+      @Param("metalakeName") String metalakeName, @Param("policyName") String 
policyName) {
+    return getProvider().selectPolicyMetaByMetalakeAndName(metalakeName, 
policyName);
+  }
+
+  public static String updatePolicyMeta(
+      @Param("newPolicyMeta") PolicyPO newPolicyMeta,
+      @Param("oldPolicyMeta") PolicyPO oldPolicyMeta) {
+    return getProvider().updatePolicyMeta(newPolicyMeta, oldPolicyMeta);
+  }
+
+  public static String softDeletePolicyByMetalakeAndPolicyName(
+      @Param("metalakeName") String metalakeName, @Param("policyName") String 
policyName) {
+    return getProvider().softDeletePolicyByMetalakeAndPolicyName(metalakeName, 
policyName);
+  }
+
+  public static String deletePolicyMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return getProvider().deletePolicyMetasByLegacyTimeline(legacyTimeline, 
limit);
+  }
+
+  public static String softDeletePolicyMetasByMetalakeId(@Param("metalakeId") 
Long metalakeId) {
+    return getProvider().softDeletePolicyMetasByMetalakeId(metalakeId);
+  }
+
+  static class PolicyMetaMySQLProvider extends PolicyMetaBaseSQLProvider {}
+
+  static class PolicyMetaH2Provider extends PolicyMetaBaseSQLProvider {}
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyVersionMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyVersionMapper.java
new file mode 100644
index 0000000000..9bfbc62ea7
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyVersionMapper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.po.PolicyMaxVersionPO;
+import org.apache.gravitino.storage.relational.po.PolicyVersionPO;
+import org.apache.ibatis.annotations.InsertProvider;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.SelectProvider;
+import org.apache.ibatis.annotations.UpdateProvider;
+
+public interface PolicyVersionMapper {
+  String POLICY_VERSION_TABLE_NAME = "policy_version_info";
+
+  @InsertProvider(
+      type = PolicyVersionSQLProviderFactory.class,
+      method = "insertPolicyVersionOnDuplicateKeyUpdate")
+  void insertPolicyVersionOnDuplicateKeyUpdate(
+      @Param("policyVersion") PolicyVersionPO policyVersionPO);
+
+  @InsertProvider(type = PolicyVersionSQLProviderFactory.class, method = 
"insertPolicyVersion")
+  void insertPolicyVersion(@Param("policyVersion") PolicyVersionPO 
policyVersionPO);
+
+  @UpdateProvider(
+      type = PolicyVersionSQLProviderFactory.class,
+      method = "softDeletePolicyVersionByMetalakeAndPolicyName")
+  Integer softDeletePolicyVersionByMetalakeAndPolicyName(
+      @Param("metalakeName") String metalakeName, @Param("policyName") String 
policyName);
+
+  @UpdateProvider(
+      type = PolicyVersionSQLProviderFactory.class,
+      method = "deletePolicyVersionsByLegacyTimeline")
+  Integer deletePolicyVersionsByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+
+  @SelectProvider(
+      type = PolicyVersionSQLProviderFactory.class,
+      method = "selectPolicyVersionsByRetentionCount")
+  List<PolicyMaxVersionPO> selectPolicyVersionsByRetentionCount(
+      @Param("versionRetentionCount") Long versionRetentionCount);
+
+  @UpdateProvider(
+      type = PolicyVersionSQLProviderFactory.class,
+      method = "softDeletePolicyVersionsByRetentionLine")
+  Integer softDeletePolicyVersionsByRetentionLine(
+      @Param("policyId") Long policyId,
+      @Param("versionRetentionLine") long versionRetentionLine,
+      @Param("limit") int limit);
+
+  @UpdateProvider(
+      type = PolicyVersionSQLProviderFactory.class,
+      method = "softDeletePolicyVersionsByMetalakeId")
+  void softDeletePolicyVersionsByMetalakeId(@Param("metalakeId") Long 
metalakeId);
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyVersionSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyVersionSQLProviderFactory.java
new file mode 100644
index 0000000000..9fbe4ddc79
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyVersionSQLProviderFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.PolicyVersionBaseSQLProvider;
+import 
org.apache.gravitino.storage.relational.mapper.provider.postgresql.PolicyVersionPostgreSQLProvider;
+import org.apache.gravitino.storage.relational.po.PolicyVersionPO;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.annotations.Param;
+
+public class PolicyVersionSQLProviderFactory {
+
+  private static final Map<JDBCBackendType, PolicyVersionBaseSQLProvider>
+      POLICY_VERSION_SQL_PROVIDER_MAP =
+          ImmutableMap.of(
+              JDBCBackendType.MYSQL, new PolicyVersionMySQLProvider(),
+              JDBCBackendType.H2, new PolicyVersionH2Provider(),
+              JDBCBackendType.POSTGRESQL, new 
PolicyVersionPostgreSQLProvider());
+
+  public static PolicyVersionBaseSQLProvider getProvider() {
+    String databaseId =
+        SqlSessionFactoryHelper.getInstance()
+            .getSqlSessionFactory()
+            .getConfiguration()
+            .getDatabaseId();
+
+    JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId);
+    return POLICY_VERSION_SQL_PROVIDER_MAP.get(jdbcBackendType);
+  }
+
+  public static String insertPolicyVersionOnDuplicateKeyUpdate(
+      @Param("policyVersion") PolicyVersionPO policyVersionPO) {
+    return 
getProvider().insertPolicyVersionOnDuplicateKeyUpdate(policyVersionPO);
+  }
+
+  public static String insertPolicyVersion(
+      @Param("policyVersion") PolicyVersionPO policyVersionPO) {
+    return getProvider().insertPolicyVersion(policyVersionPO);
+  }
+
+  public static String softDeletePolicyVersionByMetalakeAndPolicyName(
+      @Param("metalakeName") String metalakeName, @Param("policyName") String 
policyName) {
+    return 
getProvider().softDeletePolicyVersionByMetalakeAndPolicyName(metalakeName, 
policyName);
+  }
+
+  public static String deletePolicyVersionsByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return getProvider().deletePolicyVersionsByLegacyTimeline(legacyTimeline, 
limit);
+  }
+
+  public static String selectPolicyVersionsByRetentionCount(
+      @Param("versionRetentionCount") Long versionRetentionCount) {
+    return 
getProvider().selectPolicyVersionsByRetentionCount(versionRetentionCount);
+  }
+
+  public static String softDeletePolicyVersionsByRetentionLine(
+      @Param("policyId") Long policyId,
+      @Param("versionRetentionLine") long versionRetentionLine,
+      @Param("limit") int limit) {
+    return getProvider()
+        .softDeletePolicyVersionsByRetentionLine(policyId, 
versionRetentionLine, limit);
+  }
+
+  public static String 
softDeletePolicyVersionsByMetalakeId(@Param("metalakeId") Long metalakeId) {
+    return getProvider().softDeletePolicyVersionsByMetalakeId(metalakeId);
+  }
+
+  static class PolicyVersionMySQLProvider extends PolicyVersionBaseSQLProvider 
{}
+
+  static class PolicyVersionH2Provider extends PolicyVersionBaseSQLProvider {}
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyMetaBaseSQLProvider.java
new file mode 100644
index 0000000000..1f1e525071
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyMetaBaseSQLProvider.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import static 
org.apache.gravitino.storage.relational.mapper.PolicyMetaMapper.POLICY_META_TABLE_NAME;
+import static 
org.apache.gravitino.storage.relational.mapper.PolicyVersionMapper.POLICY_VERSION_TABLE_NAME;
+
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.po.PolicyPO;
+import org.apache.ibatis.annotations.Param;
+
+public class PolicyMetaBaseSQLProvider {
+
+  public String listPolicyPOsByMetalake(@Param("metalakeName") String 
metalakeName) {
+    return "SELECT pm.policy_id, pm.policy_name, pm.policy_type, 
pm.metalake_id, pm.inheritable,"
+        + " pm.exclusive, pm.supported_object_types, pm.audit_info, 
pm.current_version, pm.last_version,"
+        + " pm.deleted_at, pvi.id, pvi.metalake_id as version_metalake_id, 
pvi.policy_id as version_policy_id,"
+        + " pvi.version, pvi.policy_comment, pvi.enabled, pvi.content, 
pvi.deleted_at as version_deleted_at"
+        + " FROM "
+        + POLICY_META_TABLE_NAME
+        + " pm JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm ON pm.metalake_id = mm.metalake_id"
+        + " JOIN "
+        + POLICY_VERSION_TABLE_NAME
+        + " pvi ON pm.policy_id = pvi.policy_id"
+        + " AND pm.current_version = pvi.version"
+        + " WHERE mm.metalake_name = #{metalakeName}"
+        + " AND pm.deleted_at = 0 AND mm.deleted_at = 0"
+        + " AND pvi.deleted_at = 0";
+  }
+
+  public String insertPolicyMetaOnDuplicateKeyUpdate(@Param("policyMeta") 
PolicyPO policyPO) {
+    return "INSERT INTO "
+        + POLICY_META_TABLE_NAME
+        + " (policy_id, policy_name, policy_type, metalake_id, inheritable, 
exclusive,"
+        + " supported_object_types, audit_info, current_version, last_version, 
deleted_at)"
+        + " VALUES (#{policyMeta.policyId}, #{policyMeta.policyName}, 
#{policyMeta.policyType},"
+        + " #{policyMeta.metalakeId}, #{policyMeta.inheritable}, 
#{policyMeta.exclusive},"
+        + " #{policyMeta.supportedObjectTypes}, #{policyMeta.auditInfo}, 
#{policyMeta.currentVersion},"
+        + " #{policyMeta.lastVersion}, #{policyMeta.deletedAt})"
+        + " ON DUPLICATE KEY UPDATE"
+        + " policy_name = #{policyMeta.policyName},"
+        + " policy_type = #{policyMeta.policyType},"
+        + " metalake_id = #{policyMeta.metalakeId},"
+        + " inheritable = #{policyMeta.inheritable},"
+        + " exclusive = #{policyMeta.exclusive},"
+        + " supported_object_types = #{policyMeta.supportedObjectTypes},"
+        + " audit_info = #{policyMeta.auditInfo},"
+        + " current_version = #{policyMeta.currentVersion},"
+        + " last_version = #{policyMeta.lastVersion},"
+        + " deleted_at = #{policyMeta.deletedAt}";
+  }
+
+  public String insertPolicyMeta(@Param("policyMeta") PolicyPO policyPO) {
+    return "INSERT INTO "
+        + POLICY_META_TABLE_NAME
+        + " (policy_id, policy_name, policy_type, metalake_id, inheritable, 
exclusive,"
+        + " supported_object_types, audit_info, current_version, last_version, 
deleted_at)"
+        + " VALUES (#{policyMeta.policyId}, #{policyMeta.policyName}, 
#{policyMeta.policyType},"
+        + " #{policyMeta.metalakeId}, #{policyMeta.inheritable}, 
#{policyMeta.exclusive},"
+        + " #{policyMeta.supportedObjectTypes}, #{policyMeta.auditInfo}, 
#{policyMeta.currentVersion},"
+        + " #{policyMeta.lastVersion}, #{policyMeta.deletedAt})";
+  }
+
+  public String selectPolicyMetaByMetalakeAndName(
+      @Param("metalakeName") String metalakeName, @Param("policyName") String 
policyName) {
+    return "SELECT pm.policy_id, pm.policy_name, pm.policy_type, 
pm.metalake_id, pm.inheritable,"
+        + " pm.exclusive, pm.supported_object_types, pm.audit_info, 
pm.current_version, pm.last_version,"
+        + " pm.deleted_at, pvi.id, pvi.metalake_id as version_metalake_id, 
pvi.policy_id as version_policy_id,"
+        + " pvi.version, pvi.policy_comment, pvi.enabled, pvi.content, 
pvi.deleted_at as version_deleted_at"
+        + " FROM "
+        + POLICY_META_TABLE_NAME
+        + " pm JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm ON pm.metalake_id = mm.metalake_id"
+        + " JOIN "
+        + POLICY_VERSION_TABLE_NAME
+        + " pvi ON pm.policy_id = pvi.policy_id"
+        + " AND pm.current_version = pvi.version"
+        + " WHERE mm.metalake_name = #{metalakeName}"
+        + " AND pm.policy_name = #{policyName}"
+        + " AND pm.deleted_at = 0 AND mm.deleted_at = 0"
+        + " AND pvi.deleted_at = 0";
+  }
+
+  public String updatePolicyMeta(
+      @Param("newPolicyMeta") PolicyPO newPolicyMeta,
+      @Param("oldPolicyMeta") PolicyPO oldPolicyMeta) {
+    return "UPDATE "
+        + POLICY_META_TABLE_NAME
+        + " SET policy_name = #{newPolicyMeta.policyName},"
+        + " policy_type = #{newPolicyMeta.policyType},"
+        + " metalake_id = #{newPolicyMeta.metalakeId},"
+        + " inheritable = #{newPolicyMeta.inheritable},"
+        + " exclusive = #{newPolicyMeta.exclusive},"
+        + " supported_object_types = #{newPolicyMeta.supportedObjectTypes},"
+        + " audit_info = #{newPolicyMeta.auditInfo},"
+        + " current_version = #{newPolicyMeta.currentVersion},"
+        + " last_version = #{newPolicyMeta.lastVersion},"
+        + " deleted_at = #{newPolicyMeta.deletedAt}"
+        + " WHERE policy_id = #{oldPolicyMeta.policyId}"
+        + " AND policy_name = #{oldPolicyMeta.policyName}"
+        + " AND policy_type = #{oldPolicyMeta.policyType}"
+        + " AND metalake_id = #{oldPolicyMeta.metalakeId}"
+        + " AND inheritable = #{oldPolicyMeta.inheritable}"
+        + " AND exclusive = #{oldPolicyMeta.exclusive}"
+        + " AND supported_object_types = #{oldPolicyMeta.supportedObjectTypes}"
+        + " AND audit_info = #{oldPolicyMeta.auditInfo}"
+        + " AND current_version = #{oldPolicyMeta.currentVersion}"
+        + " AND last_version = #{oldPolicyMeta.lastVersion}"
+        + " AND deleted_at = 0";
+  }
+
+  public String softDeletePolicyByMetalakeAndPolicyName(
+      @Param("metalakeName") String metalakeName, @Param("policyName") String 
policyName) {
+    return "UPDATE "
+        + POLICY_META_TABLE_NAME
+        + " pm SET pm.deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE pm.metalake_id IN ("
+        + " SELECT mm.metalake_id FROM "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm WHERE mm.metalake_name = #{metalakeName} AND mm.deleted_at = 0)"
+        + " AND pm.policy_name = #{policyName} AND pm.deleted_at = 0";
+  }
+
+  public String deletePolicyMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return "DELETE FROM "
+        + POLICY_META_TABLE_NAME
+        + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit}";
+  }
+
+  public String softDeletePolicyMetasByMetalakeId(@Param("metalakeId") Long 
metalakeId) {
+    return "UPDATE "
+        + POLICY_META_TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyVersionBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyVersionBaseSQLProvider.java
new file mode 100644
index 0000000000..2a4ceeca6f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyVersionBaseSQLProvider.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import static 
org.apache.gravitino.storage.relational.mapper.PolicyMetaMapper.POLICY_META_TABLE_NAME;
+import static 
org.apache.gravitino.storage.relational.mapper.PolicyVersionMapper.POLICY_VERSION_TABLE_NAME;
+
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.po.PolicyVersionPO;
+import org.apache.ibatis.annotations.Param;
+
+public class PolicyVersionBaseSQLProvider {
+
+  public String insertPolicyVersionOnDuplicateKeyUpdate(
+      @Param("policyVersion") PolicyVersionPO policyVersion) {
+    return "INSERT INTO "
+        + POLICY_VERSION_TABLE_NAME
+        + " (metalake_id, policy_id, version, policy_comment, enabled, 
content, deleted_at) "
+        + "VALUES (#{policyVersion.metalakeId}, #{policyVersion.policyId}, 
#{policyVersion.version}, #{policyVersion.policyComment}, "
+        + "#{policyVersion.enabled}, #{policyVersion.content}, 
#{policyVersion.deletedAt}) "
+        + "ON DUPLICATE KEY UPDATE "
+        + "metalake_id = #{policyVersion.metalakeId}, "
+        + "policy_id = #{policyVersion.policyId}, "
+        + "version = #{policyVersion.version}, "
+        + "policy_comment = #{policyVersion.policyComment}, "
+        + "enabled = #{policyVersion.enabled}, "
+        + "content = #{policyVersion.content}, "
+        + "deleted_at = #{policyVersion.deletedAt}";
+  }
+
+  public String insertPolicyVersion(@Param("policyVersion") PolicyVersionPO 
policyVersion) {
+    return "INSERT INTO "
+        + POLICY_VERSION_TABLE_NAME
+        + " (metalake_id, policy_id, version, policy_comment, enabled, 
content, deleted_at) "
+        + "VALUES (#{policyVersion.metalakeId}, #{policyVersion.policyId}, 
#{policyVersion.version}, "
+        + "#{policyVersion.policyComment}, #{policyVersion.enabled}, 
#{policyVersion.content}, "
+        + "#{policyVersion.deletedAt})";
+  }
+
+  public String softDeletePolicyVersionByMetalakeAndPolicyName(
+      @Param("metalakeName") String metalakeName, @Param("policyName") String 
policyName) {
+    return "UPDATE "
+        + POLICY_VERSION_TABLE_NAME
+        + " pv SET pv.deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE pv.metalake_id IN ("
+        + " SELECT mm.metalake_id FROM "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm WHERE mm.metalake_name = #{metalakeName} AND mm.deleted_at = 0)"
+        + " AND pv.policy_id IN ("
+        + " SELECT pm.policy_id FROM "
+        + POLICY_META_TABLE_NAME
+        + " pm WHERE pm.policy_name = #{policyName} AND pm.deleted_at = 0"
+        + " AND pm.metalake_id IN ("
+        + " SELECT mm.metalake_id FROM "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm WHERE mm.metalake_name = #{metalakeName} AND mm.deleted_at = 
0))"
+        + " AND pv.deleted_at = 0";
+  }
+
+  public String deletePolicyVersionsByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return "DELETE FROM "
+        + POLICY_VERSION_TABLE_NAME
+        + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit}";
+  }
+
+  public String selectPolicyVersionsByRetentionCount(
+      @Param("versionRetentionCount") Long versionRetentionCount) {
+    return "SELECT policy_id as policyId, "
+        + "Max(version) as version "
+        + "FROM "
+        + POLICY_VERSION_TABLE_NAME
+        + " WHERE version > #{versionRetentionCount} AND deleted_at = 0 "
+        + "GROUP BY policy_id";
+  }
+
+  public String softDeletePolicyVersionsByRetentionLine(
+      @Param("policyId") Long policyId,
+      @Param("versionRetentionLine") long versionRetentionLine,
+      @Param("limit") int limit) {
+    return "UPDATE "
+        + POLICY_VERSION_TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE policy_id = #{policyId} AND version <= 
#{versionRetentionLine} AND deleted_at = 0"
+        + " LIMIT #{limit}";
+  }
+
+  public String softDeletePolicyVersionsByMetalakeId(@Param("metalakeId") Long 
metalakeId) {
+    return "UPDATE "
+        + POLICY_VERSION_TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/PolicyMetaPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/PolicyMetaPostgreSQLProvider.java
new file mode 100644
index 0000000000..8a2274af9f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/PolicyMetaPostgreSQLProvider.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import static 
org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper.META_TABLE_NAME;
+
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.PolicyMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.PolicyPO;
+
+public class PolicyMetaPostgreSQLProvider extends PolicyMetaBaseSQLProvider {
+
+  @Override
+  public String softDeletePolicyByMetalakeAndPolicyName(String metalakeName, 
String policyName) {
+    return "UPDATE "
+        + META_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE metalake_id = (SELECT metalake_id FROM "
+        + " metalake_meta mm WHERE mm.metalake_name = #{metalakeName} AND 
mm.deleted_at = 0)"
+        + " AND policy_name = #{policyName} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeletePolicyMetasByMetalakeId(Long metalakeId) {
+    return "UPDATE "
+        + META_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String deletePolicyMetasByLegacyTimeline(Long legacyTimeline, int 
limit) {
+    return "DELETE FROM "
+        + META_TABLE_NAME
+        + " WHERE policy_id IN (SELECT policy_id FROM "
+        + META_TABLE_NAME
+        + " WHERE deleted_at = 0 AND legacy_timeline < #{legacyTimeline} LIMIT 
#{limit})";
+  }
+
+  @Override
+  public String insertPolicyMetaOnDuplicateKeyUpdate(PolicyPO policyPO) {
+    return "INSERT INTO "
+        + META_TABLE_NAME
+        + " (policy_id, policy_name, policy_type, metalake_id, inheritable, 
exclusive,"
+        + " supported_object_types, audit_info, current_version, last_version, 
deleted_at)"
+        + " VALUES ("
+        + " #{policyPO.policyId},"
+        + " #{policyPO.policyName},"
+        + " #{policyPO.policyType},"
+        + " #{policyPO.metalakeId},"
+        + " #{policyPO.inheritable},"
+        + " #{policyPO.exclusive},"
+        + " #{policyPO.supportedObjectTypes},"
+        + " #{policyPO.auditInfo},"
+        + " #{policyPO.currentVersion},"
+        + " #{policyPO.lastVersion},"
+        + " #{policyPO.deletedAt})"
+        + " ON CONFLICT (policy_id) DO UPDATE SET"
+        + " policy_name = #{policyPO.policyName},"
+        + " policy_type = #{policyPO.policyType},"
+        + " metalake_id = #{policyPO.metalakeId},"
+        + " inheritable = #{policyPO.inheritable},"
+        + " exclusive = #{policyPO.exclusive},"
+        + " supported_object_types = #{policyPO.supportedObjectTypes},"
+        + " audit_info = #{policyPO.auditInfo},"
+        + " current_version = #{policyPO.currentVersion},"
+        + " last_version = #{policyPO.lastVersion}"
+        + " deleted_at = #{policyPO.deletedAt}";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/PolicyVersionPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/PolicyVersionPostgreSQLProvider.java
new file mode 100644
index 0000000000..9e396937b8
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/PolicyVersionPostgreSQLProvider.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import static 
org.apache.gravitino.storage.relational.mapper.PolicyMetaMapper.POLICY_META_TABLE_NAME;
+import static 
org.apache.gravitino.storage.relational.mapper.PolicyVersionMapper.POLICY_VERSION_TABLE_NAME;
+
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.PolicyVersionBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.PolicyVersionPO;
+
+public class PolicyVersionPostgreSQLProvider extends 
PolicyVersionBaseSQLProvider {
+  @Override
+  public String softDeletePolicyVersionByMetalakeAndPolicyName(
+      String metalakeName, String policyName) {
+    return "UPDATE "
+        + POLICY_VERSION_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE metalake_id = (SELECT metalake_id FROM "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm WHERE mm.metalake_name = #{metalakeName} AND mm.deleted_at = 0)"
+        + " AND policy_id = (SELECT policy_id FROM "
+        + POLICY_META_TABLE_NAME
+        + " pm WHERE pm.policy_name = #{policyName} AND pm.deleted_at = 0)"
+        + " AND deleted_at = 0";
+  }
+
+  @Override
+  public String deletePolicyVersionsByLegacyTimeline(Long legacyTimeline, int 
limit) {
+    return "DELETE FROM "
+        + POLICY_VERSION_TABLE_NAME
+        + " WHERE id IN (SELECT id FROM "
+        + POLICY_VERSION_TABLE_NAME
+        + " WHERE deleted_at = 0 AND legacy_timeline < #{legacyTimeline} LIMIT 
#{limit})";
+  }
+
+  @Override
+  public String softDeletePolicyVersionsByRetentionLine(
+      Long policyId, long versionRetentionLine, int limit) {
+    return "UPDATE "
+        + POLICY_VERSION_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE policy_id = #{policyId} AND version < 
#{versionRetentionLine}"
+        + " AND deleted_at = 0 LIMIT #{limit}";
+  }
+
+  @Override
+  public String softDeletePolicyVersionsByMetalakeId(Long metalakeId) {
+    return "UPDATE "
+        + POLICY_VERSION_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String insertPolicyVersionOnDuplicateKeyUpdate(PolicyVersionPO 
policyVersion) {
+    return "INSERT INTO "
+        + POLICY_VERSION_TABLE_NAME
+        + " (metalake_id, policy_id, version, policy_comment, enabled,"
+        + " content, deleted_at)"
+        + " VALUES ("
+        + " #{policyVersion.metalakeId},"
+        + " #{policyVersion.policyId},"
+        + " #{policyVersion.version},"
+        + " #{policyVersion.policyComment},"
+        + " #{policyVersion.enabled},"
+        + " #{policyVersion.content},"
+        + " #{policyVersion.deletedAt})"
+        + " ON CONFLICT (policy_id, version, deleted_at) DO UPDATE SET"
+        + " policy_comment = #{policyVersion.policyComment},"
+        + " enabled = #{policyVersion.enabled},"
+        + " content = #{policyVersion.content},"
+        + " deleted_at = #{policyVersion.deletedAt}";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/po/PolicyMaxVersionPO.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/PolicyMaxVersionPO.java
new file mode 100644
index 0000000000..3f01b0e7b4
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/PolicyMaxVersionPO.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.po;
+
+import java.util.Objects;
+import lombok.Getter;
+
+@Getter
+public class PolicyMaxVersionPO {
+  private Long policyId;
+  private Long version;
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof PolicyMaxVersionPO)) return false;
+    PolicyMaxVersionPO that = (PolicyMaxVersionPO) o;
+    return Objects.equals(policyId, that.policyId) && Objects.equals(version, 
that.version);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(policyId, version);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
index 75e217279d..ac7b32306b 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
@@ -41,6 +41,8 @@ import 
org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.ModelVersionAliasRelMapper;
 import org.apache.gravitino.storage.relational.mapper.ModelVersionMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.PolicyMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.PolicyVersionMapper;
 import org.apache.gravitino.storage.relational.mapper.RoleMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
@@ -243,6 +245,14 @@ public class MetalakeMetaService {
                 SessionUtils.doWithoutCommit(
                     TagMetadataObjectRelMapper.class,
                     mapper -> 
mapper.softDeleteTagMetadataObjectRelsByMetalakeId(metalakeId)),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    PolicyMetaMapper.class,
+                    mapper -> 
mapper.softDeletePolicyMetasByMetalakeId(metalakeId)),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    PolicyVersionMapper.class,
+                    mapper -> 
mapper.softDeletePolicyVersionsByMetalakeId(metalakeId)),
             () ->
                 SessionUtils.doWithoutCommit(
                     OwnerMetaMapper.class,
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
new file mode 100644
index 0000000000..8176224622
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.meta.PolicyEntity;
+import org.apache.gravitino.storage.relational.mapper.PolicyMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.PolicyVersionMapper;
+import org.apache.gravitino.storage.relational.po.PolicyMaxVersionPO;
+import org.apache.gravitino.storage.relational.po.PolicyPO;
+import org.apache.gravitino.storage.relational.utils.ExceptionUtils;
+import org.apache.gravitino.storage.relational.utils.POConverters;
+import org.apache.gravitino.storage.relational.utils.SessionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PolicyMetaService {
+  private static final PolicyMetaService INSTANCE = new PolicyMetaService();
+  private static final Logger LOG = 
LoggerFactory.getLogger(PolicyMetaService.class);
+
+  public static PolicyMetaService getInstance() {
+    return INSTANCE;
+  }
+
+  private PolicyMetaService() {}
+
+  public List<PolicyEntity> listPoliciesByNamespace(Namespace namespace) {
+    String metalakeName = namespace.level(0);
+    List<PolicyPO> policyPOs =
+        SessionUtils.getWithoutCommit(
+            PolicyMetaMapper.class, mapper -> 
mapper.listPolicyPOsByMetalake(metalakeName));
+    return policyPOs.stream()
+        .map(policyPO -> POConverters.fromPolicyPO(policyPO, namespace))
+        .collect(Collectors.toList());
+  }
+
+  public PolicyEntity getPolicyByIdentifier(NameIdentifier ident) {
+    String metalakeName = ident.namespace().level(0);
+    PolicyPO policyPO = getPolicyPOByMetalakeAndName(metalakeName, 
ident.name());
+    return POConverters.fromPolicyPO(policyPO, ident.namespace());
+  }
+
+  public void insertPolicy(PolicyEntity policyEntity, boolean overwritten) 
throws IOException {
+    Namespace ns = policyEntity.namespace();
+    String metalakeName = ns.level(0);
+
+    try {
+      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalakeName);
+
+      PolicyPO.Builder builder = PolicyPO.builder().withMetalakeId(metalakeId);
+      PolicyPO policyPO = 
POConverters.initializePolicyPOWithVersion(policyEntity, builder);
+
+      // insert both policy meta table and policy version table
+      SessionUtils.doMultipleWithCommit(
+          () ->
+              SessionUtils.doWithoutCommit(
+                  PolicyMetaMapper.class,
+                  mapper -> {
+                    if (overwritten) {
+                      mapper.insertPolicyMetaOnDuplicateKeyUpdate(policyPO);
+                    } else {
+                      mapper.insertPolicyMeta(policyPO);
+                    }
+                  }),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  PolicyVersionMapper.class,
+                  mapper -> {
+                    if (overwritten) {
+                      
mapper.insertPolicyVersionOnDuplicateKeyUpdate(policyPO.getPolicyVersionPO());
+                    } else {
+                      
mapper.insertPolicyVersion(policyPO.getPolicyVersionPO());
+                    }
+                  }));
+    } catch (RuntimeException e) {
+      ExceptionUtils.checkSQLException(e, Entity.EntityType.POLICY, 
policyEntity.toString());
+      throw e;
+    }
+  }
+
+  public <E extends Entity & HasIdentifier> PolicyEntity updatePolicy(
+      NameIdentifier ident, Function<E, E> updater) throws IOException {
+    String metalakeName = ident.namespace().level(0);
+
+    PolicyPO oldPolicyPO = getPolicyPOByMetalakeAndName(metalakeName, 
ident.name());
+    PolicyEntity oldPolicyEntity = POConverters.fromPolicyPO(oldPolicyPO, 
ident.namespace());
+    PolicyEntity updatedPolicyEntity = (PolicyEntity) updater.apply((E) 
oldPolicyEntity);
+    Preconditions.checkArgument(
+        Objects.equals(oldPolicyEntity.id(), updatedPolicyEntity.id()),
+        "The updated policy entity id: %s must have the same id as the old 
entity id %s",
+        updatedPolicyEntity.id(),
+        oldPolicyEntity.id());
+
+    Integer updateResult;
+    try {
+      boolean checkNeedUpdateVersion =
+          POConverters.checkPolicyVersionNeedUpdate(
+              oldPolicyPO.getPolicyVersionPO(), updatedPolicyEntity);
+      PolicyPO newPolicyPO =
+          POConverters.updatePolicyPOWithVersion(
+              oldPolicyPO, updatedPolicyEntity, checkNeedUpdateVersion);
+      if (checkNeedUpdateVersion) {
+        SessionUtils.doMultipleWithCommit(
+            () ->
+                SessionUtils.doWithoutCommit(
+                    PolicyVersionMapper.class,
+                    mapper -> 
mapper.insertPolicyVersion(newPolicyPO.getPolicyVersionPO())),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    PolicyMetaMapper.class,
+                    mapper -> mapper.updatePolicyMeta(newPolicyPO, 
oldPolicyPO)));
+        // we set the updateResult to 1 to indicate that the update is 
successful
+        updateResult = 1;
+      } else {
+        updateResult =
+            SessionUtils.doWithCommitAndFetchResult(
+                PolicyMetaMapper.class,
+                mapper -> mapper.updatePolicyMeta(newPolicyPO, oldPolicyPO));
+      }
+    } catch (RuntimeException re) {
+      ExceptionUtils.checkSQLException(
+          re, Entity.EntityType.POLICY, 
updatedPolicyEntity.nameIdentifier().toString());
+      throw re;
+    }
+
+    if (updateResult > 0) {
+      return updatedPolicyEntity;
+    } else {
+      throw new IOException("Failed to update the entity: " + 
updatedPolicyEntity);
+    }
+  }
+
+  public boolean deletePolicy(NameIdentifier ident) {
+    String metalakeName = ident.namespace().level(0);
+    int[] policyMetaDeletedCount = new int[] {0};
+    int[] policyVersionDeletedCount = new int[] {0};
+
+    // We should delete meta and version info
+    SessionUtils.doMultipleWithCommit(
+        () ->
+            policyMetaDeletedCount[0] =
+                SessionUtils.doWithoutCommitAndFetchResult(
+                    PolicyMetaMapper.class,
+                    mapper ->
+                        
mapper.softDeletePolicyByMetalakeAndPolicyName(metalakeName, ident.name())),
+        () ->
+            policyVersionDeletedCount[0] =
+                SessionUtils.doWithoutCommitAndFetchResult(
+                    PolicyVersionMapper.class,
+                    mapper ->
+                        mapper.softDeletePolicyVersionByMetalakeAndPolicyName(
+                            metalakeName, ident.name())));
+    return policyMetaDeletedCount[0] + policyVersionDeletedCount[0] > 0;
+  }
+
+  public int deletePolicyAndVersionMetasByLegacyTimeline(Long legacyTimeline, 
int limit) {
+    int policyDeletedCount =
+        SessionUtils.doWithCommitAndFetchResult(
+            PolicyMetaMapper.class,
+            mapper -> mapper.deletePolicyMetasByLegacyTimeline(legacyTimeline, 
limit));
+
+    int policyVersionDeletedCount =
+        SessionUtils.doWithCommitAndFetchResult(
+            PolicyVersionMapper.class,
+            mapper -> 
mapper.deletePolicyVersionsByLegacyTimeline(legacyTimeline, limit));
+
+    return policyDeletedCount + policyVersionDeletedCount;
+  }
+
+  public int deletePolicyVersionsByRetentionCount(Long versionRetentionCount, 
int limit) {
+    // get the current version of all policies.
+    List<PolicyMaxVersionPO> policyMaxVersions =
+        SessionUtils.getWithoutCommit(
+            PolicyVersionMapper.class,
+            mapper -> 
mapper.selectPolicyVersionsByRetentionCount(versionRetentionCount));
+
+    // soft delete old versions that are smaller than or equal to (maxVersion -
+    // versionRetentionCount).
+    int totalDeletedCount = 0;
+    for (PolicyMaxVersionPO policyMaxVersion : policyMaxVersions) {
+      long versionRetentionLine = policyMaxVersion.getVersion() - 
versionRetentionCount;
+      int deletedCount =
+          SessionUtils.doWithCommitAndFetchResult(
+              PolicyVersionMapper.class,
+              mapper ->
+                  mapper.softDeletePolicyVersionsByRetentionLine(
+                      policyMaxVersion.getPolicyId(), versionRetentionLine, 
limit));
+      totalDeletedCount += deletedCount;
+
+      // log the deletion by max policy version.
+      LOG.info(
+          "Soft delete policyVersions count: {} which versions are smaller 
than or equal to"
+              + " versionRetentionLine: {}, the current policyId and 
maxVersion is: <{}, {}>.",
+          deletedCount,
+          versionRetentionLine,
+          policyMaxVersion.getPolicyId(),
+          policyMaxVersion.getVersion());
+    }
+    return totalDeletedCount;
+  }
+
+  private PolicyPO getPolicyPOByMetalakeAndName(String metalakeName, String 
policyName) {
+    PolicyPO policyPO =
+        SessionUtils.getWithoutCommit(
+            PolicyMetaMapper.class,
+            mapper -> mapper.selectTagMetaByMetalakeAndName(metalakeName, 
policyName));
+
+    if (policyPO == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.POLICY.name().toLowerCase(),
+          policyName);
+    }
+    return policyPO;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
index 2b299886ee..5763f96d37 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.collect.Lists;
 import java.time.Instant;
 import java.util.Collections;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -59,6 +60,7 @@ import org.apache.gravitino.meta.TagEntity;
 import org.apache.gravitino.meta.TopicEntity;
 import org.apache.gravitino.meta.UserEntity;
 import org.apache.gravitino.policy.Policy;
+import org.apache.gravitino.policy.PolicyContent;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.expressions.Expression;
 import org.apache.gravitino.rel.types.Type;
@@ -76,6 +78,7 @@ import 
org.apache.gravitino.storage.relational.po.ModelVersionAliasRelPO;
 import org.apache.gravitino.storage.relational.po.ModelVersionPO;
 import org.apache.gravitino.storage.relational.po.OwnerRelPO;
 import org.apache.gravitino.storage.relational.po.PolicyPO;
+import org.apache.gravitino.storage.relational.po.PolicyVersionPO;
 import org.apache.gravitino.storage.relational.po.RolePO;
 import org.apache.gravitino.storage.relational.po.SchemaPO;
 import org.apache.gravitino.storage.relational.po.SecurableObjectPO;
@@ -677,6 +680,78 @@ public class POConverters {
     }
   }
 
+  public static boolean checkPolicyVersionNeedUpdate(
+      PolicyVersionPO oldPolicyVersionPO, PolicyEntity newPolicy) {
+    if (!StringUtils.equals(oldPolicyVersionPO.getPolicyComment(), 
newPolicy.comment())
+        || oldPolicyVersionPO.isEnabled() != newPolicy.enabled()) {
+      return true;
+    }
+
+    try {
+      PolicyContent oldContent =
+          JsonUtils.anyFieldMapper()
+              .readValue(
+                  oldPolicyVersionPO.getContent(),
+                  
Policy.BuiltInType.fromPolicyType(newPolicy.policyType()).contentClass());
+      if (oldContent == null) {
+        return newPolicy.content() != null;
+      }
+      return !oldContent.equals(newPolicy.content());
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to deserialize json object:", e);
+    }
+  }
+
+  public static PolicyPO updatePolicyPOWithVersion(
+      PolicyPO oldPolicyPO, PolicyEntity newPolicy, boolean needUpdateVersion) 
{
+    try {
+      Long lastVersion = oldPolicyPO.getLastVersion();
+      Long currentVersion;
+      PolicyVersionPO newPolicyVersionPO;
+      // Will set the version to the last version + 1
+      if (needUpdateVersion) {
+        lastVersion++;
+        currentVersion = lastVersion;
+        newPolicyVersionPO =
+            PolicyVersionPO.builder()
+                .withMetalakeId(oldPolicyPO.getMetalakeId())
+                .withPolicyId(newPolicy.id())
+                .withVersion(currentVersion)
+                .withPolicyComment(newPolicy.comment())
+                .withEnabled(newPolicy.enabled())
+                
.withContent(JsonUtils.anyFieldMapper().writeValueAsString(newPolicy.content()))
+                .withDeletedAt(DEFAULT_DELETED_AT)
+                .build();
+      } else {
+        currentVersion = oldPolicyPO.getCurrentVersion();
+        newPolicyVersionPO = oldPolicyPO.getPolicyVersionPO();
+      }
+      return PolicyPO.builder()
+          .withPolicyId(newPolicy.id())
+          .withPolicyName(newPolicy.name())
+          .withPolicyType(newPolicy.policyType())
+          .withMetalakeId(oldPolicyPO.getMetalakeId())
+          .withInheritable(newPolicy.inheritable())
+          .withExclusive(newPolicy.exclusive())
+          .withSupportedObjectTypes(
+              JsonUtils.anyFieldMapper()
+                  .writeValueAsString(
+                      // Sort the supported object types to ensure consistent 
ordering
+                      newPolicy.supportedObjectTypes().stream()
+                          .map(Enum::name)
+                          .sorted()
+                          
.collect(Collectors.toCollection(LinkedHashSet::new))))
+          
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(newPolicy.auditInfo()))
+          .withCurrentVersion(currentVersion)
+          .withLastVersion(lastVersion)
+          .withDeletedAt(DEFAULT_DELETED_AT)
+          .withPolicyVersionPO(newPolicyVersionPO)
+          .build();
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to serialize json object:", e);
+    }
+  }
+
   /**
    * Convert {@link FilesetPO} to {@link FilesetEntity}
    *
@@ -1339,6 +1414,45 @@ public class POConverters {
     }
   }
 
+  public static PolicyPO initializePolicyPOWithVersion(
+      PolicyEntity policyEntity, PolicyPO.Builder builder) {
+    try {
+      String content = 
JsonUtils.anyFieldMapper().writeValueAsString(policyEntity.content());
+      PolicyVersionPO policyVersionPO =
+          PolicyVersionPO.builder()
+              .withMetalakeId(builder.getMetalakeId())
+              .withPolicyId(policyEntity.id())
+              .withVersion(INIT_VERSION)
+              .withPolicyComment(policyEntity.comment())
+              .withEnabled(policyEntity.enabled())
+              .withContent(content)
+              .withDeletedAt(DEFAULT_DELETED_AT)
+              .build();
+      return builder
+          .withPolicyId(policyEntity.id())
+          .withPolicyName(policyEntity.name())
+          .withPolicyType(policyEntity.policyType())
+          .withInheritable(policyEntity.inheritable())
+          .withExclusive(policyEntity.exclusive())
+          .withSupportedObjectTypes(
+              JsonUtils.anyFieldMapper()
+                  .writeValueAsString(
+                      // Sort the supported object types to ensure consistent 
ordering
+                      policyEntity.supportedObjectTypes().stream()
+                          .map(Enum::name)
+                          .sorted()
+                          
.collect(Collectors.toCollection(LinkedHashSet::new))))
+          
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(policyEntity.auditInfo()))
+          .withCurrentVersion(INIT_VERSION)
+          .withLastVersion(INIT_VERSION)
+          .withDeletedAt(DEFAULT_DELETED_AT)
+          .withPolicyVersionPO(policyVersionPO)
+          .build();
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to serialize json object:", e);
+    }
+  }
+
   public static OwnerRelPO initializeOwnerRelPOsWithVersion(
       Long metalakeId,
       String ownerType,
diff --git 
a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java 
b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
index e8d25169ce..e556cbce9f 100644
--- a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
@@ -101,6 +101,17 @@ public class NameIdentifierUtil {
     return NameIdentifier.of(NamespaceUtil.ofTag(metalake), tagName);
   }
 
+  /**
+   * Create the policy {@link NameIdentifier} with the given metalake and 
policy name.
+   *
+   * @param metalake The metalake name
+   * @param policyName The policy name
+   * @return the created policy {@link NameIdentifier}
+   */
+  public static NameIdentifier ofPolicy(String metalake, String policyName) {
+    return NameIdentifier.of(NamespaceUtil.ofPolicy(metalake), policyName);
+  }
+
   /**
    * Create the user {@link NameIdentifier} with the given metalake and 
username.
    *
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
index 7c0169ce6c..56337a18aa 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
@@ -30,6 +30,7 @@ import static org.apache.gravitino.Configs.ENTITY_STORE;
 import static org.apache.gravitino.Configs.RELATIONAL_ENTITY_STORE;
 import static org.apache.gravitino.SupportsRelationOperations.Type.OWNER_REL;
 import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
+import static org.apache.gravitino.policy.Policy.SUPPORTS_ALL_OBJECT_TYPES;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -41,6 +42,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.sql.Connection;
@@ -72,6 +74,7 @@ import org.apache.gravitino.meta.FilesetEntity;
 import org.apache.gravitino.meta.GroupEntity;
 import org.apache.gravitino.meta.ModelEntity;
 import org.apache.gravitino.meta.ModelVersionEntity;
+import org.apache.gravitino.meta.PolicyEntity;
 import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.SchemaVersion;
@@ -79,6 +82,7 @@ import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.meta.TagEntity;
 import org.apache.gravitino.meta.TopicEntity;
 import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.policy.PolicyContents;
 import org.apache.gravitino.storage.RandomIdGenerator;
 import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.GroupMetaMapper;
@@ -96,9 +100,12 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.condition.EnabledIf;
 import org.mockito.Mockito;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
+// Avoid re-executing tests in subclasses
+@EnabledIf("isDirectlyRunningThisClass")
 public class TestJDBCBackend {
   private final String JDBC_STORE_PATH =
       "/tmp/gravitino_jdbc_entityStore_" + 
UUID.randomUUID().toString().replace("-", "");
@@ -110,6 +117,10 @@ public class TestJDBCBackend {
           Configs.DEFAULT_ENTITY_RELATIONAL_STORE, 
JDBCBackend.class.getCanonicalName());
   protected RelationalBackend backend;
 
+  static boolean isDirectlyRunningThisClass() {
+    return MethodHandles.lookup().lookupClass() == TestJDBCBackend.class;
+  }
+
   @BeforeAll
   public void setup() {
     File dir = new File(DB_DIR);
@@ -215,6 +226,21 @@ public class TestJDBCBackend {
     backend.insert(metalake, false);
     assertThrows(EntityAlreadyExistsException.class, () -> 
backend.insert(metalakeCopy, false));
 
+    PolicyEntity policy =
+        createPolicy(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofPolicy("metalake"),
+            "policy",
+            auditInfo);
+    PolicyEntity policyCopy =
+        createPolicy(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofPolicy("metalake"),
+            "policy",
+            auditInfo);
+    backend.insert(policy, false);
+    assertThrows(EntityAlreadyExistsException.class, () -> 
backend.insert(policyCopy, false));
+
     CatalogEntity catalog =
         createCatalog(
             RandomIdGenerator.INSTANCE.nextId(),
@@ -333,6 +359,28 @@ public class TestJDBCBackend {
                 Entity.EntityType.METALAKE,
                 e -> createBaseMakeLake(metalakeCopy.id(), "metalake", 
auditInfo)));
 
+    PolicyEntity policy =
+        createPolicy(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofPolicy("metalake"),
+            "policy",
+            auditInfo);
+    PolicyEntity policy1 =
+        createPolicy(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofPolicy("metalake"),
+            "policy1",
+            auditInfo);
+    backend.insert(policy, false);
+    backend.insert(policy1, false);
+    assertThrows(
+        EntityAlreadyExistsException.class,
+        () ->
+            backend.update(
+                policy1.nameIdentifier(),
+                Entity.EntityType.POLICY,
+                e -> createPolicy(policy1.id(), policy1.namespace(), "policy", 
auditInfo)));
+
     CatalogEntity catalog =
         createCatalog(
             RandomIdGenerator.INSTANCE.nextId(),
@@ -550,6 +598,30 @@ public class TestJDBCBackend {
         createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), "metalake", 
auditInfo);
     backend.insert(metalake, false);
 
+    PolicyEntity policy =
+        createPolicy(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofPolicy("metalake"),
+            "policy",
+            auditInfo);
+    backend.insert(policy, false);
+    // update policy enabled and version
+    PolicyEntity policyV2 =
+        PolicyEntity.builder()
+            .withId(policy.id())
+            .withNamespace(policy.namespace())
+            .withName(policy.name())
+            .withPolicyType(policy.policyType())
+            .withComment(policy.comment())
+            .withEnabled(!policy.enabled())
+            .withExclusive(policy.exclusive())
+            .withInheritable(policy.inheritable())
+            .withSupportedObjectTypes(policy.supportedObjectTypes())
+            .withContent(policy.content())
+            .withAuditInfo(auditInfo)
+            .build();
+    backend.update(policy.nameIdentifier(), Entity.EntityType.POLICY, e -> 
policyV2);
+
     CatalogEntity catalog =
         createCatalog(
             RandomIdGenerator.INSTANCE.nextId(),
@@ -655,6 +727,46 @@ public class TestJDBCBackend {
         createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), 
"another-metalake", auditInfo);
     backend.insert(anotherMetaLake, false);
 
+    PolicyEntity anotherPolicy =
+        createPolicy(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofPolicy("another-metalake"),
+            "another-policy",
+            auditInfo);
+    backend.insert(anotherPolicy, false);
+    // update another policy enabled and version
+    PolicyEntity anotherPolicyV2 =
+        PolicyEntity.builder()
+            .withId(anotherPolicy.id())
+            .withNamespace(anotherPolicy.namespace())
+            .withName(anotherPolicy.name())
+            .withPolicyType(anotherPolicy.policyType())
+            .withComment(anotherPolicy.comment())
+            .withEnabled(!anotherPolicy.enabled())
+            .withExclusive(anotherPolicy.exclusive())
+            .withInheritable(anotherPolicy.inheritable())
+            .withSupportedObjectTypes(anotherPolicy.supportedObjectTypes())
+            .withContent(anotherPolicy.content())
+            .withAuditInfo(auditInfo)
+            .build();
+    backend.update(anotherPolicy.nameIdentifier(), Entity.EntityType.POLICY, e 
-> anotherPolicyV2);
+    // update another policy comment and version
+    PolicyEntity anotherPolicyV3 =
+        PolicyEntity.builder()
+            .withId(anotherPolicy.id())
+            .withNamespace(anotherPolicy.namespace())
+            .withName(anotherPolicy.name())
+            .withPolicyType(anotherPolicy.policyType())
+            .withComment("v3")
+            .withEnabled(anotherPolicyV2.enabled())
+            .withExclusive(anotherPolicy.exclusive())
+            .withInheritable(anotherPolicy.inheritable())
+            .withSupportedObjectTypes(SUPPORTS_ALL_OBJECT_TYPES)
+            .withContent(anotherPolicy.content())
+            .withAuditInfo(auditInfo)
+            .build();
+    backend.update(anotherPolicy.nameIdentifier(), Entity.EntityType.POLICY, e 
-> anotherPolicyV3);
+
     CatalogEntity anotherCatalog =
         createCatalog(
             RandomIdGenerator.INSTANCE.nextId(),
@@ -743,6 +855,11 @@ public class TestJDBCBackend {
         backend.list(metalake.namespace(), Entity.EntityType.METALAKE, true);
     assertTrue(metaLakes.contains(metalake));
 
+    List<PolicyEntity> policies = backend.list(policy.namespace(), 
Entity.EntityType.POLICY, true);
+    assertFalse(policies.contains(policy));
+    assertTrue(policies.contains(policyV2));
+    assertEquals(policyV2.enabled(), 
policies.get(policies.indexOf(policyV2)).enabled());
+
     List<CatalogEntity> catalogs =
         backend.list(catalog.namespace(), Entity.EntityType.CATALOG, true);
     assertTrue(catalogs.contains(catalog));
@@ -872,6 +989,9 @@ public class TestJDBCBackend {
     assertFalse(backend.exists(metalake.nameIdentifier(), 
Entity.EntityType.METALAKE));
     assertTrue(backend.exists(anotherMetaLake.nameIdentifier(), 
Entity.EntityType.METALAKE));
 
+    assertFalse(backend.exists(policy.nameIdentifier(), 
Entity.EntityType.POLICY));
+    assertTrue(backend.exists(anotherPolicy.nameIdentifier(), 
Entity.EntityType.POLICY));
+
     assertFalse(backend.exists(catalog.nameIdentifier(), 
Entity.EntityType.CATALOG));
     assertTrue(backend.exists(anotherCatalog.nameIdentifier(), 
Entity.EntityType.CATALOG));
 
@@ -911,6 +1031,7 @@ public class TestJDBCBackend {
 
     // check legacy record after soft delete
     assertTrue(legacyRecordExistsInDB(metalake.id(), 
Entity.EntityType.METALAKE));
+    assertTrue(legacyRecordExistsInDB(policy.id(), Entity.EntityType.POLICY));
     assertTrue(legacyRecordExistsInDB(catalog.id(), 
Entity.EntityType.CATALOG));
     assertTrue(legacyRecordExistsInDB(schema.id(), Entity.EntityType.SCHEMA));
     assertTrue(legacyRecordExistsInDB(table.id(), Entity.EntityType.TABLE));
@@ -926,6 +1047,8 @@ public class TestJDBCBackend {
     assertEquals(2, countRoleRels(anotherRole.id()));
     assertEquals(2, listFilesetVersions(fileset.id()).size());
     assertEquals(3, listFilesetVersions(anotherFileset.id()).size());
+    assertEquals(2, listPolicyVersions(policy.id()).size());
+    assertEquals(3, listPolicyVersions(anotherPolicy.id()).size());
     assertTrue(legacyRecordExistsInDB(tag.id(), Entity.EntityType.TAG));
 
     // meta data hard delete
@@ -942,9 +1065,11 @@ public class TestJDBCBackend {
     assertFalse(legacyRecordExistsInDB(role.id(), Entity.EntityType.ROLE));
     assertFalse(legacyRecordExistsInDB(user.id(), Entity.EntityType.USER));
     assertFalse(legacyRecordExistsInDB(group.id(), Entity.EntityType.GROUP));
+    assertFalse(legacyRecordExistsInDB(policy.id(), Entity.EntityType.POLICY));
     assertEquals(0, countRoleRels(role.id()));
     assertEquals(2, countRoleRels(anotherRole.id()));
     assertEquals(0, listFilesetVersions(fileset.id()).size());
+    assertEquals(0, listPolicyVersions(policy.id()).size());
     assertFalse(legacyRecordExistsInDB(tag.id(), Entity.EntityType.TAG));
     assertEquals(0, countOwnerRel(metalake.id()));
     assertEquals(1, countOwnerRel(anotherMetaLake.id()));
@@ -962,6 +1087,16 @@ public class TestJDBCBackend {
     // hard delete for old version fileset
     backend.hardDeleteLegacyData(Entity.EntityType.FILESET, 
Instant.now().toEpochMilli() + 1000);
     assertEquals(1, listFilesetVersions(anotherFileset.id()).size());
+
+    // soft delete for old version policy
+    assertEquals(3, listPolicyVersions(anotherPolicy.id()).size());
+    for (Entity.EntityType entityType : Entity.EntityType.values()) {
+      backend.deleteOldVersionData(entityType, 1);
+    }
+    Map<Integer, Long> versionDeletedMap2 = 
listPolicyVersions(anotherPolicy.id());
+    assertEquals(3, versionDeletedMap2.size());
+    assertEquals(1, versionDeletedMap2.values().stream().filter(value -> value 
== 0L).count());
+    assertEquals(2, versionDeletedMap2.values().stream().filter(value -> value 
!= 0L).count());
   }
 
   @Test
@@ -1105,6 +1240,10 @@ public class TestJDBCBackend {
         tableName = "tag_meta";
         idColumnName = "tag_id";
         break;
+      case POLICY:
+        tableName = "policy_meta";
+        idColumnName = "policy_id";
+        break;
       default:
         throw new IllegalArgumentException("Unsupported entity type: " + 
entityType);
     }
@@ -1144,6 +1283,26 @@ public class TestJDBCBackend {
     return versionDeletedTime;
   }
 
+  private Map<Integer, Long> listPolicyVersions(Long policyId) {
+    Map<Integer, Long> versionDeletedTime = new HashMap<>();
+    try (SqlSession sqlSession =
+            
SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true);
+        Connection connection = sqlSession.getConnection();
+        Statement statement = connection.createStatement();
+        ResultSet rs =
+            statement.executeQuery(
+                String.format(
+                    "SELECT version, deleted_at FROM policy_version_info WHERE 
policy_id = %d",
+                    policyId))) {
+      while (rs.next()) {
+        versionDeletedTime.put(rs.getInt("version"), rs.getLong("deleted_at"));
+      }
+    } catch (SQLException e) {
+      throw new RuntimeException("SQL execution failed", e);
+    }
+    return versionDeletedTime;
+  }
+
   private Integer countRoleRels(Long roleId) {
     int count = 0;
     try (SqlSession sqlSession =
@@ -1329,6 +1488,22 @@ public class TestJDBCBackend {
         .build();
   }
 
+  public static PolicyEntity createPolicy(Long id, Namespace ns, String name, 
AuditInfo auditInfo) {
+    return PolicyEntity.builder()
+        .withId(id)
+        .withNamespace(ns)
+        .withName(name)
+        .withPolicyType("test")
+        .withComment("")
+        .withEnabled(true)
+        .withExclusive(true)
+        .withInheritable(true)
+        .withSupportedObjectTypes(SUPPORTS_ALL_OBJECT_TYPES)
+        .withContent(PolicyContents.custom(ImmutableMap.of("filed1", 123), 
null))
+        .withAuditInfo(auditInfo)
+        .build();
+  }
+
   public static SchemaEntity createSchemaEntity(
       Long id, Namespace namespace, String name, AuditInfo auditInfo) {
     return SchemaEntity.builder()
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPolicyMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPolicyMetaService.java
new file mode 100644
index 0000000000..5f89469fb9
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPolicyMetaService.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.PolicyEntity;
+import org.apache.gravitino.policy.PolicyContent;
+import org.apache.gravitino.policy.PolicyContents;
+import org.apache.gravitino.storage.RandomIdGenerator;
+import org.apache.gravitino.storage.relational.TestJDBCBackend;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestPolicyMetaService extends TestJDBCBackend {
+  private final String metalakeName = "metalake_for_policy_test";
+
+  private final AuditInfo auditInfo =
+      
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
+
+  private final PolicyContent content = 
PolicyContents.custom(ImmutableMap.of("filed1", 123), null);
+
+  private final Set<MetadataObject.Type> supportedObjectTypes =
+      new HashSet<MetadataObject.Type>() {
+        {
+          add(MetadataObject.Type.CATALOG);
+          add(MetadataObject.Type.SCHEMA);
+        }
+      };
+
+  @Test
+  public void testInsertAndGetPolicyByIdentifier() throws IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
+    backend.insert(metalake, false);
+
+    // Test no policy entity.
+    PolicyMetaService policyMetaService = PolicyMetaService.getInstance();
+    Exception excep =
+        Assertions.assertThrows(
+            NoSuchEntityException.class,
+            () ->
+                policyMetaService.getPolicyByIdentifier(
+                    NameIdentifierUtil.ofPolicy(metalakeName, "policy1")));
+    Assertions.assertEquals("No such policy entity: policy1", 
excep.getMessage());
+
+    // Test get policy entity
+    PolicyEntity policyEntity =
+        PolicyEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("policy1")
+            .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+            .withComment("comment")
+            .withPolicyType("test")
+            .withContent(content)
+            .withEnabled(true)
+            .withSupportedObjectTypes(supportedObjectTypes)
+            .withAuditInfo(auditInfo)
+            .build();
+    policyMetaService.insertPolicy(policyEntity, false);
+
+    PolicyEntity resultpolicyEntity =
+        policyMetaService.getPolicyByIdentifier(
+            NameIdentifierUtil.ofPolicy(metalakeName, "policy1"));
+    Assertions.assertEquals(policyEntity, resultpolicyEntity);
+
+    // Test with null comment and content properties.
+    PolicyEntity PolicyEntity1 =
+        PolicyEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("policy2")
+            .withPolicyType("test")
+            .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+            .withAuditInfo(auditInfo)
+            .withSupportedObjectTypes(supportedObjectTypes)
+            .withContent(content)
+            .build();
+
+    policyMetaService.insertPolicy(PolicyEntity1, false);
+    PolicyEntity resultPolicyEntity1 =
+        policyMetaService.getPolicyByIdentifier(
+            NameIdentifierUtil.ofPolicy(metalakeName, "policy2"));
+    Assertions.assertEquals(PolicyEntity1, resultPolicyEntity1);
+    Assertions.assertTrue(resultPolicyEntity1.enabled());
+    Assertions.assertNull(resultPolicyEntity1.comment());
+    Assertions.assertEquals(supportedObjectTypes, 
resultPolicyEntity1.supportedObjectTypes());
+    Assertions.assertNull(resultPolicyEntity1.content().properties());
+
+    // Test insert with overwrite.
+    PolicyEntity PolicyEntity2 =
+        PolicyEntity.builder()
+            .withId(PolicyEntity1.id())
+            .withName("policy3")
+            .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+            .withComment("comment")
+            .withPolicyType("test")
+            .withSupportedObjectTypes(supportedObjectTypes)
+            .withContent(content)
+            .withAuditInfo(auditInfo)
+            .build();
+
+    Assertions.assertThrows(
+        Exception.class, () -> policyMetaService.insertPolicy(PolicyEntity2, 
false));
+
+    policyMetaService.insertPolicy(PolicyEntity2, true);
+
+    PolicyEntity resultPolicyEntity2 =
+        policyMetaService.getPolicyByIdentifier(
+            NameIdentifierUtil.ofPolicy(metalakeName, "policy3"));
+    Assertions.assertEquals(PolicyEntity2, resultPolicyEntity2);
+  }
+
+  @Test
+  public void testCreateAndListPolicies() throws IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
+    backend.insert(metalake, false);
+
+    PolicyMetaService policyMetaService = PolicyMetaService.getInstance();
+    PolicyEntity policyEntity1 =
+        PolicyEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("policy1")
+            .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+            .withComment("comment")
+            .withPolicyType("test")
+            .withContent(content)
+            .withSupportedObjectTypes(supportedObjectTypes)
+            .withAuditInfo(auditInfo)
+            .build();
+    policyMetaService.insertPolicy(policyEntity1, false);
+
+    PolicyEntity policyEntity2 =
+        PolicyEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("policy2")
+            .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+            .withComment("comment")
+            .withPolicyType("test")
+            .withContent(content)
+            .withSupportedObjectTypes(supportedObjectTypes)
+            .withAuditInfo(auditInfo)
+            .build();
+    policyMetaService.insertPolicy(policyEntity2, false);
+
+    List<PolicyEntity> policyEntities =
+        
policyMetaService.listPoliciesByNamespace(NamespaceUtil.ofPolicy(metalakeName));
+    Assertions.assertEquals(2, policyEntities.size());
+    Assertions.assertTrue(policyEntities.contains(policyEntity1));
+    Assertions.assertTrue(policyEntities.contains(policyEntity2));
+  }
+
+  @Test
+  public void testUpdatePolicy() throws IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
+    backend.insert(metalake, false);
+
+    PolicyMetaService policyMetaService = PolicyMetaService.getInstance();
+    PolicyEntity policyEntity1 =
+        PolicyEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("policy1")
+            .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+            .withComment("comment")
+            .withPolicyType("test")
+            .withSupportedObjectTypes(supportedObjectTypes)
+            .withContent(content)
+            .withAuditInfo(auditInfo)
+            .build();
+    policyMetaService.insertPolicy(policyEntity1, false);
+
+    // Update with no policy entity.
+    Exception excep =
+        Assertions.assertThrows(
+            NoSuchEntityException.class,
+            () ->
+                policyMetaService.updatePolicy(
+                    NameIdentifierUtil.ofPolicy(metalakeName, "policy2"),
+                    policyEntity -> policyEntity));
+    Assertions.assertEquals("No such policy entity: policy2", 
excep.getMessage());
+
+    // Update policy entity.
+    PolicyEntity policyEntity2 =
+        PolicyEntity.builder()
+            .withId(policyEntity1.id())
+            .withName("policy1")
+            .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+            .withComment("comment1")
+            .withPolicyType("test")
+            .withSupportedObjectTypes(supportedObjectTypes)
+            .withContent(content)
+            .withAuditInfo(auditInfo)
+            .build();
+    PolicyEntity updatedPolicyEntity =
+        policyMetaService.updatePolicy(
+            NameIdentifierUtil.ofPolicy(metalakeName, "policy1"), policyEntity 
-> policyEntity2);
+    Assertions.assertEquals(policyEntity2, updatedPolicyEntity);
+
+    PolicyEntity loadedPolicyEntity =
+        policyMetaService.getPolicyByIdentifier(
+            NameIdentifierUtil.ofPolicy(metalakeName, "policy1"));
+    Assertions.assertEquals(policyEntity2, loadedPolicyEntity);
+
+    // Update with different id.
+    PolicyEntity policyEntity3 =
+        PolicyEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("policy1")
+            .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+            .withComment("comment1")
+            .withPolicyType("test")
+            .withSupportedObjectTypes(supportedObjectTypes)
+            .withContent(content)
+            .withAuditInfo(auditInfo)
+            .build();
+
+    Exception excep1 =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                policyMetaService.updatePolicy(
+                    NameIdentifierUtil.ofPolicy(metalakeName, "policy1"),
+                    policyEntity -> policyEntity3));
+    Assertions.assertEquals(
+        "The updated policy entity id: "
+            + policyEntity3.id()
+            + " must have the same id as the old "
+            + "entity id "
+            + policyEntity2.id(),
+        excep1.getMessage());
+
+    PolicyEntity loadedPolicyEntity1 =
+        policyMetaService.getPolicyByIdentifier(
+            NameIdentifierUtil.ofPolicy(metalakeName, "policy1"));
+    Assertions.assertEquals(policyEntity2, loadedPolicyEntity1);
+  }
+
+  @Test
+  public void testDeletePolicy() throws IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
+    backend.insert(metalake, false);
+
+    PolicyMetaService policyMetaService = PolicyMetaService.getInstance();
+    PolicyEntity policyEntity1 =
+        PolicyEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("policy1")
+            .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+            .withComment("comment")
+            .withPolicyType("test")
+            .withSupportedObjectTypes(supportedObjectTypes)
+            .withContent(content)
+            .withAuditInfo(auditInfo)
+            .build();
+    policyMetaService.insertPolicy(policyEntity1, false);
+
+    boolean deleted =
+        
policyMetaService.deletePolicy(NameIdentifierUtil.ofPolicy(metalakeName, 
"policy1"));
+    Assertions.assertTrue(deleted);
+
+    deleted = 
policyMetaService.deletePolicy(NameIdentifierUtil.ofPolicy(metalakeName, 
"policy1"));
+    Assertions.assertFalse(deleted);
+
+    Exception excep =
+        Assertions.assertThrows(
+            NoSuchEntityException.class,
+            () ->
+                policyMetaService.getPolicyByIdentifier(
+                    NameIdentifierUtil.ofPolicy(metalakeName, "policy1")));
+    Assertions.assertEquals("No such policy entity: policy1", 
excep.getMessage());
+  }
+
+  @Test
+  public void testDeleteMetalake() throws IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
+    backend.insert(metalake, false);
+
+    PolicyMetaService policyMetaService = PolicyMetaService.getInstance();
+    PolicyEntity policyEntity1 =
+        PolicyEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("policy1")
+            .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+            .withComment("comment")
+            .withPolicyType("test")
+            .withSupportedObjectTypes(supportedObjectTypes)
+            .withContent(content)
+            .withAuditInfo(auditInfo)
+            .build();
+    policyMetaService.insertPolicy(policyEntity1, false);
+
+    Assertions.assertTrue(
+        
MetalakeMetaService.getInstance().deleteMetalake(metalake.nameIdentifier(), 
false));
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            policyMetaService.getPolicyByIdentifier(
+                NameIdentifierUtil.ofPolicy(metalakeName, "policy1")));
+
+    // Test delete metalake with cascade.
+    BaseMetalake metalake1 =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName + 
"1", auditInfo);
+    backend.insert(metalake1, false);
+
+    PolicyEntity policyEntity2 =
+        PolicyEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("policy2")
+            .withNamespace(NamespaceUtil.ofPolicy(metalakeName + "1"))
+            .withComment("comment")
+            .withPolicyType("test")
+            .withSupportedObjectTypes(supportedObjectTypes)
+            .withContent(content)
+            .withAuditInfo(auditInfo)
+            .build();
+
+    policyMetaService.insertPolicy(policyEntity2, false);
+    Assertions.assertTrue(
+        
MetalakeMetaService.getInstance().deleteMetalake(metalake1.nameIdentifier(), 
true));
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            policyMetaService.getPolicyByIdentifier(
+                NameIdentifierUtil.ofPolicy(metalakeName + "1", "policy2")));
+  }
+}

Reply via email to