This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 4fe9d847ec [#9528] feat(storage): support function management
(Storage) (part-2) (#9824)
4fe9d847ec is described below
commit 4fe9d847ecfd57e3f552b9bf37cad5fd09a0cd76
Author: mchades <[email protected]>
AuthorDate: Mon Feb 2 14:05:25 2026 +0800
[#9528] feat(storage): support function management (Storage) (part-2)
(#9824)
### What changes were proposed in this pull request?
Implement the storage layer for Function management.
- Added FunctionMetaMapper, FunctionVersionMetaMapper.
- Added SQL Provider Factories and Base implementations.
- Added PostgreSQL specific providers.
- Updated DefaultMapperPackageProvider to include function mappers.
### Why are the changes needed?
To provide the persistence layer capability for storing and retrieving
function metadata in relational databases (PostgreSQL/MySQL/H2).
Fix: #9528
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Verified via Unit Tests in the Service layer (subsequent commit).
---
.../relational/mapper/FunctionMetaMapper.java | 134 ++++++++++
.../mapper/FunctionMetaSQLProviderFactory.java | 114 ++++++++
.../mapper/FunctionVersionMetaMapper.java | 40 +++
.../FunctionVersionMetaSQLProviderFactory.java | 100 +++++++
.../provider/DefaultMapperPackageProvider.java | 4 +
.../provider/base/FunctionMetaBaseSQLProvider.java | 291 +++++++++++++++++++++
.../base/FunctionVersionMetaBaseSQLProvider.java | 118 +++++++++
.../provider/base/TableMetaBaseSQLProvider.java | 12 +-
.../postgresql/FunctionMetaPostgreSQLProvider.java | 173 ++++++++++++
.../FunctionVersionMetaPostgreSQLProvider.java | 102 ++++++++
10 files changed, 1084 insertions(+), 4 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaMapper.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaMapper.java
new file mode 100644
index 0000000000..2314069fce
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaMapper.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.po.FunctionPO;
+import org.apache.gravitino.storage.relational.po.FunctionVersionPO;
+import org.apache.ibatis.annotations.InsertProvider;
+import org.apache.ibatis.annotations.One;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.Result;
+import org.apache.ibatis.annotations.ResultMap;
+import org.apache.ibatis.annotations.Results;
+import org.apache.ibatis.annotations.Select;
+import org.apache.ibatis.annotations.SelectProvider;
+
+/** A MyBatis Mapper for function metadata operation SQLs. */
+public interface FunctionMetaMapper {
+ String TABLE_NAME = "function_meta";
+ String VERSION_TABLE_NAME = "function_version_info";
+
+ @Results(
+ id = "mapToFunctionVersionPO",
+ value = {
+ @Result(property = "id", column = "id", id = true),
+ @Result(property = "metalakeId", column = "version_metalake_id"),
+ @Result(property = "catalogId", column = "version_catalog_id"),
+ @Result(property = "schemaId", column = "version_schema_id"),
+ @Result(property = "functionId", column = "version_function_id"),
+ @Result(property = "functionVersion", column = "version"),
+ @Result(property = "functionComment", column = "function_comment"),
+ @Result(property = "definitions", column = "definitions"),
+ @Result(property = "auditInfo", column = "version_audit_info"),
+ @Result(property = "deletedAt", column = "version_deleted_at")
+ })
+ @Select("SELECT 1") // Dummy SQL to avoid MyBatis error, never be executed
+ FunctionVersionPO mapToFunctionVersionPO();
+
+ @InsertProvider(type = FunctionMetaSQLProviderFactory.class, method =
"insertFunctionMeta")
+ void insertFunctionMeta(@Param("functionMeta") FunctionPO functionPO);
+
+ @InsertProvider(
+ type = FunctionMetaSQLProviderFactory.class,
+ method = "insertFunctionMetaOnDuplicateKeyUpdate")
+ void insertFunctionMetaOnDuplicateKeyUpdate(@Param("functionMeta")
FunctionPO functionPO);
+
+ @Results({
+ @Result(property = "functionId", column = "function_id", id = true),
+ @Result(property = "functionName", column = "function_name"),
+ @Result(property = "metalakeId", column = "metalake_id"),
+ @Result(property = "catalogId", column = "catalog_id"),
+ @Result(property = "schemaId", column = "schema_id"),
+ @Result(property = "functionType", column = "function_type"),
+ @Result(property = "deterministic", column = "deterministic"),
+ @Result(property = "returnType", column = "return_type"),
+ @Result(property = "functionCurrentVersion", column =
"function_current_version"),
+ @Result(property = "functionLatestVersion", column =
"function_latest_version"),
+ @Result(property = "auditInfo", column = "audit_info"),
+ @Result(property = "deletedAt", column = "deleted_at"),
+ @Result(
+ property = "functionVersionPO",
+ javaType = FunctionVersionPO.class,
+ column =
+
"{id,version_metalake_id,version_catalog_id,version_schema_id,version_function_id,"
+ +
"version,function_comment,definitions,version_audit_info,version_deleted_at}",
+ one = @One(resultMap = "mapToFunctionVersionPO"))
+ })
+ @SelectProvider(type = FunctionMetaSQLProviderFactory.class, method =
"listFunctionPOsBySchemaId")
+ List<FunctionPO> listFunctionPOsBySchemaId(@Param("schemaId") Long schemaId);
+
+ @Results(
+ id = "functionPOResultMap",
+ value = {
+ @Result(property = "functionId", column = "function_id", id = true),
+ @Result(property = "functionName", column = "function_name"),
+ @Result(property = "metalakeId", column = "metalake_id"),
+ @Result(property = "catalogId", column = "catalog_id"),
+ @Result(property = "schemaId", column = "schema_id"),
+ @Result(property = "functionType", column = "function_type"),
+ @Result(property = "deterministic", column = "deterministic"),
+ @Result(property = "returnType", column = "return_type"),
+ @Result(property = "functionCurrentVersion", column =
"function_current_version"),
+ @Result(property = "functionLatestVersion", column =
"function_latest_version"),
+ @Result(property = "auditInfo", column = "audit_info"),
+ @Result(property = "deletedAt", column = "deleted_at"),
+ @Result(
+ property = "functionVersionPO",
+ javaType = FunctionVersionPO.class,
+ column =
+
"{id,version_metalake_id,version_catalog_id,version_schema_id,version_function_id,"
+ +
"version,function_comment,definitions,version_audit_info,version_deleted_at}",
+ one = @One(resultMap = "mapToFunctionVersionPO"))
+ })
+ @SelectProvider(
+ type = FunctionMetaSQLProviderFactory.class,
+ method = "listFunctionPOsByFullQualifiedName")
+ List<FunctionPO> listFunctionPOsByFullQualifiedName(
+ @Param("metalakeName") String metalakeName,
+ @Param("catalogName") String catalogName,
+ @Param("schemaName") String schemaName);
+
+ @ResultMap("functionPOResultMap")
+ @SelectProvider(
+ type = FunctionMetaSQLProviderFactory.class,
+ method = "selectFunctionMetaByFullQualifiedName")
+ FunctionPO selectFunctionMetaByFullQualifiedName(
+ @Param("metalakeName") String metalakeName,
+ @Param("catalogName") String catalogName,
+ @Param("schemaName") String schemaName,
+ @Param("functionName") String functionName);
+
+ @ResultMap("functionPOResultMap")
+ @SelectProvider(
+ type = FunctionMetaSQLProviderFactory.class,
+ method = "selectFunctionMetaBySchemaIdAndName")
+ FunctionPO selectFunctionMetaBySchemaIdAndName(
+ @Param("schemaId") Long schemaId, @Param("functionName") String
functionName);
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaSQLProviderFactory.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaSQLProviderFactory.java
new file mode 100644
index 0000000000..239a19c044
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaSQLProviderFactory.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import
org.apache.gravitino.storage.relational.mapper.provider.base.FunctionMetaBaseSQLProvider;
+import
org.apache.gravitino.storage.relational.mapper.provider.postgresql.FunctionMetaPostgreSQLProvider;
+import org.apache.gravitino.storage.relational.po.FunctionPO;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.annotations.Param;
+
+public class FunctionMetaSQLProviderFactory {
+
+ static class FunctionMetaMySQLProvider extends FunctionMetaBaseSQLProvider {}
+
+ static class FunctionMetaH2Provider extends FunctionMetaBaseSQLProvider {}
+
+ private static final Map<JDBCBackendType, FunctionMetaBaseSQLProvider>
+ FUNCTION_META_SQL_PROVIDER_MAP =
+ ImmutableMap.of(
+ JDBCBackendType.MYSQL, new FunctionMetaMySQLProvider(),
+ JDBCBackendType.H2, new FunctionMetaH2Provider(),
+ JDBCBackendType.POSTGRESQL, new
FunctionMetaPostgreSQLProvider());
+
+ public static FunctionMetaBaseSQLProvider getProvider() {
+ String databaseId =
+ SqlSessionFactoryHelper.getInstance()
+ .getSqlSessionFactory()
+ .getConfiguration()
+ .getDatabaseId();
+
+ JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId);
+ return FUNCTION_META_SQL_PROVIDER_MAP.get(jdbcBackendType);
+ }
+
+ public static String insertFunctionMeta(@Param("functionMeta") FunctionPO
functionPO) {
+ return getProvider().insertFunctionMeta(functionPO);
+ }
+
+ public static String insertFunctionMetaOnDuplicateKeyUpdate(
+ @Param("functionMeta") FunctionPO functionPO) {
+ return getProvider().insertFunctionMetaOnDuplicateKeyUpdate(functionPO);
+ }
+
+ public static String listFunctionPOsBySchemaId(@Param("schemaId") Long
schemaId) {
+ return getProvider().listFunctionPOsBySchemaId(schemaId);
+ }
+
+ public static String listFunctionPOsByFullQualifiedName(
+ @Param("metalakeName") String metalakeName,
+ @Param("catalogName") String catalogName,
+ @Param("schemaName") String schemaName) {
+ return getProvider().listFunctionPOsByFullQualifiedName(metalakeName,
catalogName, schemaName);
+ }
+
+ public static String selectFunctionMetaByFullQualifiedName(
+ @Param("metalakeName") String metalakeName,
+ @Param("catalogName") String catalogName,
+ @Param("schemaName") String schemaName,
+ @Param("functionName") String functionName) {
+ return getProvider()
+ .selectFunctionMetaByFullQualifiedName(metalakeName, catalogName,
schemaName, functionName);
+ }
+
+ public static String selectFunctionMetaBySchemaIdAndName(
+ @Param("schemaId") Long schemaId, @Param("functionName") String
functionName) {
+ return getProvider().selectFunctionMetaBySchemaIdAndName(schemaId,
functionName);
+ }
+
+ public static String softDeleteFunctionMetaByFunctionId(@Param("functionId")
Long functionId) {
+ return getProvider().softDeleteFunctionMetaByFunctionId(functionId);
+ }
+
+ public static String softDeleteFunctionMetasByCatalogId(@Param("catalogId")
Long catalogId) {
+ return getProvider().softDeleteFunctionMetasByCatalogId(catalogId);
+ }
+
+ public static String
softDeleteFunctionMetasByMetalakeId(@Param("metalakeId") Long metalakeId) {
+ return getProvider().softDeleteFunctionMetasByMetalakeId(metalakeId);
+ }
+
+ public static String softDeleteFunctionMetasBySchemaId(@Param("schemaId")
Long schemaId) {
+ return getProvider().softDeleteFunctionMetasBySchemaId(schemaId);
+ }
+
+ public static String deleteFunctionMetasByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit)
{
+ return getProvider().deleteFunctionMetasByLegacyTimeline(legacyTimeline,
limit);
+ }
+
+ public static String updateFunctionMeta(
+ @Param("newFunctionMeta") FunctionPO newFunctionPO,
+ @Param("oldFunctionMeta") FunctionPO oldFunctionPO) {
+ return getProvider().updateFunctionMeta(newFunctionPO, oldFunctionPO);
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionVersionMetaMapper.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionVersionMetaMapper.java
new file mode 100644
index 0000000000..d258494eea
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionVersionMetaMapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import org.apache.gravitino.storage.relational.po.FunctionVersionPO;
+import org.apache.ibatis.annotations.InsertProvider;
+import org.apache.ibatis.annotations.Param;
+
+/** A MyBatis Mapper for function version metadata operation SQLs. */
+public interface FunctionVersionMetaMapper {
+
+ String TABLE_NAME = "function_version_info";
+
+ @InsertProvider(
+ type = FunctionVersionMetaSQLProviderFactory.class,
+ method = "insertFunctionVersionMeta")
+ void insertFunctionVersionMeta(@Param("functionVersionMeta")
FunctionVersionPO functionVersionPO);
+
+ @InsertProvider(
+ type = FunctionVersionMetaSQLProviderFactory.class,
+ method = "insertFunctionVersionMetaOnDuplicateKeyUpdate")
+ void insertFunctionVersionMetaOnDuplicateKeyUpdate(
+ @Param("functionVersionMeta") FunctionVersionPO functionVersionPO);
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionVersionMetaSQLProviderFactory.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionVersionMetaSQLProviderFactory.java
new file mode 100644
index 0000000000..6e8de28a78
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionVersionMetaSQLProviderFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import
org.apache.gravitino.storage.relational.mapper.provider.base.FunctionVersionMetaBaseSQLProvider;
+import
org.apache.gravitino.storage.relational.mapper.provider.postgresql.FunctionVersionMetaPostgreSQLProvider;
+import org.apache.gravitino.storage.relational.po.FunctionVersionPO;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.annotations.Param;
+
+public class FunctionVersionMetaSQLProviderFactory {
+
+ static class FunctionVersionMetaMySQLProvider extends
FunctionVersionMetaBaseSQLProvider {}
+
+ static class FunctionVersionMetaH2Provider extends
FunctionVersionMetaBaseSQLProvider {}
+
+ private static final Map<JDBCBackendType, FunctionVersionMetaBaseSQLProvider>
+ FUNCTION_VERSION_META_SQL_PROVIDER_MAP =
+ ImmutableMap.of(
+ JDBCBackendType.MYSQL, new FunctionVersionMetaMySQLProvider(),
+ JDBCBackendType.H2, new FunctionVersionMetaH2Provider(),
+ JDBCBackendType.POSTGRESQL, new
FunctionVersionMetaPostgreSQLProvider());
+
+ public static FunctionVersionMetaBaseSQLProvider getProvider() {
+ String databaseId =
+ SqlSessionFactoryHelper.getInstance()
+ .getSqlSessionFactory()
+ .getConfiguration()
+ .getDatabaseId();
+
+ JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId);
+ return FUNCTION_VERSION_META_SQL_PROVIDER_MAP.get(jdbcBackendType);
+ }
+
+ public static String insertFunctionVersionMeta(
+ @Param("functionVersionMeta") FunctionVersionPO functionVersionPO) {
+ return getProvider().insertFunctionVersionMeta(functionVersionPO);
+ }
+
+ public static String insertFunctionVersionMetaOnDuplicateKeyUpdate(
+ @Param("functionVersionMeta") FunctionVersionPO functionVersionPO) {
+ return
getProvider().insertFunctionVersionMetaOnDuplicateKeyUpdate(functionVersionPO);
+ }
+
+ public static String
softDeleteFunctionVersionMetasBySchemaId(@Param("schemaId") Long schemaId) {
+ return getProvider().softDeleteFunctionVersionMetasBySchemaId(schemaId);
+ }
+
+ public static String softDeleteFunctionVersionMetasByCatalogId(
+ @Param("catalogId") Long catalogId) {
+ return getProvider().softDeleteFunctionVersionMetasByCatalogId(catalogId);
+ }
+
+ public static String softDeleteFunctionVersionMetasByMetalakeId(
+ @Param("metalakeId") Long metalakeId) {
+ return
getProvider().softDeleteFunctionVersionMetasByMetalakeId(metalakeId);
+ }
+
+ public static String deleteFunctionVersionMetasByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit)
{
+ return
getProvider().deleteFunctionVersionMetasByLegacyTimeline(legacyTimeline, limit);
+ }
+
+ public static String selectFunctionVersionsByRetentionCount(
+ @Param("versionRetentionCount") Long versionRetentionCount) {
+ return
getProvider().selectFunctionVersionsByRetentionCount(versionRetentionCount);
+ }
+
+ public static String softDeleteFunctionVersionsByRetentionLine(
+ @Param("functionId") Long functionId,
+ @Param("versionRetentionLine") long versionRetentionLine,
+ @Param("limit") int limit) {
+ return getProvider()
+ .softDeleteFunctionVersionsByRetentionLine(functionId,
versionRetentionLine, limit);
+ }
+
+ public static String softDeleteFunctionVersionsByFunctionId(
+ @Param("functionId") Long functionId) {
+ return getProvider().softDeleteFunctionVersionsByFunctionId(functionId);
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/DefaultMapperPackageProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/DefaultMapperPackageProvider.java
index aaf22ccda8..497f751165 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/DefaultMapperPackageProvider.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/DefaultMapperPackageProvider.java
@@ -23,6 +23,8 @@ import java.util.List;
import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
import org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper;
+import org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper;
+import
org.apache.gravitino.storage.relational.mapper.FunctionVersionMetaMapper;
import org.apache.gravitino.storage.relational.mapper.GroupMetaMapper;
import org.apache.gravitino.storage.relational.mapper.GroupRoleRelMapper;
import org.apache.gravitino.storage.relational.mapper.JobMetaMapper;
@@ -57,6 +59,8 @@ public class DefaultMapperPackageProvider implements
MapperPackageProvider {
CatalogMetaMapper.class,
FilesetMetaMapper.class,
FilesetVersionMapper.class,
+ FunctionMetaMapper.class,
+ FunctionVersionMetaMapper.class,
GroupMetaMapper.class,
GroupRoleRelMapper.class,
JobMetaMapper.class,
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FunctionMetaBaseSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FunctionMetaBaseSQLProvider.java
new file mode 100644
index 0000000000..772b76cdc5
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FunctionMetaBaseSQLProvider.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import static
org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper.TABLE_NAME;
+import static
org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper.VERSION_TABLE_NAME;
+
+import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
+import org.apache.gravitino.storage.relational.po.FunctionPO;
+import org.apache.ibatis.annotations.Param;
+
+public class FunctionMetaBaseSQLProvider {
+
+ public String listFunctionPOsByFullQualifiedName(
+ @Param("metalakeName") String metalakeName,
+ @Param("catalogName") String catalogName,
+ @Param("schemaName") String schemaName) {
+ return """
+ SELECT
+ mm.metalake_id,
+ cm.catalog_id,
+ sm.schema_id,
+ fm.function_id,
+ fm.function_name,
+ fm.function_type,
+ fm.deterministic,
+ fm.return_type,
+ fm.function_current_version,
+ fm.function_latest_version,
+ fm.audit_info,
+ fm.deleted_at,
+ vi.id,
+ vi.metalake_id as version_metalake_id,
+ vi.catalog_id as version_catalog_id,
+ vi.schema_id as version_schema_id,
+ vi.function_id as version_function_id,
+ vi.version,
+ vi.function_comment,
+ vi.definitions,
+ vi.audit_info as version_audit_info,
+ vi.deleted_at as version_deleted_at
+ FROM
+ %s mm
+ INNER JOIN
+ %s cm ON mm.metalake_id = cm.metalake_id
+ AND cm.catalog_name = #{catalogName}
+ AND cm.deleted_at = 0
+ LEFT JOIN
+ %s sm ON cm.catalog_id = sm.catalog_id
+ AND sm.schema_name = #{schemaName}
+ AND sm.deleted_at = 0
+ LEFT JOIN
+ %s fm ON sm.schema_id = fm.schema_id
+ AND fm.deleted_at = 0
+ LEFT JOIN
+ %s vi ON fm.function_id = vi.function_id
+ AND fm.function_current_version = vi.version
+ AND vi.deleted_at = 0
+ WHERE
+ mm.metalake_name = #{metalakeName}
+ AND mm.deleted_at = 0
+ """
+ .formatted(
+ MetalakeMetaMapper.TABLE_NAME,
+ CatalogMetaMapper.TABLE_NAME,
+ SchemaMetaMapper.TABLE_NAME,
+ TABLE_NAME,
+ VERSION_TABLE_NAME);
+ }
+
+ public String insertFunctionMeta(@Param("functionMeta") FunctionPO
functionPO) {
+ return "INSERT INTO "
+ + TABLE_NAME
+ + " (function_id, function_name, metalake_id, catalog_id, schema_id,"
+ + " function_type, `deterministic`, return_type,
function_current_version,"
+ + " function_latest_version, audit_info, deleted_at)"
+ + " VALUES (#{functionMeta.functionId}, #{functionMeta.functionName},"
+ + " #{functionMeta.metalakeId}, #{functionMeta.catalogId},
#{functionMeta.schemaId},"
+ + " #{functionMeta.functionType}, #{functionMeta.deterministic},"
+ + " #{functionMeta.returnType},
#{functionMeta.functionCurrentVersion},"
+ + " #{functionMeta.functionLatestVersion}, #{functionMeta.auditInfo},"
+ + " #{functionMeta.deletedAt})";
+ }
+
+ public String insertFunctionMetaOnDuplicateKeyUpdate(
+ @Param("functionMeta") FunctionPO functionPO) {
+ return "INSERT INTO "
+ + TABLE_NAME
+ + " (function_id, function_name, metalake_id, catalog_id, schema_id,"
+ + " function_type, `deterministic`, return_type,"
+ + " function_current_version, function_latest_version, audit_info,
deleted_at)"
+ + " VALUES (#{functionMeta.functionId}, #{functionMeta.functionName},"
+ + " #{functionMeta.metalakeId}, #{functionMeta.catalogId},
#{functionMeta.schemaId},"
+ + " #{functionMeta.functionType}, #{functionMeta.deterministic},"
+ + " #{functionMeta.returnType},
#{functionMeta.functionCurrentVersion},"
+ + " #{functionMeta.functionLatestVersion}, #{functionMeta.auditInfo},"
+ + " #{functionMeta.deletedAt})"
+ + " ON DUPLICATE KEY UPDATE"
+ + " function_name = #{functionMeta.functionName},"
+ + " metalake_id = #{functionMeta.metalakeId},"
+ + " catalog_id = #{functionMeta.catalogId},"
+ + " schema_id = #{functionMeta.schemaId},"
+ + " function_type = #{functionMeta.functionType},"
+ + " `deterministic` = #{functionMeta.deterministic},"
+ + " return_type = #{functionMeta.returnType},"
+ + " function_current_version = #{functionMeta.functionCurrentVersion},"
+ + " function_latest_version = #{functionMeta.functionLatestVersion},"
+ + " audit_info = #{functionMeta.auditInfo},"
+ + " deleted_at = #{functionMeta.deletedAt}";
+ }
+
+ public String selectFunctionMetaByFullQualifiedName(
+ @Param("metalakeName") String metalakeName,
+ @Param("catalogName") String catalogName,
+ @Param("schemaName") String schemaName,
+ @Param("functionName") String functionName) {
+ return """
+ SELECT
+ mm.metalake_id,
+ cm.catalog_id,
+ sm.schema_id,
+ fm.function_id,
+ fm.function_name,
+ fm.function_type,
+ fm.deterministic,
+ fm.return_type,
+ fm.function_current_version,
+ fm.function_latest_version,
+ fm.audit_info,
+ fm.deleted_at,
+ vi.id,
+ vi.metalake_id as version_metalake_id,
+ vi.catalog_id as version_catalog_id,
+ vi.schema_id as version_schema_id,
+ vi.function_id as version_function_id,
+ vi.version,
+ vi.function_comment,
+ vi.definitions,
+ vi.audit_info as version_audit_info,
+ vi.deleted_at as version_deleted_at
+ FROM
+ %s mm
+ INNER JOIN
+ %s cm ON mm.metalake_id = cm.metalake_id
+ AND cm.catalog_name = #{catalogName}
+ AND cm.deleted_at = 0
+ LEFT JOIN
+ %s sm ON cm.catalog_id = sm.catalog_id
+ AND sm.schema_name = #{schemaName}
+ AND sm.deleted_at = 0
+ LEFT JOIN
+ %s fm ON sm.schema_id = fm.schema_id
+ AND fm.function_name = #{functionName}
+ AND fm.deleted_at = 0
+ INNER JOIN
+ %s vi ON fm.function_id = vi.function_id
+ AND fm.function_current_version = vi.version
+ AND vi.deleted_at = 0
+ WHERE
+ mm.metalake_name = #{metalakeName}
+ AND mm.deleted_at = 0
+ """
+ .formatted(
+ MetalakeMetaMapper.TABLE_NAME,
+ CatalogMetaMapper.TABLE_NAME,
+ SchemaMetaMapper.TABLE_NAME,
+ TABLE_NAME,
+ VERSION_TABLE_NAME);
+ }
+
+ public String listFunctionPOsBySchemaId(@Param("schemaId") Long schemaId) {
+ return "SELECT fm.function_id, fm.function_name, fm.metalake_id,
fm.catalog_id, fm.schema_id,"
+ + " fm.function_type, fm.`deterministic`, fm.return_type,"
+ + " fm.function_current_version, fm.function_latest_version,"
+ + " fm.audit_info, fm.deleted_at,"
+ + " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as
version_catalog_id,"
+ + " vi.schema_id as version_schema_id, vi.function_id as
version_function_id,"
+ + " vi.version, vi.function_comment, vi.definitions,"
+ + " vi.audit_info as version_audit_info, vi.deleted_at as
version_deleted_at"
+ + " FROM "
+ + TABLE_NAME
+ + " fm INNER JOIN "
+ + VERSION_TABLE_NAME
+ + " vi ON fm.function_id = vi.function_id AND
fm.function_current_version = vi.version"
+ + " WHERE fm.schema_id = #{schemaId} AND fm.deleted_at = 0 AND
vi.deleted_at = 0";
+ }
+
+ public String selectFunctionMetaBySchemaIdAndName(
+ @Param("schemaId") Long schemaId, @Param("functionName") String
functionName) {
+ return "SELECT fm.function_id, fm.function_name, fm.metalake_id,
fm.catalog_id, fm.schema_id,"
+ + " fm.function_type, fm.`deterministic`, fm.return_type,"
+ + " fm.function_current_version, fm.function_latest_version,"
+ + " fm.audit_info, fm.deleted_at,"
+ + " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as
version_catalog_id,"
+ + " vi.schema_id as version_schema_id, vi.function_id as
version_function_id,"
+ + " vi.version, vi.function_comment, vi.definitions,"
+ + " vi.audit_info as version_audit_info, vi.deleted_at as
version_deleted_at"
+ + " FROM "
+ + TABLE_NAME
+ + " fm INNER JOIN "
+ + VERSION_TABLE_NAME
+ + " vi ON fm.function_id = vi.function_id AND
fm.function_current_version = vi.version"
+ + " WHERE fm.schema_id = #{schemaId} AND fm.function_name =
#{functionName}"
+ + " AND fm.deleted_at = 0 AND vi.deleted_at = 0";
+ }
+
+ public String softDeleteFunctionMetaByFunctionId(@Param("functionId") Long
functionId) {
+ return "UPDATE "
+ + TABLE_NAME
+ + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ + " WHERE function_id = #{functionId} AND deleted_at = 0";
+ }
+
+ public String softDeleteFunctionMetasByCatalogId(@Param("catalogId") Long
catalogId) {
+ return "UPDATE "
+ + TABLE_NAME
+ + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ + " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
+ }
+
+ public String softDeleteFunctionMetasByMetalakeId(@Param("metalakeId") Long
metalakeId) {
+ return "UPDATE "
+ + TABLE_NAME
+ + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+ }
+
+ public String softDeleteFunctionMetasBySchemaId(@Param("schemaId") Long
schemaId) {
+ return "UPDATE "
+ + TABLE_NAME
+ + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
+ }
+
+ public String deleteFunctionMetasByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit)
{
+ return "DELETE FROM "
+ + TABLE_NAME
+ + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT
#{limit}";
+ }
+
+ public String updateFunctionMeta(
+ @Param("newFunctionMeta") FunctionPO newFunctionPO,
+ @Param("oldFunctionMeta") FunctionPO oldFunctionPO) {
+ return "UPDATE "
+ + TABLE_NAME
+ + " SET function_name = #{newFunctionMeta.functionName},"
+ + " metalake_id = #{newFunctionMeta.metalakeId},"
+ + " catalog_id = #{newFunctionMeta.catalogId},"
+ + " schema_id = #{newFunctionMeta.schemaId},"
+ + " function_type = #{newFunctionMeta.functionType},"
+ + " `deterministic` = #{newFunctionMeta.deterministic},"
+ + " return_type = #{newFunctionMeta.returnType},"
+ + " function_current_version =
#{newFunctionMeta.functionCurrentVersion},"
+ + " function_latest_version =
#{newFunctionMeta.functionLatestVersion},"
+ + " audit_info = #{newFunctionMeta.auditInfo},"
+ + " deleted_at = #{newFunctionMeta.deletedAt}"
+ + " WHERE function_id = #{oldFunctionMeta.functionId}"
+ + " AND function_name = #{oldFunctionMeta.functionName}"
+ + " AND metalake_id = #{oldFunctionMeta.metalakeId}"
+ + " AND catalog_id = #{oldFunctionMeta.catalogId}"
+ + " AND schema_id = #{oldFunctionMeta.schemaId}"
+ + " AND function_type = #{oldFunctionMeta.functionType}"
+ + " AND function_current_version =
#{oldFunctionMeta.functionCurrentVersion}"
+ + " AND function_latest_version =
#{oldFunctionMeta.functionLatestVersion}"
+ + " AND audit_info = #{oldFunctionMeta.auditInfo}"
+ + " AND deleted_at = 0";
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FunctionVersionMetaBaseSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FunctionVersionMetaBaseSQLProvider.java
new file mode 100644
index 0000000000..0e6ed91813
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FunctionVersionMetaBaseSQLProvider.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import
org.apache.gravitino.storage.relational.mapper.FunctionVersionMetaMapper;
+import org.apache.gravitino.storage.relational.po.FunctionVersionPO;
+import org.apache.ibatis.annotations.Param;
+
+public class FunctionVersionMetaBaseSQLProvider {
+
+ public String insertFunctionVersionMeta(
+ @Param("functionVersionMeta") FunctionVersionPO functionVersionPO) {
+ return "INSERT INTO "
+ + FunctionVersionMetaMapper.TABLE_NAME
+ + " (metalake_id, catalog_id, schema_id, function_id, version,"
+ + " function_comment, definitions, audit_info, deleted_at)"
+ + " VALUES (#{functionVersionMeta.metalakeId},
#{functionVersionMeta.catalogId},"
+ + " #{functionVersionMeta.schemaId},
#{functionVersionMeta.functionId},"
+ + " #{functionVersionMeta.functionVersion},
#{functionVersionMeta.functionComment},"
+ + " #{functionVersionMeta.definitions},
#{functionVersionMeta.auditInfo},"
+ + " #{functionVersionMeta.deletedAt})";
+ }
+
+ public String insertFunctionVersionMetaOnDuplicateKeyUpdate(
+ @Param("functionVersionMeta") FunctionVersionPO functionVersionPO) {
+ return "INSERT INTO "
+ + FunctionVersionMetaMapper.TABLE_NAME
+ + " (metalake_id, catalog_id, schema_id, function_id, version,"
+ + " function_comment, definitions, audit_info, deleted_at)"
+ + " VALUES (#{functionVersionMeta.metalakeId},
#{functionVersionMeta.catalogId},"
+ + " #{functionVersionMeta.schemaId},
#{functionVersionMeta.functionId},"
+ + " #{functionVersionMeta.functionVersion},
#{functionVersionMeta.functionComment},"
+ + " #{functionVersionMeta.definitions},
#{functionVersionMeta.auditInfo},"
+ + " #{functionVersionMeta.deletedAt})"
+ + " ON DUPLICATE KEY UPDATE"
+ + " function_comment = #{functionVersionMeta.functionComment},"
+ + " definitions = #{functionVersionMeta.definitions},"
+ + " audit_info = #{functionVersionMeta.auditInfo},"
+ + " deleted_at = #{functionVersionMeta.deletedAt}";
+ }
+
+ public String softDeleteFunctionVersionMetasBySchemaId(@Param("schemaId")
Long schemaId) {
+ return "UPDATE "
+ + FunctionVersionMetaMapper.TABLE_NAME
+ + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
+ }
+
+ public String softDeleteFunctionVersionMetasByCatalogId(@Param("catalogId")
Long catalogId) {
+ return "UPDATE "
+ + FunctionVersionMetaMapper.TABLE_NAME
+ + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ + " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
+ }
+
+ public String
softDeleteFunctionVersionMetasByMetalakeId(@Param("metalakeId") Long
metalakeId) {
+ return "UPDATE "
+ + FunctionVersionMetaMapper.TABLE_NAME
+ + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+ }
+
+ public String deleteFunctionVersionMetasByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit)
{
+ return "DELETE FROM "
+ + FunctionVersionMetaMapper.TABLE_NAME
+ + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT
#{limit}";
+ }
+
+ public String selectFunctionVersionsByRetentionCount(
+ @Param("versionRetentionCount") Long versionRetentionCount) {
+ return "SELECT function_id as functionId,"
+ + " MAX(version) as version"
+ + " FROM "
+ + FunctionVersionMetaMapper.TABLE_NAME
+ + " WHERE version > #{versionRetentionCount} AND deleted_at = 0"
+ + " GROUP BY function_id";
+ }
+
+ public String softDeleteFunctionVersionsByRetentionLine(
+ @Param("functionId") Long functionId,
+ @Param("versionRetentionLine") long versionRetentionLine,
+ @Param("limit") int limit) {
+ return "UPDATE "
+ + FunctionVersionMetaMapper.TABLE_NAME
+ + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ + " WHERE function_id = #{functionId} AND version <=
#{versionRetentionLine}"
+ + " AND deleted_at = 0 LIMIT #{limit}";
+ }
+
+ public String softDeleteFunctionVersionsByFunctionId(@Param("functionId")
Long functionId) {
+ return "UPDATE "
+ + FunctionVersionMetaMapper.TABLE_NAME
+ + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ + " WHERE function_id = #{functionId} AND deleted_at = 0";
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
index 4316f87561..6770ebbc2e 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
@@ -45,7 +45,8 @@ public class TableMetaBaseSQLProvider {
+ TABLE_NAME
+ " tm LEFT JOIN "
+ TableVersionMapper.TABLE_NAME
- + " tv ON tm.table_id = tv.table_id AND tm.current_version =
tv.version AND tv.deleted_at = 0"
+ + " tv ON tm.table_id = tv.table_id AND tm.current_version =
tv.version"
+ + " AND tv.deleted_at = 0"
+ " WHERE tm.schema_id = #{schemaId} AND tm.deleted_at = 0";
}
@@ -65,7 +66,8 @@ public class TableMetaBaseSQLProvider {
+ TABLE_NAME
+ " tm LEFT JOIN "
+ TableVersionMapper.TABLE_NAME
- + " tv ON tm.table_id = tv.table_id AND tm.current_version =
tv.version AND tv.deleted_at = 0"
+ + " tv ON tm.table_id = tv.table_id AND tm.current_version =
tv.version"
+ + " AND tv.deleted_at = 0"
+ " WHERE tm.deleted_at = 0"
+ " AND tm.table_id IN ("
+ "<foreach collection='tableIds' item='tableId' separator=','>"
@@ -150,8 +152,10 @@ public class TableMetaBaseSQLProvider {
+ TABLE_NAME
+ " tm LEFT JOIN "
+ TableVersionMapper.TABLE_NAME
- + " tv ON tm.table_id = tv.table_id AND tm.current_version =
tv.version AND tv.deleted_at = 0"
- + " WHERE tm.schema_id = #{schemaId} AND tm.table_name = #{tableName}
AND tm.deleted_at = 0";
+ + " tv ON tm.table_id = tv.table_id AND tm.current_version =
tv.version"
+ + " AND tv.deleted_at = 0"
+ + " WHERE tm.schema_id = #{schemaId} AND tm.table_name = #{tableName}"
+ + " AND tm.deleted_at = 0";
}
public String selectTableMetaById(@Param("tableId") Long tableId) {
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/FunctionMetaPostgreSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/FunctionMetaPostgreSQLProvider.java
new file mode 100644
index 0000000000..e112cd0e31
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/FunctionMetaPostgreSQLProvider.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper;
+import
org.apache.gravitino.storage.relational.mapper.provider.base.FunctionMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.FunctionPO;
+import org.apache.ibatis.annotations.Param;
+
+public class FunctionMetaPostgreSQLProvider extends
FunctionMetaBaseSQLProvider {
+
+ @Override
+ public String insertFunctionMeta(@Param("functionMeta") FunctionPO
functionPO) {
+ return "INSERT INTO "
+ + FunctionMetaMapper.TABLE_NAME
+ + " (function_id, function_name, metalake_id, catalog_id, schema_id,"
+ + " function_type, \"deterministic\", return_type,
function_current_version, function_latest_version, audit_info, deleted_at)"
+ + " VALUES (#{functionMeta.functionId}, #{functionMeta.functionName},
#{functionMeta.metalakeId},"
+ + " #{functionMeta.catalogId}, #{functionMeta.schemaId},
#{functionMeta.functionType},"
+ + " #{functionMeta.deterministic}, #{functionMeta.returnType},"
+ + " #{functionMeta.functionCurrentVersion},
#{functionMeta.functionLatestVersion}, #{functionMeta.auditInfo},"
+ + " #{functionMeta.deletedAt})";
+ }
+
+ @Override
+ public String insertFunctionMetaOnDuplicateKeyUpdate(
+ @Param("functionMeta") FunctionPO functionPO) {
+ return "INSERT INTO "
+ + FunctionMetaMapper.TABLE_NAME
+ + " (function_id, function_name, metalake_id, catalog_id, schema_id,"
+ + " function_type, \"deterministic\", return_type,
function_current_version, function_latest_version, audit_info, deleted_at)"
+ + " VALUES (#{functionMeta.functionId}, #{functionMeta.functionName},
#{functionMeta.metalakeId},"
+ + " #{functionMeta.catalogId}, #{functionMeta.schemaId},
#{functionMeta.functionType},"
+ + " #{functionMeta.deterministic}, #{functionMeta.returnType},"
+ + " #{functionMeta.functionCurrentVersion},
#{functionMeta.functionLatestVersion}, #{functionMeta.auditInfo},"
+ + " #{functionMeta.deletedAt})"
+ + " ON CONFLICT (function_id) DO UPDATE SET"
+ + " function_name = #{functionMeta.functionName},"
+ + " metalake_id = #{functionMeta.metalakeId},"
+ + " catalog_id = #{functionMeta.catalogId},"
+ + " schema_id = #{functionMeta.schemaId},"
+ + " function_type = #{functionMeta.functionType},"
+ + " \"deterministic\" = #{functionMeta.deterministic},"
+ + " return_type = #{functionMeta.returnType},"
+ + " function_current_version = #{functionMeta.functionCurrentVersion},"
+ + " function_latest_version = #{functionMeta.functionLatestVersion},"
+ + " audit_info = #{functionMeta.auditInfo},"
+ + " deleted_at = #{functionMeta.deletedAt}";
+ }
+
+ @Override
+ public String listFunctionPOsBySchemaId(@Param("schemaId") Long schemaId) {
+ return "SELECT fm.function_id, fm.function_name, fm.metalake_id,
fm.catalog_id, fm.schema_id,"
+ + " fm.function_type, fm.\"deterministic\", fm.return_type,
fm.function_current_version, fm.function_latest_version,"
+ + " fm.audit_info, fm.deleted_at,"
+ + " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as
version_catalog_id,"
+ + " vi.schema_id as version_schema_id, vi.function_id as
version_function_id,"
+ + " vi.version, vi.function_comment, vi.definitions,"
+ + " vi.audit_info as version_audit_info, vi.deleted_at as
version_deleted_at"
+ + " FROM "
+ + FunctionMetaMapper.TABLE_NAME
+ + " fm INNER JOIN "
+ + FunctionMetaMapper.VERSION_TABLE_NAME
+ + " vi ON fm.function_id = vi.function_id AND
fm.function_current_version = vi.version"
+ + " WHERE fm.schema_id = #{schemaId} AND fm.deleted_at = 0 AND
vi.deleted_at = 0";
+ }
+
+ @Override
+ public String selectFunctionMetaBySchemaIdAndName(
+ @Param("schemaId") Long schemaId, @Param("functionName") String
functionName) {
+ return "SELECT fm.function_id, fm.function_name, fm.metalake_id,
fm.catalog_id, fm.schema_id,"
+ + " fm.function_type, fm.\"deterministic\", fm.return_type,
fm.function_current_version, fm.function_latest_version,"
+ + " fm.audit_info, fm.deleted_at,"
+ + " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as
version_catalog_id,"
+ + " vi.schema_id as version_schema_id, vi.function_id as
version_function_id,"
+ + " vi.version, vi.function_comment, vi.definitions,"
+ + " vi.audit_info as version_audit_info, vi.deleted_at as
version_deleted_at"
+ + " FROM "
+ + FunctionMetaMapper.TABLE_NAME
+ + " fm INNER JOIN "
+ + FunctionMetaMapper.VERSION_TABLE_NAME
+ + " vi ON fm.function_id = vi.function_id AND
fm.function_current_version = vi.version"
+ + " WHERE fm.schema_id = #{schemaId} AND fm.function_name =
#{functionName}"
+ + " AND fm.deleted_at = 0 AND vi.deleted_at = 0";
+ }
+
+ @Override
+ public String softDeleteFunctionMetaByFunctionId(@Param("functionId") Long
functionId) {
+ return "UPDATE "
+ + FunctionMetaMapper.TABLE_NAME
+ + " SET deleted_at = CAST(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000
AS BIGINT)"
+ + " WHERE function_id = #{functionId} AND deleted_at = 0";
+ }
+
+ @Override
+ public String softDeleteFunctionMetasByCatalogId(@Param("catalogId") Long
catalogId) {
+ return "UPDATE "
+ + FunctionMetaMapper.TABLE_NAME
+ + " SET deleted_at = CAST(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000
AS BIGINT)"
+ + " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
+ }
+
+ @Override
+ public String softDeleteFunctionMetasByMetalakeId(@Param("metalakeId") Long
metalakeId) {
+ return "UPDATE "
+ + FunctionMetaMapper.TABLE_NAME
+ + " SET deleted_at = CAST(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000
AS BIGINT)"
+ + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+ }
+
+ @Override
+ public String softDeleteFunctionMetasBySchemaId(@Param("schemaId") Long
schemaId) {
+ return "UPDATE "
+ + FunctionMetaMapper.TABLE_NAME
+ + " SET deleted_at = CAST(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000
AS BIGINT)"
+ + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
+ }
+
+ @Override
+ public String deleteFunctionMetasByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit)
{
+ return "DELETE FROM "
+ + FunctionMetaMapper.TABLE_NAME
+ + " WHERE function_id IN (SELECT function_id FROM "
+ + FunctionMetaMapper.TABLE_NAME
+ + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT
#{limit})";
+ }
+
+ @Override
+ public String updateFunctionMeta(
+ @Param("newFunctionMeta") FunctionPO newFunctionPO,
+ @Param("oldFunctionMeta") FunctionPO oldFunctionPO) {
+ return "UPDATE "
+ + FunctionMetaMapper.TABLE_NAME
+ + " SET function_name = #{newFunctionMeta.functionName},"
+ + " metalake_id = #{newFunctionMeta.metalakeId},"
+ + " catalog_id = #{newFunctionMeta.catalogId},"
+ + " schema_id = #{newFunctionMeta.schemaId},"
+ + " function_type = #{newFunctionMeta.functionType},"
+ + " \"deterministic\" = #{newFunctionMeta.deterministic},"
+ + " return_type = #{newFunctionMeta.returnType},"
+ + " function_current_version =
#{newFunctionMeta.functionCurrentVersion},"
+ + " function_latest_version =
#{newFunctionMeta.functionLatestVersion},"
+ + " audit_info = #{newFunctionMeta.auditInfo},"
+ + " deleted_at = #{newFunctionMeta.deletedAt}"
+ + " WHERE function_id = #{oldFunctionMeta.functionId}"
+ + " AND function_name = #{oldFunctionMeta.functionName}"
+ + " AND metalake_id = #{oldFunctionMeta.metalakeId}"
+ + " AND catalog_id = #{oldFunctionMeta.catalogId}"
+ + " AND schema_id = #{oldFunctionMeta.schemaId}"
+ + " AND function_type = #{oldFunctionMeta.functionType}"
+ + " AND function_current_version =
#{oldFunctionMeta.functionCurrentVersion}"
+ + " AND function_latest_version =
#{oldFunctionMeta.functionLatestVersion}"
+ + " AND audit_info = #{oldFunctionMeta.auditInfo}"
+ + " AND deleted_at = 0";
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/FunctionVersionMetaPostgreSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/FunctionVersionMetaPostgreSQLProvider.java
new file mode 100644
index 0000000000..ce2022cd41
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/FunctionVersionMetaPostgreSQLProvider.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import
org.apache.gravitino.storage.relational.mapper.FunctionVersionMetaMapper;
+import
org.apache.gravitino.storage.relational.mapper.provider.base.FunctionVersionMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.FunctionVersionPO;
+import org.apache.ibatis.annotations.Param;
+
+public class FunctionVersionMetaPostgreSQLProvider extends
FunctionVersionMetaBaseSQLProvider {
+
+ @Override
+ public String insertFunctionVersionMetaOnDuplicateKeyUpdate(
+ @Param("functionVersionMeta") FunctionVersionPO functionVersionPO) {
+ return "INSERT INTO "
+ + FunctionVersionMetaMapper.TABLE_NAME
+ + " (metalake_id, catalog_id, schema_id, function_id, version,"
+ + " function_comment, definitions, audit_info, deleted_at)"
+ + " VALUES (#{functionVersionMeta.metalakeId},
#{functionVersionMeta.catalogId},"
+ + " #{functionVersionMeta.schemaId},
#{functionVersionMeta.functionId},"
+ + " #{functionVersionMeta.functionVersion},
#{functionVersionMeta.functionComment},"
+ + " #{functionVersionMeta.definitions},
#{functionVersionMeta.auditInfo},"
+ + " #{functionVersionMeta.deletedAt})"
+ + " ON CONFLICT (function_id, version, deleted_at) DO UPDATE SET"
+ + " function_comment = #{functionVersionMeta.functionComment},"
+ + " definitions = #{functionVersionMeta.definitions},"
+ + " audit_info = #{functionVersionMeta.auditInfo},"
+ + " deleted_at = #{functionVersionMeta.deletedAt}";
+ }
+
+ @Override
+ public String softDeleteFunctionVersionMetasBySchemaId(@Param("schemaId")
Long schemaId) {
+ return "UPDATE "
+ + FunctionVersionMetaMapper.TABLE_NAME
+ + " SET deleted_at = CAST(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000
AS BIGINT)"
+ + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
+ }
+
+ @Override
+ public String softDeleteFunctionVersionMetasByCatalogId(@Param("catalogId")
Long catalogId) {
+ return "UPDATE "
+ + FunctionVersionMetaMapper.TABLE_NAME
+ + " SET deleted_at = CAST(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000
AS BIGINT)"
+ + " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
+ }
+
+ @Override
+ public String
softDeleteFunctionVersionMetasByMetalakeId(@Param("metalakeId") Long
metalakeId) {
+ return "UPDATE "
+ + FunctionVersionMetaMapper.TABLE_NAME
+ + " SET deleted_at = CAST(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000
AS BIGINT)"
+ + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+ }
+
+ @Override
+ public String deleteFunctionVersionMetasByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit)
{
+ return "DELETE FROM "
+ + FunctionVersionMetaMapper.TABLE_NAME
+ + " WHERE id IN (SELECT id FROM "
+ + FunctionVersionMetaMapper.TABLE_NAME
+ + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT
#{limit})";
+ }
+
+ @Override
+ public String softDeleteFunctionVersionsByRetentionLine(
+ @Param("functionId") Long functionId,
+ @Param("versionRetentionLine") long versionRetentionLine,
+ @Param("limit") int limit) {
+ return "UPDATE "
+ + FunctionVersionMetaMapper.TABLE_NAME
+ + " SET deleted_at = CAST(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000
AS BIGINT)"
+ + " WHERE id IN (SELECT id FROM "
+ + FunctionVersionMetaMapper.TABLE_NAME
+ + " WHERE function_id = #{functionId} AND version <=
#{versionRetentionLine}"
+ + " AND deleted_at = 0 LIMIT #{limit})";
+ }
+
+ @Override
+ public String softDeleteFunctionVersionsByFunctionId(@Param("functionId")
Long functionId) {
+ return "UPDATE "
+ + FunctionVersionMetaMapper.TABLE_NAME
+ + " SET deleted_at = CAST(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000
AS BIGINT)"
+ + " WHERE function_id = #{functionId} AND deleted_at = 0";
+ }
+}