This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 5f869a53b0 [#9561] feat(core): Implement alterFunction for managed UDF
operations (#9737)
5f869a53b0 is described below
commit 5f869a53b0f2a5ac1b3b8092c2cb6e97d9d65019
Author: mchades <[email protected]>
AuthorDate: Wed Jan 21 02:23:36 2026 +0800
[#9561] feat(core): Implement alterFunction for managed UDF operations
(#9737)
### What changes were proposed in this pull request?
Implement the `alterFunction` method in `ManagedFunctionOperations` to
support modifying registered UDFs. This includes:
- **UpdateComment**: Update the function's comment/description
- **AddDefinition**: Add a new function definition with arity overlap
validation
- **RemoveDefinition**: Remove an existing definition (with guard
against removing the only definition)
- **AddImpl**: Add a new implementation to a definition (with runtime
uniqueness check)
- **UpdateImpl**: Update an existing implementation for a specific
runtime
- **RemoveImpl**: Remove an implementation from a definition (with guard
against removing the only implementation)
### Why are the changes needed?
Fix: #9561
This is part of the UDF management feature. After registering a
function, users need the ability to modify it without dropping and
recreating. The `alterFunction` API enables incremental updates to
function metadata, definitions, and implementations.
### Does this PR introduce _any_ user-facing change?
Yes. Users can now use the `alterFunction` API to modify registered
functions through various `FunctionChange` operations.
### How was this patch tested?
Added comprehensive unit tests in `TestManagedFunctionOperations`:
- Test updating comment
- Test adding/removing definitions with arity validation
- Test adding/updating/removing implementations
- Test error cases (non-existing function, non-existing runtime,
removing only definition/impl)
- Test parameter order validation during alter operations
---
.../catalog/ManagedFunctionOperations.java | 287 +++++++++++++-
.../catalog/TestManagedFunctionOperations.java | 422 +++++++++++++++++++++
2 files changed, 706 insertions(+), 3 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/ManagedFunctionOperations.java
b/core/src/main/java/org/apache/gravitino/catalog/ManagedFunctionOperations.java
index a2cefd583b..778cc4ed09 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/ManagedFunctionOperations.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/ManagedFunctionOperations.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.catalog;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -43,6 +44,8 @@ import org.apache.gravitino.function.FunctionCatalog;
import org.apache.gravitino.function.FunctionChange;
import org.apache.gravitino.function.FunctionColumn;
import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionDefinitions;
+import org.apache.gravitino.function.FunctionImpl;
import org.apache.gravitino.function.FunctionParam;
import org.apache.gravitino.function.FunctionType;
import org.apache.gravitino.meta.AuditInfo;
@@ -153,8 +156,20 @@ public class ManagedFunctionOperations implements
FunctionCatalog {
@Override
public Function alterFunction(NameIdentifier ident, FunctionChange...
changes)
throws NoSuchFunctionException, IllegalArgumentException {
- // TODO: Implement when FunctionEntity is available
- throw new UnsupportedOperationException("alterFunction: FunctionEntity not
yet implemented");
+ try {
+ return store.update(
+ ident,
+ FunctionEntity.class,
+ Entity.EntityType.FUNCTION,
+ oldEntity -> applyChanges(oldEntity, changes));
+
+ } catch (NoSuchEntityException e) {
+ throw new NoSuchFunctionException(e, "Function %s does not exist",
ident);
+ } catch (EntityAlreadyExistsException e) {
+ throw new IllegalArgumentException("Failed to alter function " + ident,
e);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to alter function " + ident, e);
+ }
}
@Override
@@ -245,9 +260,111 @@ public class ManagedFunctionOperations implements
FunctionCatalog {
}
}
+ private FunctionEntity applyChanges(FunctionEntity oldEntity,
FunctionChange... changes) {
+ String newComment = oldEntity.comment();
+ List<FunctionDefinition> newDefinitions =
+ new ArrayList<>(Arrays.asList(oldEntity.definitions()));
+
+ for (FunctionChange change : changes) {
+ if (change instanceof FunctionChange.UpdateComment) {
+ newComment = ((FunctionChange.UpdateComment) change).newComment();
+
+ } else if (change instanceof FunctionChange.AddDefinition) {
+ FunctionDefinition defToAdd = ((FunctionChange.AddDefinition)
change).definition();
+ validateNoArityOverlap(newDefinitions, defToAdd);
+ newDefinitions.add(defToAdd);
+
+ } else if (change instanceof FunctionChange.RemoveDefinition) {
+ FunctionParam[] paramsToRemove = ((FunctionChange.RemoveDefinition)
change).parameters();
+ validateRemoveDefinition(newDefinitions, paramsToRemove);
+ newDefinitions.removeIf(def -> parametersMatch(def.parameters(),
paramsToRemove));
+
+ } else if (change instanceof FunctionChange.AddImpl) {
+ FunctionChange.AddImpl addImpl = (FunctionChange.AddImpl) change;
+ FunctionParam[] targetParams = addImpl.parameters();
+ FunctionImpl implToAdd = addImpl.implementation();
+ newDefinitions = addImplToDefinition(newDefinitions, targetParams,
implToAdd);
+
+ } else if (change instanceof FunctionChange.UpdateImpl) {
+ FunctionChange.UpdateImpl updateImpl = (FunctionChange.UpdateImpl)
change;
+ FunctionParam[] targetParams = updateImpl.parameters();
+ FunctionImpl.RuntimeType runtime = updateImpl.runtime();
+ FunctionImpl newImpl = updateImpl.implementation();
+ newDefinitions = updateImplInDefinition(newDefinitions, targetParams,
runtime, newImpl);
+
+ } else if (change instanceof FunctionChange.RemoveImpl) {
+ FunctionChange.RemoveImpl removeImpl = (FunctionChange.RemoveImpl)
change;
+ FunctionParam[] targetParams = removeImpl.parameters();
+ FunctionImpl.RuntimeType runtime = removeImpl.runtime();
+ newDefinitions = removeImplFromDefinition(newDefinitions,
targetParams, runtime);
+
+ } else {
+ throw new IllegalArgumentException(
+ "Unknown function change: " + change.getClass().getSimpleName());
+ }
+ }
+
+ String currentUser = PrincipalUtils.getCurrentUserName();
+ Instant now = Instant.now();
+ AuditInfo newAuditInfo =
+ AuditInfo.builder()
+ .withCreator(oldEntity.auditInfo().creator())
+ .withCreateTime(oldEntity.auditInfo().createTime())
+ .withLastModifier(currentUser)
+ .withLastModifiedTime(now)
+ .build();
+
+ return FunctionEntity.builder()
+ .withId(oldEntity.id())
+ .withName(oldEntity.name())
+ .withNamespace(oldEntity.namespace())
+ .withComment(newComment)
+ .withFunctionType(oldEntity.functionType())
+ .withDeterministic(oldEntity.deterministic())
+ .withReturnType(oldEntity.returnType())
+ .withReturnColumns(oldEntity.returnColumns())
+ .withDefinitions(newDefinitions.toArray(new FunctionDefinition[0]))
+ .withAuditInfo(newAuditInfo)
+ .build();
+ }
+
+ /**
+ * Validates that a new definition does not create ambiguous function
arities with existing
+ * definitions. Each definition can support multiple arities based on
parameters with default
+ * values.
+ *
+ * <p>Gravitino enforces strict validation to prevent ambiguity. Operations
MUST fail if a new
+ * definition's invocation arities overlap with existing ones. For example,
if an existing
+ * definition {@code foo(int, float default 1.0)} supports arities {@code
(int)} and {@code (int,
+ * float)}, adding a new definition {@code foo(int, string default 'x')}
(which supports {@code
+ * (int)} and {@code (int, string)}) will be REJECTED because both support
the call {@code
+ * foo(1)}. This ensures every function invocation deterministically maps to
a single definition.
+ *
+ * @param existingDefinitions The current definitions.
+ * @param newDefinition The definition to add.
+ * @throws IllegalArgumentException If the new definition creates
overlapping arities.
+ */
+ private void validateNoArityOverlap(
+ List<FunctionDefinition> existingDefinitions, FunctionDefinition
newDefinition) {
+ Set<String> newArities = computeArities(newDefinition);
+
+ for (FunctionDefinition existing : existingDefinitions) {
+ Set<String> existingArities = computeArities(existing);
+ for (String arity : newArities) {
+ if (existingArities.contains(arity)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot add definition: arity '%s' overlaps with an existing
definition. "
+ + "This would create ambiguous function invocations.",
+ arity));
+ }
+ }
+ }
+ }
+
/**
* Computes all possible invocation arities for a function definition. A
definition with N
- * parameters where the last M have default values supports arities from
(N-M) to N parameters.
+ * parameters where the last M has default values supports arities from
(N-M) to N parameters.
*
* <p>For example:
*
@@ -313,4 +430,168 @@ public class ManagedFunctionOperations implements
FunctionCatalog {
Expression defaultValue = param.defaultValue();
return defaultValue != null && defaultValue !=
Column.DEFAULT_VALUE_NOT_SET;
}
+
+ /**
+ * Validates that a definition can be removed.
+ *
+ * @param definitions The current definitions.
+ * @param paramsToRemove The parameters identifying the definition to remove.
+ * @throws IllegalArgumentException If the definition doesn't exist or is
the only one.
+ */
+ private void validateRemoveDefinition(
+ List<FunctionDefinition> definitions, FunctionParam[] paramsToRemove) {
+ if (definitions.size() == 1) {
+ throw new IllegalArgumentException(
+ "Cannot remove the only definition. Use dropFunction to remove the
entire function.");
+ }
+
+ boolean found = false;
+ for (FunctionDefinition def : definitions) {
+ if (parametersMatch(def.parameters(), paramsToRemove)) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ throw new IllegalArgumentException(
+ "Cannot remove definition: no definition found with the specified
parameters");
+ }
+ }
+
+ private boolean parametersMatch(FunctionParam[] params1, FunctionParam[]
params2) {
+ if (params1.length != params2.length) {
+ return false;
+ }
+ for (int i = 0; i < params1.length; i++) {
+ if (!params1[i].name().equals(params2[i].name())
+ || !params1[i].dataType().equals(params2[i].dataType())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private List<FunctionDefinition> addImplToDefinition(
+ List<FunctionDefinition> definitions, FunctionParam[] targetParams,
FunctionImpl implToAdd) {
+ List<FunctionDefinition> result = new ArrayList<>();
+ boolean found = false;
+
+ for (FunctionDefinition def : definitions) {
+ if (parametersMatch(def.parameters(), targetParams)) {
+ found = true;
+ // Check if runtime already exists
+ for (FunctionImpl existingImpl : def.impls()) {
+ if (existingImpl.runtime() == implToAdd.runtime()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot add implementation: runtime '%s' already exists in
this definition. "
+ + "Use updateImpl to replace it.",
+ implToAdd.runtime()));
+ }
+ }
+ List<FunctionImpl> impls = new ArrayList<>(Arrays.asList(def.impls()));
+ impls.add(implToAdd);
+ result.add(FunctionDefinitions.of(def.parameters(), impls.toArray(new
FunctionImpl[0])));
+ } else {
+ result.add(def);
+ }
+ }
+
+ if (!found) {
+ throw new IllegalArgumentException(
+ "Cannot add implementation: no definition found with the specified
parameters");
+ }
+
+ return result;
+ }
+
+ private List<FunctionDefinition> updateImplInDefinition(
+ List<FunctionDefinition> definitions,
+ FunctionParam[] targetParams,
+ FunctionImpl.RuntimeType runtime,
+ FunctionImpl newImpl) {
+ List<FunctionDefinition> result = new ArrayList<>();
+ boolean definitionFound = false;
+ boolean runtimeFound = false;
+
+ for (FunctionDefinition def : definitions) {
+ if (parametersMatch(def.parameters(), targetParams)) {
+ definitionFound = true;
+ List<FunctionImpl> impls = new ArrayList<>();
+ for (FunctionImpl impl : def.impls()) {
+ if (impl.runtime() == runtime) {
+ runtimeFound = true;
+ impls.add(newImpl);
+ } else {
+ impls.add(impl);
+ }
+ }
+ result.add(FunctionDefinitions.of(def.parameters(), impls.toArray(new
FunctionImpl[0])));
+ } else {
+ result.add(def);
+ }
+ }
+
+ if (!definitionFound) {
+ throw new IllegalArgumentException(
+ "Cannot update implementation: no definition found with the
specified parameters");
+ }
+
+ if (!runtimeFound) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot update implementation: runtime '%s' not found in the
definition", runtime));
+ }
+
+ return result;
+ }
+
+ private List<FunctionDefinition> removeImplFromDefinition(
+ List<FunctionDefinition> definitions,
+ FunctionParam[] targetParams,
+ FunctionImpl.RuntimeType runtime) {
+ List<FunctionDefinition> result = new ArrayList<>();
+ boolean definitionFound = false;
+ boolean runtimeFound = false;
+
+ for (FunctionDefinition def : definitions) {
+ if (parametersMatch(def.parameters(), targetParams)) {
+ definitionFound = true;
+
+ // Check if this is the only implementation
+ if (def.impls().length == 1) {
+ if (def.impls()[0].runtime() == runtime) {
+ throw new IllegalArgumentException(
+ "Cannot remove the only implementation. Use removeDefinition
to remove the entire definition.");
+ }
+ }
+
+ List<FunctionImpl> impls = new ArrayList<>();
+ for (FunctionImpl impl : def.impls()) {
+ if (impl.runtime() == runtime) {
+ runtimeFound = true;
+ } else {
+ impls.add(impl);
+ }
+ }
+ result.add(FunctionDefinitions.of(def.parameters(), impls.toArray(new
FunctionImpl[0])));
+ } else {
+ result.add(def);
+ }
+ }
+
+ if (!definitionFound) {
+ throw new IllegalArgumentException(
+ "Cannot remove implementation: no definition found with the
specified parameters");
+ }
+
+ if (!runtimeFound) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot remove implementation: runtime '%s' not found in the
definition", runtime));
+ }
+
+ return result;
+ }
}
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestManagedFunctionOperations.java
b/core/src/test/java/org/apache/gravitino/catalog/TestManagedFunctionOperations.java
index 417eb1a361..9a759cb28b 100644
---
a/core/src/test/java/org/apache/gravitino/catalog/TestManagedFunctionOperations.java
+++
b/core/src/test/java/org/apache/gravitino/catalog/TestManagedFunctionOperations.java
@@ -38,6 +38,7 @@ import
org.apache.gravitino.exceptions.FunctionAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.exceptions.NoSuchFunctionException;
import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionChange;
import org.apache.gravitino.function.FunctionDefinition;
import org.apache.gravitino.function.FunctionDefinitions;
import org.apache.gravitino.function.FunctionImpl;
@@ -45,6 +46,7 @@ import org.apache.gravitino.function.FunctionImpls;
import org.apache.gravitino.function.FunctionParam;
import org.apache.gravitino.function.FunctionParams;
import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.function.JavaImpl;
import org.apache.gravitino.meta.FunctionEntity;
import org.apache.gravitino.rel.expressions.literals.Literals;
import org.apache.gravitino.rel.types.Types;
@@ -184,6 +186,94 @@ public class TestManagedFunctionOperations {
Assertions.assertFalse(functionOperations.dropFunction(funcIdent));
}
+ @Test
+ public void testAlterFunctionUpdateComment() {
+ NameIdentifier funcIdent = getFunctionIdent("func_to_alter");
+ FunctionParam[] params = new FunctionParam[] {FunctionParams.of("a",
Types.IntegerType.get())};
+ FunctionDefinition[] definitions = new FunctionDefinition[]
{createSimpleDefinition(params)};
+
+ functionOperations.registerFunction(
+ funcIdent,
+ "Original comment",
+ FunctionType.SCALAR,
+ true,
+ Types.StringType.get(),
+ definitions);
+
+ // Update comment
+ String newComment = "Updated comment";
+ org.apache.gravitino.function.Function updatedFunc =
+ functionOperations.alterFunction(funcIdent,
FunctionChange.updateComment(newComment));
+
+ Assertions.assertEquals(newComment, updatedFunc.comment());
+
+ // Verify the change is persisted
+ org.apache.gravitino.function.Function loadedFunc =
functionOperations.getFunction(funcIdent);
+ Assertions.assertEquals(newComment, loadedFunc.comment());
+ }
+
+ @Test
+ public void testAlterFunctionAddDefinition() {
+ NameIdentifier funcIdent = getFunctionIdent("func_add_def");
+ FunctionParam[] params1 = new FunctionParam[] {FunctionParams.of("a",
Types.IntegerType.get())};
+ FunctionDefinition[] definitions1 = new FunctionDefinition[]
{createSimpleDefinition(params1)};
+
+ functionOperations.registerFunction(
+ funcIdent,
+ "Test function",
+ FunctionType.SCALAR,
+ true,
+ Types.StringType.get(),
+ definitions1);
+
+ // Add a new definition with different parameters
+ FunctionParam[] params2 =
+ new FunctionParam[] {
+ FunctionParams.of("a", Types.IntegerType.get()),
+ FunctionParams.of("b", Types.StringType.get())
+ };
+ FunctionDefinition newDef = createSimpleDefinition(params2);
+
+ org.apache.gravitino.function.Function updatedFunc =
+ functionOperations.alterFunction(funcIdent,
FunctionChange.addDefinition(newDef));
+
+ Assertions.assertEquals(2, updatedFunc.definitions().length);
+ }
+
+ @Test
+ public void testAlterFunctionAddDefinitionWithOverlappingArity() {
+ NameIdentifier funcIdent = getFunctionIdent("func_overlap");
+
+ // Create definition: foo(int, float default 1.0) supports arities (int)
and (int, float)
+ FunctionParam[] params1 =
+ new FunctionParam[] {
+ FunctionParams.of("a", Types.IntegerType.get()),
+ FunctionParams.of("b", Types.FloatType.get(), null,
Literals.floatLiteral(1.0f))
+ };
+ FunctionDefinition[] definitions1 = new FunctionDefinition[]
{createSimpleDefinition(params1)};
+
+ functionOperations.registerFunction(
+ funcIdent,
+ "Test function",
+ FunctionType.SCALAR,
+ true,
+ Types.StringType.get(),
+ definitions1);
+
+ // Try to add definition: foo(int, string default 'x') which supports
(int) and (int, string)
+ // This should fail because both support the call foo(int)
+ FunctionParam[] params2 =
+ new FunctionParam[] {
+ FunctionParams.of("a", Types.IntegerType.get()),
+ FunctionParams.of("c", Types.StringType.get(), null,
Literals.stringLiteral("x"))
+ };
+ FunctionDefinition newDef = createSimpleDefinition(params2);
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> functionOperations.alterFunction(funcIdent,
FunctionChange.addDefinition(newDef)));
+ }
+
@Test
public void testRegisterFunctionWithOverlappingDefinitions() {
NameIdentifier funcIdent = getFunctionIdent("func_overlap_register");
@@ -388,6 +478,310 @@ public class TestManagedFunctionOperations {
Assertions.assertEquals(2, func.definitions().length);
}
+ @Test
+ public void testAlterFunctionRemoveDefinition() {
+ NameIdentifier funcIdent = getFunctionIdent("func_remove_def");
+
+ // Create function with two definitions
+ FunctionParam[] params1 = new FunctionParam[] {FunctionParams.of("a",
Types.IntegerType.get())};
+ FunctionParam[] params2 = new FunctionParam[] {FunctionParams.of("b",
Types.StringType.get())};
+ FunctionDefinition[] definitions =
+ new FunctionDefinition[] {createSimpleDefinition(params1),
createSimpleDefinition(params2)};
+
+ functionOperations.registerFunction(
+ funcIdent, "Test function", FunctionType.SCALAR, true,
Types.StringType.get(), definitions);
+
+ // Remove one definition
+ Function updatedFunc =
+ functionOperations.alterFunction(funcIdent,
FunctionChange.removeDefinition(params1));
+
+ Assertions.assertEquals(1, updatedFunc.definitions().length);
+ }
+
+ @Test
+ public void testAlterFunctionRemoveOnlyDefinition() {
+ NameIdentifier funcIdent = getFunctionIdent("func_remove_only");
+ FunctionParam[] params = new FunctionParam[] {FunctionParams.of("a",
Types.IntegerType.get())};
+ FunctionDefinition[] definitions = new FunctionDefinition[]
{createSimpleDefinition(params)};
+
+ functionOperations.registerFunction(
+ funcIdent, "Test function", FunctionType.SCALAR, true,
Types.StringType.get(), definitions);
+
+ // Try to remove the only definition - should fail
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> functionOperations.alterFunction(funcIdent,
FunctionChange.removeDefinition(params)));
+ }
+
+ @Test
+ public void testAlterFunctionRemoveNonExistingDefinition() {
+ NameIdentifier funcIdent = getFunctionIdent("func_remove_nonexist");
+ FunctionParam[] params = new FunctionParam[] {FunctionParams.of("a",
Types.IntegerType.get())};
+ FunctionDefinition[] definitions = new FunctionDefinition[]
{createSimpleDefinition(params)};
+
+ functionOperations.registerFunction(
+ funcIdent, "Test function", FunctionType.SCALAR, true,
Types.StringType.get(), definitions);
+
+ // Try to remove a definition that doesn't exist
+ FunctionParam[] nonExistingParams =
+ new FunctionParam[] {FunctionParams.of("x", Types.StringType.get())};
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ functionOperations.alterFunction(
+ funcIdent,
FunctionChange.removeDefinition(nonExistingParams)));
+ }
+
+ @Test
+ public void testAlterFunctionAddImpl() {
+ NameIdentifier funcIdent = getFunctionIdent("func_add_impl");
+ FunctionParam[] params = new FunctionParam[] {FunctionParams.of("a",
Types.IntegerType.get())};
+ FunctionImpl sparkImpl =
+ FunctionImpls.ofJava(FunctionImpl.RuntimeType.SPARK,
"com.example.SparkUDF");
+ FunctionDefinition[] definitions =
+ new FunctionDefinition[] {
+ createDefinitionWithImpls(params, new FunctionImpl[] {sparkImpl})
+ };
+
+ functionOperations.registerFunction(
+ funcIdent, "Test function", FunctionType.SCALAR, true,
Types.StringType.get(), definitions);
+
+ // Add Trino implementation
+ FunctionImpl trinoImpl =
+ FunctionImpls.ofJava(FunctionImpl.RuntimeType.TRINO,
"com.example.TrinoUDF");
+ org.apache.gravitino.function.Function updatedFunc =
+ functionOperations.alterFunction(funcIdent,
FunctionChange.addImpl(params, trinoImpl));
+
+ Assertions.assertEquals(2, updatedFunc.definitions()[0].impls().length);
+ }
+
+ @Test
+ public void testAlterFunctionAddImplDuplicateRuntime() {
+ NameIdentifier funcIdent = getFunctionIdent("func_add_impl_dup");
+ FunctionParam[] params = new FunctionParam[] {FunctionParams.of("a",
Types.IntegerType.get())};
+ FunctionImpl sparkImpl =
+ FunctionImpls.ofJava(FunctionImpl.RuntimeType.SPARK,
"com.example.SparkUDF");
+ FunctionDefinition[] definitions =
+ new FunctionDefinition[] {
+ createDefinitionWithImpls(params, new FunctionImpl[] {sparkImpl})
+ };
+
+ functionOperations.registerFunction(
+ funcIdent, "Test function", FunctionType.SCALAR, true,
Types.StringType.get(), definitions);
+
+ // Try to add another Spark implementation - should fail
+ FunctionImpl anotherSparkImpl =
+ FunctionImpls.ofJava(FunctionImpl.RuntimeType.SPARK,
"com.example.AnotherSparkUDF");
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ functionOperations.alterFunction(
+ funcIdent, FunctionChange.addImpl(params, anotherSparkImpl)));
+ }
+
+ @Test
+ public void testAlterFunctionAddImplToNonExistingDefinition() {
+ NameIdentifier funcIdent = getFunctionIdent("func_add_impl_nodef");
+ FunctionParam[] params = new FunctionParam[] {FunctionParams.of("a",
Types.IntegerType.get())};
+ FunctionDefinition[] definitions = new FunctionDefinition[]
{createSimpleDefinition(params)};
+
+ functionOperations.registerFunction(
+ funcIdent, "Test function", FunctionType.SCALAR, true,
Types.StringType.get(), definitions);
+
+ // Try to add impl to non-existing definition
+ FunctionParam[] nonExistingParams =
+ new FunctionParam[] {FunctionParams.of("x", Types.StringType.get())};
+ FunctionImpl impl =
+ FunctionImpls.ofJava(FunctionImpl.RuntimeType.TRINO,
"com.example.TrinoUDF");
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ functionOperations.alterFunction(
+ funcIdent, FunctionChange.addImpl(nonExistingParams, impl)));
+ }
+
+ @Test
+ public void testAlterFunctionUpdateImpl() {
+ NameIdentifier funcIdent = getFunctionIdent("func_update_impl");
+ FunctionParam[] params = new FunctionParam[] {FunctionParams.of("a",
Types.IntegerType.get())};
+ FunctionImpl sparkImpl =
+ FunctionImpls.ofJava(FunctionImpl.RuntimeType.SPARK,
"com.example.OldSparkUDF");
+ FunctionDefinition[] definitions =
+ new FunctionDefinition[] {
+ createDefinitionWithImpls(params, new FunctionImpl[] {sparkImpl})
+ };
+
+ functionOperations.registerFunction(
+ funcIdent, "Test function", FunctionType.SCALAR, true,
Types.StringType.get(), definitions);
+
+ // Update Spark implementation
+ FunctionImpl newSparkImpl =
+ FunctionImpls.ofJava(FunctionImpl.RuntimeType.SPARK,
"com.example.NewSparkUDF");
+ org.apache.gravitino.function.Function updatedFunc =
+ functionOperations.alterFunction(
+ funcIdent,
+ FunctionChange.updateImpl(params, FunctionImpl.RuntimeType.SPARK,
newSparkImpl));
+
+ Assertions.assertEquals(1, updatedFunc.definitions()[0].impls().length);
+ // Verify the implementation was actually updated to the new class
+ FunctionImpl updatedImpl = updatedFunc.definitions()[0].impls()[0];
+ Assertions.assertEquals(FunctionImpl.RuntimeType.SPARK,
updatedImpl.runtime());
+ Assertions.assertInstanceOf(JavaImpl.class, updatedImpl);
+ Assertions.assertEquals("com.example.NewSparkUDF", ((JavaImpl)
updatedImpl).className());
+ }
+
+ @Test
+ public void testAlterFunctionUpdateImplNonExistingRuntime() {
+ NameIdentifier funcIdent = getFunctionIdent("func_update_impl_noruntime");
+ FunctionParam[] params = new FunctionParam[] {FunctionParams.of("a",
Types.IntegerType.get())};
+ FunctionImpl sparkImpl =
+ FunctionImpls.ofJava(FunctionImpl.RuntimeType.SPARK,
"com.example.SparkUDF");
+ FunctionDefinition[] definitions =
+ new FunctionDefinition[] {
+ createDefinitionWithImpls(params, new FunctionImpl[] {sparkImpl})
+ };
+
+ functionOperations.registerFunction(
+ funcIdent, "Test function", FunctionType.SCALAR, true,
Types.StringType.get(), definitions);
+
+ // Try to update Trino implementation which doesn't exist
+ FunctionImpl trinoImpl =
+ FunctionImpls.ofJava(FunctionImpl.RuntimeType.TRINO,
"com.example.TrinoUDF");
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ functionOperations.alterFunction(
+ funcIdent,
+ FunctionChange.updateImpl(params,
FunctionImpl.RuntimeType.TRINO, trinoImpl)));
+ }
+
+ @Test
+ public void testAlterFunctionRemoveImpl() {
+ NameIdentifier funcIdent = getFunctionIdent("func_remove_impl");
+ FunctionParam[] params = new FunctionParam[] {FunctionParams.of("a",
Types.IntegerType.get())};
+ FunctionImpl sparkImpl =
+ FunctionImpls.ofJava(FunctionImpl.RuntimeType.SPARK,
"com.example.SparkUDF");
+ FunctionImpl trinoImpl =
+ FunctionImpls.ofJava(FunctionImpl.RuntimeType.TRINO,
"com.example.TrinoUDF");
+ FunctionDefinition[] definitions =
+ new FunctionDefinition[] {
+ createDefinitionWithImpls(params, new FunctionImpl[] {sparkImpl,
trinoImpl})
+ };
+
+ functionOperations.registerFunction(
+ funcIdent, "Test function", FunctionType.SCALAR, true,
Types.StringType.get(), definitions);
+
+ // Remove Spark implementation
+ org.apache.gravitino.function.Function updatedFunc =
+ functionOperations.alterFunction(
+ funcIdent, FunctionChange.removeImpl(params,
FunctionImpl.RuntimeType.SPARK));
+
+ Assertions.assertEquals(1, updatedFunc.definitions()[0].impls().length);
+ Assertions.assertEquals(
+ FunctionImpl.RuntimeType.TRINO,
updatedFunc.definitions()[0].impls()[0].runtime());
+ }
+
+ @Test
+ public void testAlterFunctionRemoveOnlyImpl() {
+ NameIdentifier funcIdent = getFunctionIdent("func_remove_only_impl");
+ FunctionParam[] params = new FunctionParam[] {FunctionParams.of("a",
Types.IntegerType.get())};
+ FunctionImpl sparkImpl =
+ FunctionImpls.ofJava(FunctionImpl.RuntimeType.SPARK,
"com.example.SparkUDF");
+ FunctionDefinition[] definitions =
+ new FunctionDefinition[] {
+ createDefinitionWithImpls(params, new FunctionImpl[] {sparkImpl})
+ };
+
+ functionOperations.registerFunction(
+ funcIdent, "Test function", FunctionType.SCALAR, true,
Types.StringType.get(), definitions);
+
+ // Try to remove the only implementation - should fail
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ functionOperations.alterFunction(
+ funcIdent, FunctionChange.removeImpl(params,
FunctionImpl.RuntimeType.SPARK)));
+ }
+
+ @Test
+ public void testAlterFunctionRemoveImplNonExistingRuntime() {
+ NameIdentifier funcIdent = getFunctionIdent("func_remove_impl_noruntime");
+ FunctionParam[] params = new FunctionParam[] {FunctionParams.of("a",
Types.IntegerType.get())};
+ FunctionImpl sparkImpl =
+ FunctionImpls.ofJava(FunctionImpl.RuntimeType.SPARK,
"com.example.SparkUDF");
+ FunctionDefinition[] definitions =
+ new FunctionDefinition[] {
+ createDefinitionWithImpls(params, new FunctionImpl[] {sparkImpl})
+ };
+
+ functionOperations.registerFunction(
+ funcIdent, "Test function", FunctionType.SCALAR, true,
Types.StringType.get(), definitions);
+
+ // Try to remove Trino implementation which doesn't exist
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ functionOperations.alterFunction(
+ funcIdent, FunctionChange.removeImpl(params,
FunctionImpl.RuntimeType.TRINO)));
+ }
+
+ @Test
+ public void testAlterNonExistingFunction() {
+ NameIdentifier nonExistingIdent = getFunctionIdent("non_existing_func");
+
+ Assertions.assertThrows(
+ NoSuchFunctionException.class,
+ () ->
+ functionOperations.alterFunction(
+ nonExistingIdent, FunctionChange.updateComment("new
comment")));
+ }
+
+ @Test
+ public void testParameterOrderValidationInAlterFunction() {
+ // Test that parameter validation also works when altering functions
+ NameIdentifier funcIdent = getFunctionIdent("func_alter_invalid_params");
+
+ // First register a valid function
+ FunctionParam[] initialParams =
+ new FunctionParam[] {FunctionParams.of("a", Types.IntegerType.get())};
+ FunctionDefinition[] initialDefinitions =
+ new FunctionDefinition[] {createSimpleDefinition(initialParams)};
+
+ functionOperations.registerFunction(
+ funcIdent,
+ "Initial function",
+ FunctionType.SCALAR,
+ true,
+ Types.StringType.get(),
+ initialDefinitions);
+
+ // Try to add a definition with invalid parameter order
+ FunctionParam[] invalidParams =
+ new FunctionParam[] {
+ FunctionParams.of("x", Types.IntegerType.get(), "param x",
Literals.integerLiteral(1)),
+ FunctionParams.of("y", Types.StringType.get()), // Required after
optional
+ };
+
+ IllegalArgumentException ex =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ functionOperations.alterFunction(
+ funcIdent,
+
FunctionChange.addDefinition(createSimpleDefinition(invalidParams))));
+
+ Assertions.assertTrue(
+ ex.getMessage().contains("Invalid parameter order"),
+ "Expected error about invalid parameter order, got: " +
ex.getMessage());
+ Assertions.assertTrue(
+ ex.getMessage().contains("required parameter 'y'"),
+ "Expected error to mention parameter 'y', got: " + ex.getMessage());
+ }
+
@SuppressWarnings("unchecked")
private EntityStore createMockEntityStore() throws Exception {
EntityStore mockStore = mock(EntityStore.class);
@@ -421,6 +815,29 @@ public class TestManagedFunctionOperations {
return entity;
});
+ // Mock update operation
+ when(mockStore.update(
+ any(NameIdentifier.class),
+ eq(FunctionEntity.class),
+ eq(Entity.EntityType.FUNCTION),
+ any(java.util.function.Function.class)))
+ .thenAnswer(
+ invocation -> {
+ NameIdentifier ident = invocation.getArgument(0);
+ java.util.function.Function<FunctionEntity, FunctionEntity>
updater =
+ invocation.getArgument(3);
+
+ FunctionEntity oldEntity = findEntityByIdent(ident);
+ if (oldEntity == null) {
+ throw new NoSuchEntityException("Entity %s does not exist",
ident);
+ }
+
+ FunctionEntity newEntity = updater.apply(oldEntity);
+ NameIdentifier originalIdent = oldEntity.nameIdentifier();
+ entityMap.put(originalIdent, newEntity);
+ return newEntity;
+ });
+
// Mock delete operation (2 parameters - default method that calls
3-parameter version)
when(mockStore.delete(any(NameIdentifier.class),
eq(Entity.EntityType.FUNCTION)))
.thenAnswer(
@@ -490,4 +907,9 @@ public class TestManagedFunctionOperations {
FunctionImpl impl = FunctionImpls.ofJava(FunctionImpl.RuntimeType.SPARK,
"com.example.TestUDF");
return FunctionDefinitions.of(params, new FunctionImpl[] {impl});
}
+
+ private FunctionDefinition createDefinitionWithImpls(
+ FunctionParam[] params, FunctionImpl[] impls) {
+ return FunctionDefinitions.of(params, impls);
+ }
}