This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 4fe9d847ec [#9528] feat(storage): support function management 
(Storage) (part-2) (#9824)
4fe9d847ec is described below

commit 4fe9d847ecfd57e3f552b9bf37cad5fd09a0cd76
Author: mchades <[email protected]>
AuthorDate: Mon Feb 2 14:05:25 2026 +0800

    [#9528] feat(storage): support function management (Storage) (part-2) 
(#9824)
    
    ### What changes were proposed in this pull request?
    
    Implement the storage layer for Function management.
    - Added FunctionMetaMapper, FunctionVersionMetaMapper.
    - Added SQL Provider Factories and Base implementations.
    - Added PostgreSQL specific providers.
    - Updated DefaultMapperPackageProvider to include function mappers.
    
    ### Why are the changes needed?
    
    To provide the persistence layer capability for storing and retrieving
    function metadata in relational databases (PostgreSQL/MySQL/H2).
    
    Fix: #9528
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Verified via Unit Tests in the Service layer (subsequent commit).
---
 .../relational/mapper/FunctionMetaMapper.java      | 134 ++++++++++
 .../mapper/FunctionMetaSQLProviderFactory.java     | 114 ++++++++
 .../mapper/FunctionVersionMetaMapper.java          |  40 +++
 .../FunctionVersionMetaSQLProviderFactory.java     | 100 +++++++
 .../provider/DefaultMapperPackageProvider.java     |   4 +
 .../provider/base/FunctionMetaBaseSQLProvider.java | 291 +++++++++++++++++++++
 .../base/FunctionVersionMetaBaseSQLProvider.java   | 118 +++++++++
 .../provider/base/TableMetaBaseSQLProvider.java    |  12 +-
 .../postgresql/FunctionMetaPostgreSQLProvider.java | 173 ++++++++++++
 .../FunctionVersionMetaPostgreSQLProvider.java     | 102 ++++++++
 10 files changed, 1084 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaMapper.java
new file mode 100644
index 0000000000..2314069fce
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaMapper.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.po.FunctionPO;
+import org.apache.gravitino.storage.relational.po.FunctionVersionPO;
+import org.apache.ibatis.annotations.InsertProvider;
+import org.apache.ibatis.annotations.One;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.Result;
+import org.apache.ibatis.annotations.ResultMap;
+import org.apache.ibatis.annotations.Results;
+import org.apache.ibatis.annotations.Select;
+import org.apache.ibatis.annotations.SelectProvider;
+
+/** A MyBatis Mapper for function metadata operation SQLs. */
+public interface FunctionMetaMapper {
+  String TABLE_NAME = "function_meta";
+  String VERSION_TABLE_NAME = "function_version_info";
+
+  @Results(
+      id = "mapToFunctionVersionPO",
+      value = {
+        @Result(property = "id", column = "id", id = true),
+        @Result(property = "metalakeId", column = "version_metalake_id"),
+        @Result(property = "catalogId", column = "version_catalog_id"),
+        @Result(property = "schemaId", column = "version_schema_id"),
+        @Result(property = "functionId", column = "version_function_id"),
+        @Result(property = "functionVersion", column = "version"),
+        @Result(property = "functionComment", column = "function_comment"),
+        @Result(property = "definitions", column = "definitions"),
+        @Result(property = "auditInfo", column = "version_audit_info"),
+        @Result(property = "deletedAt", column = "version_deleted_at")
+      })
+  @Select("SELECT 1") // Dummy SQL to avoid MyBatis error, never be executed
+  FunctionVersionPO mapToFunctionVersionPO();
+
+  @InsertProvider(type = FunctionMetaSQLProviderFactory.class, method = 
"insertFunctionMeta")
+  void insertFunctionMeta(@Param("functionMeta") FunctionPO functionPO);
+
+  @InsertProvider(
+      type = FunctionMetaSQLProviderFactory.class,
+      method = "insertFunctionMetaOnDuplicateKeyUpdate")
+  void insertFunctionMetaOnDuplicateKeyUpdate(@Param("functionMeta") 
FunctionPO functionPO);
+
+  @Results({
+    @Result(property = "functionId", column = "function_id", id = true),
+    @Result(property = "functionName", column = "function_name"),
+    @Result(property = "metalakeId", column = "metalake_id"),
+    @Result(property = "catalogId", column = "catalog_id"),
+    @Result(property = "schemaId", column = "schema_id"),
+    @Result(property = "functionType", column = "function_type"),
+    @Result(property = "deterministic", column = "deterministic"),
+    @Result(property = "returnType", column = "return_type"),
+    @Result(property = "functionCurrentVersion", column = 
"function_current_version"),
+    @Result(property = "functionLatestVersion", column = 
"function_latest_version"),
+    @Result(property = "auditInfo", column = "audit_info"),
+    @Result(property = "deletedAt", column = "deleted_at"),
+    @Result(
+        property = "functionVersionPO",
+        javaType = FunctionVersionPO.class,
+        column =
+            
"{id,version_metalake_id,version_catalog_id,version_schema_id,version_function_id,"
+                + 
"version,function_comment,definitions,version_audit_info,version_deleted_at}",
+        one = @One(resultMap = "mapToFunctionVersionPO"))
+  })
+  @SelectProvider(type = FunctionMetaSQLProviderFactory.class, method = 
"listFunctionPOsBySchemaId")
+  List<FunctionPO> listFunctionPOsBySchemaId(@Param("schemaId") Long schemaId);
+
+  @Results(
+      id = "functionPOResultMap",
+      value = {
+        @Result(property = "functionId", column = "function_id", id = true),
+        @Result(property = "functionName", column = "function_name"),
+        @Result(property = "metalakeId", column = "metalake_id"),
+        @Result(property = "catalogId", column = "catalog_id"),
+        @Result(property = "schemaId", column = "schema_id"),
+        @Result(property = "functionType", column = "function_type"),
+        @Result(property = "deterministic", column = "deterministic"),
+        @Result(property = "returnType", column = "return_type"),
+        @Result(property = "functionCurrentVersion", column = 
"function_current_version"),
+        @Result(property = "functionLatestVersion", column = 
"function_latest_version"),
+        @Result(property = "auditInfo", column = "audit_info"),
+        @Result(property = "deletedAt", column = "deleted_at"),
+        @Result(
+            property = "functionVersionPO",
+            javaType = FunctionVersionPO.class,
+            column =
+                
"{id,version_metalake_id,version_catalog_id,version_schema_id,version_function_id,"
+                    + 
"version,function_comment,definitions,version_audit_info,version_deleted_at}",
+            one = @One(resultMap = "mapToFunctionVersionPO"))
+      })
+  @SelectProvider(
+      type = FunctionMetaSQLProviderFactory.class,
+      method = "listFunctionPOsByFullQualifiedName")
+  List<FunctionPO> listFunctionPOsByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName);
+
+  @ResultMap("functionPOResultMap")
+  @SelectProvider(
+      type = FunctionMetaSQLProviderFactory.class,
+      method = "selectFunctionMetaByFullQualifiedName")
+  FunctionPO selectFunctionMetaByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("functionName") String functionName);
+
+  @ResultMap("functionPOResultMap")
+  @SelectProvider(
+      type = FunctionMetaSQLProviderFactory.class,
+      method = "selectFunctionMetaBySchemaIdAndName")
+  FunctionPO selectFunctionMetaBySchemaIdAndName(
+      @Param("schemaId") Long schemaId, @Param("functionName") String 
functionName);
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaSQLProviderFactory.java
new file mode 100644
index 0000000000..239a19c044
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaSQLProviderFactory.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.FunctionMetaBaseSQLProvider;
+import 
org.apache.gravitino.storage.relational.mapper.provider.postgresql.FunctionMetaPostgreSQLProvider;
+import org.apache.gravitino.storage.relational.po.FunctionPO;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.annotations.Param;
+
+public class FunctionMetaSQLProviderFactory {
+
+  static class FunctionMetaMySQLProvider extends FunctionMetaBaseSQLProvider {}
+
+  static class FunctionMetaH2Provider extends FunctionMetaBaseSQLProvider {}
+
+  private static final Map<JDBCBackendType, FunctionMetaBaseSQLProvider>
+      FUNCTION_META_SQL_PROVIDER_MAP =
+          ImmutableMap.of(
+              JDBCBackendType.MYSQL, new FunctionMetaMySQLProvider(),
+              JDBCBackendType.H2, new FunctionMetaH2Provider(),
+              JDBCBackendType.POSTGRESQL, new 
FunctionMetaPostgreSQLProvider());
+
+  public static FunctionMetaBaseSQLProvider getProvider() {
+    String databaseId =
+        SqlSessionFactoryHelper.getInstance()
+            .getSqlSessionFactory()
+            .getConfiguration()
+            .getDatabaseId();
+
+    JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId);
+    return FUNCTION_META_SQL_PROVIDER_MAP.get(jdbcBackendType);
+  }
+
+  public static String insertFunctionMeta(@Param("functionMeta") FunctionPO 
functionPO) {
+    return getProvider().insertFunctionMeta(functionPO);
+  }
+
+  public static String insertFunctionMetaOnDuplicateKeyUpdate(
+      @Param("functionMeta") FunctionPO functionPO) {
+    return getProvider().insertFunctionMetaOnDuplicateKeyUpdate(functionPO);
+  }
+
+  public static String listFunctionPOsBySchemaId(@Param("schemaId") Long 
schemaId) {
+    return getProvider().listFunctionPOsBySchemaId(schemaId);
+  }
+
+  public static String listFunctionPOsByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName) {
+    return getProvider().listFunctionPOsByFullQualifiedName(metalakeName, 
catalogName, schemaName);
+  }
+
+  public static String selectFunctionMetaByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("functionName") String functionName) {
+    return getProvider()
+        .selectFunctionMetaByFullQualifiedName(metalakeName, catalogName, 
schemaName, functionName);
+  }
+
+  public static String selectFunctionMetaBySchemaIdAndName(
+      @Param("schemaId") Long schemaId, @Param("functionName") String 
functionName) {
+    return getProvider().selectFunctionMetaBySchemaIdAndName(schemaId, 
functionName);
+  }
+
+  public static String softDeleteFunctionMetaByFunctionId(@Param("functionId") 
Long functionId) {
+    return getProvider().softDeleteFunctionMetaByFunctionId(functionId);
+  }
+
+  public static String softDeleteFunctionMetasByCatalogId(@Param("catalogId") 
Long catalogId) {
+    return getProvider().softDeleteFunctionMetasByCatalogId(catalogId);
+  }
+
+  public static String 
softDeleteFunctionMetasByMetalakeId(@Param("metalakeId") Long metalakeId) {
+    return getProvider().softDeleteFunctionMetasByMetalakeId(metalakeId);
+  }
+
+  public static String softDeleteFunctionMetasBySchemaId(@Param("schemaId") 
Long schemaId) {
+    return getProvider().softDeleteFunctionMetasBySchemaId(schemaId);
+  }
+
+  public static String deleteFunctionMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return getProvider().deleteFunctionMetasByLegacyTimeline(legacyTimeline, 
limit);
+  }
+
+  public static String updateFunctionMeta(
+      @Param("newFunctionMeta") FunctionPO newFunctionPO,
+      @Param("oldFunctionMeta") FunctionPO oldFunctionPO) {
+    return getProvider().updateFunctionMeta(newFunctionPO, oldFunctionPO);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionVersionMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionVersionMetaMapper.java
new file mode 100644
index 0000000000..d258494eea
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionVersionMetaMapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import org.apache.gravitino.storage.relational.po.FunctionVersionPO;
+import org.apache.ibatis.annotations.InsertProvider;
+import org.apache.ibatis.annotations.Param;
+
+/** A MyBatis Mapper for function version metadata operation SQLs. */
+public interface FunctionVersionMetaMapper {
+
+  String TABLE_NAME = "function_version_info";
+
+  @InsertProvider(
+      type = FunctionVersionMetaSQLProviderFactory.class,
+      method = "insertFunctionVersionMeta")
+  void insertFunctionVersionMeta(@Param("functionVersionMeta") 
FunctionVersionPO functionVersionPO);
+
+  @InsertProvider(
+      type = FunctionVersionMetaSQLProviderFactory.class,
+      method = "insertFunctionVersionMetaOnDuplicateKeyUpdate")
+  void insertFunctionVersionMetaOnDuplicateKeyUpdate(
+      @Param("functionVersionMeta") FunctionVersionPO functionVersionPO);
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionVersionMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionVersionMetaSQLProviderFactory.java
new file mode 100644
index 0000000000..6e8de28a78
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionVersionMetaSQLProviderFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.FunctionVersionMetaBaseSQLProvider;
+import 
org.apache.gravitino.storage.relational.mapper.provider.postgresql.FunctionVersionMetaPostgreSQLProvider;
+import org.apache.gravitino.storage.relational.po.FunctionVersionPO;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.annotations.Param;
+
+public class FunctionVersionMetaSQLProviderFactory {
+
+  static class FunctionVersionMetaMySQLProvider extends 
FunctionVersionMetaBaseSQLProvider {}
+
+  static class FunctionVersionMetaH2Provider extends 
FunctionVersionMetaBaseSQLProvider {}
+
+  private static final Map<JDBCBackendType, FunctionVersionMetaBaseSQLProvider>
+      FUNCTION_VERSION_META_SQL_PROVIDER_MAP =
+          ImmutableMap.of(
+              JDBCBackendType.MYSQL, new FunctionVersionMetaMySQLProvider(),
+              JDBCBackendType.H2, new FunctionVersionMetaH2Provider(),
+              JDBCBackendType.POSTGRESQL, new 
FunctionVersionMetaPostgreSQLProvider());
+
+  public static FunctionVersionMetaBaseSQLProvider getProvider() {
+    String databaseId =
+        SqlSessionFactoryHelper.getInstance()
+            .getSqlSessionFactory()
+            .getConfiguration()
+            .getDatabaseId();
+
+    JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId);
+    return FUNCTION_VERSION_META_SQL_PROVIDER_MAP.get(jdbcBackendType);
+  }
+
+  public static String insertFunctionVersionMeta(
+      @Param("functionVersionMeta") FunctionVersionPO functionVersionPO) {
+    return getProvider().insertFunctionVersionMeta(functionVersionPO);
+  }
+
+  public static String insertFunctionVersionMetaOnDuplicateKeyUpdate(
+      @Param("functionVersionMeta") FunctionVersionPO functionVersionPO) {
+    return 
getProvider().insertFunctionVersionMetaOnDuplicateKeyUpdate(functionVersionPO);
+  }
+
+  public static String 
softDeleteFunctionVersionMetasBySchemaId(@Param("schemaId") Long schemaId) {
+    return getProvider().softDeleteFunctionVersionMetasBySchemaId(schemaId);
+  }
+
+  public static String softDeleteFunctionVersionMetasByCatalogId(
+      @Param("catalogId") Long catalogId) {
+    return getProvider().softDeleteFunctionVersionMetasByCatalogId(catalogId);
+  }
+
+  public static String softDeleteFunctionVersionMetasByMetalakeId(
+      @Param("metalakeId") Long metalakeId) {
+    return 
getProvider().softDeleteFunctionVersionMetasByMetalakeId(metalakeId);
+  }
+
+  public static String deleteFunctionVersionMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return 
getProvider().deleteFunctionVersionMetasByLegacyTimeline(legacyTimeline, limit);
+  }
+
+  public static String selectFunctionVersionsByRetentionCount(
+      @Param("versionRetentionCount") Long versionRetentionCount) {
+    return 
getProvider().selectFunctionVersionsByRetentionCount(versionRetentionCount);
+  }
+
+  public static String softDeleteFunctionVersionsByRetentionLine(
+      @Param("functionId") Long functionId,
+      @Param("versionRetentionLine") long versionRetentionLine,
+      @Param("limit") int limit) {
+    return getProvider()
+        .softDeleteFunctionVersionsByRetentionLine(functionId, 
versionRetentionLine, limit);
+  }
+
+  public static String softDeleteFunctionVersionsByFunctionId(
+      @Param("functionId") Long functionId) {
+    return getProvider().softDeleteFunctionVersionsByFunctionId(functionId);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/DefaultMapperPackageProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/DefaultMapperPackageProvider.java
index aaf22ccda8..497f751165 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/DefaultMapperPackageProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/DefaultMapperPackageProvider.java
@@ -23,6 +23,8 @@ import java.util.List;
 import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper;
+import org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.FunctionVersionMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.GroupMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.GroupRoleRelMapper;
 import org.apache.gravitino.storage.relational.mapper.JobMetaMapper;
@@ -57,6 +59,8 @@ public class DefaultMapperPackageProvider implements 
MapperPackageProvider {
         CatalogMetaMapper.class,
         FilesetMetaMapper.class,
         FilesetVersionMapper.class,
+        FunctionMetaMapper.class,
+        FunctionVersionMetaMapper.class,
         GroupMetaMapper.class,
         GroupRoleRelMapper.class,
         JobMetaMapper.class,
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FunctionMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FunctionMetaBaseSQLProvider.java
new file mode 100644
index 0000000000..772b76cdc5
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FunctionMetaBaseSQLProvider.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import static 
org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper.TABLE_NAME;
+import static 
org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper.VERSION_TABLE_NAME;
+
+import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
+import org.apache.gravitino.storage.relational.po.FunctionPO;
+import org.apache.ibatis.annotations.Param;
+
+public class FunctionMetaBaseSQLProvider {
+
+  public String listFunctionPOsByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName) {
+    return """
+        SELECT
+            mm.metalake_id,
+            cm.catalog_id,
+            sm.schema_id,
+            fm.function_id,
+            fm.function_name,
+            fm.function_type,
+            fm.deterministic,
+            fm.return_type,
+            fm.function_current_version,
+            fm.function_latest_version,
+            fm.audit_info,
+            fm.deleted_at,
+            vi.id,
+            vi.metalake_id as version_metalake_id,
+            vi.catalog_id as version_catalog_id,
+            vi.schema_id as version_schema_id,
+            vi.function_id as version_function_id,
+            vi.version,
+            vi.function_comment,
+            vi.definitions,
+            vi.audit_info as version_audit_info,
+            vi.deleted_at as version_deleted_at
+        FROM
+            %s mm
+        INNER JOIN
+            %s cm ON mm.metalake_id = cm.metalake_id
+            AND cm.catalog_name = #{catalogName}
+            AND cm.deleted_at = 0
+        LEFT JOIN
+            %s sm ON cm.catalog_id = sm.catalog_id
+            AND sm.schema_name = #{schemaName}
+            AND sm.deleted_at = 0
+        LEFT JOIN
+            %s fm ON sm.schema_id = fm.schema_id
+            AND fm.deleted_at = 0
+        LEFT JOIN
+            %s vi ON fm.function_id = vi.function_id
+            AND fm.function_current_version = vi.version
+            AND vi.deleted_at = 0
+        WHERE
+            mm.metalake_name = #{metalakeName}
+            AND mm.deleted_at = 0
+        """
+        .formatted(
+            MetalakeMetaMapper.TABLE_NAME,
+            CatalogMetaMapper.TABLE_NAME,
+            SchemaMetaMapper.TABLE_NAME,
+            TABLE_NAME,
+            VERSION_TABLE_NAME);
+  }
+
+  public String insertFunctionMeta(@Param("functionMeta") FunctionPO 
functionPO) {
+    return "INSERT INTO "
+        + TABLE_NAME
+        + " (function_id, function_name, metalake_id, catalog_id, schema_id,"
+        + " function_type, `deterministic`, return_type, 
function_current_version,"
+        + " function_latest_version, audit_info, deleted_at)"
+        + " VALUES (#{functionMeta.functionId}, #{functionMeta.functionName},"
+        + " #{functionMeta.metalakeId}, #{functionMeta.catalogId}, 
#{functionMeta.schemaId},"
+        + " #{functionMeta.functionType}, #{functionMeta.deterministic},"
+        + " #{functionMeta.returnType}, 
#{functionMeta.functionCurrentVersion},"
+        + " #{functionMeta.functionLatestVersion}, #{functionMeta.auditInfo},"
+        + " #{functionMeta.deletedAt})";
+  }
+
+  public String insertFunctionMetaOnDuplicateKeyUpdate(
+      @Param("functionMeta") FunctionPO functionPO) {
+    return "INSERT INTO "
+        + TABLE_NAME
+        + " (function_id, function_name, metalake_id, catalog_id, schema_id,"
+        + " function_type, `deterministic`, return_type,"
+        + " function_current_version, function_latest_version, audit_info, 
deleted_at)"
+        + " VALUES (#{functionMeta.functionId}, #{functionMeta.functionName},"
+        + " #{functionMeta.metalakeId}, #{functionMeta.catalogId}, 
#{functionMeta.schemaId},"
+        + " #{functionMeta.functionType}, #{functionMeta.deterministic},"
+        + " #{functionMeta.returnType}, 
#{functionMeta.functionCurrentVersion},"
+        + " #{functionMeta.functionLatestVersion}, #{functionMeta.auditInfo},"
+        + " #{functionMeta.deletedAt})"
+        + " ON DUPLICATE KEY UPDATE"
+        + " function_name = #{functionMeta.functionName},"
+        + " metalake_id = #{functionMeta.metalakeId},"
+        + " catalog_id = #{functionMeta.catalogId},"
+        + " schema_id = #{functionMeta.schemaId},"
+        + " function_type = #{functionMeta.functionType},"
+        + " `deterministic` = #{functionMeta.deterministic},"
+        + " return_type = #{functionMeta.returnType},"
+        + " function_current_version = #{functionMeta.functionCurrentVersion},"
+        + " function_latest_version = #{functionMeta.functionLatestVersion},"
+        + " audit_info = #{functionMeta.auditInfo},"
+        + " deleted_at = #{functionMeta.deletedAt}";
+  }
+
+  public String selectFunctionMetaByFullQualifiedName(
+      @Param("metalakeName") String metalakeName,
+      @Param("catalogName") String catalogName,
+      @Param("schemaName") String schemaName,
+      @Param("functionName") String functionName) {
+    return """
+        SELECT
+            mm.metalake_id,
+            cm.catalog_id,
+            sm.schema_id,
+            fm.function_id,
+            fm.function_name,
+            fm.function_type,
+            fm.deterministic,
+            fm.return_type,
+            fm.function_current_version,
+            fm.function_latest_version,
+            fm.audit_info,
+            fm.deleted_at,
+            vi.id,
+            vi.metalake_id as version_metalake_id,
+            vi.catalog_id as version_catalog_id,
+            vi.schema_id as version_schema_id,
+            vi.function_id as version_function_id,
+            vi.version,
+            vi.function_comment,
+            vi.definitions,
+            vi.audit_info as version_audit_info,
+            vi.deleted_at as version_deleted_at
+        FROM
+            %s mm
+        INNER JOIN
+            %s cm ON mm.metalake_id = cm.metalake_id
+            AND cm.catalog_name = #{catalogName}
+            AND cm.deleted_at = 0
+        LEFT JOIN
+            %s sm ON cm.catalog_id = sm.catalog_id
+            AND sm.schema_name = #{schemaName}
+            AND sm.deleted_at = 0
+        LEFT JOIN
+            %s fm ON sm.schema_id = fm.schema_id
+            AND fm.function_name = #{functionName}
+            AND fm.deleted_at = 0
+        INNER JOIN
+            %s vi ON fm.function_id = vi.function_id
+            AND fm.function_current_version = vi.version
+            AND vi.deleted_at = 0
+        WHERE
+            mm.metalake_name = #{metalakeName}
+            AND mm.deleted_at = 0
+        """
+        .formatted(
+            MetalakeMetaMapper.TABLE_NAME,
+            CatalogMetaMapper.TABLE_NAME,
+            SchemaMetaMapper.TABLE_NAME,
+            TABLE_NAME,
+            VERSION_TABLE_NAME);
+  }
+
+  public String listFunctionPOsBySchemaId(@Param("schemaId") Long schemaId) {
+    return "SELECT fm.function_id, fm.function_name, fm.metalake_id, 
fm.catalog_id, fm.schema_id,"
+        + " fm.function_type, fm.`deterministic`, fm.return_type,"
+        + " fm.function_current_version, fm.function_latest_version,"
+        + " fm.audit_info, fm.deleted_at,"
+        + " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as 
version_catalog_id,"
+        + " vi.schema_id as version_schema_id, vi.function_id as 
version_function_id,"
+        + " vi.version, vi.function_comment, vi.definitions,"
+        + " vi.audit_info as version_audit_info, vi.deleted_at as 
version_deleted_at"
+        + " FROM "
+        + TABLE_NAME
+        + " fm INNER JOIN "
+        + VERSION_TABLE_NAME
+        + " vi ON fm.function_id = vi.function_id AND 
fm.function_current_version = vi.version"
+        + " WHERE fm.schema_id = #{schemaId} AND fm.deleted_at = 0 AND 
vi.deleted_at = 0";
+  }
+
+  public String selectFunctionMetaBySchemaIdAndName(
+      @Param("schemaId") Long schemaId, @Param("functionName") String 
functionName) {
+    return "SELECT fm.function_id, fm.function_name, fm.metalake_id, 
fm.catalog_id, fm.schema_id,"
+        + " fm.function_type, fm.`deterministic`, fm.return_type,"
+        + " fm.function_current_version, fm.function_latest_version,"
+        + " fm.audit_info, fm.deleted_at,"
+        + " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as 
version_catalog_id,"
+        + " vi.schema_id as version_schema_id, vi.function_id as 
version_function_id,"
+        + " vi.version, vi.function_comment, vi.definitions,"
+        + " vi.audit_info as version_audit_info, vi.deleted_at as 
version_deleted_at"
+        + " FROM "
+        + TABLE_NAME
+        + " fm INNER JOIN "
+        + VERSION_TABLE_NAME
+        + " vi ON fm.function_id = vi.function_id AND 
fm.function_current_version = vi.version"
+        + " WHERE fm.schema_id = #{schemaId} AND fm.function_name = 
#{functionName}"
+        + " AND fm.deleted_at = 0 AND vi.deleted_at = 0";
+  }
+
+  public String softDeleteFunctionMetaByFunctionId(@Param("functionId") Long 
functionId) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE function_id = #{functionId} AND deleted_at = 0";
+  }
+
+  public String softDeleteFunctionMetasByCatalogId(@Param("catalogId") Long 
catalogId) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
+  }
+
+  public String softDeleteFunctionMetasByMetalakeId(@Param("metalakeId") Long 
metalakeId) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  public String softDeleteFunctionMetasBySchemaId(@Param("schemaId") Long 
schemaId) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
+  }
+
+  public String deleteFunctionMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return "DELETE FROM "
+        + TABLE_NAME
+        + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit}";
+  }
+
+  public String updateFunctionMeta(
+      @Param("newFunctionMeta") FunctionPO newFunctionPO,
+      @Param("oldFunctionMeta") FunctionPO oldFunctionPO) {
+    return "UPDATE "
+        + TABLE_NAME
+        + " SET function_name = #{newFunctionMeta.functionName},"
+        + " metalake_id = #{newFunctionMeta.metalakeId},"
+        + " catalog_id = #{newFunctionMeta.catalogId},"
+        + " schema_id = #{newFunctionMeta.schemaId},"
+        + " function_type = #{newFunctionMeta.functionType},"
+        + " `deterministic` = #{newFunctionMeta.deterministic},"
+        + " return_type = #{newFunctionMeta.returnType},"
+        + " function_current_version = 
#{newFunctionMeta.functionCurrentVersion},"
+        + " function_latest_version = 
#{newFunctionMeta.functionLatestVersion},"
+        + " audit_info = #{newFunctionMeta.auditInfo},"
+        + " deleted_at = #{newFunctionMeta.deletedAt}"
+        + " WHERE function_id = #{oldFunctionMeta.functionId}"
+        + " AND function_name = #{oldFunctionMeta.functionName}"
+        + " AND metalake_id = #{oldFunctionMeta.metalakeId}"
+        + " AND catalog_id = #{oldFunctionMeta.catalogId}"
+        + " AND schema_id = #{oldFunctionMeta.schemaId}"
+        + " AND function_type = #{oldFunctionMeta.functionType}"
+        + " AND function_current_version = 
#{oldFunctionMeta.functionCurrentVersion}"
+        + " AND function_latest_version = 
#{oldFunctionMeta.functionLatestVersion}"
+        + " AND audit_info = #{oldFunctionMeta.auditInfo}"
+        + " AND deleted_at = 0";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FunctionVersionMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FunctionVersionMetaBaseSQLProvider.java
new file mode 100644
index 0000000000..0e6ed91813
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FunctionVersionMetaBaseSQLProvider.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import 
org.apache.gravitino.storage.relational.mapper.FunctionVersionMetaMapper;
+import org.apache.gravitino.storage.relational.po.FunctionVersionPO;
+import org.apache.ibatis.annotations.Param;
+
+public class FunctionVersionMetaBaseSQLProvider {
+
+  public String insertFunctionVersionMeta(
+      @Param("functionVersionMeta") FunctionVersionPO functionVersionPO) {
+    return "INSERT INTO "
+        + FunctionVersionMetaMapper.TABLE_NAME
+        + " (metalake_id, catalog_id, schema_id, function_id, version,"
+        + " function_comment, definitions, audit_info, deleted_at)"
+        + " VALUES (#{functionVersionMeta.metalakeId}, 
#{functionVersionMeta.catalogId},"
+        + " #{functionVersionMeta.schemaId}, 
#{functionVersionMeta.functionId},"
+        + " #{functionVersionMeta.functionVersion}, 
#{functionVersionMeta.functionComment},"
+        + " #{functionVersionMeta.definitions}, 
#{functionVersionMeta.auditInfo},"
+        + " #{functionVersionMeta.deletedAt})";
+  }
+
+  public String insertFunctionVersionMetaOnDuplicateKeyUpdate(
+      @Param("functionVersionMeta") FunctionVersionPO functionVersionPO) {
+    return "INSERT INTO "
+        + FunctionVersionMetaMapper.TABLE_NAME
+        + " (metalake_id, catalog_id, schema_id, function_id, version,"
+        + " function_comment, definitions, audit_info, deleted_at)"
+        + " VALUES (#{functionVersionMeta.metalakeId}, 
#{functionVersionMeta.catalogId},"
+        + " #{functionVersionMeta.schemaId}, 
#{functionVersionMeta.functionId},"
+        + " #{functionVersionMeta.functionVersion}, 
#{functionVersionMeta.functionComment},"
+        + " #{functionVersionMeta.definitions}, 
#{functionVersionMeta.auditInfo},"
+        + " #{functionVersionMeta.deletedAt})"
+        + " ON DUPLICATE KEY UPDATE"
+        + " function_comment = #{functionVersionMeta.functionComment},"
+        + " definitions = #{functionVersionMeta.definitions},"
+        + " audit_info = #{functionVersionMeta.auditInfo},"
+        + " deleted_at = #{functionVersionMeta.deletedAt}";
+  }
+
+  public String softDeleteFunctionVersionMetasBySchemaId(@Param("schemaId") 
Long schemaId) {
+    return "UPDATE "
+        + FunctionVersionMetaMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
+  }
+
+  public String softDeleteFunctionVersionMetasByCatalogId(@Param("catalogId") 
Long catalogId) {
+    return "UPDATE "
+        + FunctionVersionMetaMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
+  }
+
+  public String 
softDeleteFunctionVersionMetasByMetalakeId(@Param("metalakeId") Long 
metalakeId) {
+    return "UPDATE "
+        + FunctionVersionMetaMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  public String deleteFunctionVersionMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return "DELETE FROM "
+        + FunctionVersionMetaMapper.TABLE_NAME
+        + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit}";
+  }
+
+  public String selectFunctionVersionsByRetentionCount(
+      @Param("versionRetentionCount") Long versionRetentionCount) {
+    return "SELECT function_id as functionId,"
+        + " MAX(version) as version"
+        + " FROM "
+        + FunctionVersionMetaMapper.TABLE_NAME
+        + " WHERE version > #{versionRetentionCount} AND deleted_at = 0"
+        + " GROUP BY function_id";
+  }
+
+  public String softDeleteFunctionVersionsByRetentionLine(
+      @Param("functionId") Long functionId,
+      @Param("versionRetentionLine") long versionRetentionLine,
+      @Param("limit") int limit) {
+    return "UPDATE "
+        + FunctionVersionMetaMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE function_id = #{functionId} AND version <= 
#{versionRetentionLine}"
+        + " AND deleted_at = 0 LIMIT #{limit}";
+  }
+
+  public String softDeleteFunctionVersionsByFunctionId(@Param("functionId") 
Long functionId) {
+    return "UPDATE "
+        + FunctionVersionMetaMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE function_id = #{functionId} AND deleted_at = 0";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
index 4316f87561..6770ebbc2e 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
@@ -45,7 +45,8 @@ public class TableMetaBaseSQLProvider {
         + TABLE_NAME
         + " tm LEFT JOIN "
         + TableVersionMapper.TABLE_NAME
-        + " tv ON tm.table_id = tv.table_id AND tm.current_version = 
tv.version AND tv.deleted_at = 0"
+        + " tv ON tm.table_id = tv.table_id AND tm.current_version = 
tv.version"
+        + " AND tv.deleted_at = 0"
         + " WHERE tm.schema_id = #{schemaId} AND tm.deleted_at = 0";
   }
 
@@ -65,7 +66,8 @@ public class TableMetaBaseSQLProvider {
         + TABLE_NAME
         + " tm LEFT JOIN "
         + TableVersionMapper.TABLE_NAME
-        + " tv ON tm.table_id = tv.table_id AND tm.current_version = 
tv.version AND tv.deleted_at = 0"
+        + " tv ON tm.table_id = tv.table_id AND tm.current_version = 
tv.version"
+        + " AND tv.deleted_at = 0"
         + " WHERE tm.deleted_at = 0"
         + " AND tm.table_id IN ("
         + "<foreach collection='tableIds' item='tableId' separator=','>"
@@ -150,8 +152,10 @@ public class TableMetaBaseSQLProvider {
         + TABLE_NAME
         + " tm LEFT JOIN "
         + TableVersionMapper.TABLE_NAME
-        + " tv ON tm.table_id = tv.table_id AND tm.current_version = 
tv.version AND tv.deleted_at = 0"
-        + " WHERE tm.schema_id = #{schemaId} AND tm.table_name = #{tableName} 
AND tm.deleted_at = 0";
+        + " tv ON tm.table_id = tv.table_id AND tm.current_version = 
tv.version"
+        + " AND tv.deleted_at = 0"
+        + " WHERE tm.schema_id = #{schemaId} AND tm.table_name = #{tableName}"
+        + " AND tm.deleted_at = 0";
   }
 
   public String selectTableMetaById(@Param("tableId") Long tableId) {
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/FunctionMetaPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/FunctionMetaPostgreSQLProvider.java
new file mode 100644
index 0000000000..e112cd0e31
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/FunctionMetaPostgreSQLProvider.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.FunctionMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.FunctionPO;
+import org.apache.ibatis.annotations.Param;
+
+public class FunctionMetaPostgreSQLProvider extends 
FunctionMetaBaseSQLProvider {
+
+  @Override
+  public String insertFunctionMeta(@Param("functionMeta") FunctionPO 
functionPO) {
+    return "INSERT INTO "
+        + FunctionMetaMapper.TABLE_NAME
+        + " (function_id, function_name, metalake_id, catalog_id, schema_id,"
+        + " function_type, \"deterministic\", return_type, 
function_current_version, function_latest_version, audit_info, deleted_at)"
+        + " VALUES (#{functionMeta.functionId}, #{functionMeta.functionName}, 
#{functionMeta.metalakeId},"
+        + " #{functionMeta.catalogId}, #{functionMeta.schemaId}, 
#{functionMeta.functionType},"
+        + " #{functionMeta.deterministic}, #{functionMeta.returnType},"
+        + " #{functionMeta.functionCurrentVersion}, 
#{functionMeta.functionLatestVersion}, #{functionMeta.auditInfo},"
+        + " #{functionMeta.deletedAt})";
+  }
+
+  @Override
+  public String insertFunctionMetaOnDuplicateKeyUpdate(
+      @Param("functionMeta") FunctionPO functionPO) {
+    return "INSERT INTO "
+        + FunctionMetaMapper.TABLE_NAME
+        + " (function_id, function_name, metalake_id, catalog_id, schema_id,"
+        + " function_type, \"deterministic\", return_type, 
function_current_version, function_latest_version, audit_info, deleted_at)"
+        + " VALUES (#{functionMeta.functionId}, #{functionMeta.functionName}, 
#{functionMeta.metalakeId},"
+        + " #{functionMeta.catalogId}, #{functionMeta.schemaId}, 
#{functionMeta.functionType},"
+        + " #{functionMeta.deterministic}, #{functionMeta.returnType},"
+        + " #{functionMeta.functionCurrentVersion}, 
#{functionMeta.functionLatestVersion}, #{functionMeta.auditInfo},"
+        + " #{functionMeta.deletedAt})"
+        + " ON CONFLICT (function_id) DO UPDATE SET"
+        + " function_name = #{functionMeta.functionName},"
+        + " metalake_id = #{functionMeta.metalakeId},"
+        + " catalog_id = #{functionMeta.catalogId},"
+        + " schema_id = #{functionMeta.schemaId},"
+        + " function_type = #{functionMeta.functionType},"
+        + " \"deterministic\" = #{functionMeta.deterministic},"
+        + " return_type = #{functionMeta.returnType},"
+        + " function_current_version = #{functionMeta.functionCurrentVersion},"
+        + " function_latest_version = #{functionMeta.functionLatestVersion},"
+        + " audit_info = #{functionMeta.auditInfo},"
+        + " deleted_at = #{functionMeta.deletedAt}";
+  }
+
+  @Override
+  public String listFunctionPOsBySchemaId(@Param("schemaId") Long schemaId) {
+    return "SELECT fm.function_id, fm.function_name, fm.metalake_id, 
fm.catalog_id, fm.schema_id,"
+        + " fm.function_type, fm.\"deterministic\", fm.return_type, 
fm.function_current_version, fm.function_latest_version,"
+        + " fm.audit_info, fm.deleted_at,"
+        + " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as 
version_catalog_id,"
+        + " vi.schema_id as version_schema_id, vi.function_id as 
version_function_id,"
+        + " vi.version, vi.function_comment, vi.definitions,"
+        + " vi.audit_info as version_audit_info, vi.deleted_at as 
version_deleted_at"
+        + " FROM "
+        + FunctionMetaMapper.TABLE_NAME
+        + " fm INNER JOIN "
+        + FunctionMetaMapper.VERSION_TABLE_NAME
+        + " vi ON fm.function_id = vi.function_id AND 
fm.function_current_version = vi.version"
+        + " WHERE fm.schema_id = #{schemaId} AND fm.deleted_at = 0 AND 
vi.deleted_at = 0";
+  }
+
+  @Override
+  public String selectFunctionMetaBySchemaIdAndName(
+      @Param("schemaId") Long schemaId, @Param("functionName") String 
functionName) {
+    return "SELECT fm.function_id, fm.function_name, fm.metalake_id, 
fm.catalog_id, fm.schema_id,"
+        + " fm.function_type, fm.\"deterministic\", fm.return_type, 
fm.function_current_version, fm.function_latest_version,"
+        + " fm.audit_info, fm.deleted_at,"
+        + " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as 
version_catalog_id,"
+        + " vi.schema_id as version_schema_id, vi.function_id as 
version_function_id,"
+        + " vi.version, vi.function_comment, vi.definitions,"
+        + " vi.audit_info as version_audit_info, vi.deleted_at as 
version_deleted_at"
+        + " FROM "
+        + FunctionMetaMapper.TABLE_NAME
+        + " fm INNER JOIN "
+        + FunctionMetaMapper.VERSION_TABLE_NAME
+        + " vi ON fm.function_id = vi.function_id AND 
fm.function_current_version = vi.version"
+        + " WHERE fm.schema_id = #{schemaId} AND fm.function_name = 
#{functionName}"
+        + " AND fm.deleted_at = 0 AND vi.deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteFunctionMetaByFunctionId(@Param("functionId") Long 
functionId) {
+    return "UPDATE "
+        + FunctionMetaMapper.TABLE_NAME
+        + " SET deleted_at = CAST(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000 
AS BIGINT)"
+        + " WHERE function_id = #{functionId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteFunctionMetasByCatalogId(@Param("catalogId") Long 
catalogId) {
+    return "UPDATE "
+        + FunctionMetaMapper.TABLE_NAME
+        + " SET deleted_at = CAST(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000 
AS BIGINT)"
+        + " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteFunctionMetasByMetalakeId(@Param("metalakeId") Long 
metalakeId) {
+    return "UPDATE "
+        + FunctionMetaMapper.TABLE_NAME
+        + " SET deleted_at = CAST(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000 
AS BIGINT)"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteFunctionMetasBySchemaId(@Param("schemaId") Long 
schemaId) {
+    return "UPDATE "
+        + FunctionMetaMapper.TABLE_NAME
+        + " SET deleted_at = CAST(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000 
AS BIGINT)"
+        + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String deleteFunctionMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return "DELETE FROM "
+        + FunctionMetaMapper.TABLE_NAME
+        + " WHERE function_id IN (SELECT function_id FROM "
+        + FunctionMetaMapper.TABLE_NAME
+        + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit})";
+  }
+
+  @Override
+  public String updateFunctionMeta(
+      @Param("newFunctionMeta") FunctionPO newFunctionPO,
+      @Param("oldFunctionMeta") FunctionPO oldFunctionPO) {
+    return "UPDATE "
+        + FunctionMetaMapper.TABLE_NAME
+        + " SET function_name = #{newFunctionMeta.functionName},"
+        + " metalake_id = #{newFunctionMeta.metalakeId},"
+        + " catalog_id = #{newFunctionMeta.catalogId},"
+        + " schema_id = #{newFunctionMeta.schemaId},"
+        + " function_type = #{newFunctionMeta.functionType},"
+        + " \"deterministic\" = #{newFunctionMeta.deterministic},"
+        + " return_type = #{newFunctionMeta.returnType},"
+        + " function_current_version = 
#{newFunctionMeta.functionCurrentVersion},"
+        + " function_latest_version = 
#{newFunctionMeta.functionLatestVersion},"
+        + " audit_info = #{newFunctionMeta.auditInfo},"
+        + " deleted_at = #{newFunctionMeta.deletedAt}"
+        + " WHERE function_id = #{oldFunctionMeta.functionId}"
+        + " AND function_name = #{oldFunctionMeta.functionName}"
+        + " AND metalake_id = #{oldFunctionMeta.metalakeId}"
+        + " AND catalog_id = #{oldFunctionMeta.catalogId}"
+        + " AND schema_id = #{oldFunctionMeta.schemaId}"
+        + " AND function_type = #{oldFunctionMeta.functionType}"
+        + " AND function_current_version = 
#{oldFunctionMeta.functionCurrentVersion}"
+        + " AND function_latest_version = 
#{oldFunctionMeta.functionLatestVersion}"
+        + " AND audit_info = #{oldFunctionMeta.auditInfo}"
+        + " AND deleted_at = 0";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/FunctionVersionMetaPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/FunctionVersionMetaPostgreSQLProvider.java
new file mode 100644
index 0000000000..ce2022cd41
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/FunctionVersionMetaPostgreSQLProvider.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import 
org.apache.gravitino.storage.relational.mapper.FunctionVersionMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.FunctionVersionMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.FunctionVersionPO;
+import org.apache.ibatis.annotations.Param;
+
+public class FunctionVersionMetaPostgreSQLProvider extends 
FunctionVersionMetaBaseSQLProvider {
+
+  @Override
+  public String insertFunctionVersionMetaOnDuplicateKeyUpdate(
+      @Param("functionVersionMeta") FunctionVersionPO functionVersionPO) {
+    return "INSERT INTO "
+        + FunctionVersionMetaMapper.TABLE_NAME
+        + " (metalake_id, catalog_id, schema_id, function_id, version,"
+        + " function_comment, definitions, audit_info, deleted_at)"
+        + " VALUES (#{functionVersionMeta.metalakeId}, 
#{functionVersionMeta.catalogId},"
+        + " #{functionVersionMeta.schemaId}, 
#{functionVersionMeta.functionId},"
+        + " #{functionVersionMeta.functionVersion}, 
#{functionVersionMeta.functionComment},"
+        + " #{functionVersionMeta.definitions}, 
#{functionVersionMeta.auditInfo},"
+        + " #{functionVersionMeta.deletedAt})"
+        + " ON CONFLICT (function_id, version, deleted_at) DO UPDATE SET"
+        + " function_comment = #{functionVersionMeta.functionComment},"
+        + " definitions = #{functionVersionMeta.definitions},"
+        + " audit_info = #{functionVersionMeta.auditInfo},"
+        + " deleted_at = #{functionVersionMeta.deletedAt}";
+  }
+
+  @Override
+  public String softDeleteFunctionVersionMetasBySchemaId(@Param("schemaId") 
Long schemaId) {
+    return "UPDATE "
+        + FunctionVersionMetaMapper.TABLE_NAME
+        + " SET deleted_at = CAST(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000 
AS BIGINT)"
+        + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteFunctionVersionMetasByCatalogId(@Param("catalogId") 
Long catalogId) {
+    return "UPDATE "
+        + FunctionVersionMetaMapper.TABLE_NAME
+        + " SET deleted_at = CAST(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000 
AS BIGINT)"
+        + " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String 
softDeleteFunctionVersionMetasByMetalakeId(@Param("metalakeId") Long 
metalakeId) {
+    return "UPDATE "
+        + FunctionVersionMetaMapper.TABLE_NAME
+        + " SET deleted_at = CAST(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000 
AS BIGINT)"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String deleteFunctionVersionMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return "DELETE FROM "
+        + FunctionVersionMetaMapper.TABLE_NAME
+        + " WHERE id IN (SELECT id FROM "
+        + FunctionVersionMetaMapper.TABLE_NAME
+        + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit})";
+  }
+
+  @Override
+  public String softDeleteFunctionVersionsByRetentionLine(
+      @Param("functionId") Long functionId,
+      @Param("versionRetentionLine") long versionRetentionLine,
+      @Param("limit") int limit) {
+    return "UPDATE "
+        + FunctionVersionMetaMapper.TABLE_NAME
+        + " SET deleted_at = CAST(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000 
AS BIGINT)"
+        + " WHERE id IN (SELECT id FROM "
+        + FunctionVersionMetaMapper.TABLE_NAME
+        + " WHERE function_id = #{functionId} AND version <= 
#{versionRetentionLine}"
+        + " AND deleted_at = 0 LIMIT #{limit})";
+  }
+
+  @Override
+  public String softDeleteFunctionVersionsByFunctionId(@Param("functionId") 
Long functionId) {
+    return "UPDATE "
+        + FunctionVersionMetaMapper.TABLE_NAME
+        + " SET deleted_at = CAST(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000 
AS BIGINT)"
+        + " WHERE function_id = #{functionId} AND deleted_at = 0";
+  }
+}

Reply via email to