This is an automated email from the ASF dual-hosted git repository.

mchades pushed a commit to branch udf-poc
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit fe06e24864fddd379a6a9a8fb1de9ba3752d983e
Author: mchades <[email protected]>
AuthorDate: Wed Dec 17 20:23:01 2025 +0800

    Java client supports function operation
---
 .../org/apache/gravitino/client/ErrorHandlers.java |  71 ++++
 .../org/apache/gravitino/client/HTTPClient.java    |  52 +++
 .../org/apache/gravitino/client/RESTClient.java    |  41 +++
 .../apache/gravitino/client/RelationalCatalog.java | 401 ++++++++++++++++++++-
 .../client/integration/test/FunctionCatalogIT.java | 243 +++++++++++++
 5 files changed, 807 insertions(+), 1 deletion(-)

diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
index 705731808f..9c739d2656 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
@@ -33,6 +33,7 @@ import 
org.apache.gravitino.exceptions.CatalogNotInUseException;
 import org.apache.gravitino.exceptions.ConnectionFailedException;
 import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
 import org.apache.gravitino.exceptions.ForbiddenException;
+import org.apache.gravitino.exceptions.FunctionAlreadyExistsException;
 import org.apache.gravitino.exceptions.GroupAlreadyExistsException;
 import org.apache.gravitino.exceptions.IllegalMetadataObjectException;
 import org.apache.gravitino.exceptions.IllegalPrivilegeException;
@@ -47,6 +48,8 @@ import 
org.apache.gravitino.exceptions.ModelAlreadyExistsException;
 import 
org.apache.gravitino.exceptions.ModelVersionAliasesAlreadyExistException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
+import org.apache.gravitino.exceptions.NoSuchFunctionVersionException;
 import org.apache.gravitino.exceptions.NoSuchGroupException;
 import org.apache.gravitino.exceptions.NoSuchJobException;
 import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
@@ -117,6 +120,15 @@ public class ErrorHandlers {
     return SchemaErrorHandler.INSTANCE;
   }
 
