This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 60f4bebbc [#4101] feat(core): Support PostgreSQL storage backend 
(#4611)
60f4bebbc is described below

commit 60f4bebbc6eec4ef16ed10ad7530af09d7c34985
Author: Qi Yu <[email protected]>
AuthorDate: Sun Sep 8 11:38:24 2024 +0800

    [#4101] feat(core): Support PostgreSQL storage backend (#4611)
    
    ### What changes were proposed in this pull request?
    
    - Support PostgreSQL JDBC backend.
    
    ### Why are the changes needed?
    
    User need.
    
    Fix: #4101
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A
    
    ### How was this patch tested?
    
    Existing ITs
    
    ---------
    
    Co-authored-by: Jerry Shao <[email protected]>
---
 .github/workflows/backend-integration-test.yml     |  18 +-
 catalogs/catalog-hadoop/build.gradle.kts           |   1 +
 catalogs/catalog-hive/build.gradle.kts             |   1 +
 catalogs/catalog-jdbc-doris/build.gradle.kts       |   1 +
 catalogs/catalog-jdbc-mysql/build.gradle.kts       |   1 +
 catalogs/catalog-kafka/build.gradle.kts            |   1 +
 catalogs/catalog-lakehouse-paimon/build.gradle.kts |   1 +
 .../converters/PostgreSQLExceptionConverter.java   |  47 +++
 .../converters/SQLExceptionConverterFactory.java   |   2 +
 .../mapper/CatalogMetaSQLProviderFactory.java      |   4 +-
 .../mapper/FilesetMetaSQLProviderFactory.java      |   4 +-
 .../mapper/FilesetVersionSQLProviderFactory.java   |   4 +-
 .../mapper/GroupMetaSQLProviderFactory.java        |   4 +-
 .../mapper/GroupRoleRelSQLProviderFactory.java     |   4 +-
 .../mapper/MetalakeMetaSQLProviderFactory.java     |   4 +-
 .../mapper/OwnerMetaSQLProviderFactory.java        |   4 +-
 .../mapper/RoleMetaSQLProviderFactory.java         |   4 +-
 .../mapper/SchemaMetaSQLProviderFactory.java       |   4 +-
 .../mapper/SecurableObjectSQLProviderFactory.java  |   4 +-
 .../mapper/TableMetaSQLProviderFactory.java        |   4 +-
 .../mapper/TagMetaSQLProviderFactory.java          |   4 +-
 .../TagMetadataObjectRelSQLProviderFactory.java    |   4 +-
 .../mapper/TopicMetaSQLProviderFactory.java        |   4 +-
 .../mapper/UserMetaSQLProviderFactory.java         |   4 +-
 .../mapper/UserRoleRelSQLProviderFactory.java      |   4 +-
 .../postgresql/CatalogMetaPostgreSQLProvider.java  | 108 +++++
 .../postgresql/FilesetMetaPostgreSQLProvider.java  |  93 +++++
 .../FilesetVersionPostgreSQLProvider.java          | 102 +++++
 .../postgresql/GroupMetaPostgreSQLProvider.java    |  69 ++++
 .../postgresql/GroupRoleRelPostgreSQLProvider.java |  73 ++++
 .../postgresql/MetalakeMetaPostgreSQLProvider.java |  89 ++++
 .../postgresql/OwnerMetaPostgreSQLProvider.java    | 120 ++++++
 .../postgresql/RoleMetaPostgreSQLProvider.java     |  70 ++++
 .../postgresql/SchemaMetaPostgreSQLProvider.java   |  84 ++++
 .../SecurableObjectPostgreSQLProvider.java         |  47 +++
 .../postgresql/TableMetaPostgreSQLProvider.java    |  91 ++++
 .../postgresql/TagMetaPostgreSQLProvider.java      | 102 +++++
 .../TagMetadataObjectRelPostgreSQLProvider.java    |  89 ++++
 .../postgresql/TopicMetaPostgreSQLProvider.java    |  96 +++++
 .../postgresql/UserMetaPostgreSQLProvider.java     |  69 ++++
 .../postgresql/UserRoleRelPostgreSQLProvider.java  | 102 +++++
 .../test/container/PostgreSQLContainer.java        |   2 +-
 .../integration/test/util/AbstractIT.java          |  70 ++++
 .../integration/test/util/TestDatabaseName.java    |   2 +
 scripts/postgresql/schema-0.7.0-postgresql.sql     | 458 +++++++++++++++++++++
 45 files changed, 2050 insertions(+), 23 deletions(-)

diff --git a/.github/workflows/backend-integration-test.yml 
b/.github/workflows/backend-integration-test.yml
index 1c6596f77..c4c0a4467 100644
--- a/.github/workflows/backend-integration-test.yml
+++ b/.github/workflows/backend-integration-test.yml
@@ -61,12 +61,15 @@ jobs:
         # Integration test for AMD64 architecture
         architecture: [linux/amd64]
         java-version: [ 8, 11, 17 ]
+        backend: [ h2, mysql, postgresql ]
         test-mode: [ embedded, deploy ]
-        include:
+        exclude:
           - test-mode: 'embedded'
-            backend: 'h2'
-          - test-mode: 'deploy'
             backend: 'mysql'
+          - test-mode: 'embedded'
+            backend: 'postgresql'
+          - test-mode: 'deploy'
+            backend: 'h2'
 
     env:
       PLATFORM: ${{ matrix.architecture }}
@@ -132,11 +135,14 @@ jobs:
         architecture: [ linux/amd64 ]
         java-version: [ 17 ]
         test-mode: [ embedded, deploy ]
-        include:
+        backend: [ h2, mysql, postgresql ]
+        exclude:
           - test-mode: 'embedded'
-            backend: 'h2'
-          - test-mode: 'deploy'
             backend: 'mysql'
+          - test-mode: 'embedded'
+            backend: 'postgresql'
+          - test-mode: 'deploy'
+            backend: 'h2'
 
     env:
       PLATFORM: ${{ matrix.architecture }}
diff --git a/catalogs/catalog-hadoop/build.gradle.kts 
b/catalogs/catalog-hadoop/build.gradle.kts
index ea38a895f..429cd4052 100644
--- a/catalogs/catalog-hadoop/build.gradle.kts
+++ b/catalogs/catalog-hadoop/build.gradle.kts
@@ -79,6 +79,7 @@ dependencies {
   testImplementation(libs.mockito.core)
   testImplementation(libs.mockito.inline)
   testImplementation(libs.mysql.driver)
+  testImplementation(libs.postgresql.driver)
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.junit.jupiter.params)
   testImplementation(libs.testcontainers)
diff --git a/catalogs/catalog-hive/build.gradle.kts 
b/catalogs/catalog-hive/build.gradle.kts
index 776e9bf39..081bfcbeb 100644
--- a/catalogs/catalog-hive/build.gradle.kts
+++ b/catalogs/catalog-hive/build.gradle.kts
@@ -132,6 +132,7 @@ dependencies {
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.mockito.core)
   testImplementation(libs.mysql.driver)
+  testImplementation(libs.postgresql.driver)
 
   
testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") {
     exclude("org.apache.hadoop")
diff --git a/catalogs/catalog-jdbc-doris/build.gradle.kts 
b/catalogs/catalog-jdbc-doris/build.gradle.kts
index b7cf6dfc8..ccfaaddfb 100644
--- a/catalogs/catalog-jdbc-doris/build.gradle.kts
+++ b/catalogs/catalog-jdbc-doris/build.gradle.kts
@@ -52,6 +52,7 @@ dependencies {
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.junit.jupiter.params)
   testImplementation(libs.mysql.driver)
+  testImplementation(libs.postgresql.driver)
   testImplementation(libs.testcontainers)
   testImplementation(libs.testcontainers.mysql)
 
diff --git a/catalogs/catalog-jdbc-mysql/build.gradle.kts 
b/catalogs/catalog-jdbc-mysql/build.gradle.kts
index 9ad6c739a..95f0578d3 100644
--- a/catalogs/catalog-jdbc-mysql/build.gradle.kts
+++ b/catalogs/catalog-jdbc-mysql/build.gradle.kts
@@ -55,6 +55,7 @@ dependencies {
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.junit.jupiter.params)
   testImplementation(libs.mysql.driver)
+  testImplementation(libs.postgresql.driver)
   testImplementation(libs.testcontainers)
   testImplementation(libs.testcontainers.mysql)
 
diff --git a/catalogs/catalog-kafka/build.gradle.kts 
b/catalogs/catalog-kafka/build.gradle.kts
index a2493fa77..3165758a1 100644
--- a/catalogs/catalog-kafka/build.gradle.kts
+++ b/catalogs/catalog-kafka/build.gradle.kts
@@ -51,6 +51,7 @@ dependencies {
   testImplementation(libs.mockito.core)
   testImplementation(libs.mockito.inline)
   testImplementation(libs.mysql.driver)
+  testImplementation(libs.postgresql.driver)
   testImplementation(libs.testcontainers)
   testImplementation(libs.testcontainers.mysql)
 
diff --git a/catalogs/catalog-lakehouse-paimon/build.gradle.kts 
b/catalogs/catalog-lakehouse-paimon/build.gradle.kts
index a6adf999d..930e3e335 100644
--- a/catalogs/catalog-lakehouse-paimon/build.gradle.kts
+++ b/catalogs/catalog-lakehouse-paimon/build.gradle.kts
@@ -78,6 +78,7 @@ dependencies {
   testImplementation(libs.slf4j.api)
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.mysql.driver)
+  testImplementation(libs.postgresql.driver)
   testImplementation(libs.bundles.log4j)
   testImplementation(libs.junit.jupiter.params)
   testImplementation(libs.testcontainers)
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/converters/PostgreSQLExceptionConverter.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/converters/PostgreSQLExceptionConverter.java
new file mode 100644
index 000000000..741476093
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/converters/PostgreSQLExceptionConverter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.converters;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityAlreadyExistsException;
+
+/**
+ * Exception converter to Apache Gravitino exception for PostgreSQL. The 
definition of error codes
+ * can be found in the document: <a
+ * href="https://www.postgresql.org/docs/8.4/errcodes-appendix.html";>error 
code of PostgreSQL</a>
+ */
+public class PostgreSQLExceptionConverter implements SQLExceptionConverter {
+  private static final int DUPLICATED_ENTRY_ERROR_CODE = 23505;
+
+  @Override
+  @SuppressWarnings("FormatStringAnnotation")
+  public void toGravitinoException(SQLException sqlException, 
Entity.EntityType type, String name)
+      throws IOException {
+    int errorCode = Integer.valueOf(sqlException.getSQLState());
+    switch (errorCode) {
+      case DUPLICATED_ENTRY_ERROR_CODE:
+        throw new EntityAlreadyExistsException(
+            sqlException, "The %s entity: %s already exists.", type.name(), 
name);
+      default:
+        throw new IOException(sqlException);
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/converters/SQLExceptionConverterFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/converters/SQLExceptionConverterFactory.java
index 4954910c9..feb0fb0d7 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/converters/SQLExceptionConverterFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/converters/SQLExceptionConverterFactory.java
@@ -40,6 +40,8 @@ public class SQLExceptionConverterFactory {
           converter = new MySQLExceptionConverter();
         } else if (jdbcType.equalsIgnoreCase("h2")) {
           converter = new H2ExceptionConverter();
+        } else if (jdbcType.equalsIgnoreCase("postgresql")) {
+          converter = new PostgreSQLExceptionConverter();
         } else {
           throw new IllegalArgumentException(String.format("Unsupported jdbc 
type: %s", jdbcType));
         }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaSQLProviderFactory.java
index 5c0e63f53..632681c5f 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaSQLProviderFactory.java
@@ -22,6 +22,7 @@ package org.apache.gravitino.storage.relational.mapper;
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.postgresql.CatalogMetaPostgreSQLProvider;
 import org.apache.gravitino.storage.relational.po.CatalogPO;
 import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
 import org.apache.ibatis.annotations.Param;
@@ -32,7 +33,8 @@ public class CatalogMetaSQLProviderFactory {
       METALAKE_META_SQL_PROVIDER_MAP =
           ImmutableMap.of(
               JDBCBackendType.MYSQL, new CatalogMetaMySQLProvider(),
-              JDBCBackendType.H2, new CatalogMetaH2Provider());
+              JDBCBackendType.H2, new CatalogMetaH2Provider(),
+              JDBCBackendType.POSTGRESQL, new CatalogMetaPostgreSQLProvider());
 
   public static CatalogMetaBaseSQLProvider getProvider() {
     String databaseId =
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaSQLProviderFactory.java
index 36ea94d58..b41237e2a 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaSQLProviderFactory.java
@@ -22,6 +22,7 @@ package org.apache.gravitino.storage.relational.mapper;
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.postgresql.FilesetMetaPostgreSQLProvider;
 import org.apache.gravitino.storage.relational.po.FilesetPO;
 import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
 import org.apache.ibatis.annotations.Param;
@@ -31,7 +32,8 @@ public class FilesetMetaSQLProviderFactory {
       METALAKE_META_SQL_PROVIDER_MAP =
           ImmutableMap.of(
               JDBCBackendType.MYSQL, new FilesetMetaMySQLProvider(),
-              JDBCBackendType.H2, new FilesetMetaH2Provider());
+              JDBCBackendType.H2, new FilesetMetaH2Provider(),
+              JDBCBackendType.POSTGRESQL, new FilesetMetaPostgreSQLProvider());
 
   public static FilesetMetaBaseSQLProvider getProvider() {
     String databaseId =
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionSQLProviderFactory.java
index 163f2c882..4029c2cb0 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionSQLProviderFactory.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.storage.relational.mapper;
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.postgresql.FilesetVersionPostgreSQLProvider;
 import org.apache.gravitino.storage.relational.po.FilesetVersionPO;
 import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
 import org.apache.ibatis.annotations.Param;
@@ -30,7 +31,8 @@ public class FilesetVersionSQLProviderFactory {
       METALAKE_META_SQL_PROVIDER_MAP =
           ImmutableMap.of(
               JDBCBackendType.MYSQL, new FilesetVersionMySQLProvider(),
-              JDBCBackendType.H2, new FilesetVersionH2Provider());
+              JDBCBackendType.H2, new FilesetVersionH2Provider(),
+              JDBCBackendType.POSTGRESQL, new 
FilesetVersionPostgreSQLProvider());
 
   public static FilesetVersionBaseSQLProvider getProvider() {
     String databaseId =
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/GroupMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/GroupMetaSQLProviderFactory.java
index 59e45a6dc..2769bf1d9 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/GroupMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/GroupMetaSQLProviderFactory.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.storage.relational.mapper;
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.postgresql.GroupMetaPostgreSQLProvider;
 import org.apache.gravitino.storage.relational.po.GroupPO;
 import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
 import org.apache.ibatis.annotations.Param;
@@ -30,7 +31,8 @@ public class GroupMetaSQLProviderFactory {
       METALAKE_META_SQL_PROVIDER_MAP =
           ImmutableMap.of(
               JDBCBackendType.MYSQL, new GroupMetaMySQLProvider(),
-              JDBCBackendType.H2, new GroupMetaH2Provider());
+              JDBCBackendType.H2, new GroupMetaH2Provider(),
+              JDBCBackendType.POSTGRESQL, new GroupMetaPostgreSQLProvider());
 
   public static GroupMetaBaseSQLProvider getProvider() {
     String databaseId =
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/GroupRoleRelSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/GroupRoleRelSQLProviderFactory.java
index 4fd047abe..6d2ff176b 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/GroupRoleRelSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/GroupRoleRelSQLProviderFactory.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.postgresql.GroupRoleRelPostgreSQLProvider;
 import org.apache.gravitino.storage.relational.po.GroupRoleRelPO;
 import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
 import org.apache.ibatis.annotations.Param;
@@ -32,7 +33,8 @@ public class GroupRoleRelSQLProviderFactory {
       METALAKE_META_SQL_PROVIDER_MAP =
           ImmutableMap.of(
               JDBCBackendType.MYSQL, new GroupRoleRelMySQLProvider(),
-              JDBCBackendType.H2, new GroupRoleRelH2Provider());
+              JDBCBackendType.H2, new GroupRoleRelH2Provider(),
+              JDBCBackendType.POSTGRESQL, new 
GroupRoleRelPostgreSQLProvider());
 
   public static GroupRoleRelBaseSQLProvider getProvider() {
     String databaseId =
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaSQLProviderFactory.java
index e28cbc9d7..675894596 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaSQLProviderFactory.java
@@ -22,6 +22,7 @@ package org.apache.gravitino.storage.relational.mapper;
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.postgresql.MetalakeMetaPostgreSQLProvider;
 import org.apache.gravitino.storage.relational.po.MetalakePO;
 import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
 import org.apache.ibatis.annotations.Param;
@@ -32,7 +33,8 @@ public class MetalakeMetaSQLProviderFactory {
       METALAKE_META_SQL_PROVIDER_MAP =
           ImmutableMap.of(
               JDBCBackendType.MYSQL, new MetalakeMetaMySQLProvider(),
-              JDBCBackendType.H2, new MetalakeMetaH2Provider());
+              JDBCBackendType.H2, new MetalakeMetaH2Provider(),
+              JDBCBackendType.POSTGRESQL, new 
MetalakeMetaPostgreSQLProvider());
 
   public static MetalakeMetaBaseSQLProvider getProvider() {
     String databaseId =
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaSQLProviderFactory.java
index e07fd269b..8ddf53d5d 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaSQLProviderFactory.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.storage.relational.mapper;
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.postgresql.OwnerMetaPostgreSQLProvider;
 import org.apache.gravitino.storage.relational.po.OwnerRelPO;
 import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
 import org.apache.ibatis.annotations.Param;
@@ -31,7 +32,8 @@ public class OwnerMetaSQLProviderFactory {
       METALAKE_META_SQL_PROVIDER_MAP =
           ImmutableMap.of(
               JDBCBackendType.MYSQL, new OwnerMetaMySQLProvider(),
-              JDBCBackendType.H2, new OwnerMetaH2Provider());
+              JDBCBackendType.H2, new OwnerMetaH2Provider(),
+              JDBCBackendType.POSTGRESQL, new OwnerMetaPostgreSQLProvider());
 
   public static OwnerMetaBaseSQLProvider getProvider() {
     String databaseId =
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/RoleMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/RoleMetaSQLProviderFactory.java
index bdcb45749..415993860 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/RoleMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/RoleMetaSQLProviderFactory.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.storage.relational.mapper;
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.postgresql.RoleMetaPostgreSQLProvider;
 import org.apache.gravitino.storage.relational.po.RolePO;
 import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
 import org.apache.ibatis.annotations.Param;
@@ -30,7 +31,8 @@ public class RoleMetaSQLProviderFactory {
       METALAKE_META_SQL_PROVIDER_MAP =
           ImmutableMap.of(
               JDBCBackendType.MYSQL, new RoleMetaMySQLProvider(),
-              JDBCBackendType.H2, new RoleMetaH2Provider());
+              JDBCBackendType.H2, new RoleMetaH2Provider(),
+              JDBCBackendType.POSTGRESQL, new RoleMetaPostgreSQLProvider());
 
   public static RoleMetaBaseSQLProvider getProvider() {
     String databaseId =
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaSQLProviderFactory.java
index 5fa6252d5..87f636e61 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaSQLProviderFactory.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.storage.relational.mapper;
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.postgresql.SchemaMetaPostgreSQLProvider;
 import org.apache.gravitino.storage.relational.po.SchemaPO;
 import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
 import org.apache.ibatis.annotations.Param;
@@ -30,7 +31,8 @@ public class SchemaMetaSQLProviderFactory {
       METALAKE_META_SQL_PROVIDER_MAP =
           ImmutableMap.of(
               JDBCBackendType.MYSQL, new SchemaMetaMySQLProvider(),
-              JDBCBackendType.H2, new SchemaMetaH2Provider());
+              JDBCBackendType.H2, new SchemaMetaH2Provider(),
+              JDBCBackendType.POSTGRESQL, new SchemaMetaPostgreSQLProvider());
 
   public static SchemaMetaBaseSQLProvider getProvider() {
     String databaseId =
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SecurableObjectSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SecurableObjectSQLProviderFactory.java
index 6508d7db3..da4ddc730 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SecurableObjectSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SecurableObjectSQLProviderFactory.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.postgresql.SecurableObjectPostgreSQLProvider;
 import org.apache.gravitino.storage.relational.po.SecurableObjectPO;
 import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
 import org.apache.ibatis.annotations.Param;
@@ -32,7 +33,8 @@ public class SecurableObjectSQLProviderFactory {
       METALAKE_META_SQL_PROVIDER_MAP =
           ImmutableMap.of(
               JDBCBackendType.MYSQL, new SecurableObjectMySQLProvider(),
-              JDBCBackendType.H2, new SecurableObjectH2Provider());
+              JDBCBackendType.H2, new SecurableObjectH2Provider(),
+              JDBCBackendType.POSTGRESQL, new 
SecurableObjectPostgreSQLProvider());
 
   public static SecurableObjectBaseSQLProvider getProvider() {
     String databaseId =
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaSQLProviderFactory.java
index 833ba9a05..619648c1f 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaSQLProviderFactory.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.storage.relational.mapper;
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.postgresql.TableMetaPostgreSQLProvider;
 import org.apache.gravitino.storage.relational.po.TablePO;
 import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
 import org.apache.ibatis.annotations.Param;
@@ -31,7 +32,8 @@ public class TableMetaSQLProviderFactory {
       METALAKE_META_SQL_PROVIDER_MAP =
           ImmutableMap.of(
               JDBCBackendType.MYSQL, new TableMetaMySQLProvider(),
-              JDBCBackendType.H2, new TableMetaH2Provider());
+              JDBCBackendType.H2, new TableMetaH2Provider(),
+              JDBCBackendType.POSTGRESQL, new TableMetaPostgreSQLProvider());
 
   public static TableMetaBaseSQLProvider getProvider() {
     String databaseId =
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaSQLProviderFactory.java
index aaa92b038..c9a018c5f 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaSQLProviderFactory.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.postgresql.TagMetaPostgreSQLProvider;
 import org.apache.gravitino.storage.relational.po.TagPO;
 import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
 import org.apache.ibatis.annotations.Param;
@@ -31,7 +32,8 @@ public class TagMetaSQLProviderFactory {
   private static final Map<JDBCBackendType, TagMetaBaseSQLProvider> 
METALAKE_META_SQL_PROVIDER_MAP =
       ImmutableMap.of(
           JDBCBackendType.MYSQL, new TagMetaMySQLProvider(),
-          JDBCBackendType.H2, new TagMetaH2Provider());
+          JDBCBackendType.H2, new TagMetaH2Provider(),
+          JDBCBackendType.POSTGRESQL, new TagMetaPostgreSQLProvider());
 
   public static TagMetaBaseSQLProvider getProvider() {
     String databaseId =
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetadataObjectRelSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetadataObjectRelSQLProviderFactory.java
index b074349e8..af4522f9d 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetadataObjectRelSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetadataObjectRelSQLProviderFactory.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.postgresql.TagMetadataObjectRelPostgreSQLProvider;
 import org.apache.gravitino.storage.relational.po.TagMetadataObjectRelPO;
 import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
 import org.apache.ibatis.annotations.Param;
@@ -32,7 +33,8 @@ public class TagMetadataObjectRelSQLProviderFactory {
       METALAKE_META_SQL_PROVIDER_MAP =
           ImmutableMap.of(
               JDBCBackendType.MYSQL, new TagMetadataObjectRelMySQLProvider(),
-              JDBCBackendType.H2, new TagMetadataObjectRelH2Provider());
+              JDBCBackendType.H2, new TagMetadataObjectRelH2Provider(),
+              JDBCBackendType.POSTGRESQL, new 
TagMetadataObjectRelPostgreSQLProvider());
 
   public static TagMetadataObjectRelBaseSQLProvider getProvider() {
     String databaseId =
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java
index 9a417e011..c1c6e2e23 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java
@@ -22,6 +22,7 @@ package org.apache.gravitino.storage.relational.mapper;
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.postgresql.TopicMetaPostgreSQLProvider;
 import org.apache.gravitino.storage.relational.po.TopicPO;
 import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
 import org.apache.ibatis.annotations.Param;
@@ -32,7 +33,8 @@ public class TopicMetaSQLProviderFactory {
       METALAKE_META_SQL_PROVIDER_MAP =
           ImmutableMap.of(
               JDBCBackendType.MYSQL, new TopicMetaMySQLProvider(),
-              JDBCBackendType.H2, new TopicMetaH2Provider());
+              JDBCBackendType.H2, new TopicMetaH2Provider(),
+              JDBCBackendType.POSTGRESQL, new TopicMetaPostgreSQLProvider());
 
   public static TopicMetaBaseSQLProvider getProvider() {
     String databaseId =
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/UserMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/UserMetaSQLProviderFactory.java
index 3c64f510c..2c322db86 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/UserMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/UserMetaSQLProviderFactory.java
@@ -22,6 +22,7 @@ package org.apache.gravitino.storage.relational.mapper;
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.postgresql.UserMetaPostgreSQLProvider;
 import org.apache.gravitino.storage.relational.po.UserPO;
 import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
 import org.apache.ibatis.annotations.Param;
@@ -32,7 +33,8 @@ public class UserMetaSQLProviderFactory {
       METALAKE_META_SQL_PROVIDER_MAP =
           ImmutableMap.of(
               JDBCBackendType.MYSQL, new UserMetaMySQLProvider(),
-              JDBCBackendType.H2, new UserMetaH2Provider());
+              JDBCBackendType.H2, new UserMetaH2Provider(),
+              JDBCBackendType.POSTGRESQL, new UserMetaPostgreSQLProvider());
 
   public static UserMetaBaseSQLProvider getProvider() {
     String databaseId =
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/UserRoleRelSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/UserRoleRelSQLProviderFactory.java
index 3d52c6aa0..f98f509f8 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/UserRoleRelSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/UserRoleRelSQLProviderFactory.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.postgresql.UserRoleRelPostgreSQLProvider;
 import org.apache.gravitino.storage.relational.po.UserRoleRelPO;
 import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
 import org.apache.ibatis.annotations.Param;
@@ -33,7 +34,8 @@ public class UserRoleRelSQLProviderFactory {
       METALAKE_META_SQL_PROVIDER_MAP =
           ImmutableMap.of(
               JDBCBackendType.MYSQL, new UserRoleRelMySQLProvider(),
-              JDBCBackendType.H2, new UserRoleRelH2Provider());
+              JDBCBackendType.H2, new UserRoleRelH2Provider(),
+              JDBCBackendType.POSTGRESQL, new UserRoleRelPostgreSQLProvider());
 
   public static UserRoleRelBaseSQLProvider getProvider() {
     String databaseId =
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/CatalogMetaPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/CatalogMetaPostgreSQLProvider.java
new file mode 100644
index 000000000..a4646fde8
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/CatalogMetaPostgreSQLProvider.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.postgresql;
+
+import static 
org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper.TABLE_NAME;
+
+import 
org.apache.gravitino.storage.relational.mapper.CatalogMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.CatalogPO;
+import org.apache.ibatis.annotations.Param;
+
+public class CatalogMetaPostgreSQLProvider extends CatalogMetaBaseSQLProvider {
+  @Override
+  public String softDeleteCatalogMetasByCatalogId(Long catalogId) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteCatalogMetasByMetalakeId(Long metalakeId) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String insertCatalogMetaOnDuplicateKeyUpdate(CatalogPO catalogPO) {
+    return "INSERT INTO "
+        + TABLE_NAME
+        + "(catalog_id, catalog_name, metalake_id,"
+        + " type, provider, catalog_comment, properties, audit_info,"
+        + " current_version, last_version, deleted_at)"
+        + " VALUES("
+        + " #{catalogMeta.catalogId},"
+        + " #{catalogMeta.catalogName},"
+        + " #{catalogMeta.metalakeId},"
+        + " #{catalogMeta.type},"
+        + " #{catalogMeta.provider},"
+        + " #{catalogMeta.catalogComment},"
+        + " #{catalogMeta.properties},"
+        + " #{catalogMeta.auditInfo},"
+        + " #{catalogMeta.currentVersion},"
+        + " #{catalogMeta.lastVersion},"
+        + " #{catalogMeta.deletedAt}"
+        + " )"
+        + " ON CONFLICT(catalog_id) DO UPDATE SET"
+        + " catalog_name = #{catalogMeta.catalogName},"
+        + " metalake_id = #{catalogMeta.metalakeId},"
+        + " type = #{catalogMeta.type},"
+        + " provider = #{catalogMeta.provider},"
+        + " catalog_comment = #{catalogMeta.catalogComment},"
+        + " properties = #{catalogMeta.properties},"
+        + " audit_info = #{catalogMeta.auditInfo},"
+        + " current_version = #{catalogMeta.currentVersion},"
+        + " last_version = #{catalogMeta.lastVersion},"
+        + " deleted_at = #{catalogMeta.deletedAt}";
+  }
+
+  public String updateCatalogMeta(
+      @Param("newCatalogMeta") CatalogPO newCatalogPO,
+      @Param("oldCatalogMeta") CatalogPO oldCatalogPO) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET catalog_name = #{newCatalogMeta.catalogName},"
+        + " metalake_id = #{newCatalogMeta.metalakeId},"
+        + " type = #{newCatalogMeta.type},"
+        + " provider = #{newCatalogMeta.provider},"
+        + " catalog_comment = #{newCatalogMeta.catalogComment},"
+        + " properties = #{newCatalogMeta.properties},"
+        + " audit_info = #{newCatalogMeta.auditInfo},"
+        + " current_version = #{newCatalogMeta.currentVersion},"
+        + " last_version = #{newCatalogMeta.lastVersion},"
+        + " deleted_at = #{newCatalogMeta.deletedAt}"
+        + " WHERE catalog_id = #{oldCatalogMeta.catalogId}"
+        + " AND catalog_name = #{oldCatalogMeta.catalogName}"
+        + " AND metalake_id = #{oldCatalogMeta.metalakeId}"
+        + " AND type = #{oldCatalogMeta.type}"
+        + " AND provider = #{oldCatalogMeta.provider}"
+        + " AND (catalog_comment = #{oldCatalogMeta.catalogComment} "
+        + "   OR (CAST(catalog_comment AS VARCHAR) IS NULL AND "
+        + "   CAST(#{oldCatalogMeta.catalogComment} AS VARCHAR) IS NULL))"
+        + " AND properties = #{oldCatalogMeta.properties}"
+        + " AND audit_info = #{oldCatalogMeta.auditInfo}"
+        + " AND current_version = #{oldCatalogMeta.currentVersion}"
+        + " AND last_version = #{oldCatalogMeta.lastVersion}"
+        + " AND deleted_at = 0";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/FilesetMetaPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/FilesetMetaPostgreSQLProvider.java
new file mode 100644
index 000000000..b63314088
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/FilesetMetaPostgreSQLProvider.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.postgresql;
+
+import static 
org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper.META_TABLE_NAME;
+
+import 
org.apache.gravitino.storage.relational.mapper.FilesetMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.FilesetPO;
+
+public class FilesetMetaPostgreSQLProvider extends FilesetMetaBaseSQLProvider {
+  @Override
+  public String softDeleteFilesetMetasByMetalakeId(Long metalakeId) {
+    return "UPDATE "
+        + META_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteFilesetMetasByCatalogId(Long catalogId) {
+    return "UPDATE "
+        + META_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteFilesetMetasBySchemaId(Long schemaId) {
+    return "UPDATE "
+        + META_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteFilesetMetasByFilesetId(Long filesetId) {
+    return "UPDATE "
+        + META_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE fileset_id = #{filesetId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String insertFilesetMetaOnDuplicateKeyUpdate(FilesetPO filesetPO) {
+    return "INSERT INTO "
+        + META_TABLE_NAME
+        + "(fileset_id, fileset_name, metalake_id,"
+        + " catalog_id, schema_id, type, audit_info,"
+        + " current_version, last_version, deleted_at)"
+        + " VALUES("
+        + " #{filesetMeta.filesetId},"
+        + " #{filesetMeta.filesetName},"
+        + " #{filesetMeta.metalakeId},"
+        + " #{filesetMeta.catalogId},"
+        + " #{filesetMeta.schemaId},"
+        + " #{filesetMeta.type},"
+        + " #{filesetMeta.auditInfo},"
+        + " #{filesetMeta.currentVersion},"
+        + " #{filesetMeta.lastVersion},"
+        + " #{filesetMeta.deletedAt}"
+        + " )"
+        + " ON CONFLICT(fileset_id) DO UPDATE SET"
+        + " fileset_name = #{filesetMeta.filesetName},"
+        + " metalake_id = #{filesetMeta.metalakeId},"
+        + " catalog_id = #{filesetMeta.catalogId},"
+        + " schema_id = #{filesetMeta.schemaId},"
+        + " type = #{filesetMeta.type},"
+        + " audit_info = #{filesetMeta.auditInfo},"
+        + " current_version = #{filesetMeta.currentVersion},"
+        + " last_version = #{filesetMeta.lastVersion},"
+        + " deleted_at = #{filesetMeta.deletedAt}";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/FilesetVersionPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/FilesetVersionPostgreSQLProvider.java
new file mode 100644
index 000000000..55317c0c7
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/FilesetVersionPostgreSQLProvider.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.postgresql;
+
+import static 
org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper.VERSION_TABLE_NAME;
+
+import 
org.apache.gravitino.storage.relational.mapper.FilesetVersionBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.FilesetVersionPO;
+
+public class FilesetVersionPostgreSQLProvider extends 
FilesetVersionBaseSQLProvider {
+  @Override
+  public String softDeleteFilesetVersionsByMetalakeId(Long metalakeId) {
+    return "UPDATE "
+        + VERSION_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteFilesetVersionsByCatalogId(Long catalogId) {
+    return "UPDATE "
+        + VERSION_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteFilesetVersionsBySchemaId(Long schemaId) {
+    return "UPDATE "
+        + VERSION_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteFilesetVersionsByFilesetId(Long filesetId) {
+    return "UPDATE "
+        + VERSION_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE fileset_id = #{filesetId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteFilesetVersionsByRetentionLine(
+      Long filesetId, long versionRetentionLine, int limit) {
+    return "UPDATE "
+        + VERSION_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE fileset_id = #{filesetId} AND version <= 
#{versionRetentionLine} AND deleted_at = 0 LIMIT #{limit}";
+  }
+
+  @Override
+  public String insertFilesetVersionOnDuplicateKeyUpdate(FilesetVersionPO 
filesetVersionPO) {
+    return "INSERT INTO "
+        + VERSION_TABLE_NAME
+        + "(metalake_id, catalog_id, schema_id, fileset_id,"
+        + " version, fileset_comment, properties, storage_location,"
+        + " deleted_at)"
+        + " VALUES("
+        + " #{filesetVersion.metalakeId},"
+        + " #{filesetVersion.catalogId},"
+        + " #{filesetVersion.schemaId},"
+        + " #{filesetVersion.filesetId},"
+        + " #{filesetVersion.version},"
+        + " #{filesetVersion.filesetComment},"
+        + " #{filesetVersion.properties},"
+        + " #{filesetVersion.storageLocation},"
+        + " #{filesetVersion.deletedAt}"
+        + " )"
+        + " ON CONFLICT(fileset_id, version, deleted_at) DO UPDATE SET"
+        + " metalake_id = #{filesetVersion.metalakeId},"
+        + " catalog_id = #{filesetVersion.catalogId},"
+        + " schema_id = #{filesetVersion.schemaId},"
+        + " fileset_id = #{filesetVersion.filesetId},"
+        + " version = #{filesetVersion.version},"
+        + " fileset_comment = #{filesetVersion.filesetComment},"
+        + " properties = #{filesetVersion.properties},"
+        + " storage_location = #{filesetVersion.storageLocation},"
+        + " deleted_at = #{filesetVersion.deletedAt}";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/GroupMetaPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/GroupMetaPostgreSQLProvider.java
new file mode 100644
index 000000000..0e24b319e
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/GroupMetaPostgreSQLProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.postgresql;
+
+import static 
org.apache.gravitino.storage.relational.mapper.GroupMetaMapper.GROUP_TABLE_NAME;
+
+import org.apache.gravitino.storage.relational.mapper.GroupMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.GroupPO;
+
+public class GroupMetaPostgreSQLProvider extends GroupMetaBaseSQLProvider {
+  @Override
+  public String softDeleteGroupMetaByGroupId(Long groupId) {
+    return "UPDATE "
+        + GROUP_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE group_id = #{groupId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteGroupMetasByMetalakeId(Long metalakeId) {
+    return "UPDATE "
+        + GROUP_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String insertGroupMetaOnDuplicateKeyUpdate(GroupPO groupPO) {
+    return "INSERT INTO "
+        + GROUP_TABLE_NAME
+        + "(group_id, group_name,"
+        + "metalake_id, audit_info,"
+        + " current_version, last_version, deleted_at)"
+        + " VALUES("
+        + " #{groupMeta.groupId},"
+        + " #{groupMeta.groupName},"
+        + " #{groupMeta.metalakeId},"
+        + " #{groupMeta.auditInfo},"
+        + " #{groupMeta.currentVersion},"
+        + " #{groupMeta.lastVersion},"
+        + " #{groupMeta.deletedAt}"
+        + " )"
+        + " ON CONFLICT(group_id) DO UPDATE SET"
+        + " group_name = #{groupMeta.groupName},"
+        + " metalake_id = #{groupMeta.metalakeId},"
+        + " audit_info = #{groupMeta.auditInfo},"
+        + " current_version = #{groupMeta.currentVersion},"
+        + " last_version = #{groupMeta.lastVersion},"
+        + " deleted_at = #{groupMeta.deletedAt}";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/GroupRoleRelPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/GroupRoleRelPostgreSQLProvider.java
new file mode 100644
index 000000000..02f81bc73
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/GroupRoleRelPostgreSQLProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.postgresql;
+
+import static 
org.apache.gravitino.storage.relational.mapper.GroupRoleRelMapper.GROUP_ROLE_RELATION_TABLE_NAME;
+import static 
org.apache.gravitino.storage.relational.mapper.GroupRoleRelMapper.GROUP_TABLE_NAME;
+
+import java.util.List;
+import 
org.apache.gravitino.storage.relational.mapper.GroupRoleRelBaseSQLProvider;
+
+public class GroupRoleRelPostgreSQLProvider extends 
GroupRoleRelBaseSQLProvider {
+  @Override
+  public String softDeleteGroupRoleRelByGroupId(Long groupId) {
+    return "UPDATE "
+        + GROUP_ROLE_RELATION_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE group_id = #{groupId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteGroupRoleRelByGroupAndRoles(Long groupId, List<Long> 
roleIds) {
+    return "<script>"
+        + "UPDATE "
+        + GROUP_ROLE_RELATION_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE group_id = #{groupId} AND role_id in ("
+        + "<foreach collection='roleIds' item='roleId' separator=','>"
+        + "#{roleId}"
+        + "</foreach>"
+        + ") "
+        + "AND deleted_at = 0"
+        + "</script>";
+  }
+
+  @Override
+  public String softDeleteGroupRoleRelByMetalakeId(Long metalakeId) {
+    return "UPDATE "
+        + GROUP_ROLE_RELATION_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE group_id IN (SELECT group_id FROM "
+        + GROUP_TABLE_NAME
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0)"
+        + " AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteGroupRoleRelByRoleId(Long roleId) {
+    return "UPDATE "
+        + GROUP_ROLE_RELATION_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE role_id = #{roleId} AND deleted_at = 0";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/MetalakeMetaPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/MetalakeMetaPostgreSQLProvider.java
new file mode 100644
index 000000000..5375f74c0
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/MetalakeMetaPostgreSQLProvider.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.postgresql;
+
+import static 
org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper.TABLE_NAME;
+
+import 
org.apache.gravitino.storage.relational.mapper.MetalakeMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.MetalakePO;
+import org.apache.ibatis.annotations.Param;
+
+public class MetalakeMetaPostgreSQLProvider extends 
MetalakeMetaBaseSQLProvider {
+  @Override
+  public String softDeleteMetalakeMetaByMetalakeId(Long metalakeId) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String insertMetalakeMetaOnDuplicateKeyUpdate(MetalakePO metalakePO) {
+    return "INSERT INTO "
+        + TABLE_NAME
+        + "(metalake_id, metalake_name, metalake_comment, properties, 
audit_info,"
+        + " schema_version, current_version, last_version, deleted_at)"
+        + " VALUES("
+        + " #{metalakeMeta.metalakeId},"
+        + " #{metalakeMeta.metalakeName},"
+        + " #{metalakeMeta.metalakeComment},"
+        + " #{metalakeMeta.properties},"
+        + " #{metalakeMeta.auditInfo},"
+        + " #{metalakeMeta.schemaVersion},"
+        + " #{metalakeMeta.currentVersion},"
+        + " #{metalakeMeta.lastVersion},"
+        + " #{metalakeMeta.deletedAt}"
+        + " )"
+        + " ON CONFLICT(metalake_id) DO UPDATE SET"
+        + " metalake_name = #{metalakeMeta.metalakeName},"
+        + " metalake_comment = #{metalakeMeta.metalakeComment},"
+        + " properties = #{metalakeMeta.properties},"
+        + " audit_info = #{metalakeMeta.auditInfo},"
+        + " schema_version = #{metalakeMeta.schemaVersion},"
+        + " current_version = #{metalakeMeta.currentVersion},"
+        + " last_version = #{metalakeMeta.lastVersion},"
+        + " deleted_at = #{metalakeMeta.deletedAt}";
+  }
+
+  public String updateMetalakeMeta(
+      @Param("newMetalakeMeta") MetalakePO newMetalakePO,
+      @Param("oldMetalakeMeta") MetalakePO oldMetalakePO) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET metalake_name = #{newMetalakeMeta.metalakeName},"
+        + " metalake_comment = #{newMetalakeMeta.metalakeComment},"
+        + " properties = #{newMetalakeMeta.properties},"
+        + " audit_info = #{newMetalakeMeta.auditInfo},"
+        + " schema_version = #{newMetalakeMeta.schemaVersion},"
+        + " current_version = #{newMetalakeMeta.currentVersion},"
+        + " last_version = #{newMetalakeMeta.lastVersion}"
+        + " WHERE metalake_id = #{oldMetalakeMeta.metalakeId}"
+        + " AND metalake_name = #{oldMetalakeMeta.metalakeName}"
+        + " AND (metalake_comment = #{oldMetalakeMeta.metalakeComment} "
+        + "  OR (CAST(metalake_comment AS VARCHAR) IS NULL AND "
+        + "  CAST(#{oldMetalakeMeta.metalakeComment} AS VARCHAR) IS NULL))"
+        + " AND properties = #{oldMetalakeMeta.properties}"
+        + " AND audit_info = #{oldMetalakeMeta.auditInfo}"
+        + " AND schema_version = #{oldMetalakeMeta.schemaVersion}"
+        + " AND current_version = #{oldMetalakeMeta.currentVersion}"
+        + " AND last_version = #{oldMetalakeMeta.lastVersion}"
+        + " AND deleted_at = 0";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/OwnerMetaPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/OwnerMetaPostgreSQLProvider.java
new file mode 100644
index 000000000..6374a3fbd
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/OwnerMetaPostgreSQLProvider.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.postgresql;
+
+import static 
org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper.OWNER_TABLE_NAME;
+
+import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.OwnerMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.TopicMetaMapper;
+
+public class OwnerMetaPostgreSQLProvider extends OwnerMetaBaseSQLProvider {
+  @Override
+  public String softDeleteOwnerRelByMetadataObjectIdAndType(
+      Long metadataObjectId, String metadataObjectType) {
+    return "UPDATE "
+        + OWNER_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE metadata_object_id = #{metadataObjectId} AND 
metadata_object_type = #{metadataObjectType} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteOwnerRelByOwnerIdAndType(Long ownerId, String 
ownerType) {
+    return "UPDATE "
+        + OWNER_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE owner_id = #{ownerId} AND owner_type = #{ownerType} AND 
deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteOwnerRelByMetalakeId(Long metalakeId) {
+    return "UPDATE  "
+        + OWNER_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at =0";
+  }
+
+  @Override
+  public String softDeleteOwnerRelByCatalogId(Long catalogId) {
+    return "UPDATE  "
+        + OWNER_TABLE_NAME
+        + " ot SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE EXISTS ("
+        + " SELECT ct.catalog_id FROM "
+        + CatalogMetaMapper.TABLE_NAME
+        + " ct WHERE ct.catalog_id = #{catalogId}  AND ct.deleted_at = 0 AND 
ot.deleted_at = 0 AND "
+        + "ct.catalog_id = ot.metadata_object_id AND ot.metadata_object_type = 
'CATALOG'"
+        + " UNION "
+        + " SELECT st.catalog_id FROM "
+        + SchemaMetaMapper.TABLE_NAME
+        + " st WHERE st.catalog_id = #{catalogId} AND st.deleted_at = 0 AND 
ot.deleted_at = 0 AND "
+        + "st.schema_id = ot.metadata_object_id AND ot.metadata_object_type = 
'SCHEMA'"
+        + " UNION "
+        + " SELECT tt.catalog_id FROM "
+        + TopicMetaMapper.TABLE_NAME
+        + " tt WHERE tt.catalog_id = #{catalogId} AND tt.deleted_at = 0 AND 
ot.deleted_at = 0 AND "
+        + "tt.topic_id = ot.metadata_object_id AND ot.metadata_object_type = 
'TOPIC'"
+        + " UNION "
+        + " SELECT tat.catalog_id FROM "
+        + TableMetaMapper.TABLE_NAME
+        + " tat WHERE tat.catalog_id = #{catalogId} AND tat.deleted_at = 0 AND 
ot.deleted_at = 0 AND "
+        + "tat.table_id = ot.metadata_object_id AND ot.metadata_object_type = 
'TABLE'"
+        + " UNION "
+        + " SELECT ft.catalog_id FROM "
+        + FilesetMetaMapper.META_TABLE_NAME
+        + " ft WHERE ft.catalog_id = #{catalogId} AND ft.deleted_at = 0 AND 
ot.deleted_at = 0 AND"
+        + " ft.fileset_id = ot.metadata_object_id AND ot.metadata_object_type 
= 'FILESET'"
+        + ")";
+  }
+
+  @Override
+  public String sotDeleteOwnerRelBySchemaId(Long schemaId) {
+    return "UPDATE  "
+        + OWNER_TABLE_NAME
+        + " ot SET deleted_at = floor(extract(epoch from((current_timestamp - 
timestamp '1970-01-01 00:00:00')*1000))) "
+        + " WHERE EXISTS ("
+        + " SELECT st.schema_id FROM "
+        + SchemaMetaMapper.TABLE_NAME
+        + " st WHERE st.schema_id = #{schemaId} AND st.deleted_at = 0 AND 
ot.deleted_at = 0 "
+        + "AND st.schema_id = ot.metadata_object_id AND 
ot.metadata_object_type = 'SCHEMA'"
+        + " UNION "
+        + " SELECT tt.schema_id FROM "
+        + TopicMetaMapper.TABLE_NAME
+        + " tt WHERE tt.schema_id = #{schemaId} AND tt.deleted_at = 0 AND 
ot.deleted_at = 0 AND "
+        + "tt.topic_id = ot.metadata_object_id AND ot.metadata_object_type = 
'TOPIC'"
+        + " UNION "
+        + " SELECT tat.schema_id FROM "
+        + TableMetaMapper.TABLE_NAME
+        + " tat WHERE tat.schema_id = #{schemaId} AND tat.deleted_at = 0 AND 
ot.deleted_at = 0 AND "
+        + "tat.table_id = ot.metadata_object_id AND ot.metadata_object_type = 
'TABLE'"
+        + " UNION "
+        + " SELECT ft.schema_id FROM "
+        + FilesetMetaMapper.META_TABLE_NAME
+        + " ft WHERE ft.schema_id = #{schemaId} AND ft.deleted_at = 0 AND 
ot.deleted_at = 0 AND "
+        + "ft.fileset_id = ot.metadata_object_id AND ot.metadata_object_type = 
'FILESET'"
+        + ")";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/RoleMetaPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/RoleMetaPostgreSQLProvider.java
new file mode 100644
index 000000000..72e5808e3
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/RoleMetaPostgreSQLProvider.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.postgresql;
+
+import static 
org.apache.gravitino.storage.relational.mapper.RoleMetaMapper.ROLE_TABLE_NAME;
+
+import org.apache.gravitino.storage.relational.mapper.RoleMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.RolePO;
+
+public class RoleMetaPostgreSQLProvider extends RoleMetaBaseSQLProvider {
+  @Override
+  public String softDeleteRoleMetaByRoleId(Long roleId) {
+    return "UPDATE "
+        + ROLE_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE role_id = #{roleId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteRoleMetasByMetalakeId(Long metalakeId) {
+    return "UPDATE "
+        + ROLE_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String insertRoleMetaOnDuplicateKeyUpdate(RolePO rolePO) {
+    return "INSERT INTO "
+        + ROLE_TABLE_NAME
+        + "(role_id, role_name,"
+        + " metalake_id, properties,"
+        + " audit_info, current_version, last_version, deleted_at)"
+        + " VALUES("
+        + " #{roleMeta.roleId},"
+        + " #{roleMeta.roleName},"
+        + " #{roleMeta.metalakeId},"
+        + " #{roleMeta.properties},"
+        + " #{roleMeta.auditInfo},"
+        + " #{roleMeta.currentVersion},"
+        + " #{roleMeta.lastVersion},"
+        + " #{roleMeta.deletedAt}"
+        + " ) ON CONFLICT (role_id) DO UPDATE SET"
+        + " role_name = #{roleMeta.roleName},"
+        + " metalake_id = #{roleMeta.metalakeId},"
+        + " properties = #{roleMeta.properties},"
+        + " audit_info = #{roleMeta.auditInfo},"
+        + " current_version = #{roleMeta.currentVersion},"
+        + " last_version = #{roleMeta.lastVersion},"
+        + " deleted_at = #{roleMeta.deletedAt}";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/SchemaMetaPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/SchemaMetaPostgreSQLProvider.java
new file mode 100644
index 000000000..c2a565ddd
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/SchemaMetaPostgreSQLProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.postgresql;
+
+import static 
org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper.TABLE_NAME;
+
+import 
org.apache.gravitino.storage.relational.mapper.SchemaMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.SchemaPO;
+
+public class SchemaMetaPostgreSQLProvider extends SchemaMetaBaseSQLProvider {
+  @Override
+  public String insertSchemaMetaOnDuplicateKeyUpdate(SchemaPO schemaPO) {
+    return "INSERT INTO "
+        + TABLE_NAME
+        + "(schema_id, schema_name, metalake_id,"
+        + " catalog_id, schema_comment, properties, audit_info,"
+        + " current_version, last_version, deleted_at)"
+        + " VALUES("
+        + " #{schemaMeta.schemaId},"
+        + " #{schemaMeta.schemaName},"
+        + " #{schemaMeta.metalakeId},"
+        + " #{schemaMeta.catalogId},"
+        + " #{schemaMeta.schemaComment},"
+        + " #{schemaMeta.properties},"
+        + " #{schemaMeta.auditInfo},"
+        + " #{schemaMeta.currentVersion},"
+        + " #{schemaMeta.lastVersion},"
+        + " #{schemaMeta.deletedAt}"
+        + " )"
+        + " ON CONFLICT(schema_id) DO UPDATE SET "
+        + " schema_name = #{schemaMeta.schemaName},"
+        + " metalake_id = #{schemaMeta.metalakeId},"
+        + " catalog_id = #{schemaMeta.catalogId},"
+        + " schema_comment = #{schemaMeta.schemaComment},"
+        + " properties = #{schemaMeta.properties},"
+        + " audit_info = #{schemaMeta.auditInfo},"
+        + " current_version = #{schemaMeta.currentVersion},"
+        + " last_version = #{schemaMeta.lastVersion},"
+        + " deleted_at = #{schemaMeta.deletedAt}";
+  }
+
+  @Override
+  public String softDeleteSchemaMetasBySchemaId(Long schemaId) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteSchemaMetasByMetalakeId(Long metalakeId) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteSchemaMetasByCatalogId(Long catalogId) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/SecurableObjectPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/SecurableObjectPostgreSQLProvider.java
new file mode 100644
index 000000000..ff1340e6c
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/SecurableObjectPostgreSQLProvider.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.postgresql;
+
+import static 
org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper.ROLE_TABLE_NAME;
+import static 
org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper.SECURABLE_OBJECT_TABLE_NAME;
+
+import 
org.apache.gravitino.storage.relational.mapper.SecurableObjectBaseSQLProvider;
+
+public class SecurableObjectPostgreSQLProvider extends 
SecurableObjectBaseSQLProvider {
+  @Override
+  public String softDeleteSecurableObjectsByRoleId(Long roleId) {
+    return "UPDATE "
+        + SECURABLE_OBJECT_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE role_id = #{roleId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteRoleMetasByMetalakeId(Long metalakeId) {
+    return "UPDATE "
+        + SECURABLE_OBJECT_TABLE_NAME
+        + " ob SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE exists (SELECT * from "
+        + ROLE_TABLE_NAME
+        + " ro WHERE ro.metalake_id = #{metalakeId} AND ro.role_id = 
ob.role_id"
+        + " AND ro.deleted_at = 0) AND ob.deleted_at = 0";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/TableMetaPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/TableMetaPostgreSQLProvider.java
new file mode 100644
index 000000000..d1f5d3985
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/TableMetaPostgreSQLProvider.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.postgresql;
+
+import static 
org.apache.gravitino.storage.relational.mapper.TableMetaMapper.TABLE_NAME;
+
+import org.apache.gravitino.storage.relational.mapper.TableMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.TablePO;
+
+public class TableMetaPostgreSQLProvider extends TableMetaBaseSQLProvider {
+  @Override
+  public String insertTableMetaOnDuplicateKeyUpdate(TablePO tablePO) {
+    return "INSERT INTO "
+        + TABLE_NAME
+        + "(table_id, table_name, metalake_id,"
+        + " catalog_id, schema_id, audit_info,"
+        + " current_version, last_version, deleted_at)"
+        + " VALUES("
+        + " #{tableMeta.tableId},"
+        + " #{tableMeta.tableName},"
+        + " #{tableMeta.metalakeId},"
+        + " #{tableMeta.catalogId},"
+        + " #{tableMeta.schemaId},"
+        + " #{tableMeta.auditInfo},"
+        + " #{tableMeta.currentVersion},"
+        + " #{tableMeta.lastVersion},"
+        + " #{tableMeta.deletedAt}"
+        + " )"
+        + " ON CONFLICT (table_id) DO UPDATE SET "
+        + " table_name = #{tableMeta.tableName},"
+        + " metalake_id = #{tableMeta.metalakeId},"
+        + " catalog_id = #{tableMeta.catalogId},"
+        + " schema_id = #{tableMeta.schemaId},"
+        + " audit_info = #{tableMeta.auditInfo},"
+        + " current_version = #{tableMeta.currentVersion},"
+        + " last_version = #{tableMeta.lastVersion},"
+        + " deleted_at = #{tableMeta.deletedAt}";
+  }
+
+  @Override
+  public String softDeleteTableMetasByTableId(Long tableId) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000))) "
+        + " WHERE table_id = #{tableId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteTableMetasByMetalakeId(Long metalakeId) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteTableMetasByCatalogId(Long catalogId) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteTableMetasBySchemaId(Long schemaId) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/TagMetaPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/TagMetaPostgreSQLProvider.java
new file mode 100644
index 000000000..75a3863f7
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/TagMetaPostgreSQLProvider.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.postgresql;
+
+import static 
org.apache.gravitino.storage.relational.mapper.TagMetaMapper.TAG_TABLE_NAME;
+
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.TagMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.TagPO;
+import org.apache.ibatis.annotations.Param;
+
+public class TagMetaPostgreSQLProvider extends TagMetaBaseSQLProvider {
+  @Override
+  public String softDeleteTagMetaByMetalakeAndTagName(String metalakeName, 
String tagName) {
+    return "UPDATE "
+        + TAG_TABLE_NAME
+        + " tm SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000))) "
+        + " WHERE tm.metalake_id IN ("
+        + " SELECT mm.metalake_id FROM "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm WHERE mm.metalake_name = #{metalakeName} AND mm.deleted_at = 0)"
+        + " AND tm.tag_name = #{tagName} AND tm.deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteTagMetasByMetalakeId(Long metalakeId) {
+    return "UPDATE "
+        + TAG_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000))) "
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String insertTagMetaOnDuplicateKeyUpdate(TagPO tagPO) {
+    return "INSERT INTO "
+        + TAG_TABLE_NAME
+        + "(tag_id, tag_name,"
+        + " metalake_id, tag_comment, properties, audit_info,"
+        + " current_version, last_version, deleted_at)"
+        + " VALUES("
+        + " #{tagMeta.tagId},"
+        + " #{tagMeta.tagName},"
+        + " #{tagMeta.metalakeId},"
+        + " #{tagMeta.comment},"
+        + " #{tagMeta.properties},"
+        + " #{tagMeta.auditInfo},"
+        + " #{tagMeta.currentVersion},"
+        + " #{tagMeta.lastVersion},"
+        + " #{tagMeta.deletedAt}"
+        + " )"
+        + " ON CONFLICT(tag_id) DO UPDATE SET"
+        + " tag_name = #{tagMeta.tagName},"
+        + " metalake_id = #{tagMeta.metalakeId},"
+        + " tag_comment = #{tagMeta.comment},"
+        + " properties = #{tagMeta.properties},"
+        + " audit_info = #{tagMeta.auditInfo},"
+        + " current_version = #{tagMeta.currentVersion},"
+        + " last_version = #{tagMeta.lastVersion},"
+        + " deleted_at = #{tagMeta.deletedAt}";
+  }
+
+  public String updateTagMeta(
+      @Param("newTagMeta") TagPO newTagPO, @Param("oldTagMeta") TagPO 
oldTagPO) {
+    return "UPDATE "
+        + TAG_TABLE_NAME
+        + " SET tag_name = #{newTagMeta.tagName},"
+        + " tag_comment = #{newTagMeta.comment},"
+        + " properties = #{newTagMeta.properties},"
+        + " audit_info = #{newTagMeta.auditInfo},"
+        + " current_version = #{newTagMeta.currentVersion},"
+        + " last_version = #{newTagMeta.lastVersion},"
+        + " deleted_at = #{newTagMeta.deletedAt}"
+        + " WHERE tag_id = #{oldTagMeta.tagId}"
+        + " AND metalake_id = #{oldTagMeta.metalakeId}"
+        + " AND tag_name = #{oldTagMeta.tagName}"
+        + " AND (tag_comment = #{oldTagMeta.comment} "
+        + "   OR (CAST(tag_comment AS VARCHAR) IS NULL AND 
CAST(#{oldTagMeta.comment} AS VARCHAR) IS NULL))"
+        + " AND properties = #{oldTagMeta.properties}"
+        + " AND audit_info = #{oldTagMeta.auditInfo}"
+        + " AND current_version = #{oldTagMeta.currentVersion}"
+        + " AND last_version = #{oldTagMeta.lastVersion}"
+        + " AND deleted_at = 0";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/TagMetadataObjectRelPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/TagMetadataObjectRelPostgreSQLProvider.java
new file mode 100644
index 000000000..1bb716248
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/TagMetadataObjectRelPostgreSQLProvider.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.postgresql;
+
+import static 
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper.TAG_METADATA_OBJECT_RELATION_TABLE_NAME;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.TagMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelBaseSQLProvider;
+
+public class TagMetadataObjectRelPostgreSQLProvider extends 
TagMetadataObjectRelBaseSQLProvider {
+  @Override
+  public String softDeleteTagMetadataObjectRelsByMetalakeAndTagName(
+      String metalakeName, String tagName) {
+    return "UPDATE "
+        + TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " te SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000))) "
+        + " WHERE te.tag_id IN (SELECT tm.tag_id FROM "
+        + TagMetaMapper.TAG_TABLE_NAME
+        + " tm WHERE tm.metalake_id IN (SELECT mm.metalake_id FROM "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm WHERE mm.metalake_name = #{metalakeName} AND mm.deleted_at = 0)"
+        + " AND tm.deleted_at = 0) AND te.deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteTagMetadataObjectRelsByMetalakeId(Long metalakeId) {
+    return "UPDATE "
+        + TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " te SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000))) "
+        + " WHERE EXISTS (SELECT * FROM "
+        + TagMetaMapper.TAG_TABLE_NAME
+        + " tm WHERE tm.metalake_id = #{metalakeId} AND tm.tag_id = te.tag_id"
+        + " AND tm.deleted_at = 0) AND te.deleted_at = 0";
+  }
+
+  @Override
+  public String batchDeleteTagMetadataObjectRelsByTagIdsAndMetadataObject(
+      Long metadataObjectId, String metadataObjectType, List<Long> tagIds) {
+    return "<script>"
+        + "UPDATE "
+        + TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000))) "
+        + " WHERE tag_id IN "
+        + "<foreach item='tagId' collection='tagIds' open='(' separator=',' 
close=')'>"
+        + "#{tagId}"
+        + "</foreach>"
+        + " And metadata_object_id = #{metadataObjectId}"
+        + " AND metadata_object_type = #{metadataObjectType} AND deleted_at = 
0"
+        + "</script>";
+  }
+
+  @Override
+  public String listTagMetadataObjectRelsByMetalakeAndTagName(String 
metalakeName, String tagName) {
+    return "SELECT te.tag_id as tagId, te.metadata_object_id as 
metadataObjectId,"
+        + " te.metadata_object_type as metadataObjectType, te.audit_info as 
auditInfo,"
+        + " te.current_version as currentVersion, te.last_version as 
lastVersion,"
+        + " te.deleted_at as deletedAt"
+        + " FROM "
+        + TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " te JOIN "
+        + TagMetaMapper.TAG_TABLE_NAME
+        + " tm ON te.tag_id = tm.tag_id JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm ON tm.metalake_id = mm.metalake_id"
+        + " WHERE mm.metalake_name = #{metalakeName} AND tm.tag_name = 
#{tagName}"
+        + " AND te.deleted_at = 0 AND tm.deleted_at = 0 AND mm.deleted_at = 0";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/TopicMetaPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/TopicMetaPostgreSQLProvider.java
new file mode 100644
index 000000000..ef7e45cbc
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/TopicMetaPostgreSQLProvider.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.postgresql;
+
+import static 
org.apache.gravitino.storage.relational.mapper.TopicMetaMapper.TABLE_NAME;
+
+import org.apache.gravitino.storage.relational.mapper.TopicMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.TopicPO;
+
+public class TopicMetaPostgreSQLProvider extends TopicMetaBaseSQLProvider {
+
+  @Override
+  public String softDeleteTopicMetasByTopicId(Long topicId) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE topic_id = #{topicId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteTopicMetasByCatalogId(Long catalogId) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000))) "
+        + " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteTopicMetasByMetalakeId(Long metalakeId) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteTopicMetasBySchemaId(Long schemaId) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000))) "
+        + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String insertTopicMetaOnDuplicateKeyUpdate(TopicPO topicPO) {
+    return "INSERT INTO "
+        + TABLE_NAME
+        + "(topic_id, topic_name, metalake_id, catalog_id, schema_id,"
+        + " comment, properties, audit_info, current_version, last_version,"
+        + " deleted_at)"
+        + " VALUES("
+        + " #{topicMeta.topicId},"
+        + " #{topicMeta.topicName},"
+        + " #{topicMeta.metalakeId},"
+        + " #{topicMeta.catalogId},"
+        + " #{topicMeta.schemaId},"
+        + " #{topicMeta.comment},"
+        + " #{topicMeta.properties},"
+        + " #{topicMeta.auditInfo},"
+        + " #{topicMeta.currentVersion},"
+        + " #{topicMeta.lastVersion},"
+        + " #{topicMeta.deletedAt}"
+        + " )"
+        + " ON CONFLICT (topic_id) DO UPDATE SET"
+        + " topic_name = #{topicMeta.topicName},"
+        + " metalake_id = #{topicMeta.metalakeId},"
+        + " catalog_id = #{topicMeta.catalogId},"
+        + " schema_id = #{topicMeta.schemaId},"
+        + " comment = #{topicMeta.comment},"
+        + " properties = #{topicMeta.properties},"
+        + " audit_info = #{topicMeta.auditInfo},"
+        + " current_version = #{topicMeta.currentVersion},"
+        + " last_version = #{topicMeta.lastVersion},"
+        + " deleted_at = #{topicMeta.deletedAt}";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/UserMetaPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/UserMetaPostgreSQLProvider.java
new file mode 100644
index 000000000..af7d65d2a
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/UserMetaPostgreSQLProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.postgresql;
+
+import static 
org.apache.gravitino.storage.relational.mapper.UserRoleRelMapper.USER_TABLE_NAME;
+
+import org.apache.gravitino.storage.relational.mapper.UserMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.UserPO;
+
+public class UserMetaPostgreSQLProvider extends UserMetaBaseSQLProvider {
+  @Override
+  public String softDeleteUserMetaByUserId(Long userId) {
+    return "UPDATE "
+        + USER_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000))) "
+        + " WHERE user_id = #{userId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteUserMetasByMetalakeId(Long metalakeId) {
+    return "UPDATE "
+        + USER_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000))) "
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String insertUserMetaOnDuplicateKeyUpdate(UserPO userPO) {
+    return "INSERT INTO "
+        + USER_TABLE_NAME
+        + "(user_id, user_name,"
+        + "metalake_id, audit_info,"
+        + " current_version, last_version, deleted_at)"
+        + " VALUES("
+        + " #{userMeta.userId},"
+        + " #{userMeta.userName},"
+        + " #{userMeta.metalakeId},"
+        + " #{userMeta.auditInfo},"
+        + " #{userMeta.currentVersion},"
+        + " #{userMeta.lastVersion},"
+        + " #{userMeta.deletedAt}"
+        + " )"
+        + " ON CONFLICT(user_id) DO UPDATE SET"
+        + " user_name = #{userMeta.userName},"
+        + " metalake_id = #{userMeta.metalakeId},"
+        + " audit_info = #{userMeta.auditInfo},"
+        + " current_version = #{userMeta.currentVersion},"
+        + " last_version = #{userMeta.lastVersion},"
+        + " deleted_at = #{userMeta.deletedAt}";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/UserRoleRelPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/UserRoleRelPostgreSQLProvider.java
new file mode 100644
index 000000000..dddca1efc
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/postgresql/UserRoleRelPostgreSQLProvider.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.storage.relational.mapper.postgresql;
+
+import static 
org.apache.gravitino.storage.relational.mapper.UserRoleRelMapper.USER_ROLE_RELATION_TABLE_NAME;
+import static 
org.apache.gravitino.storage.relational.mapper.UserRoleRelMapper.USER_TABLE_NAME;
+
+import java.util.List;
+import 
org.apache.gravitino.storage.relational.mapper.UserRoleRelBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.UserRoleRelPO;
+
+public class UserRoleRelPostgreSQLProvider extends UserRoleRelBaseSQLProvider {
+  @Override
+  public String softDeleteUserRoleRelByUserId(Long userId) {
+    return "UPDATE "
+        + USER_ROLE_RELATION_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000))) "
+        + " WHERE user_id = #{userId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteUserRoleRelByUserAndRoles(Long userId, List<Long> 
roleIds) {
+    return "<script>"
+        + "UPDATE "
+        + USER_ROLE_RELATION_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000))) "
+        + " WHERE user_id = #{userId} AND role_id in ("
+        + "<foreach collection='roleIds' item='roleId' separator=','>"
+        + "#{roleId}"
+        + "</foreach>"
+        + ") "
+        + "AND deleted_at = 0"
+        + "</script>";
+  }
+
+  @Override
+  public String softDeleteUserRoleRelByMetalakeId(Long metalakeId) {
+    return "UPDATE "
+        + USER_ROLE_RELATION_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000))) "
+        + " WHERE user_id IN (SELECT user_id FROM "
+        + USER_TABLE_NAME
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0)"
+        + " AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteUserRoleRelByRoleId(Long roleId) {
+    return "UPDATE "
+        + USER_ROLE_RELATION_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + "  timestamp '1970-01-01 00:00:00')*1000))) "
+        + " WHERE role_id = #{roleId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String batchInsertUserRoleRelOnDuplicateKeyUpdate(List<UserRoleRelPO> 
userRoleRelPOs) {
+    return "<script>"
+        + "INSERT INTO "
+        + USER_ROLE_RELATION_TABLE_NAME
+        + "(user_id, role_id,"
+        + " audit_info,"
+        + " current_version, last_version, deleted_at)"
+        + " VALUES "
+        + "<foreach collection='userRoleRels' item='item' separator=','>"
+        + "(#{item.userId},"
+        + " #{item.roleId},"
+        + " #{item.auditInfo},"
+        + " #{item.currentVersion},"
+        + " #{item.lastVersion},"
+        + " #{item.deletedAt})"
+        + "</foreach>"
+        + " ON CONFLICT (user_id, role_id, deleted_at) DO UPDATE SET"
+        + " user_id = VALUES(user_id),"
+        + " role_id = VALUES(role_id),"
+        + " audit_info = VALUES(audit_info),"
+        + " current_version = VALUES(current_version),"
+        + " last_version = VALUES(last_version),"
+        + " deleted_at = VALUES(deleted_at)"
+        + "</script>";
+  }
+}
diff --git 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/PostgreSQLContainer.java
 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/PostgreSQLContainer.java
index b0ff97ea6..6b2b147b1 100644
--- 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/PostgreSQLContainer.java
+++ 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/PostgreSQLContainer.java
@@ -116,7 +116,7 @@ public class PostgreSQLContainer extends BaseContainer {
             DriverManager.getConnection(getJdbcUrl(), getUsername(), 
getPassword());
         Statement statement = connection.createStatement()) {
 
-      String query = format("CREATE DATABASE %s;", testDatabaseName);
+      String query = format("CREATE DATABASE \"%s\"", testDatabaseName);
       statement.execute(query);
       LOG.info(format("PostgreSQL container database %s has been created", 
testDatabaseName));
     } catch (SQLException e) {
diff --git 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/AbstractIT.java
 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/AbstractIT.java
index 968eeeef8..3e264652b 100644
--- 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/AbstractIT.java
+++ 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/AbstractIT.java
@@ -19,6 +19,8 @@
 package org.apache.gravitino.integration.test.util;
 
 import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_PATH;
+import static 
org.apache.gravitino.integration.test.util.TestDatabaseName.PG_CATALOG_POSTGRESQL_IT;
+import static 
org.apache.gravitino.integration.test.util.TestDatabaseName.PG_JDBC_BACKEND;
 import static 
org.apache.gravitino.server.GravitinoServer.WEBSERVER_CONF_PREFIX;
 
 import com.google.common.base.Splitter;
@@ -30,6 +32,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -39,6 +42,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Configs;
@@ -49,6 +53,7 @@ import org.apache.gravitino.integration.test.MiniGravitino;
 import org.apache.gravitino.integration.test.MiniGravitinoContext;
 import org.apache.gravitino.integration.test.container.ContainerSuite;
 import org.apache.gravitino.integration.test.container.MySQLContainer;
+import org.apache.gravitino.integration.test.container.PostgreSQLContainer;
 import org.apache.gravitino.server.GravitinoServer;
 import org.apache.gravitino.server.ServerConfig;
 import org.apache.gravitino.server.web.JettyServerConfig;
@@ -92,6 +97,7 @@ public class AbstractIT {
 
   private static TestDatabaseName META_DATA;
   private static MySQLContainer MYSQL_CONTAINER;
+  private static PostgreSQLContainer POSTGRESQL_CONTAINER;
 
   protected static String serverUri;
 
@@ -155,6 +161,63 @@ public class AbstractIT {
     }
   }
 
+  protected static void setPGBackend() throws SQLException {
+    String pgUrlWithoutSchema = POSTGRESQL_CONTAINER.getJdbcUrl(META_DATA);
+    customConfigs.put(Configs.ENTITY_STORE_KEY, "relational");
+    customConfigs.put(Configs.ENTITY_RELATIONAL_STORE_KEY, "JDBCBackend");
+    customConfigs.put(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL_KEY, 
pgUrlWithoutSchema);
+    customConfigs.put(
+        Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER_KEY,
+        POSTGRESQL_CONTAINER.getDriverClassName(META_DATA));
+    customConfigs.put(
+        Configs.ENTITY_RELATIONAL_JDBC_BACKEND_USER_KEY, 
POSTGRESQL_CONTAINER.getUsername());
+    customConfigs.put(
+        Configs.ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD_KEY, 
POSTGRESQL_CONTAINER.getPassword());
+
+    LOG.info("PG URL: {}", pgUrlWithoutSchema);
+
+    String randomSchemaName = RandomStringUtils.random(10, true, false);
+    // Connect to the PostgreSQL docker and create a schema
+    String currentExecuteSql = "";
+    try (Connection connection =
+        DriverManager.getConnection(
+            pgUrlWithoutSchema,
+            POSTGRESQL_CONTAINER.getUsername(),
+            POSTGRESQL_CONTAINER.getPassword())) {
+      connection.setCatalog(PG_CATALOG_POSTGRESQL_IT.toString());
+      final Statement statement = connection.createStatement();
+      statement.execute("drop schema if exists " + randomSchemaName);
+      statement.execute("create schema " + randomSchemaName);
+      statement.execute("set search_path to " + randomSchemaName);
+      String gravitinoHome = System.getenv("GRAVITINO_ROOT_DIR");
+      String mysqlContent =
+          FileUtils.readFileToString(
+              new File(
+                  gravitinoHome
+                      + String.format(
+                          "/scripts/postgresql/schema-%s-postgresql.sql",
+                          ConfigConstants.VERSION_0_7_0)),
+              "UTF-8");
+
+      String[] initPGBackendSqls =
+          Arrays.stream(mysqlContent.split(";"))
+              .map(String::trim)
+              .filter(s -> !s.isEmpty())
+              .toArray(String[]::new);
+
+      for (String sql : initPGBackendSqls) {
+        currentExecuteSql = sql;
+        statement.execute(sql);
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to create database in pg, sql:\n{}", 
currentExecuteSql, e);
+      throw new RuntimeException(e);
+    }
+
+    pgUrlWithoutSchema = pgUrlWithoutSchema + "?currentSchema=" + 
randomSchemaName;
+    customConfigs.put(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL_KEY, 
pgUrlWithoutSchema);
+  }
+
   private static void setMySQLBackend() {
     String mysqlUrl = MYSQL_CONTAINER.getJdbcUrl(META_DATA);
     customConfigs.put(Configs.ENTITY_STORE_KEY, "relational");
@@ -221,6 +284,13 @@ public class AbstractIT {
       MYSQL_CONTAINER = containerSuite.getMySQLContainer();
 
       setMySQLBackend();
+    } else if ("PostgreSQL".equalsIgnoreCase(System.getenv("jdbcBackend"))) {
+      // Start PostgreSQL docker instance.
+      META_DATA = PG_JDBC_BACKEND;
+      containerSuite.startPostgreSQLContainer(META_DATA);
+      POSTGRESQL_CONTAINER = containerSuite.getPostgreSQLContainer();
+
+      setPGBackend();
     }
 
     File baseDir = new File(System.getProperty("java.io.tmpdir"));
diff --git 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/TestDatabaseName.java
 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/TestDatabaseName.java
index 98fb959ae..4e81b992c 100644
--- 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/TestDatabaseName.java
+++ 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/TestDatabaseName.java
@@ -58,6 +58,8 @@ public enum TestDatabaseName {
   /** Represents the MySQL database used for testing the catalog integration 
with MySQL. */
   MYSQL_CATALOG_MYSQL_IT,
 
+  PG_JDBC_BACKEND,
+
   /** Represents the PostgreSQL database for CatalogPostgreSqlIT. */
   PG_CATALOG_POSTGRESQL_IT {
     /** PostgreSQL only accept lowercase database name */
diff --git a/scripts/postgresql/schema-0.7.0-postgresql.sql 
b/scripts/postgresql/schema-0.7.0-postgresql.sql
new file mode 100644
index 000000000..0ce7a9685
--- /dev/null
+++ b/scripts/postgresql/schema-0.7.0-postgresql.sql
@@ -0,0 +1,458 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file--
+--  distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"). You may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--  http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
+
+-- Note: Database and schema creation is not included in this script. Please 
create the database and
+-- schema before running this script. for example in psql:
+-- CREATE DATABASE example_db;
+-- \c example_db
+-- CREATE SCHEMA example_schema;
+-- set search_path to example_schema;
+
+
+CREATE TABLE IF NOT EXISTS metalake_meta (
+    metalake_id BIGINT NOT NULL,
+    metalake_name VARCHAR(128) NOT NULL,
+    metalake_comment VARCHAR(256) DEFAULT '',
+    properties TEXT DEFAULT NULL,
+    audit_info TEXT NOT NULL,
+    schema_version TEXT NOT NULL,
+    current_version INT NOT NULL DEFAULT 1,
+    last_version INT NOT NULL DEFAULT 1,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (metalake_id),
+    UNIQUE (metalake_name, deleted_at)
+    );
+COMMENT ON TABLE metalake_meta IS 'metalake metadata';
+
+COMMENT ON COLUMN metalake_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN metalake_meta.metalake_name IS 'metalake name';
+COMMENT ON COLUMN metalake_meta.metalake_comment IS 'metalake comment';
+COMMENT ON COLUMN metalake_meta.properties IS 'metalake properties';
+COMMENT ON COLUMN metalake_meta.audit_info IS 'metalake audit info';
+COMMENT ON COLUMN metalake_meta.schema_version IS 'metalake schema version 
info';
+COMMENT ON COLUMN metalake_meta.current_version IS 'metalake current version';
+COMMENT ON COLUMN metalake_meta.last_version IS 'metalake last version';
+COMMENT ON COLUMN metalake_meta.deleted_at IS 'metalake deleted at';
+
+
+CREATE TABLE IF NOT EXISTS catalog_meta (
+    catalog_id BIGINT NOT NULL,
+    catalog_name VARCHAR(128) NOT NULL,
+    metalake_id BIGINT NOT NULL,
+    type VARCHAR(64) NOT NULL,
+    provider VARCHAR(64) NOT NULL,
+    catalog_comment VARCHAR(256) DEFAULT '',
+    properties TEXT DEFAULT NULL,
+    audit_info TEXT NOT NULL,
+    current_version INT NOT NULL DEFAULT 1,
+    last_version INT NOT NULL DEFAULT 1,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (catalog_id),
+    UNIQUE (metalake_id, catalog_name, deleted_at)
+    );
+
+COMMENT ON TABLE catalog_meta IS 'catalog metadata';
+
+COMMENT ON COLUMN catalog_meta.catalog_id IS 'catalog id';
+COMMENT ON COLUMN catalog_meta.catalog_name IS 'catalog name';
+COMMENT ON COLUMN catalog_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN catalog_meta.type IS 'catalog type';
+COMMENT ON COLUMN catalog_meta.provider IS 'catalog provider';
+COMMENT ON COLUMN catalog_meta.catalog_comment IS 'catalog comment';
+COMMENT ON COLUMN catalog_meta.properties IS 'catalog properties';
+COMMENT ON COLUMN catalog_meta.audit_info IS 'catalog audit info';
+COMMENT ON COLUMN catalog_meta.current_version IS 'catalog current version';
+COMMENT ON COLUMN catalog_meta.last_version IS 'catalog last version';
+COMMENT ON COLUMN catalog_meta.deleted_at IS 'catalog deleted at';
+
+
+CREATE TABLE IF NOT EXISTS schema_meta (
+    schema_id BIGINT NOT NULL,
+    schema_name VARCHAR(128) NOT NULL,
+    metalake_id BIGINT NOT NULL,
+    catalog_id BIGINT NOT NULL,
+    schema_comment VARCHAR(256) DEFAULT '',
+    properties TEXT DEFAULT NULL,
+    audit_info TEXT NOT NULL,
+    current_version INT NOT NULL DEFAULT 1,
+    last_version INT NOT NULL DEFAULT 1,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (schema_id),
+    UNIQUE (catalog_id, schema_name, deleted_at)
+    );
+
+CREATE INDEX IF NOT EXISTS idx_metalake_id ON schema_meta (metalake_id);
+COMMENT ON TABLE schema_meta IS 'schema metadata';
+
+COMMENT ON COLUMN schema_meta.schema_id IS 'schema id';
+COMMENT ON COLUMN schema_meta.schema_name IS 'schema name';
+COMMENT ON COLUMN schema_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN schema_meta.catalog_id IS 'catalog id';
+COMMENT ON COLUMN schema_meta.schema_comment IS 'schema comment';
+COMMENT ON COLUMN schema_meta.properties IS 'schema properties';
+COMMENT ON COLUMN schema_meta.audit_info IS 'schema audit info';
+COMMENT ON COLUMN schema_meta.current_version IS 'schema current version';
+COMMENT ON COLUMN schema_meta.last_version IS 'schema last version';
+COMMENT ON COLUMN schema_meta.deleted_at IS 'schema deleted at';
+
+
+CREATE TABLE IF NOT EXISTS table_meta (
+    table_id BIGINT NOT NULL,
+    table_name VARCHAR(128) NOT NULL,
+    metalake_id BIGINT NOT NULL,
+    catalog_id BIGINT NOT NULL,
+    schema_id BIGINT NOT NULL,
+    audit_info TEXT NOT NULL,
+    current_version INT NOT NULL DEFAULT 1,
+    last_version INT NOT NULL DEFAULT 1,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (table_id),
+    UNIQUE (schema_id, table_name, deleted_at)
+    );
+
+CREATE INDEX IF NOT EXISTS idx_metalake_id ON table_meta (metalake_id);
+CREATE INDEX IF NOT EXISTS idx_catalog_id ON table_meta (catalog_id);
+COMMENT ON TABLE table_meta IS 'table metadata';
+
+COMMENT ON COLUMN table_meta.table_id IS 'table id';
+COMMENT ON COLUMN table_meta.table_name IS 'table name';
+COMMENT ON COLUMN table_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN table_meta.catalog_id IS 'catalog id';
+COMMENT ON COLUMN table_meta.schema_id IS 'schema id';
+COMMENT ON COLUMN table_meta.audit_info IS 'table audit info';
+COMMENT ON COLUMN table_meta.current_version IS 'table current version';
+COMMENT ON COLUMN table_meta.last_version IS 'table last version';
+COMMENT ON COLUMN table_meta.deleted_at IS 'table deleted at';
+
+
+CREATE TABLE IF NOT EXISTS fileset_meta (
+    fileset_id BIGINT NOT NULL,
+    fileset_name VARCHAR(128) NOT NULL,
+    metalake_id BIGINT NOT NULL,
+    catalog_id BIGINT NOT NULL,
+    schema_id BIGINT NOT NULL,
+    type VARCHAR(64) NOT NULL,
+    audit_info TEXT NOT NULL,
+    current_version INT NOT NULL DEFAULT 1,
+    last_version INT NOT NULL DEFAULT 1,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (fileset_id),
+    UNIQUE (schema_id, fileset_name, deleted_at)
+    );
+
+CREATE INDEX IF NOT EXISTS idx_metalake_id ON fileset_meta (metalake_id);
+CREATE INDEX IF NOT EXISTS idx_catalog_id ON fileset_meta (catalog_id);
+COMMENT ON TABLE fileset_meta IS 'fileset metadata';
+
+COMMENT ON COLUMN fileset_meta.fileset_id IS 'fileset id';
+COMMENT ON COLUMN fileset_meta.fileset_name IS 'fileset name';
+COMMENT ON COLUMN fileset_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN fileset_meta.catalog_id IS 'catalog id';
+COMMENT ON COLUMN fileset_meta.schema_id IS 'schema id';
+COMMENT ON COLUMN fileset_meta.type IS 'fileset type';
+COMMENT ON COLUMN fileset_meta.audit_info IS 'fileset audit info';
+COMMENT ON COLUMN fileset_meta.current_version IS 'fileset current version';
+COMMENT ON COLUMN fileset_meta.last_version IS 'fileset last version';
+COMMENT ON COLUMN fileset_meta.deleted_at IS 'fileset deleted at';
+
+
+CREATE TABLE IF NOT EXISTS fileset_version_info (
+    id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY,
+    metalake_id BIGINT NOT NULL,
+    catalog_id BIGINT NOT NULL,
+    schema_id BIGINT NOT NULL,
+    fileset_id BIGINT NOT NULL,
+    version INT NOT NULL,
+    fileset_comment VARCHAR(256) DEFAULT '',
+    properties TEXT DEFAULT NULL,
+    storage_location TEXT NOT NULL,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (id),
+    UNIQUE (fileset_id, version, deleted_at)
+    );
+
+CREATE INDEX IF NOT EXISTS idx_metalake_id ON fileset_version_info 
(metalake_id);
+CREATE INDEX IF NOT EXISTS idx_catalog_id ON fileset_version_info (catalog_id);
+CREATE INDEX IF NOT EXISTS idx_schema_id ON fileset_version_info (schema_id);
+COMMENT ON TABLE fileset_version_info IS 'fileset version information';
+
+COMMENT ON COLUMN fileset_version_info.id IS 'auto increment id';
+COMMENT ON COLUMN fileset_version_info.metalake_id IS 'metalake id';
+COMMENT ON COLUMN fileset_version_info.catalog_id IS 'catalog id';
+COMMENT ON COLUMN fileset_version_info.schema_id IS 'schema id';
+COMMENT ON COLUMN fileset_version_info.fileset_id IS 'fileset id';
+COMMENT ON COLUMN fileset_version_info.version IS 'fileset info version';
+COMMENT ON COLUMN fileset_version_info.fileset_comment IS 'fileset comment';
+COMMENT ON COLUMN fileset_version_info.properties IS 'fileset properties';
+COMMENT ON COLUMN fileset_version_info.storage_location IS 'fileset storage 
location';
+COMMENT ON COLUMN fileset_version_info.deleted_at IS 'fileset deleted at';
+
+
+CREATE TABLE IF NOT EXISTS topic_meta (
+    topic_id BIGINT NOT NULL,
+    topic_name VARCHAR(128) NOT NULL,
+    metalake_id BIGINT NOT NULL,
+    catalog_id BIGINT NOT NULL,
+    schema_id BIGINT NOT NULL,
+    comment VARCHAR(256) DEFAULT '',
+    properties TEXT DEFAULT NULL,
+    audit_info TEXT NOT NULL,
+    current_version INT NOT NULL DEFAULT 1,
+    last_version INT NOT NULL DEFAULT 1,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (topic_id),
+    UNIQUE (schema_id, topic_name, deleted_at)
+    );
+
+CREATE INDEX IF NOT EXISTS idx_metalake_id ON topic_meta (metalake_id);
+CREATE INDEX IF NOT EXISTS idx_catalog_id ON topic_meta (catalog_id);
+COMMENT ON TABLE topic_meta IS 'topic metadata';
+
+COMMENT ON COLUMN topic_meta.topic_id IS 'topic id';
+COMMENT ON COLUMN topic_meta.topic_name IS 'topic name';
+COMMENT ON COLUMN topic_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN topic_meta.catalog_id IS 'catalog id';
+COMMENT ON COLUMN topic_meta.schema_id IS 'schema id';
+COMMENT ON COLUMN topic_meta.comment IS 'topic comment';
+COMMENT ON COLUMN topic_meta.properties IS 'topic properties';
+COMMENT ON COLUMN topic_meta.audit_info IS 'topic audit info';
+COMMENT ON COLUMN topic_meta.current_version IS 'topic current version';
+COMMENT ON COLUMN topic_meta.last_version IS 'topic last version';
+COMMENT ON COLUMN topic_meta.deleted_at IS 'topic deleted at';
+
+
+CREATE TABLE IF NOT EXISTS user_meta (
+    user_id BIGINT NOT NULL,
+    user_name VARCHAR(128) NOT NULL,
+    metalake_id BIGINT NOT NULL,
+    audit_info TEXT NOT NULL,
+    current_version INT NOT NULL DEFAULT 1,
+    last_version INT NOT NULL DEFAULT 1,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (user_id),
+    UNIQUE (metalake_id, user_name, deleted_at)
+    );
+COMMENT ON TABLE user_meta IS 'user metadata';
+
+COMMENT ON COLUMN user_meta.user_id IS 'user id';
+COMMENT ON COLUMN user_meta.user_name IS 'username';
+COMMENT ON COLUMN user_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN user_meta.audit_info IS 'user audit info';
+COMMENT ON COLUMN user_meta.current_version IS 'user current version';
+COMMENT ON COLUMN user_meta.last_version IS 'user last version';
+COMMENT ON COLUMN user_meta.deleted_at IS 'user deleted at';
+
+CREATE TABLE IF NOT EXISTS role_meta (
+    role_id BIGINT NOT NULL,
+    role_name VARCHAR(128) NOT NULL,
+    metalake_id BIGINT NOT NULL,
+    properties TEXT DEFAULT NULL,
+    audit_info TEXT NOT NULL,
+    current_version INT NOT NULL DEFAULT 1,
+    last_version INT NOT NULL DEFAULT 1,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (role_id),
+    UNIQUE (metalake_id, role_name, deleted_at)
+    );
+
+COMMENT ON TABLE role_meta IS 'role metadata';
+
+COMMENT ON COLUMN role_meta.role_id IS 'role id';
+COMMENT ON COLUMN role_meta.role_name IS 'role name';
+COMMENT ON COLUMN role_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN role_meta.properties IS 'role properties';
+COMMENT ON COLUMN role_meta.audit_info IS 'role audit info';
+COMMENT ON COLUMN role_meta.current_version IS 'role current version';
+COMMENT ON COLUMN role_meta.last_version IS 'role last version';
+COMMENT ON COLUMN role_meta.deleted_at IS 'role deleted at';
+
+
+CREATE TABLE IF NOT EXISTS role_meta_securable_object (
+    id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY,
+    role_id BIGINT NOT NULL,
+    metadata_object_id BIGINT NOT NULL,
+    type  VARCHAR(128) NOT NULL,
+    privilege_names VARCHAR(256) NOT NULL,
+    privilege_conditions VARCHAR(256) NOT NULL,
+    current_version INT NOT NULL DEFAULT 1,
+    last_version INT NOT NULL DEFAULT 1,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (id)
+    );
+
+CREATE INDEX IF NOT EXISTS idx_role_id ON role_meta_securable_object (role_id);
+COMMENT ON TABLE role_meta_securable_object IS 'role to securable object 
relation metadata';
+
+COMMENT ON COLUMN role_meta_securable_object.id IS 'auto increment id';
+COMMENT ON COLUMN role_meta_securable_object.role_id IS 'role id';
+COMMENT ON COLUMN role_meta_securable_object.metadata_object_id IS 'The entity 
id of securable object';
+COMMENT ON COLUMN role_meta_securable_object.type IS 'securable object type';
+COMMENT ON COLUMN role_meta_securable_object.privilege_names IS 'securable 
object privilege names';
+COMMENT ON COLUMN role_meta_securable_object.privilege_conditions IS 
'securable object privilege conditions';
+COMMENT ON COLUMN role_meta_securable_object.current_version IS 'securable 
object current version';
+COMMENT ON COLUMN role_meta_securable_object.last_version IS 'securable object 
last version';
+COMMENT ON COLUMN role_meta_securable_object.deleted_at IS 'securable object 
deleted at';
+
+
+CREATE TABLE IF NOT EXISTS user_role_rel (
+    id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY,
+    user_id BIGINT NOT NULL,
+    role_id BIGINT NOT NULL,
+    audit_info TEXT NOT NULL,
+    current_version INT NOT NULL DEFAULT 1,
+    last_version INT NOT NULL DEFAULT 1,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (id),
+    UNIQUE (user_id, role_id, deleted_at)
+    );
+
+CREATE INDEX IF NOT EXISTS idx_user_id ON user_role_rel (user_id);
+COMMENT ON TABLE user_role_rel IS 'user role relation metadata';
+
+COMMENT ON COLUMN user_role_rel.id IS 'auto increment id';
+COMMENT ON COLUMN user_role_rel.user_id IS 'user id';
+COMMENT ON COLUMN user_role_rel.role_id IS 'role id';
+COMMENT ON COLUMN user_role_rel.audit_info IS 'relation audit info';
+COMMENT ON COLUMN user_role_rel.current_version IS 'relation current version';
+COMMENT ON COLUMN user_role_rel.last_version IS 'relation last version';
+COMMENT ON COLUMN user_role_rel.deleted_at IS 'relation deleted at';
+
+
+CREATE TABLE IF NOT EXISTS group_meta (
+    group_id BIGINT NOT NULL,
+    group_name VARCHAR(128) NOT NULL,
+    metalake_id BIGINT NOT NULL,
+    audit_info TEXT NOT NULL,
+    current_version INT NOT NULL DEFAULT 1,
+    last_version INT NOT NULL DEFAULT 1,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (group_id),
+    UNIQUE (metalake_id, group_name, deleted_at)
+    );
+COMMENT ON TABLE group_meta IS 'group metadata';
+
+COMMENT ON COLUMN group_meta.group_id IS 'group id';
+COMMENT ON COLUMN group_meta.group_name IS 'group name';
+COMMENT ON COLUMN group_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN group_meta.audit_info IS 'group audit info';
+COMMENT ON COLUMN group_meta.current_version IS 'group current version';
+COMMENT ON COLUMN group_meta.last_version IS 'group last version';
+COMMENT ON COLUMN group_meta.deleted_at IS 'group deleted at';
+
+
+CREATE TABLE IF NOT EXISTS group_role_rel (
+    id BIGSERIAL NOT NULL,
+    group_id BIGINT NOT NULL,
+    role_id BIGINT NOT NULL,
+    audit_info TEXT NOT NULL,
+    current_version INT NOT NULL DEFAULT 1,
+    last_version INT NOT NULL DEFAULT 1,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (id),
+    UNIQUE (group_id, role_id, deleted_at)
+    );
+
+CREATE INDEX IF NOT EXISTS idx_group_id ON group_role_rel (group_id);
+COMMENT ON TABLE group_role_rel IS 'relation between group and role';
+COMMENT ON COLUMN group_role_rel.id IS 'auto increment id';
+COMMENT ON COLUMN group_role_rel.group_id IS 'group id';
+COMMENT ON COLUMN group_role_rel.role_id IS 'role id';
+COMMENT ON COLUMN group_role_rel.audit_info IS 'relation audit info';
+COMMENT ON COLUMN group_role_rel.current_version IS 'relation current version';
+COMMENT ON COLUMN group_role_rel.last_version IS 'relation last version';
+COMMENT ON COLUMN group_role_rel.deleted_at IS 'relation deleted at';
+
+CREATE TABLE IF NOT EXISTS tag_meta (
+    tag_id BIGINT NOT NULL,
+    tag_name VARCHAR(128) NOT NULL,
+    metalake_id BIGINT NOT NULL,
+    tag_comment VARCHAR(256) DEFAULT '',
+    properties TEXT DEFAULT NULL,
+    audit_info TEXT NOT NULL,
+    current_version INT NOT NULL DEFAULT 1,
+    last_version INT NOT NULL DEFAULT 1,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (tag_id),
+    UNIQUE (metalake_id, tag_name, deleted_at)
+    );
+
+COMMENT ON TABLE tag_meta IS 'tag metadata';
+
+COMMENT ON COLUMN tag_meta.tag_id IS 'tag id';
+COMMENT ON COLUMN tag_meta.tag_name IS 'tag name';
+COMMENT ON COLUMN tag_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN tag_meta.tag_comment IS 'tag comment';
+COMMENT ON COLUMN tag_meta.properties IS 'tag properties';
+COMMENT ON COLUMN tag_meta.audit_info IS 'tag audit info';
+
+
+CREATE TABLE IF NOT EXISTS tag_relation_meta (
+    id BIGINT GENERATED BY DEFAULT AS IDENTITY,
+    tag_id BIGINT NOT NULL,
+    metadata_object_id BIGINT NOT NULL,
+    metadata_object_type VARCHAR(64) NOT NULL,
+    audit_info TEXT NOT NULL,
+    current_version INT NOT NULL DEFAULT 1,
+    last_version INT NOT NULL DEFAULT 1,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (id),
+    UNIQUE (tag_id, metadata_object_id, metadata_object_type, deleted_at)
+    );
+
+CREATE INDEX IF NOT EXISTS idx_tag_id ON tag_relation_meta (tag_id);
+CREATE INDEX IF NOT EXISTS idx_metadata_object_id ON tag_relation_meta 
(metadata_object_id);
+COMMENT ON TABLE tag_relation_meta IS 'tag metadata object relation';
+COMMENT ON COLUMN tag_relation_meta.id IS 'auto increment id';
+COMMENT ON COLUMN tag_relation_meta.tag_id IS 'tag id';
+COMMENT ON COLUMN tag_relation_meta.metadata_object_id IS 'metadata object id';
+COMMENT ON COLUMN tag_relation_meta.metadata_object_type IS 'metadata object 
type';
+COMMENT ON COLUMN tag_relation_meta.audit_info IS 'tag relation audit info';
+COMMENT ON COLUMN tag_relation_meta.current_version IS 'tag relation current 
version';
+COMMENT ON COLUMN tag_relation_meta.last_version IS 'tag relation last 
version';
+COMMENT ON COLUMN tag_relation_meta.deleted_at IS 'tag relation deleted at';
+
+CREATE TABLE IF NOT EXISTS owner_meta (
+    id BIGINT GENERATED BY DEFAULT AS IDENTITY,
+    metalake_id BIGINT NOT NULL,
+    owner_id BIGINT NOT NULL,
+    owner_type VARCHAR(64) NOT NULL,
+    metadata_object_id BIGINT NOT NULL,
+    metadata_object_type VARCHAR(64) NOT NULL,
+    audit_info TEXT NOT NULL,
+    current_version INT NOT NULL DEFAULT 1,
+    last_version INT NOT NULL DEFAULT 1,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (id),
+    UNIQUE (owner_id, metadata_object_id, metadata_object_type, deleted_at)
+    );
+
+CREATE INDEX IF NOT EXISTS idx_owner_id ON owner_meta (owner_id);
+CREATE INDEX IF NOT EXISTS idx_metadata_object_id ON owner_meta 
(metadata_object_id);
+COMMENT ON TABLE owner_meta IS 'owner relation';
+COMMENT ON COLUMN owner_meta.id IS 'auto increment id';
+COMMENT ON COLUMN owner_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN owner_meta.owner_id IS 'owner id';
+COMMENT ON COLUMN owner_meta.owner_type IS 'owner type';
+COMMENT ON COLUMN owner_meta.metadata_object_id IS 'metadata object id';
+COMMENT ON COLUMN owner_meta.metadata_object_type IS 'metadata object type';
+COMMENT ON COLUMN owner_meta.audit_info IS 'owner relation audit info';
+COMMENT ON COLUMN owner_meta.current_version IS 'owner relation current 
version';
+COMMENT ON COLUMN owner_meta.last_version IS 'owner relation last version';
+COMMENT ON COLUMN owner_meta.deleted_at IS 'owner relation deleted at';
+

Reply via email to