mchades commented on code in PR #9560:
URL: https://github.com/apache/gravitino/pull/9560#discussion_r2671167546


##########
core/src/main/java/org/apache/gravitino/catalog/ManagedFunctionOperations.java:
##########
@@ -117,21 +159,448 @@ public Function registerFunction(
       FunctionColumn[] returnColumns,
       FunctionDefinition[] definitions)
       throws NoSuchSchemaException, FunctionAlreadyExistsException {
-    // TODO: Implement when FunctionEntity is available
-    throw new UnsupportedOperationException(
-        "registerFunction for table-valued functions: FunctionEntity not yet 
implemented");
+    return doRegisterFunction(
+        ident, comment, FunctionType.TABLE, deterministic, null, 
returnColumns, definitions);
   }
 
   @Override
   public Function alterFunction(NameIdentifier ident, FunctionChange... 
changes)
       throws NoSuchFunctionException, IllegalArgumentException {
-    // TODO: Implement when FunctionEntity is available
-    throw new UnsupportedOperationException("alterFunction: FunctionEntity not 
yet implemented");
+    try {
+      return store.update(
+          ident,
+          FunctionEntity.class,
+          Entity.EntityType.FUNCTION,
+          oldEntity -> applyChanges(oldEntity, changes));
+
+    } catch (NoSuchEntityException e) {
+      throw new NoSuchFunctionException(e, "Function %s does not exist", 
ident);
+    } catch (EntityAlreadyExistsException e) {
+      throw new IllegalArgumentException("Failed to alter function " + ident, 
e);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to alter function " + ident, e);
+    }
   }
 
   @Override
   public boolean dropFunction(NameIdentifier ident) {
-    // TODO: Implement when FunctionEntity is available
-    throw new UnsupportedOperationException("dropFunction: FunctionEntity not 
yet implemented");
+    try {
+      return store.delete(ident, Entity.EntityType.FUNCTION);
+    } catch (NoSuchEntityException e) {
+      return false;
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to drop function " + ident, e);
+    }
+  }
+
+  /**
+   * Converts a function identifier to a versioned identifier. The versioned 
identifier uses the
+   * version number as the name to allow the store to retrieve specific 
versions.
+   *
+   * @param ident The function identifier.
+   * @param version The version number, or {@link 
FunctionEntity#LATEST_VERSION} for the latest.
+   * @return The versioned identifier.
+   */
+  private NameIdentifier toVersionedIdent(NameIdentifier ident, int version) {
+    return NameIdentifier.of(
+        ident.namespace().level(0),
+        ident.namespace().level(1),
+        ident.namespace().level(2),
+        ident.name(),
+        String.valueOf(version));
+  }
+
+  private Function doRegisterFunction(
+      NameIdentifier ident,
+      String comment,
+      FunctionType functionType,
+      boolean deterministic,
+      Type returnType,
+      FunctionColumn[] returnColumns,
+      FunctionDefinition[] definitions)
+      throws NoSuchSchemaException, FunctionAlreadyExistsException {
+    Preconditions.checkArgument(
+        definitions != null && definitions.length > 0,
+        "At least one function definition must be provided");
+
+    // Validate definitions for arity overlap when there are multiple 
definitions
+    if (definitions.length > 1) {
+      validateDefinitionsNoArityOverlap(definitions);
+    }
+
+    String currentUser = PrincipalUtils.getCurrentUserName();
+    Instant now = Instant.now();
+    AuditInfo auditInfo = 
AuditInfo.builder().withCreator(currentUser).withCreateTime(now).build();
+
+    FunctionEntity functionEntity =
+        FunctionEntity.builder()
+            .withId(idGenerator.nextId())
+            .withName(ident.name())
+            .withNamespace(ident.namespace())
+            .withComment(comment)
+            .withFunctionType(functionType)
+            .withDeterministic(deterministic)
+            .withReturnType(returnType)
+            .withReturnColumns(returnColumns)
+            .withDefinitions(definitions)
+            .withVersion(INIT_VERSION)
+            .withAuditInfo(auditInfo)
+            .build();
+
+    try {
+      store.put(functionEntity, false /* overwrite */);
+      return functionEntity;
+
+    } catch (NoSuchEntityException e) {
+      throw new NoSuchSchemaException(e, "Schema %s does not exist", 
ident.namespace());
+    } catch (EntityAlreadyExistsException e) {
+      throw new FunctionAlreadyExistsException(e, "Function %s already 
exists", ident);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to register function " + ident, e);
+    }
+  }
+
+  private FunctionEntity applyChanges(FunctionEntity oldEntity, 
FunctionChange... changes) {
+    String newComment = oldEntity.comment();
+    List<FunctionDefinition> newDefinitions =
+        new ArrayList<>(Arrays.asList(oldEntity.definitions()));
+
+    for (FunctionChange change : changes) {
+      if (change instanceof FunctionChange.UpdateComment) {
+        newComment = ((FunctionChange.UpdateComment) change).newComment();
+
+      } else if (change instanceof FunctionChange.AddDefinition) {
+        FunctionDefinition defToAdd = ((FunctionChange.AddDefinition) 
change).definition();
+        validateNoArityOverlap(newDefinitions, defToAdd);
+        newDefinitions.add(defToAdd);
+
+      } else if (change instanceof FunctionChange.RemoveDefinition) {
+        FunctionParam[] paramsToRemove = ((FunctionChange.RemoveDefinition) 
change).parameters();
+        validateRemoveDefinition(newDefinitions, paramsToRemove);
+        newDefinitions.removeIf(def -> parametersMatch(def.parameters(), 
paramsToRemove));
+
+      } else if (change instanceof FunctionChange.AddImpl) {
+        FunctionChange.AddImpl addImpl = (FunctionChange.AddImpl) change;
+        FunctionParam[] targetParams = addImpl.parameters();
+        FunctionImpl implToAdd = addImpl.implementation();
+        newDefinitions = addImplToDefinition(newDefinitions, targetParams, 
implToAdd);
+
+      } else if (change instanceof FunctionChange.UpdateImpl) {
+        FunctionChange.UpdateImpl updateImpl = (FunctionChange.UpdateImpl) 
change;
+        FunctionParam[] targetParams = updateImpl.parameters();
+        FunctionImpl.RuntimeType runtime = updateImpl.runtime();
+        FunctionImpl newImpl = updateImpl.implementation();
+        newDefinitions = updateImplInDefinition(newDefinitions, targetParams, 
runtime, newImpl);
+
+      } else if (change instanceof FunctionChange.RemoveImpl) {
+        FunctionChange.RemoveImpl removeImpl = (FunctionChange.RemoveImpl) 
change;
+        FunctionParam[] targetParams = removeImpl.parameters();
+        FunctionImpl.RuntimeType runtime = removeImpl.runtime();
+        newDefinitions = removeImplFromDefinition(newDefinitions, 
targetParams, runtime);
+
+      } else {
+        throw new IllegalArgumentException("Unknown function change: " + 
change);
+      }
+    }
+
+    String currentUser = PrincipalUtils.getCurrentUserName();
+    Instant now = Instant.now();
+    AuditInfo newAuditInfo =
+        AuditInfo.builder()
+            .withCreator(oldEntity.auditInfo().creator())
+            .withCreateTime(oldEntity.auditInfo().createTime())
+            .withLastModifier(currentUser)
+            .withLastModifiedTime(now)
+            .build();
+
+    return FunctionEntity.builder()
+        .withId(oldEntity.id())
+        .withName(oldEntity.name())
+        .withNamespace(oldEntity.namespace())
+        .withComment(newComment)
+        .withFunctionType(oldEntity.functionType())
+        .withDeterministic(oldEntity.deterministic())
+        .withReturnType(oldEntity.returnType())
+        .withReturnColumns(oldEntity.returnColumns())
+        .withDefinitions(newDefinitions.toArray(new FunctionDefinition[0]))
+        .withVersion(oldEntity.version() + 1)
+        .withAuditInfo(newAuditInfo)
+        .build();
+  }
+
+  /**
+   * Validates that all definitions in the array do not have overlapping 
arities. This is used when
+   * registering a function with multiple definitions.
+   *
+   * <p>Gravitino enforces strict validation to prevent ambiguity. Operations 
MUST fail if any
+   * definition's invocation arities overlap with another. For example, if an 
existing definition
+   * {@code foo(int, float default 1.0)} supports arities {@code (int)} and 
{@code (int, float)},
+   * adding a new definition {@code foo(int, string default 'x')} (which 
supports {@code (int)} and
+   * {@code (int, string)}) will be REJECTED because both support the call 
{@code foo(1)}. This
+   * ensures every function invocation deterministically maps to a single 
definition.
+   *
+   * @param definitions The array of definitions to validate.
+   * @throws IllegalArgumentException If any two definitions have overlapping 
arities.
+   */
+  private void validateDefinitionsNoArityOverlap(FunctionDefinition[] 
definitions) {
+    for (int i = 0; i < definitions.length; i++) {
+      Set<String> aritiesI = computeArities(definitions[i]);
+      for (int j = i + 1; j < definitions.length; j++) {
+        Set<String> aritiesJ = computeArities(definitions[j]);
+        for (String arity : aritiesI) {
+          if (aritiesJ.contains(arity)) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "Cannot register function: definitions at index %d and %d 
have overlapping "
+                        + "arity '%s'. This would create ambiguous function 
invocations.",
+                    i, j, arity));
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Validates that a new definition does not create ambiguous function 
arities with existing
+   * definitions. Each definition can support multiple arities based on 
parameters with default
+   * values.
+   *
+   * <p>Gravitino enforces strict validation to prevent ambiguity. Operations 
MUST fail if a new
+   * definition's invocation arities overlap with existing ones. For example, 
if an existing
+   * definition {@code foo(int, float default 1.0)} supports arities {@code 
(int)} and {@code (int,
+   * float)}, adding a new definition {@code foo(int, string default 'x')} 
(which supports {@code
+   * (int)} and {@code (int, string)}) will be REJECTED because both support 
the call {@code
+   * foo(1)}. This ensures every function invocation deterministically maps to 
a single definition.
+   *
+   * @param existingDefinitions The current definitions.
+   * @param newDefinition The definition to add.
+   * @throws IllegalArgumentException If the new definition creates 
overlapping arities.
+   */
+  private void validateNoArityOverlap(
+      List<FunctionDefinition> existingDefinitions, FunctionDefinition 
newDefinition) {
+    Set<String> newArities = computeArities(newDefinition);
+
+    for (FunctionDefinition existing : existingDefinitions) {
+      Set<String> existingArities = computeArities(existing);
+      for (String arity : newArities) {
+        if (existingArities.contains(arity)) {
+          throw new IllegalArgumentException(
+              String.format(
+                  "Cannot add definition: arity '%s' overlaps with an existing 
definition. "
+                      + "This would create ambiguous function invocations.",
+                  arity));
+        }
+      }
+    }
+  }
+
+  /**
+   * Computes all possible invocation arities for a function definition. A 
definition with N
+   * parameters where the last M have default values supports arities from 
(N-M) to N parameters.
+   *
+   * <p>For example:
+   *
+   * <ul>
+   *   <li>{@code foo(int a)} → arities: {@code ["int"]}
+   *   <li>{@code foo(int a, float b)} → arities: {@code ["int,float"]}
+   *   <li>{@code foo(int a, float b default 1.0)} → arities: {@code ["int", 
"int,float"]}
+   *   <li>{@code foo(int a, float b default 1.0, string c default 'x')} → 
arities: {@code ["int",
+   *       "int,float", "int,float,string"]}
+   *   <li>{@code foo()} (no args) → arities: {@code [""]}
+   * </ul>
+   *
+   * @param definition The function definition.
+   * @return A set of arity signatures (e.g., "int", "int,float", "").
+   */
+  private Set<String> computeArities(FunctionDefinition definition) {
+    Set<String> arities = new HashSet<>();
+    FunctionParam[] params = definition.parameters();
+
+    // Find the first parameter with a default value
+    int firstDefaultIndex = params.length;
+    for (int i = 0; i < params.length; i++) {
+      if (params[i].defaultValue() != Column.DEFAULT_VALUE_NOT_SET) {
+        firstDefaultIndex = i;
+        break;
+      }
+    }
+
+    // Generate all possible arities from firstDefaultIndex to params.length
+    for (int i = firstDefaultIndex; i <= params.length; i++) {
+      StringBuilder arity = new StringBuilder();
+      for (int j = 0; j < i; j++) {
+        if (j > 0) {
+          arity.append(",");
+        }
+        arity.append(params[j].dataType().simpleString());
+      }
+      arities.add(arity.toString());
+    }
+
+    return arities;
+  }

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to