+  /**
+   * Creates an error handler specific to Function operations.
+   *
+   * @return A Consumer representing the Function error handler.
+   */
+  public static Consumer<ErrorResponse> functionErrorHandler() {
+    return FunctionErrorHandler.INSTANCE;
+  }
+
   /**
    * Creates an error handler specific to Table operations.
    *
@@ -412,6 +424,65 @@ public class ErrorHandlers {
     }
   }
 
+  /** Error handler specific to Function operations. */
+  @SuppressWarnings("FormatStringAnnotation")
+  private static class FunctionErrorHandler extends RestErrorHandler {
+    private static final ErrorHandler INSTANCE = new FunctionErrorHandler();
+
+    @Override
+    public void accept(ErrorResponse errorResponse) {
+      String errorMessage = formatErrorMessage(errorResponse);
+
+      switch (errorResponse.getCode()) {
+        case ErrorConstants.ILLEGAL_ARGUMENTS_CODE:
+          throw new IllegalArgumentException(errorMessage);
+
+        case ErrorConstants.NOT_FOUND_CODE:
+          if 
(errorResponse.getType().equals(NoSuchSchemaException.class.getSimpleName())) {
+            throw new NoSuchSchemaException(errorMessage);
+          } else if (errorResponse
+              .getType()
+              .equals(NoSuchFunctionException.class.getSimpleName())) {
+            throw new NoSuchFunctionException(errorMessage);
+          } else if (errorResponse
+              .getType()
+              .equals(NoSuchFunctionVersionException.class.getSimpleName())) {
+            throw new NoSuchFunctionVersionException(errorMessage);
+          } else {
+            throw new NotFoundException(errorMessage);
+          }
+
+        case ErrorConstants.ALREADY_EXISTS_CODE:
+          throw new FunctionAlreadyExistsException(errorMessage);
+
+        case ErrorConstants.INTERNAL_ERROR_CODE:
+          throw new RuntimeException(errorMessage);
+
+        case ErrorConstants.UNSUPPORTED_OPERATION_CODE:
+          throw new UnsupportedOperationException(errorMessage);
+
+        case ErrorConstants.FORBIDDEN_CODE:
+          throw new ForbiddenException(errorMessage);
+
+        case ErrorConstants.NOT_IN_USE_CODE:
+          if 
(errorResponse.getType().equals(CatalogNotInUseException.class.getSimpleName()))
 {
+            throw new CatalogNotInUseException(errorMessage);
+
+          } else if (errorResponse
+              .getType()
+              .equals(MetalakeNotInUseException.class.getSimpleName())) {
+            throw new MetalakeNotInUseException(errorMessage);
+
+          } else {
+            throw new NotInUseException(errorMessage);
+          }
+
+        default:
+          super.accept(errorResponse);
+      }
+    }
+  }
+
   /** Error handler specific to Schema operations. */
   @SuppressWarnings("FormatStringAnnotation")
   private static class SchemaErrorHandler extends RestErrorHandler {
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/HTTPClient.java 
b/clients/client-java/src/main/java/org/apache/gravitino/client/HTTPClient.java
index ba961fb296..ae0a8843f9 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/HTTPClient.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/HTTPClient.java
@@ -645,6 +645,58 @@ public class HTTPClient implements RESTClient {
     return execute(Method.DELETE, path, queryParams, null, responseType, 
headers, errorHandler);
   }
 
+  /**
+   * Sends an HTTP DELETE request with a request body to the specified path 
and processes the
+   * response.
+   *
+   * @param path The URL path to send the DELETE request to.
+   * @param body The request body to place in the DELETE request.
+   * @param responseType The class type of the response for deserialization 
(Must be registered with
+   *     the ObjectMapper).
+   * @param headers A map of request headers (key-value pairs) to include in 
the request (can be
+   *     null).
+   * @param errorHandler The error handler delegated for HTTP responses, which 
handles server error
+   *     responses.
+   * @param <T> The class type of the response for deserialization.
+   * @return The response entity parsed and converted to its type T.
+   */
+  @Override
+  public <T extends RESTResponse> T delete(
+      String path,
+      RESTRequest body,
+      Class<T> responseType,
+      Map<String, String> headers,
+      Consumer<ErrorResponse> errorHandler) {
+    return execute(Method.DELETE, path, null, body, responseType, headers, 
errorHandler);
+  }
+
+  /**
+   * Sends an HTTP DELETE request with query parameters and a request body to 
the specified path and
+   * processes the response.
+   *
+   * @param path The URL path to send the DELETE request to.
+   * @param queryParams A map of query parameters (key-value pairs) to include 
in the request URL.
+   * @param body The request body to place in the DELETE request.
+   * @param responseType The class type of the response for deserialization 
(Must be registered with
+   *     the ObjectMapper).
+   * @param headers A map of request headers (key-value pairs) to include in 
the request (can be
+   *     null).
+   * @param errorHandler The error handler delegated for HTTP responses, which 
handles server error
+   *     responses.
+   * @param <T> The class type of the response for deserialization.
+   * @return The response entity parsed and converted to its type T.
+   */
+  @Override
+  public <T extends RESTResponse> T delete(
+      String path,
+      Map<String, String> queryParams,
+      RESTRequest body,
+      Class<T> responseType,
+      Map<String, String> headers,
+      Consumer<ErrorResponse> errorHandler) {
+    return execute(Method.DELETE, path, queryParams, body, responseType, 
headers, errorHandler);
+  }
+
   /**
    * Sends an HTTP POST request with form data to the specified path and 
processes the response.
    *
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/RESTClient.java 
b/clients/client-java/src/main/java/org/apache/gravitino/client/RESTClient.java
index 384a635823..4fe1b1d6a3 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/RESTClient.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/RESTClient.java
@@ -129,6 +129,47 @@ public interface RESTClient extends Closeable {
       Map<String, String> headers,
       Consumer<ErrorResponse> errorHandler);
 
+  /**
+   * Perform a DELETE request on the specified path with the given request 
body.
+   *
+   * @param path The path to be requested.
+   * @param body The request body to be included in the delete request.
+   * @param responseType The class representing the type of the response.
+   * @param headers The headers to be included in the request.
+   * @param errorHandler The consumer for handling error responses.
+   * @param <T> The type of the response.
+   * @return The response of the DELETE request.
+   */
+  default <T extends RESTResponse> T delete(
+      String path,
+      RESTRequest body,
+      Class<T> responseType,
+      Map<String, String> headers,
+      Consumer<ErrorResponse> errorHandler) {
+    return delete(path, ImmutableMap.of(), body, responseType, headers, 
errorHandler);
+  }
+
+  /**
+   * Perform a DELETE request on the specified path with the given request 
body and query
+   * parameters.
+   *
+   * @param path The path to be requested.
+   * @param queryParams The query parameters to be included in the request.
+   * @param body The request body to be included in the delete request.
+   * @param responseType The class representing the type of the response.
+   * @param headers The headers to be included in the request.
+   * @param errorHandler The consumer for handling error responses.
+   * @param <T> The type of the response.
+   * @return The response of the DELETE request.
+   */
+  <T extends RESTResponse> T delete(
+      String path,
+      Map<String, String> queryParams,
+      RESTRequest body,
+      Class<T> responseType,
+      Map<String, String> headers,
+      Consumer<ErrorResponse> errorHandler);
+
   /**
    * Perform a GET request on the specified path with given information and no 
query parameters.
    *
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/RelationalCatalog.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/RelationalCatalog.java
index de48ca7555..f4deb2f678 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/RelationalCatalog.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/RelationalCatalog.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.client;
 
 import static org.apache.gravitino.dto.util.DTOConverters.toDTO;
 import static org.apache.gravitino.dto.util.DTOConverters.toDTOs;
+import static org.apache.gravitino.dto.util.DTOConverters.toFunctionArg;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -35,15 +36,36 @@ import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.dto.AuditDTO;
 import org.apache.gravitino.dto.CatalogDTO;
+import org.apache.gravitino.dto.function.FunctionColumnDTO;
+import org.apache.gravitino.dto.function.FunctionImplDTO;
+import org.apache.gravitino.dto.function.FunctionParamDTO;
+import org.apache.gravitino.dto.function.FunctionSignatureDTO;
+import org.apache.gravitino.dto.requests.FunctionDeleteRequest;
+import org.apache.gravitino.dto.requests.FunctionRegisterRequest;
+import org.apache.gravitino.dto.requests.FunctionUpdateRequest;
+import org.apache.gravitino.dto.requests.FunctionUpdatesRequest;
 import org.apache.gravitino.dto.requests.TableCreateRequest;
 import org.apache.gravitino.dto.requests.TableUpdateRequest;
 import org.apache.gravitino.dto.requests.TableUpdatesRequest;
 import org.apache.gravitino.dto.responses.DropResponse;
 import org.apache.gravitino.dto.responses.EntityListResponse;
+import org.apache.gravitino.dto.responses.FunctionListResponse;
+import org.apache.gravitino.dto.responses.FunctionResponse;
 import org.apache.gravitino.dto.responses.TableResponse;
+import org.apache.gravitino.exceptions.FunctionAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
+import org.apache.gravitino.exceptions.NoSuchFunctionVersionException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
 import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionCatalog;
+import org.apache.gravitino.function.FunctionChange;
+import org.apache.gravitino.function.FunctionColumn;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.FunctionParam;
+import org.apache.gravitino.function.FunctionSignature;
+import org.apache.gravitino.function.FunctionType;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableCatalog;
@@ -59,7 +81,7 @@ import org.apache.gravitino.rest.RESTUtils;
  * operations, for example, schemas and tables list, creation, update and 
deletion. A Relational
  * catalog is under the metalake.
  */
-class RelationalCatalog extends BaseSchemaCatalog implements TableCatalog {
+class RelationalCatalog extends BaseSchemaCatalog implements TableCatalog, 
FunctionCatalog {
 
   RelationalCatalog(
       Namespace namespace,
@@ -78,6 +100,11 @@ class RelationalCatalog extends BaseSchemaCatalog 
implements TableCatalog {
     return this;
   }
 
+  @Override
+  public FunctionCatalog asFunctionCatalog() {
+    return this;
+  }
+
   /**
    * List all the tables under the given Schema namespace.
    *
@@ -257,6 +284,284 @@ class RelationalCatalog extends BaseSchemaCatalog 
implements TableCatalog {
     return resp.dropped();
   }
 
+  /**
+   * List all the functions under the given schema namespace.
+   *
+   * @param namespace The namespace to list the functions under it. This 
namespace should have 1
+   *     level, which is the schema name;
+   * @return A list of {@link NameIdentifier} of the functions under the given 
namespace.
+   * @throws NoSuchSchemaException if the schema with a specified namespace 
does not exist.
+   */
+  @Override
+  public NameIdentifier[] listFunctions(Namespace namespace) throws 
NoSuchSchemaException {
+    checkFunctionNamespace(namespace);
+
+    Namespace fullNamespace = getFunctionFullNamespace(namespace);
+    EntityListResponse resp =
+        restClient.get(
+            formatFunctionRequestPath(fullNamespace),
+            EntityListResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.functionErrorHandler());
+    resp.validate();
+
+    return Arrays.stream(resp.identifiers())
+        .map(ident -> NameIdentifier.of(ident.namespace().level(2), 
ident.name()))
+        .toArray(NameIdentifier[]::new);
+  }
+
+  @Override
+  public Function[] listFunctionInfos(Namespace namespace) throws 
NoSuchSchemaException {
+    checkFunctionNamespace(namespace);
+
+    Namespace fullNamespace = getFunctionFullNamespace(namespace);
+    Map<String, String> params = new HashMap<>();
+    params.put("details", "true");
+
+    FunctionListResponse resp =
+        restClient.get(
+            formatFunctionRequestPath(fullNamespace),
+            params,
+            FunctionListResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.functionErrorHandler());
+    resp.validate();
+    return resp.getFunctions();
+  }
+
+  /**
+   * Load the functions with a specified identifier.
+   *
+   * @param ident The identifier of the functions to load, which should be 
"schema.function" format.
+   * @return The functions with specified identifier.
+   * @throws NoSuchFunctionException if the function with a specified 
identifier does not exist.
+   */
+  @Override
+  public Function[] getFunction(NameIdentifier ident) throws 
NoSuchFunctionException {
+    checkFunctionNameIdentifier(ident);
+
+    Namespace fullNamespace = getFunctionFullNamespace(ident.namespace());
+    FunctionListResponse resp =
+        restClient.get(
+            formatFunctionRequestPath(fullNamespace) + "/" + 
RESTUtils.encodeString(ident.name()),
+            FunctionListResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.functionErrorHandler());
+    resp.validate();
+
+    return resp.getFunctions();
+  }
+
+  /**
+   * Load the functions with a specified identifier and version.
+   *
+   * @param ident The identifier of the functions to load, which should be 
"schema.function" format.
+   * @param version The version to load.
+   * @return The functions with specified identifier and version.
+   * @throws NoSuchFunctionException if the function with a specified 
identifier does not exist.
+   * @throws NoSuchFunctionVersionException if the function version does not 
exist.
+   */
+  @Override
+  public Function[] getFunction(NameIdentifier ident, int version)
+      throws NoSuchFunctionException, NoSuchFunctionVersionException {
+    checkFunctionNameIdentifier(ident);
+
+    Namespace fullNamespace = getFunctionFullNamespace(ident.namespace());
+    Map<String, String> params = new HashMap<>();
+    params.put("version", String.valueOf(version));
+    FunctionListResponse resp =
+        restClient.get(
+            formatFunctionRequestPath(fullNamespace) + "/" + 
RESTUtils.encodeString(ident.name()),
+            params,
+            FunctionListResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.functionErrorHandler());
+    resp.validate();
+
+    return resp.getFunctions();
+  }
+
+  /**
+   * Register a scalar or aggregate function.
+   *
+   * @param ident The identifier of the function, which should be 
"schema.function" format.
+   * @param comment The comment of the function.
+   * @param functionType The function type.
+   * @param deterministic Whether the function is deterministic.
+   * @param functionParams The parameters of the function.
+   * @param returnType The return type of the function.
+   * @param functionImpls The implementations of the function.
+   * @return The created {@link Function}.
+   * @throws NoSuchSchemaException if the schema with specified namespace does 
not exist.
+   * @throws FunctionAlreadyExistsException if the function with specified 
identifier already
+   *     exists.
+   */
+  @Override
+  public Function registerFunction(
+      NameIdentifier ident,
+      String comment,
+      FunctionType functionType,
+      boolean deterministic,
+      FunctionParam[] functionParams,
+      org.apache.gravitino.rel.types.Type returnType,
+      FunctionImpl[] functionImpls)
+      throws NoSuchSchemaException, FunctionAlreadyExistsException {
+    checkFunctionNameIdentifier(ident);
+
+    FunctionRegisterRequest request =
+        new FunctionRegisterRequest(
+            new FunctionSignatureDTO(ident.name(), 
toFunctionParamDTOs(functionParams)),
+            comment,
+            functionType,
+            deterministic,
+            returnType,
+            null,
+            toFunctionImplDTOs(functionImpls));
+    request.validate();
+
+    Namespace fullNamespace = getFunctionFullNamespace(ident.namespace());
+    FunctionResponse resp =
+        restClient.post(
+            formatFunctionRequestPath(fullNamespace),
+            request,
+            FunctionResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.functionErrorHandler());
+    resp.validate();
+
+    return resp.getFunction();
+  }
+
+  /**
+   * Register a table-valued function.
+   *
+   * @param ident The identifier of the function, which should be 
"schema.function" format.
+   * @param comment The comment of the function.
+   * @param deterministic Whether the function is deterministic.
+   * @param functionParams The parameters of the function.
+   * @param returnColumns The return columns of the function.
+   * @param functionImpls The implementations of the function.
+   * @return The created {@link Function}.
+   * @throws NoSuchSchemaException if the schema with specified namespace does 
not exist.
+   * @throws FunctionAlreadyExistsException if the function with specified 
identifier already
+   *     exists.
+   */
+  @Override
+  public Function registerFunction(
+      NameIdentifier ident,
+      String comment,
+      boolean deterministic,
+      FunctionParam[] functionParams,
+      FunctionColumn[] returnColumns,
+      FunctionImpl[] functionImpls)
+      throws NoSuchSchemaException, FunctionAlreadyExistsException {
+    checkFunctionNameIdentifier(ident);
+
+    FunctionRegisterRequest request =
+        new FunctionRegisterRequest(
+            new FunctionSignatureDTO(ident.name(), 
toFunctionParamDTOs(functionParams)),
+            comment,
+            FunctionType.TABLE,
+            deterministic,
+            null,
+            toFunctionColumnDTOs(returnColumns),
+            toFunctionImplDTOs(functionImpls));
+    request.validate();
+
+    Namespace fullNamespace = getFunctionFullNamespace(ident.namespace());
+    FunctionResponse resp =
+        restClient.post(
+            formatFunctionRequestPath(fullNamespace),
+            request,
+            FunctionResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.functionErrorHandler());
+    resp.validate();
+
+    return resp.getFunction();
+  }
+
+  /**
+   * Alter the function with a specified identifier by applying the changes.
+   *
+   * @param ident The identifier of the function, which should be 
"schema.function" format.
+   * @param changes Function changes to apply to the function.
+   * @return The altered {@link Function}.
+   * @throws NoSuchFunctionException if the function with specified identifier 
does not exist.
+   * @throws IllegalArgumentException if the changes are invalid.
+   */
+  @Override
+  public Function alterFunction(NameIdentifier ident, FunctionChange... 
changes)
+      throws NoSuchFunctionException, IllegalArgumentException {
+    checkFunctionNameIdentifier(ident);
+
+    List<FunctionUpdateRequest> requests =
+        Arrays.stream(changes)
+            .map(RelationalCatalog::toFunctionUpdateRequest)
+            .collect(Collectors.toList());
+    FunctionUpdatesRequest updatesRequest = new 
FunctionUpdatesRequest(requests);
+    updatesRequest.validate();
+
+    Namespace fullNamespace = getFunctionFullNamespace(ident.namespace());
+    FunctionResponse resp =
+        restClient.put(
+            formatFunctionRequestPath(fullNamespace) + "/" + 
RESTUtils.encodeString(ident.name()),
+            updatesRequest,
+            FunctionResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.functionErrorHandler());
+    resp.validate();
+
+    return resp.getFunction();
+  }
+
+  /**
+   * Drop the function with specified identifier.
+   *
+   * @param ident The identifier of the function, which should be 
"schema.function" format.
+   * @return true if the function is dropped successfully, false if the 
function does not exist.
+   */
+  @Override
+  public boolean deleteFunction(NameIdentifier ident) {
+    checkFunctionNameIdentifier(ident);
+
+    Namespace fullNamespace = getFunctionFullNamespace(ident.namespace());
+    DropResponse resp =
+        restClient.delete(
+            formatFunctionRequestPath(fullNamespace) + "/" + 
RESTUtils.encodeString(ident.name()),
+            DropResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.functionErrorHandler());
+    resp.validate();
+    return resp.dropped();
+  }
+
+  /**
+   * Drop the function with a specified identifier and signature.
+   *
+   * @param ident The identifier of the function, which should be 
"schema.function" format.
+   * @param signature The signature of the function.
+   * @return true if the function is dropped successfully, false if the 
function does not exist.
+   */
+  @Override
+  public boolean deleteFunction(NameIdentifier ident, FunctionSignature 
signature) {
+    checkFunctionNameIdentifier(ident);
+
+    Namespace fullNamespace = getFunctionFullNamespace(ident.namespace());
+    FunctionDeleteRequest request = new 
FunctionDeleteRequest(toFunctionSignatureDTO(signature));
+    request.validate();
+
+    DropResponse resp =
+        restClient.delete(
+            formatFunctionRequestPath(fullNamespace) + "/" + 
RESTUtils.encodeString(ident.name()),
+            request,
+            DropResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.functionErrorHandler());
+    resp.validate();
+    return resp.dropped();
+  }
+
   @VisibleForTesting
   static String formatTableRequestPath(Namespace ns) {
     Namespace schemaNs = Namespace.of(ns.level(0), ns.level(1));
@@ -292,6 +597,100 @@ class RelationalCatalog extends BaseSchemaCatalog 
implements TableCatalog {
     checkTableNamespace(ident.namespace());
   }
 
+  private static String formatFunctionRequestPath(Namespace ns) {
+    Namespace schemaNs = Namespace.of(ns.level(0), ns.level(1));
+    return formatSchemaRequestPath(schemaNs)
+        + "/"
+        + RESTUtils.encodeString(ns.level(2))
+        + "/functions";
+  }
+
+  private static void checkFunctionNamespace(Namespace namespace) {
+    Namespace.check(
+        namespace != null && namespace.length() == 1,
+        "Function namespace must be non-null and have 1 level, the input 
namespace is %s",
+        namespace);
+  }
+
+  private static void checkFunctionNameIdentifier(NameIdentifier ident) {
+    NameIdentifier.check(ident != null, "NameIdentifier must not be null");
+    NameIdentifier.check(
+        ident.name() != null && !ident.name().isEmpty(), "NameIdentifier name 
must not be empty");
+    checkFunctionNamespace(ident.namespace());
+  }
+
+  private Namespace getFunctionFullNamespace(Namespace functionNamespace) {
+    return Namespace.of(this.catalogNamespace().level(0), this.name(), 
functionNamespace.level(0));
+  }
+
+  private static FunctionParamDTO[] toFunctionParamDTOs(FunctionParam[] 
params) {
+    if (params == null) {
+      return null;
+    }
+    return Arrays.stream(params)
+        .map(
+            param ->
+                new FunctionParamDTO(
+                    param.name(),
+                    param.dataType(),
+                    param.comment(),
+                    param.defaultValue() == null
+                            || 
Column.DEFAULT_VALUE_NOT_SET.equals(param.defaultValue())
+                        ? Column.DEFAULT_VALUE_NOT_SET
+                        : toFunctionArg(param.defaultValue())))
+        .toArray(FunctionParamDTO[]::new);
+  }
+
+  private static FunctionImplDTO[] toFunctionImplDTOs(FunctionImpl[] impls) {
+    if (impls == null) {
+      return new FunctionImplDTO[0];
+    }
+    return 
Arrays.stream(impls).map(FunctionImplDTO::from).toArray(FunctionImplDTO[]::new);
+  }
+
+  private static FunctionColumnDTO[] toFunctionColumnDTOs(FunctionColumn[] 
columns) {
+    if (columns == null) {
+      return new FunctionColumnDTO[0];
+    }
+    return Arrays.stream(columns)
+        .map(column -> new FunctionColumnDTO(column.name(), column.dataType(), 
column.comment()))
+        .toArray(FunctionColumnDTO[]::new);
+  }
+
+  private static FunctionSignatureDTO toFunctionSignatureDTO(FunctionSignature 
signature) {
+    if (signature == null) {
+      return null;
+    }
+    return new FunctionSignatureDTO(
+        signature.name(), toFunctionParamDTOs(signature.functionParams()));
+  }
+
+  private static FunctionUpdateRequest toFunctionUpdateRequest(FunctionChange 
change) {
+    if (change instanceof FunctionChange.UpdateComment) {
+      FunctionChange.UpdateComment updateComment = 
(FunctionChange.UpdateComment) change;
+      return new FunctionUpdateRequest.UpdateCommentRequest(
+          toFunctionSignatureDTO(updateComment.signature()), 
updateComment.newComment());
+
+    } else if (change instanceof FunctionChange.UpdateImplementations) {
+      FunctionChange.UpdateImplementations updateImplementations =
+          (FunctionChange.UpdateImplementations) change;
+      return new FunctionUpdateRequest.UpdateImplementationsRequest(
+          toFunctionSignatureDTO(updateImplementations.signature()),
+          toFunctionImplDTOs(updateImplementations.newImplementations()));
+
+    } else if (change instanceof FunctionChange.AddImplementation) {
+      FunctionChange.AddImplementation addImplementation =
+          (FunctionChange.AddImplementation) change;
+      return new FunctionUpdateRequest.AddImplementationRequest(
+          toFunctionSignatureDTO(addImplementation.signature()),
+          FunctionImplDTO.from(addImplementation.implementation()));
+
+    } else {
+      throw new IllegalArgumentException(
+          "Unknown change type: " + change.getClass().getSimpleName());
+    }
+  }
+
   /**
    * Get the full namespace of the table with the given table's short 
namespace (schema name).
    *
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/FunctionCatalogIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/FunctionCatalogIT.java
new file mode 100644
index 0000000000..0f74c06219
--- /dev/null
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/FunctionCatalogIT.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.client.integration.test;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.FunctionAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionCatalog;
+import org.apache.gravitino.function.FunctionChange;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.FunctionParam;
+import org.apache.gravitino.function.FunctionParams;
+import org.apache.gravitino.function.FunctionSignature;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration test that verifies function operations against an Iceberg 
catalog with memory
+ * backend.
+ */
+public class FunctionCatalogIT extends BaseIT {
+
+  /** Metalake used for function catalog verification. */
+  private static final String metalakeName = 
GravitinoITUtils.genRandomName("function_metalake");
+
+  /** Warehouse directory for the Iceberg memory catalog. */
+  private static Path warehousePath;
+
+  /** The metalake instance for the test. */
+  private static GravitinoMetalake metalake;
+
+  /** The Iceberg catalog under test. */
+  private static Catalog icebergCatalog;
+
+  /** The schema used for registering functions. */
+  private static Schema schema;
+
+  /**
+   * Create the metalake, Iceberg catalog, and schema for function tests.
+   *
+   * @throws IOException if creating temporary warehouse directory fails
+   */
+  @BeforeAll
+  public void setUp() throws IOException {
+    warehousePath = Files.createTempDirectory("iceberg_memory_warehouse");
+    Assertions.assertFalse(client.metalakeExists(metalakeName));
+    metalake =
+        client.createMetalake(
+            metalakeName, "Metalake for function catalog tests", 
Collections.emptyMap());
+
+    String catalogName = GravitinoITUtils.genRandomName("function_catalog");
+    Map<String, String> properties =
+        ImmutableMap.<String, String>builder()
+            .put("catalog-backend", "memory")
+            .put("uri", "")
+            .put("warehouse", warehousePath.toString())
+            .build();
+    icebergCatalog =
+        metalake.createCatalog(
+            catalogName,
+            Catalog.Type.RELATIONAL,
+            "lakehouse-iceberg",
+            "Iceberg catalog for function tests",
+            properties);
+
+    String schemaName = GravitinoITUtils.genRandomName("function_schema");
+    schema =
+        icebergCatalog
+            .asSchemas()
+            .createSchema(schemaName, "Schema for functions", 
Collections.emptyMap());
+  }
+
+  /**
+   * Drop created resources and clean temporary files.
+   *
+   * @throws IOException if deleting warehouse directory fails
+   */
+  @AfterAll
+  public void tearDown() throws IOException {
+    if (icebergCatalog != null && schema != null) {
+      metalake.dropCatalog(icebergCatalog.name(), true);
+    }
+
+    if (metalake != null) {
+      client.dropMetalake(metalakeName, true);
+    }
+
+    if (warehousePath != null) {
+      FileUtils.deleteQuietly(warehousePath.toFile());
+    }
+
+    if (client != null) {
+      client.close();
+    }
+
+    try {
+      closer.close();
+    } catch (Exception e) {
+      // Ignore close exception
+    }
+  }
+
+  /**
+   * Verify registering, listing, loading, altering, and deleting functions on 
an Iceberg catalog.
+   */
+  @Test
+  public void testFunctionLifecycleOnIcebergCatalog()
+      throws NoSuchFunctionException, NoSuchSchemaException, 
FunctionAlreadyExistsException {
+    FunctionCatalog functionCatalog = icebergCatalog.asFunctionCatalog();
+    Namespace functionNamespace = Namespace.of(schema.name());
+
+    String functionName = GravitinoITUtils.genRandomName("function");
+    NameIdentifier functionIdent = NameIdentifier.of(schema.name(), 
functionName);
+
+    Assertions.assertEquals(0, 
functionCatalog.listFunctions(functionNamespace).length);
+    Assertions.assertFalse(functionCatalog.functionExists(functionIdent));
+
+    FunctionImpl initialImpl =
+        FunctionImpl.ofSql(
+            FunctionImpl.RuntimeType.SPARK,
+            "CREATE FUNCTION " + functionName + "(x INT) RETURNS INT RETURN x 
+ 1");
+    Function createdFunction =
+        functionCatalog.registerFunction(
+            functionIdent,
+            "initial comment",
+            FunctionType.SCALAR,
+            true,
+            new FunctionParam[] {FunctionParams.of("x", 
Types.IntegerType.get(), "input value")},
+            Types.IntegerType.get(),
+            new FunctionImpl[] {initialImpl});
+
+    Assertions.assertEquals(functionName, createdFunction.signature().name());
+    Assertions.assertEquals(FunctionType.SCALAR, 
createdFunction.functionType());
+    Assertions.assertTrue(createdFunction.deterministic());
+    Assertions.assertEquals("initial comment", createdFunction.comment());
+    Assertions.assertEquals(Types.IntegerType.get(), 
createdFunction.returnType());
+    Assertions.assertEquals(1, createdFunction.version());
+    Assertions.assertEquals(1, createdFunction.impls().length);
+
+    NameIdentifier[] listedFunctions = 
functionCatalog.listFunctions(functionNamespace);
+    Assertions.assertEquals(1, listedFunctions.length);
+    Assertions.assertEquals(functionIdent, listedFunctions[0]);
+
+    Function[] loadedFunctions = functionCatalog.getFunction(functionIdent);
+    Assertions.assertEquals(1, loadedFunctions.length);
+    Assertions.assertEquals(createdFunction.signature(), 
loadedFunctions[0].signature());
+
+    Function[] versionedFunctions =
+        functionCatalog.getFunction(functionIdent, createdFunction.version());
+    Assertions.assertEquals(1, versionedFunctions.length);
+    Assertions.assertEquals(createdFunction.signature(), 
versionedFunctions[0].signature());
+
+    FunctionImpl additionalImpl =
+        FunctionImpl.ofSql(
+            FunctionImpl.RuntimeType.TRINO,
+            "CREATE FUNCTION " + functionName + "(x INT) RETURNS INT RETURN x 
* 2");
+    Function updatedFunction =
+        functionCatalog.alterFunction(
+            functionIdent,
+            FunctionChange.updateComment("updated comment"),
+            FunctionChange.addImplementation(additionalImpl));
+
+    Assertions.assertEquals(createdFunction.version() + 1, 
updatedFunction.version());
+    Assertions.assertEquals("updated comment", updatedFunction.comment());
+    Assertions.assertEquals(2, updatedFunction.impls().length);
+
+    Function[] historicalVersion =
+        functionCatalog.getFunction(functionIdent, createdFunction.version());
+    Assertions.assertEquals(1, historicalVersion.length);
+    Assertions.assertEquals("initial comment", historicalVersion[0].comment());
+    Assertions.assertEquals(1, historicalVersion[0].impls().length);
+
+    FunctionSignature functionSignature =
+        FunctionSignature.of(functionName, 
createdFunction.signature().functionParams());
+    Assertions.assertTrue(functionCatalog.deleteFunction(functionIdent, 
functionSignature));
+    Assertions.assertEquals(0, 
functionCatalog.listFunctions(functionNamespace).length);
+    Assertions.assertThrows(
+        NoSuchFunctionException.class, () -> 
functionCatalog.getFunction(functionIdent));
+  }
+
+  @Test
+  public void testListFunctionInfos()
+      throws NoSuchSchemaException, FunctionAlreadyExistsException, 
NoSuchFunctionException {
+    FunctionCatalog functionCatalog = icebergCatalog.asFunctionCatalog();
+    Namespace functionNamespace = Namespace.of(schema.name());
+    String functionName = GravitinoITUtils.genRandomName("function_info");
+    NameIdentifier functionIdent = NameIdentifier.of(schema.name(), 
functionName);
+
+    FunctionImpl impl =
+        FunctionImpl.ofSql(
+            FunctionImpl.RuntimeType.SPARK,
+            "CREATE FUNCTION " + functionName + "(x INT) RETURNS INT RETURN x 
+ 1");
+
+    functionCatalog.registerFunction(
+        functionIdent,
+        "info comment",
+        FunctionType.SCALAR,
+        true,
+        new FunctionParam[] {FunctionParams.of("x", Types.IntegerType.get(), 
"input value")},
+        Types.IntegerType.get(),
+        new FunctionImpl[] {impl});
+
+    Function[] functions = 
functionCatalog.listFunctionInfos(functionNamespace);
+    Assertions.assertEquals(1, functions.length);
+    Assertions.assertEquals(functionName, functions[0].signature().name());
+    Assertions.assertEquals(FunctionImpl.RuntimeType.SPARK, 
functions[0].impls()[0].runtime());
+  }
+}


Reply via email to