This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 2fab0df70f [#9528] feat(storage): support function management
(Delete/Update)(part-4) (#9864)
2fab0df70f is described below
commit 2fab0df70fdcc053e9423d83e4a51388acd8a1df
Author: mchades <[email protected]>
AuthorDate: Wed Feb 4 12:07:00 2026 +0800
[#9528] feat(storage): support function management (Delete/Update)(part-4)
(#9864)
### What changes were proposed in this pull request?
Add Delete and Update support for Function management in
FunctionMetaService.
- Implemented deleteFunction with cascading delete support (versions).
- Implemented updateFunction to handle version increments.
- Added soft delete support for cascading deletion from
Catalog/Schema/Metalake.
### Why are the changes needed?
To complete the lifecycle management of functions, allowing users to
update logic or remove functions when no longer needed.
Fix: #9528
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- Added TestFunctionMetaService covering update and delete scenarios.
- Verified cascading deletes.
---------
Co-authored-by: Copilot <[email protected]>
---
.../gravitino/storage/relational/JDBCBackend.java | 10 +-
.../relational/mapper/FunctionMetaMapper.java | 33 +++
.../mapper/FunctionVersionMetaMapper.java | 45 ++++
.../storage/relational/po/FunctionPO.java | 7 +-
.../relational/service/CatalogMetaService.java | 10 +
.../relational/service/FunctionMetaService.java | 132 ++++++++++
.../relational/service/MetalakeMetaService.java | 10 +
.../relational/service/SchemaMetaService.java | 10 +
.../service/TestFunctionMetaService.java | 279 +++++++++++++++++++++
9 files changed, 531 insertions(+), 5 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index e6f7d46870..1a1110143a 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -428,8 +428,9 @@ public class JDBCBackend implements RelationalBackend {
.deleteModelVersionMetasByLegacyTimeline(
legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case FUNCTION:
- // TODO: implement deleteFunctionMetasByLegacyTimeline
- return 0;
+ return FunctionMetaService.getInstance()
+ .deleteFunctionMetasByLegacyTimeline(
+ legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case TABLE_STATISTIC:
return StatisticMetaService.getInstance()
.deleteStatisticsByLegacyTimeline(
@@ -490,8 +491,9 @@ public class JDBCBackend implements RelationalBackend {
versionRetentionCount,
GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case FUNCTION:
- // TODO: implement deleteFunctionVersionsByRetentionCount
- return 0;
+ return FunctionMetaService.getInstance()
+ .deleteFunctionVersionsByRetentionCount(
+ versionRetentionCount,
GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
default:
throw new IllegalArgumentException(
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaMapper.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaMapper.java
index 2314069fce..a57b40cc8a 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaMapper.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaMapper.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.storage.relational.mapper;
import java.util.List;
import org.apache.gravitino.storage.relational.po.FunctionPO;
import org.apache.gravitino.storage.relational.po.FunctionVersionPO;
+import org.apache.ibatis.annotations.DeleteProvider;
import org.apache.ibatis.annotations.InsertProvider;
import org.apache.ibatis.annotations.One;
import org.apache.ibatis.annotations.Param;
@@ -29,6 +30,7 @@ import org.apache.ibatis.annotations.ResultMap;
import org.apache.ibatis.annotations.Results;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.SelectProvider;
+import org.apache.ibatis.annotations.UpdateProvider;
/** A MyBatis Mapper for function metadata operation SQLs. */
public interface FunctionMetaMapper {
@@ -131,4 +133,35 @@ public interface FunctionMetaMapper {
method = "selectFunctionMetaBySchemaIdAndName")
FunctionPO selectFunctionMetaBySchemaIdAndName(
@Param("schemaId") Long schemaId, @Param("functionName") String
functionName);
+
+ @UpdateProvider(
+ type = FunctionMetaSQLProviderFactory.class,
+ method = "softDeleteFunctionMetaByFunctionId")
+ Integer softDeleteFunctionMetaByFunctionId(@Param("functionId") Long
functionId);
+
+ @UpdateProvider(
+ type = FunctionMetaSQLProviderFactory.class,
+ method = "softDeleteFunctionMetasByCatalogId")
+ Integer softDeleteFunctionMetasByCatalogId(@Param("catalogId") Long
catalogId);
+
+ @UpdateProvider(
+ type = FunctionMetaSQLProviderFactory.class,
+ method = "softDeleteFunctionMetasByMetalakeId")
+ Integer softDeleteFunctionMetasByMetalakeId(@Param("metalakeId") Long
metalakeId);
+
+ @UpdateProvider(
+ type = FunctionMetaSQLProviderFactory.class,
+ method = "softDeleteFunctionMetasBySchemaId")
+ Integer softDeleteFunctionMetasBySchemaId(@Param("schemaId") Long schemaId);
+
+ @DeleteProvider(
+ type = FunctionMetaSQLProviderFactory.class,
+ method = "deleteFunctionMetasByLegacyTimeline")
+ Integer deleteFunctionMetasByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+
+ @UpdateProvider(type = FunctionMetaSQLProviderFactory.class, method =
"updateFunctionMeta")
+ Integer updateFunctionMeta(
+ @Param("newFunctionMeta") FunctionPO newFunctionPO,
+ @Param("oldFunctionMeta") FunctionPO oldFunctionPO);
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionVersionMetaMapper.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionVersionMetaMapper.java
index d258494eea..e088736094 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionVersionMetaMapper.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionVersionMetaMapper.java
@@ -18,9 +18,14 @@
*/
package org.apache.gravitino.storage.relational.mapper;
+import java.util.List;
+import org.apache.gravitino.storage.relational.po.FunctionMaxVersionPO;
import org.apache.gravitino.storage.relational.po.FunctionVersionPO;
+import org.apache.ibatis.annotations.DeleteProvider;
import org.apache.ibatis.annotations.InsertProvider;
import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.SelectProvider;
+import org.apache.ibatis.annotations.UpdateProvider;
/** A MyBatis Mapper for function version metadata operation SQLs. */
public interface FunctionVersionMetaMapper {
@@ -37,4 +42,44 @@ public interface FunctionVersionMetaMapper {
method = "insertFunctionVersionMetaOnDuplicateKeyUpdate")
void insertFunctionVersionMetaOnDuplicateKeyUpdate(
@Param("functionVersionMeta") FunctionVersionPO functionVersionPO);
+
+ @UpdateProvider(
+ type = FunctionVersionMetaSQLProviderFactory.class,
+ method = "softDeleteFunctionVersionMetasBySchemaId")
+ Integer softDeleteFunctionVersionMetasBySchemaId(@Param("schemaId") Long
schemaId);
+
+ @UpdateProvider(
+ type = FunctionVersionMetaSQLProviderFactory.class,
+ method = "softDeleteFunctionVersionMetasByCatalogId")
+ Integer softDeleteFunctionVersionMetasByCatalogId(@Param("catalogId") Long
catalogId);
+
+ @UpdateProvider(
+ type = FunctionVersionMetaSQLProviderFactory.class,
+ method = "softDeleteFunctionVersionMetasByMetalakeId")
+ Integer softDeleteFunctionVersionMetasByMetalakeId(@Param("metalakeId") Long
metalakeId);
+
+ @DeleteProvider(
+ type = FunctionVersionMetaSQLProviderFactory.class,
+ method = "deleteFunctionVersionMetasByLegacyTimeline")
+ Integer deleteFunctionVersionMetasByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+
+ @SelectProvider(
+ type = FunctionVersionMetaSQLProviderFactory.class,
+ method = "selectFunctionVersionsByRetentionCount")
+ List<FunctionMaxVersionPO> selectFunctionVersionsByRetentionCount(
+ @Param("versionRetentionCount") Long versionRetentionCount);
+
+ @UpdateProvider(
+ type = FunctionVersionMetaSQLProviderFactory.class,
+ method = "softDeleteFunctionVersionsByRetentionLine")
+ Integer softDeleteFunctionVersionsByRetentionLine(
+ @Param("functionId") Long functionId,
+ @Param("versionRetentionLine") long versionRetentionLine,
+ @Param("limit") int limit);
+
+ @UpdateProvider(
+ type = FunctionVersionMetaSQLProviderFactory.class,
+ method = "softDeleteFunctionVersionsByFunctionId")
+ Integer softDeleteFunctionVersionsByFunctionId(@Param("functionId") Long
functionId);
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/po/FunctionPO.java
b/core/src/main/java/org/apache/gravitino/storage/relational/po/FunctionPO.java
index 5fc23337b8..4cc3c47e09 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/po/FunctionPO.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/po/FunctionPO.java
@@ -192,12 +192,17 @@ public class FunctionPO {
public static FunctionPO buildFunctionPO(
FunctionEntity functionEntity, FunctionPO.FunctionPOBuilder builder) {
+ return buildFunctionPO(functionEntity, builder, INITIAL_VERSION);
+ }
+
+ public static FunctionPO buildFunctionPO(
+ FunctionEntity functionEntity, FunctionPO.FunctionPOBuilder builder,
Integer version) {
try {
NamespacedEntityId namespacedEntityId =
EntityIdService.getEntityIds(
NameIdentifier.of(functionEntity.namespace().levels()),
Entity.EntityType.SCHEMA);
FunctionVersionPO versionPO =
- initializeFunctionVersionPO(functionEntity, namespacedEntityId,
INITIAL_VERSION);
+ initializeFunctionVersionPO(functionEntity, namespacedEntityId,
version);
return builder
.withFunctionId(functionEntity.id())
.withFunctionName(functionEntity.name())
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
index 99f6ddefe6..f5315b5c59 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
@@ -39,6 +39,8 @@ import
org.apache.gravitino.storage.relational.helper.CatalogIds;
import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
import org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper;
+import org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper;
+import
org.apache.gravitino.storage.relational.mapper.FunctionVersionMetaMapper;
import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
import
org.apache.gravitino.storage.relational.mapper.ModelVersionAliasRelMapper;
import org.apache.gravitino.storage.relational.mapper.ModelVersionMetaMapper;
@@ -278,6 +280,14 @@ public class CatalogMetaService {
SessionUtils.doWithoutCommit(
TopicMetaMapper.class,
mapper -> mapper.softDeleteTopicMetasByCatalogId(catalogId)),
+ () ->
+ SessionUtils.doWithoutCommit(
+ FunctionMetaMapper.class,
+ mapper ->
mapper.softDeleteFunctionMetasByCatalogId(catalogId)),
+ () ->
+ SessionUtils.doWithoutCommit(
+ FunctionVersionMetaMapper.class,
+ mapper ->
mapper.softDeleteFunctionVersionMetasByCatalogId(catalogId)),
() ->
SessionUtils.doWithoutCommit(
OwnerMetaMapper.class, mapper ->
mapper.softDeleteOwnerRelByCatalogId(catalogId)),
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionMetaService.java
index 4a5bfe36d8..c6ca471079 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionMetaService.java
@@ -20,16 +20,21 @@
package org.apache.gravitino.storage.relational.service;
import static
org.apache.gravitino.metrics.source.MetricsSource.GRAVITINO_RELATIONAL_STORE_METRIC_NAME;
+import static
org.apache.gravitino.storage.relational.po.FunctionPO.buildFunctionPO;
import static
org.apache.gravitino.storage.relational.po.FunctionPO.fromFunctionPO;
import static
org.apache.gravitino.storage.relational.po.FunctionPO.initializeFunctionPO;
+import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.gravitino.Entity;
import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.HasIdentifier;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.exceptions.NoSuchEntityException;
@@ -38,17 +43,21 @@ import org.apache.gravitino.meta.NamespacedEntityId;
import org.apache.gravitino.metrics.Monitored;
import org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper;
import
org.apache.gravitino.storage.relational.mapper.FunctionVersionMetaMapper;
+import org.apache.gravitino.storage.relational.po.FunctionMaxVersionPO;
import org.apache.gravitino.storage.relational.po.FunctionPO;
import org.apache.gravitino.storage.relational.utils.ExceptionUtils;
import org.apache.gravitino.storage.relational.utils.SessionUtils;
import org.apache.gravitino.utils.NameIdentifierUtil;
import org.apache.gravitino.utils.NamespaceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class FunctionMetaService {
public static FunctionMetaService getInstance() {
return INSTANCE;
}
+ private static final Logger LOG =
LoggerFactory.getLogger(FunctionMetaService.class);
private static final FunctionMetaService INSTANCE = new
FunctionMetaService();
private FunctionMetaService() {}
@@ -110,6 +119,82 @@ public class FunctionMetaService {
}
}
+ @Monitored(
+ metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
+ baseMetricName = "deleteFunction")
+ public boolean deleteFunction(NameIdentifier ident) {
+ FunctionPO functionPO = getFunctionPOByIdentifier(ident);
+ Long functionId = functionPO.functionId();
+
+ AtomicInteger functionDeletedCount = new AtomicInteger();
+ SessionUtils.doMultipleWithCommit(
+ // delete function meta
+ () ->
+ functionDeletedCount.set(
+ SessionUtils.getWithoutCommit(
+ FunctionMetaMapper.class,
+ mapper ->
mapper.softDeleteFunctionMetaByFunctionId(functionId))),
+
+ // delete function versions after meta deletion
+ () -> {
+ if (functionDeletedCount.get() > 0) {
+ SessionUtils.doWithoutCommit(
+ FunctionVersionMetaMapper.class,
+ mapper ->
mapper.softDeleteFunctionVersionsByFunctionId(functionId));
+ }
+ });
+
+ return functionDeletedCount.get() > 0;
+ }
+
+ @Monitored(
+ metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
+ baseMetricName = "deleteFunctionMetasByLegacyTimeline")
+ public int deleteFunctionMetasByLegacyTimeline(Long legacyTimeline, int
limit) {
+ int functionVersionDeletedCount =
+ SessionUtils.doWithCommitAndFetchResult(
+ FunctionVersionMetaMapper.class,
+ mapper ->
mapper.deleteFunctionVersionMetasByLegacyTimeline(legacyTimeline, limit));
+
+ int functionMetaDeletedCount =
+ SessionUtils.doWithCommitAndFetchResult(
+ FunctionMetaMapper.class,
+ mapper ->
mapper.deleteFunctionMetasByLegacyTimeline(legacyTimeline, limit));
+
+ return functionVersionDeletedCount + functionMetaDeletedCount;
+ }
+
+ @Monitored(
+ metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
+ baseMetricName = "deleteFunctionVersionsByRetentionCount")
+ public int deleteFunctionVersionsByRetentionCount(Long
versionRetentionCount, int limit) {
+ List<FunctionMaxVersionPO> functionCurVersions =
+ SessionUtils.getWithoutCommit(
+ FunctionVersionMetaMapper.class,
+ mapper ->
mapper.selectFunctionVersionsByRetentionCount(versionRetentionCount));
+
+ int totalDeletedCount = 0;
+ for (FunctionMaxVersionPO functionCurVersion : functionCurVersions) {
+ long versionRetentionLine = functionCurVersion.version() -
versionRetentionCount;
+ int deletedCount =
+ SessionUtils.doWithCommitAndFetchResult(
+ FunctionVersionMetaMapper.class,
+ mapper ->
+ mapper.softDeleteFunctionVersionsByRetentionLine(
+ functionCurVersion.functionId(), versionRetentionLine,
limit));
+ totalDeletedCount += deletedCount;
+
+ LOG.info(
+ "Soft delete functionVersions count: {} which versions are older
than or equal to"
+ + " versionRetentionLine: {}, the current functionId and version
is: <{}, {}>.",
+ deletedCount,
+ versionRetentionLine,
+ functionCurVersion.functionId(),
+ functionCurVersion.version());
+ }
+ return totalDeletedCount;
+ }
+
@Monitored(
metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
baseMetricName = "getFunctionPOByIdentifier")
@@ -119,6 +204,41 @@ public class FunctionMetaService {
return functionPOFetcher().apply(ident);
}
+ @Monitored(
+ metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
+ baseMetricName = "updateFunction")
+ public <E extends Entity & HasIdentifier> FunctionEntity updateFunction(
+ NameIdentifier identifier, Function<E, E> updater) throws IOException {
+ FunctionPO oldFunctionPO = getFunctionPOByIdentifier(identifier);
+ FunctionEntity oldFunctionEntity = fromFunctionPO(oldFunctionPO,
identifier.namespace());
+ FunctionEntity newEntity = (FunctionEntity) updater.apply((E)
oldFunctionEntity);
+ Preconditions.checkArgument(
+ Objects.equals(oldFunctionEntity.id(), newEntity.id()),
+ "The updated function entity id: %s should be same with the entity id
before: %s",
+ newEntity.id(),
+ oldFunctionEntity.id());
+
+ try {
+ FunctionPO newFunctionPO = updateFunctionPO(oldFunctionPO, newEntity);
+ // Insert a new version and update function meta
+ SessionUtils.doMultipleWithCommit(
+ () ->
+ SessionUtils.doWithoutCommit(
+ FunctionVersionMetaMapper.class,
+ mapper ->
mapper.insertFunctionVersionMeta(newFunctionPO.functionVersionPO())),
+ () ->
+ SessionUtils.doWithoutCommit(
+ FunctionMetaMapper.class,
+ mapper -> mapper.updateFunctionMeta(newFunctionPO,
oldFunctionPO)));
+
+ return newEntity;
+ } catch (RuntimeException re) {
+ ExceptionUtils.checkSQLException(
+ re, Entity.EntityType.FUNCTION,
newEntity.nameIdentifier().toString());
+ throw re;
+ }
+ }
+
private Function<NameIdentifier, FunctionPO> functionPOFetcher() {
return GravitinoEnv.getInstance().cacheEnabled()
? this::getFunctionPOBySchemaId
@@ -213,4 +333,16 @@ public class FunctionMetaService {
builder.withCatalogId(namespacedEntityId.namespaceIds()[1]);
builder.withSchemaId(namespacedEntityId.entityId());
}
+
+ private FunctionPO updateFunctionPO(FunctionPO oldFunctionPO, FunctionEntity
newFunction) {
+ Integer newVersion = oldFunctionPO.functionLatestVersion() + 1;
+ FunctionPO.FunctionPOBuilder builder =
+ FunctionPO.builder()
+ .withMetalakeId(oldFunctionPO.metalakeId())
+ .withCatalogId(oldFunctionPO.catalogId())
+ .withSchemaId(oldFunctionPO.schemaId())
+ .withFunctionLatestVersion(newVersion)
+ .withFunctionCurrentVersion(newVersion);
+ return buildFunctionPO(newFunction, builder, newVersion);
+ }
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
index e19ad76aef..7ea85374c9 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
@@ -37,6 +37,8 @@ import org.apache.gravitino.metrics.Monitored;
import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
import org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper;
+import org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper;
+import
org.apache.gravitino.storage.relational.mapper.FunctionVersionMetaMapper;
import org.apache.gravitino.storage.relational.mapper.GroupMetaMapper;
import org.apache.gravitino.storage.relational.mapper.GroupRoleRelMapper;
import org.apache.gravitino.storage.relational.mapper.JobMetaMapper;
@@ -230,6 +232,14 @@ public class MetalakeMetaService {
SessionUtils.doWithoutCommit(
TopicMetaMapper.class,
mapper ->
mapper.softDeleteTopicMetasByMetalakeId(metalakeId)),
+ () ->
+ SessionUtils.doWithoutCommit(
+ FunctionMetaMapper.class,
+ mapper ->
mapper.softDeleteFunctionMetasByMetalakeId(metalakeId)),
+ () ->
+ SessionUtils.doWithoutCommit(
+ FunctionVersionMetaMapper.class,
+ mapper ->
mapper.softDeleteFunctionVersionMetasByMetalakeId(metalakeId)),
() ->
SessionUtils.doWithoutCommit(
UserRoleRelMapper.class,
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
index fd150ec294..62a5e5e990 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
@@ -45,6 +45,8 @@ import org.apache.gravitino.metrics.Monitored;
import org.apache.gravitino.storage.relational.helper.SchemaIds;
import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
import org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper;
+import org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper;
+import
org.apache.gravitino.storage.relational.mapper.FunctionVersionMetaMapper;
import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
import
org.apache.gravitino.storage.relational.mapper.ModelVersionAliasRelMapper;
import org.apache.gravitino.storage.relational.mapper.ModelVersionMetaMapper;
@@ -247,6 +249,14 @@ public class SchemaMetaService {
() ->
SessionUtils.doWithoutCommit(
TopicMetaMapper.class, mapper ->
mapper.softDeleteTopicMetasBySchemaId(schemaId)),
+ () ->
+ SessionUtils.doWithoutCommit(
+ FunctionMetaMapper.class,
+ mapper ->
mapper.softDeleteFunctionMetasBySchemaId(schemaId)),
+ () ->
+ SessionUtils.doWithoutCommit(
+ FunctionVersionMetaMapper.class,
+ mapper ->
mapper.softDeleteFunctionVersionMetasBySchemaId(schemaId)),
() ->
SessionUtils.doWithoutCommit(
OwnerMetaMapper.class, mapper ->
mapper.softDeleteOwnerRelBySchemaId(schemaId)),
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestFunctionMetaService.java
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestFunctionMetaService.java
index 63f6d15989..5cc4c225ac 100644
---
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestFunctionMetaService.java
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestFunctionMetaService.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.storage.relational.service;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -28,9 +29,11 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.gravitino.Entity;
import org.apache.gravitino.EntityAlreadyExistsException;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
@@ -49,6 +52,7 @@ import org.apache.gravitino.rel.types.Types;
import org.apache.gravitino.storage.RandomIdGenerator;
import org.apache.gravitino.storage.relational.TestJDBCBackend;
import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.gravitino.utils.NameIdentifierUtil;
import org.apache.gravitino.utils.NamespaceUtil;
import org.apache.ibatis.session.SqlSession;
import org.junit.jupiter.api.BeforeEach;
@@ -110,6 +114,47 @@ public class TestFunctionMetaService extends
TestJDBCBackend {
assertEquals(function.deterministic(), loadedFunction.deterministic());
}
+ @TestTemplate
+ public void testMultipleVersionsInStorage() throws IOException {
+ // This test verifies that multiple versions are created in storage layer
+ // even though the API always returns the latest version
+ String functionName = GravitinoITUtils.genRandomName("test_function");
+ Namespace ns = NamespaceUtil.ofFunction(metalakeName, catalogName,
schemaName);
+ FunctionEntity function =
+ createFunctionEntity(RandomIdGenerator.INSTANCE.nextId(), ns,
functionName, AUDIT_INFO);
+
+ FunctionMetaService.getInstance().insertFunction(function, false);
+
+ // Update function to create version 2 in storage layer
+ NameIdentifier functionIdent =
+ NameIdentifier.of(metalakeName, catalogName, schemaName, functionName);
+ FunctionEntity updatedFunction =
+ FunctionEntity.builder()
+ .withId(function.id())
+ .withName(function.name())
+ .withNamespace(ns)
+ .withComment("updated comment")
+ .withFunctionType(function.functionType())
+ .withDeterministic(function.deterministic())
+ .withReturnType(function.returnType())
+ .withDefinitions(function.definitions())
+ .withAuditInfo(AUDIT_INFO)
+ .build();
+
+ FunctionMetaService.getInstance().updateFunction(functionIdent, e ->
updatedFunction);
+
+ // Get function always returns latest version
+ FunctionEntity loadedLatest =
+
FunctionMetaService.getInstance().getFunctionByIdentifier(functionIdent);
+ assertEquals("updated comment", loadedLatest.comment());
+
+ // Verify both versions exist in storage
+ Map<Integer, Long> versions = listFunctionVersions(function.id());
+ assertEquals(2, versions.size());
+ assertTrue(versions.containsKey(1));
+ assertTrue(versions.containsKey(2));
+ }
+
@TestTemplate
public void testListFunctions() throws IOException {
Namespace ns = NamespaceUtil.ofFunction(metalakeName, catalogName,
schemaName);
@@ -132,6 +177,240 @@ public class TestFunctionMetaService extends
TestJDBCBackend {
assertTrue(functions.stream().anyMatch(f ->
f.name().equals(functionName2)));
}
+ @TestTemplate
+ public void testUpdateFunction() throws IOException {
+ String functionName = GravitinoITUtils.genRandomName("test_function");
+ Namespace ns = NamespaceUtil.ofFunction(metalakeName, catalogName,
schemaName);
+ FunctionEntity function =
+ createFunctionEntity(RandomIdGenerator.INSTANCE.nextId(), ns,
functionName, AUDIT_INFO);
+
+ FunctionMetaService.getInstance().insertFunction(function, false);
+
+ // Update function (new version in storage layer)
+ NameIdentifier functionIdent =
+ NameIdentifier.of(metalakeName, catalogName, schemaName, functionName);
+ FunctionEntity updatedFunction =
+ FunctionEntity.builder()
+ .withId(function.id())
+ .withName(function.name())
+ .withNamespace(ns)
+ .withComment("updated comment")
+ .withFunctionType(function.functionType())
+ .withDeterministic(true)
+ .withReturnType(function.returnType())
+ .withDefinitions(function.definitions())
+ .withAuditInfo(AUDIT_INFO)
+ .build();
+
+ FunctionEntity result =
+ FunctionMetaService.getInstance().updateFunction(functionIdent, e ->
updatedFunction);
+
+ assertEquals("updated comment", result.comment());
+ assertTrue(result.deterministic());
+
+ // Verify both versions exist in DB
+ Map<Integer, Long> versions = listFunctionVersions(function.id());
+ assertEquals(2, versions.size());
+ assertTrue(versions.containsKey(1));
+ assertTrue(versions.containsKey(2));
+ }
+
+ @TestTemplate
+ public void testDeleteFunction() throws IOException {
+ String functionName = GravitinoITUtils.genRandomName("test_function");
+ Namespace ns = NamespaceUtil.ofFunction(metalakeName, catalogName,
schemaName);
+ FunctionEntity function =
+ createFunctionEntity(RandomIdGenerator.INSTANCE.nextId(), ns,
functionName, AUDIT_INFO);
+
+ FunctionMetaService.getInstance().insertFunction(function, false);
+
+ NameIdentifier functionIdent =
+ NameIdentifier.of(metalakeName, catalogName, schemaName, functionName);
+
assertTrue(FunctionMetaService.getInstance().deleteFunction(functionIdent));
+
+ // Verify function is soft deleted
+ assertThrows(
+ NoSuchEntityException.class,
+ () ->
FunctionMetaService.getInstance().getFunctionByIdentifier(functionIdent));
+ }
+
+ @TestTemplate
+ public void testDeleteNonExistentFunction() {
+ NameIdentifier functionIdent =
+ NameIdentifier.of(metalakeName, catalogName, schemaName,
"non_existent_function");
+ assertThrows(
+ NoSuchEntityException.class,
+ () -> FunctionMetaService.getInstance().deleteFunction(functionIdent));
+ }
+
+ @TestTemplate
+ public void testMetaLifeCycleFromCreationToDeletion() throws IOException {
+ String functionName = GravitinoITUtils.genRandomName("test_function");
+ Namespace ns = NamespaceUtil.ofFunction(metalakeName, catalogName,
schemaName);
+ FunctionEntity function =
+ createFunctionEntity(RandomIdGenerator.INSTANCE.nextId(), ns,
functionName, AUDIT_INFO);
+
+ FunctionMetaService.getInstance().insertFunction(function, false);
+
+ // Update function to create version 2 in storage layer
+ NameIdentifier functionIdent =
+ NameIdentifier.of(metalakeName, catalogName, schemaName, functionName);
+ FunctionEntity functionV2 =
+ FunctionEntity.builder()
+ .withId(function.id())
+ .withName(function.name())
+ .withNamespace(ns)
+ .withComment("version 2 comment")
+ .withFunctionType(function.functionType())
+ .withDeterministic(function.deterministic())
+ .withReturnType(function.returnType())
+ .withDefinitions(function.definitions())
+ .withAuditInfo(AUDIT_INFO)
+ .build();
+
+ FunctionMetaService.getInstance().updateFunction(functionIdent, e ->
functionV2);
+
+ // Create another function in a different schema
+ String anotherMetalakeName =
GravitinoITUtils.genRandomName("another-metalake");
+ String anotherCatalogName =
GravitinoITUtils.genRandomName("another-catalog");
+ String anotherSchemaName =
GravitinoITUtils.genRandomName("another-schema");
+ createAndInsertMakeLake(anotherMetalakeName);
+ createAndInsertCatalog(anotherMetalakeName, anotherCatalogName);
+ createAndInsertSchema(anotherMetalakeName, anotherCatalogName,
anotherSchemaName);
+
+ FunctionEntity anotherFunction =
+ createFunctionEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ NamespaceUtil.ofFunction(anotherMetalakeName, anotherCatalogName,
anotherSchemaName),
+ "another_function",
+ AUDIT_INFO);
+ FunctionMetaService.getInstance().insertFunction(anotherFunction, false);
+
+ // Update another function to version 2 and 3
+ NameIdentifier anotherFunctionIdent =
+ NameIdentifier.of(
+ anotherMetalakeName, anotherCatalogName, anotherSchemaName,
"another_function");
+ Namespace anotherNs =
+ NamespaceUtil.ofFunction(anotherMetalakeName, anotherCatalogName,
anotherSchemaName);
+ FunctionEntity anotherFunctionV2 =
+ FunctionEntity.builder()
+ .withId(anotherFunction.id())
+ .withName(anotherFunction.name())
+ .withNamespace(anotherNs)
+ .withComment("another function v2")
+ .withFunctionType(anotherFunction.functionType())
+ .withDeterministic(anotherFunction.deterministic())
+ .withReturnType(anotherFunction.returnType())
+ .withDefinitions(anotherFunction.definitions())
+ .withAuditInfo(AUDIT_INFO)
+ .build();
+ FunctionMetaService.getInstance().updateFunction(anotherFunctionIdent, e
-> anotherFunctionV2);
+
+ FunctionEntity anotherFunctionV3 =
+ FunctionEntity.builder()
+ .withId(anotherFunction.id())
+ .withName(anotherFunction.name())
+ .withNamespace(anotherNs)
+ .withComment("another function v3")
+ .withFunctionType(anotherFunction.functionType())
+ .withDeterministic(anotherFunction.deterministic())
+ .withReturnType(anotherFunction.returnType())
+ .withDefinitions(anotherFunction.definitions())
+ .withAuditInfo(AUDIT_INFO)
+ .build();
+ FunctionMetaService.getInstance().updateFunction(anotherFunctionIdent, e
-> anotherFunctionV3);
+
+ // Verify list functions
+ List<FunctionEntity> functions =
FunctionMetaService.getInstance().listFunctionsByNamespace(ns);
+ assertEquals(1, functions.size());
+ assertEquals(functionV2.name(), functions.get(0).name());
+
+ // Soft delete metalake (cascading delete)
+ backend.delete(NameIdentifierUtil.ofMetalake(metalakeName),
Entity.EntityType.METALAKE, true);
+
+ // Verify function is deleted in the deleted metalake
+ assertThrows(
+ NoSuchEntityException.class,
+ () ->
FunctionMetaService.getInstance().getFunctionByIdentifier(functionIdent));
+
+ // Verify another function still exists
+ NameIdentifier anotherFunctionIdentForVerify =
+ NameIdentifier.of(
+ anotherMetalakeName, anotherCatalogName, anotherSchemaName,
"another_function");
+ FunctionEntity loadedAnotherFunction =
+
FunctionMetaService.getInstance().getFunctionByIdentifier(anotherFunctionIdentForVerify);
+ assertNotNull(loadedAnotherFunction);
+
+ // Check legacy record after soft delete
+ assertTrue(legacyRecordExistsInDB(function.id(),
Entity.EntityType.FUNCTION));
+ assertEquals(2, listFunctionVersions(function.id()).size());
+ assertEquals(3, listFunctionVersions(anotherFunction.id()).size());
+
+ // Hard delete legacy data
+ for (Entity.EntityType entityType : Entity.EntityType.values()) {
+ backend.hardDeleteLegacyData(entityType, Instant.now().toEpochMilli() +
1000);
+ }
+ assertFalse(legacyRecordExistsInDB(function.id(),
Entity.EntityType.FUNCTION));
+ assertEquals(0, listFunctionVersions(function.id()).size());
+ assertEquals(3, listFunctionVersions(anotherFunction.id()).size());
+
+ // Soft delete old versions
+ for (Entity.EntityType entityType : Entity.EntityType.values()) {
+ backend.deleteOldVersionData(entityType, 1);
+ }
+ Map<Integer, Long> versionDeletedMap =
listFunctionVersions(anotherFunction.id());
+ assertEquals(3, versionDeletedMap.size());
+ assertEquals(1, versionDeletedMap.values().stream().filter(value -> value
== 0L).count());
+ assertEquals(2, versionDeletedMap.values().stream().filter(value -> value
!= 0L).count());
+
+ // Hard delete old versions
+ backend.hardDeleteLegacyData(Entity.EntityType.FUNCTION,
Instant.now().toEpochMilli() + 1000);
+ assertEquals(1, listFunctionVersions(anotherFunction.id()).size());
+ }
+
+ @TestTemplate
+ public void testDeleteFunctionVersionsByRetentionCount() throws IOException {
+ String functionName = GravitinoITUtils.genRandomName("test_function");
+ Namespace ns = NamespaceUtil.ofFunction(metalakeName, catalogName,
schemaName);
+ FunctionEntity function =
+ createFunctionEntity(RandomIdGenerator.INSTANCE.nextId(), ns,
functionName, AUDIT_INFO);
+
+ FunctionMetaService.getInstance().insertFunction(function, false);
+
+ // Create multiple versions
+ NameIdentifier functionIdent =
+ NameIdentifier.of(metalakeName, catalogName, schemaName, functionName);
+
+ for (int v = 2; v <= 5; v++) {
+ final int version = v;
+ FunctionEntity updatedFunction =
+ FunctionEntity.builder()
+ .withId(function.id())
+ .withName(function.name())
+ .withNamespace(ns)
+ .withComment("version " + version)
+ .withFunctionType(function.functionType())
+ .withDeterministic(function.deterministic())
+ .withReturnType(function.returnType())
+ .withDefinitions(function.definitions())
+ .withAuditInfo(AUDIT_INFO)
+ .build();
+ FunctionMetaService.getInstance().updateFunction(functionIdent, e ->
updatedFunction);
+ }
+
+ // Verify all 5 versions exist
+ assertEquals(5, listFunctionVersions(function.id()).size());
+
+ // Soft delete versions by retention count (keep only 2)
+
FunctionMetaService.getInstance().deleteFunctionVersionsByRetentionCount(2L,
100);
+
+ // Verify 3 versions are soft deleted
+ Map<Integer, Long> versionDeletedMap = listFunctionVersions(function.id());
+ assertEquals(5, versionDeletedMap.size());
+ assertEquals(2, versionDeletedMap.values().stream().filter(value -> value
== 0L).count());
+ assertEquals(3, versionDeletedMap.values().stream().filter(value -> value
!= 0L).count());
+ }
+
@TestTemplate
public void testGetNonExistentFunction() {
NameIdentifier functionIdent =