This is an automated email from the ASF dual-hosted git repository. mchades pushed a commit to branch udf-poc in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit fe06e24864fddd379a6a9a8fb1de9ba3752d983e Author: mchades <[email protected]> AuthorDate: Wed Dec 17 20:23:01 2025 +0800 Java client supports function operation --- .../org/apache/gravitino/client/ErrorHandlers.java | 71 ++++ .../org/apache/gravitino/client/HTTPClient.java | 52 +++ .../org/apache/gravitino/client/RESTClient.java | 41 +++ .../apache/gravitino/client/RelationalCatalog.java | 401 ++++++++++++++++++++- .../client/integration/test/FunctionCatalogIT.java | 243 +++++++++++++ 5 files changed, 807 insertions(+), 1 deletion(-) diff --git a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java index 705731808f..9c739d2656 100644 --- a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java +++ b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java @@ -33,6 +33,7 @@ import org.apache.gravitino.exceptions.CatalogNotInUseException; import org.apache.gravitino.exceptions.ConnectionFailedException; import org.apache.gravitino.exceptions.FilesetAlreadyExistsException; import org.apache.gravitino.exceptions.ForbiddenException; +import org.apache.gravitino.exceptions.FunctionAlreadyExistsException; import org.apache.gravitino.exceptions.GroupAlreadyExistsException; import org.apache.gravitino.exceptions.IllegalMetadataObjectException; import org.apache.gravitino.exceptions.IllegalPrivilegeException; @@ -47,6 +48,8 @@ import org.apache.gravitino.exceptions.ModelAlreadyExistsException; import org.apache.gravitino.exceptions.ModelVersionAliasesAlreadyExistException; import org.apache.gravitino.exceptions.NoSuchCatalogException; import org.apache.gravitino.exceptions.NoSuchFilesetException; +import org.apache.gravitino.exceptions.NoSuchFunctionException; +import org.apache.gravitino.exceptions.NoSuchFunctionVersionException; import org.apache.gravitino.exceptions.NoSuchGroupException; import org.apache.gravitino.exceptions.NoSuchJobException; import org.apache.gravitino.exceptions.NoSuchJobTemplateException; @@ -117,6 +120,15 @@ public class ErrorHandlers { return SchemaErrorHandler.INSTANCE; } + /** + * Creates an error handler specific to Function operations. + * + * @return A Consumer representing the Function error handler. + */ + public static Consumer<ErrorResponse> functionErrorHandler() { + return FunctionErrorHandler.INSTANCE; + } + /** * Creates an error handler specific to Table operations. * @@ -412,6 +424,65 @@ public class ErrorHandlers { } } + /** Error handler specific to Function operations. */ + @SuppressWarnings("FormatStringAnnotation") + private static class FunctionErrorHandler extends RestErrorHandler { + private static final ErrorHandler INSTANCE = new FunctionErrorHandler(); + + @Override + public void accept(ErrorResponse errorResponse) { + String errorMessage = formatErrorMessage(errorResponse); + + switch (errorResponse.getCode()) { + case ErrorConstants.ILLEGAL_ARGUMENTS_CODE: + throw new IllegalArgumentException(errorMessage); + + case ErrorConstants.NOT_FOUND_CODE: + if (errorResponse.getType().equals(NoSuchSchemaException.class.getSimpleName())) { + throw new NoSuchSchemaException(errorMessage); + } else if (errorResponse + .getType() + .equals(NoSuchFunctionException.class.getSimpleName())) { + throw new NoSuchFunctionException(errorMessage); + } else if (errorResponse + .getType() + .equals(NoSuchFunctionVersionException.class.getSimpleName())) { + throw new NoSuchFunctionVersionException(errorMessage); + } else { + throw new NotFoundException(errorMessage); + } + + case ErrorConstants.ALREADY_EXISTS_CODE: + throw new FunctionAlreadyExistsException(errorMessage); + + case ErrorConstants.INTERNAL_ERROR_CODE: + throw new RuntimeException(errorMessage); + + case ErrorConstants.UNSUPPORTED_OPERATION_CODE: + throw new UnsupportedOperationException(errorMessage); + + case ErrorConstants.FORBIDDEN_CODE: + throw new ForbiddenException(errorMessage); + + case ErrorConstants.NOT_IN_USE_CODE: + if (errorResponse.getType().equals(CatalogNotInUseException.class.getSimpleName())) { + throw new CatalogNotInUseException(errorMessage); + + } else if (errorResponse + .getType() + .equals(MetalakeNotInUseException.class.getSimpleName())) { + throw new MetalakeNotInUseException(errorMessage); + + } else { + throw new NotInUseException(errorMessage); + } + + default: + super.accept(errorResponse); + } + } + } + /** Error handler specific to Schema operations. */ @SuppressWarnings("FormatStringAnnotation") private static class SchemaErrorHandler extends RestErrorHandler { diff --git a/clients/client-java/src/main/java/org/apache/gravitino/client/HTTPClient.java b/clients/client-java/src/main/java/org/apache/gravitino/client/HTTPClient.java index ba961fb296..ae0a8843f9 100644 --- a/clients/client-java/src/main/java/org/apache/gravitino/client/HTTPClient.java +++ b/clients/client-java/src/main/java/org/apache/gravitino/client/HTTPClient.java @@ -645,6 +645,58 @@ public class HTTPClient implements RESTClient { return execute(Method.DELETE, path, queryParams, null, responseType, headers, errorHandler); } + /** + * Sends an HTTP DELETE request with a request body to the specified path and processes the + * response. + * + * @param path The URL path to send the DELETE request to. + * @param body The request body to place in the DELETE request. + * @param responseType The class type of the response for deserialization (Must be registered with + * the ObjectMapper). + * @param headers A map of request headers (key-value pairs) to include in the request (can be + * null). + * @param errorHandler The error handler delegated for HTTP responses, which handles server error + * responses. + * @param <T> The class type of the response for deserialization. + * @return The response entity parsed and converted to its type T. + */ + @Override + public <T extends RESTResponse> T delete( + String path, + RESTRequest body, + Class<T> responseType, + Map<String, String> headers, + Consumer<ErrorResponse> errorHandler) { + return execute(Method.DELETE, path, null, body, responseType, headers, errorHandler); + } + + /** + * Sends an HTTP DELETE request with query parameters and a request body to the specified path and + * processes the response. + * + * @param path The URL path to send the DELETE request to. + * @param queryParams A map of query parameters (key-value pairs) to include in the request URL. + * @param body The request body to place in the DELETE request. + * @param responseType The class type of the response for deserialization (Must be registered with + * the ObjectMapper). + * @param headers A map of request headers (key-value pairs) to include in the request (can be + * null). + * @param errorHandler The error handler delegated for HTTP responses, which handles server error + * responses. + * @param <T> The class type of the response for deserialization. + * @return The response entity parsed and converted to its type T. + */ + @Override + public <T extends RESTResponse> T delete( + String path, + Map<String, String> queryParams, + RESTRequest body, + Class<T> responseType, + Map<String, String> headers, + Consumer<ErrorResponse> errorHandler) { + return execute(Method.DELETE, path, queryParams, body, responseType, headers, errorHandler); + } + /** * Sends an HTTP POST request with form data to the specified path and processes the response. * diff --git a/clients/client-java/src/main/java/org/apache/gravitino/client/RESTClient.java b/clients/client-java/src/main/java/org/apache/gravitino/client/RESTClient.java index 384a635823..4fe1b1d6a3 100644 --- a/clients/client-java/src/main/java/org/apache/gravitino/client/RESTClient.java +++ b/clients/client-java/src/main/java/org/apache/gravitino/client/RESTClient.java @@ -129,6 +129,47 @@ public interface RESTClient extends Closeable { Map<String, String> headers, Consumer<ErrorResponse> errorHandler); + /** + * Perform a DELETE request on the specified path with the given request body. + * + * @param path The path to be requested. + * @param body The request body to be included in the delete request. + * @param responseType The class representing the type of the response. + * @param headers The headers to be included in the request. + * @param errorHandler The consumer for handling error responses. + * @param <T> The type of the response. + * @return The response of the DELETE request. + */ + default <T extends RESTResponse> T delete( + String path, + RESTRequest body, + Class<T> responseType, + Map<String, String> headers, + Consumer<ErrorResponse> errorHandler) { + return delete(path, ImmutableMap.of(), body, responseType, headers, errorHandler); + } + + /** + * Perform a DELETE request on the specified path with the given request body and query + * parameters. + * + * @param path The path to be requested. + * @param queryParams The query parameters to be included in the request. + * @param body The request body to be included in the delete request. + * @param responseType The class representing the type of the response. + * @param headers The headers to be included in the request. + * @param errorHandler The consumer for handling error responses. + * @param <T> The type of the response. + * @return The response of the DELETE request. + */ + <T extends RESTResponse> T delete( + String path, + Map<String, String> queryParams, + RESTRequest body, + Class<T> responseType, + Map<String, String> headers, + Consumer<ErrorResponse> errorHandler); + /** * Perform a GET request on the specified path with given information and no query parameters. * diff --git a/clients/client-java/src/main/java/org/apache/gravitino/client/RelationalCatalog.java b/clients/client-java/src/main/java/org/apache/gravitino/client/RelationalCatalog.java index de48ca7555..f4deb2f678 100644 --- a/clients/client-java/src/main/java/org/apache/gravitino/client/RelationalCatalog.java +++ b/clients/client-java/src/main/java/org/apache/gravitino/client/RelationalCatalog.java @@ -20,6 +20,7 @@ package org.apache.gravitino.client; import static org.apache.gravitino.dto.util.DTOConverters.toDTO; import static org.apache.gravitino.dto.util.DTOConverters.toDTOs; +import static org.apache.gravitino.dto.util.DTOConverters.toFunctionArg; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -35,15 +36,36 @@ import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; import org.apache.gravitino.dto.AuditDTO; import org.apache.gravitino.dto.CatalogDTO; +import org.apache.gravitino.dto.function.FunctionColumnDTO; +import org.apache.gravitino.dto.function.FunctionImplDTO; +import org.apache.gravitino.dto.function.FunctionParamDTO; +import org.apache.gravitino.dto.function.FunctionSignatureDTO; +import org.apache.gravitino.dto.requests.FunctionDeleteRequest; +import org.apache.gravitino.dto.requests.FunctionRegisterRequest; +import org.apache.gravitino.dto.requests.FunctionUpdateRequest; +import org.apache.gravitino.dto.requests.FunctionUpdatesRequest; import org.apache.gravitino.dto.requests.TableCreateRequest; import org.apache.gravitino.dto.requests.TableUpdateRequest; import org.apache.gravitino.dto.requests.TableUpdatesRequest; import org.apache.gravitino.dto.responses.DropResponse; import org.apache.gravitino.dto.responses.EntityListResponse; +import org.apache.gravitino.dto.responses.FunctionListResponse; +import org.apache.gravitino.dto.responses.FunctionResponse; import org.apache.gravitino.dto.responses.TableResponse; +import org.apache.gravitino.exceptions.FunctionAlreadyExistsException; +import org.apache.gravitino.exceptions.NoSuchFunctionException; +import org.apache.gravitino.exceptions.NoSuchFunctionVersionException; import org.apache.gravitino.exceptions.NoSuchSchemaException; import org.apache.gravitino.exceptions.NoSuchTableException; import org.apache.gravitino.exceptions.TableAlreadyExistsException; +import org.apache.gravitino.function.Function; +import org.apache.gravitino.function.FunctionCatalog; +import org.apache.gravitino.function.FunctionChange; +import org.apache.gravitino.function.FunctionColumn; +import org.apache.gravitino.function.FunctionImpl; +import org.apache.gravitino.function.FunctionParam; +import org.apache.gravitino.function.FunctionSignature; +import org.apache.gravitino.function.FunctionType; import org.apache.gravitino.rel.Column; import org.apache.gravitino.rel.Table; import org.apache.gravitino.rel.TableCatalog; @@ -59,7 +81,7 @@ import org.apache.gravitino.rest.RESTUtils; * operations, for example, schemas and tables list, creation, update and deletion. A Relational * catalog is under the metalake. */ -class RelationalCatalog extends BaseSchemaCatalog implements TableCatalog { +class RelationalCatalog extends BaseSchemaCatalog implements TableCatalog, FunctionCatalog { RelationalCatalog( Namespace namespace, @@ -78,6 +100,11 @@ class RelationalCatalog extends BaseSchemaCatalog implements TableCatalog { return this; } + @Override + public FunctionCatalog asFunctionCatalog() { + return this; + } + /** * List all the tables under the given Schema namespace. * @@ -257,6 +284,284 @@ class RelationalCatalog extends BaseSchemaCatalog implements TableCatalog { return resp.dropped(); } + /** + * List all the functions under the given schema namespace. + * + * @param namespace The namespace to list the functions under it. This namespace should have 1 + * level, which is the schema name; + * @return A list of {@link NameIdentifier} of the functions under the given namespace. + * @throws NoSuchSchemaException if the schema with a specified namespace does not exist. + */ + @Override + public NameIdentifier[] listFunctions(Namespace namespace) throws NoSuchSchemaException { + checkFunctionNamespace(namespace); + + Namespace fullNamespace = getFunctionFullNamespace(namespace); + EntityListResponse resp = + restClient.get( + formatFunctionRequestPath(fullNamespace), + EntityListResponse.class, + Collections.emptyMap(), + ErrorHandlers.functionErrorHandler()); + resp.validate(); + + return Arrays.stream(resp.identifiers()) + .map(ident -> NameIdentifier.of(ident.namespace().level(2), ident.name())) + .toArray(NameIdentifier[]::new); + } + + @Override + public Function[] listFunctionInfos(Namespace namespace) throws NoSuchSchemaException { + checkFunctionNamespace(namespace); + + Namespace fullNamespace = getFunctionFullNamespace(namespace); + Map<String, String> params = new HashMap<>(); + params.put("details", "true"); + + FunctionListResponse resp = + restClient.get( + formatFunctionRequestPath(fullNamespace), + params, + FunctionListResponse.class, + Collections.emptyMap(), + ErrorHandlers.functionErrorHandler()); + resp.validate(); + return resp.getFunctions(); + } + + /** + * Load the functions with a specified identifier. + * + * @param ident The identifier of the functions to load, which should be "schema.function" format. + * @return The functions with specified identifier. + * @throws NoSuchFunctionException if the function with a specified identifier does not exist. + */ + @Override + public Function[] getFunction(NameIdentifier ident) throws NoSuchFunctionException { + checkFunctionNameIdentifier(ident); + + Namespace fullNamespace = getFunctionFullNamespace(ident.namespace()); + FunctionListResponse resp = + restClient.get( + formatFunctionRequestPath(fullNamespace) + "/" + RESTUtils.encodeString(ident.name()), + FunctionListResponse.class, + Collections.emptyMap(), + ErrorHandlers.functionErrorHandler()); + resp.validate(); + + return resp.getFunctions(); + } + + /** + * Load the functions with a specified identifier and version. + * + * @param ident The identifier of the functions to load, which should be "schema.function" format. + * @param version The version to load. + * @return The functions with specified identifier and version. + * @throws NoSuchFunctionException if the function with a specified identifier does not exist. + * @throws NoSuchFunctionVersionException if the function version does not exist. + */ + @Override + public Function[] getFunction(NameIdentifier ident, int version) + throws NoSuchFunctionException, NoSuchFunctionVersionException { + checkFunctionNameIdentifier(ident); + + Namespace fullNamespace = getFunctionFullNamespace(ident.namespace()); + Map<String, String> params = new HashMap<>(); + params.put("version", String.valueOf(version)); + FunctionListResponse resp = + restClient.get( + formatFunctionRequestPath(fullNamespace) + "/" + RESTUtils.encodeString(ident.name()), + params, + FunctionListResponse.class, + Collections.emptyMap(), + ErrorHandlers.functionErrorHandler()); + resp.validate(); + + return resp.getFunctions(); + } + + /** + * Register a scalar or aggregate function. + * + * @param ident The identifier of the function, which should be "schema.function" format. + * @param comment The comment of the function. + * @param functionType The function type. + * @param deterministic Whether the function is deterministic. + * @param functionParams The parameters of the function. + * @param returnType The return type of the function. + * @param functionImpls The implementations of the function. + * @return The created {@link Function}. + * @throws NoSuchSchemaException if the schema with specified namespace does not exist. + * @throws FunctionAlreadyExistsException if the function with specified identifier already + * exists. + */ + @Override + public Function registerFunction( + NameIdentifier ident, + String comment, + FunctionType functionType, + boolean deterministic, + FunctionParam[] functionParams, + org.apache.gravitino.rel.types.Type returnType, + FunctionImpl[] functionImpls) + throws NoSuchSchemaException, FunctionAlreadyExistsException { + checkFunctionNameIdentifier(ident); + + FunctionRegisterRequest request = + new FunctionRegisterRequest( + new FunctionSignatureDTO(ident.name(), toFunctionParamDTOs(functionParams)), + comment, + functionType, + deterministic, + returnType, + null, + toFunctionImplDTOs(functionImpls)); + request.validate(); + + Namespace fullNamespace = getFunctionFullNamespace(ident.namespace()); + FunctionResponse resp = + restClient.post( + formatFunctionRequestPath(fullNamespace), + request, + FunctionResponse.class, + Collections.emptyMap(), + ErrorHandlers.functionErrorHandler()); + resp.validate(); + + return resp.getFunction(); + } + + /** + * Register a table-valued function. + * + * @param ident The identifier of the function, which should be "schema.function" format. + * @param comment The comment of the function. + * @param deterministic Whether the function is deterministic. + * @param functionParams The parameters of the function. + * @param returnColumns The return columns of the function. + * @param functionImpls The implementations of the function. + * @return The created {@link Function}. + * @throws NoSuchSchemaException if the schema with specified namespace does not exist. + * @throws FunctionAlreadyExistsException if the function with specified identifier already + * exists. + */ + @Override + public Function registerFunction( + NameIdentifier ident, + String comment, + boolean deterministic, + FunctionParam[] functionParams, + FunctionColumn[] returnColumns, + FunctionImpl[] functionImpls) + throws NoSuchSchemaException, FunctionAlreadyExistsException { + checkFunctionNameIdentifier(ident); + + FunctionRegisterRequest request = + new FunctionRegisterRequest( + new FunctionSignatureDTO(ident.name(), toFunctionParamDTOs(functionParams)), + comment, + FunctionType.TABLE, + deterministic, + null, + toFunctionColumnDTOs(returnColumns), + toFunctionImplDTOs(functionImpls)); + request.validate(); + + Namespace fullNamespace = getFunctionFullNamespace(ident.namespace()); + FunctionResponse resp = + restClient.post( + formatFunctionRequestPath(fullNamespace), + request, + FunctionResponse.class, + Collections.emptyMap(), + ErrorHandlers.functionErrorHandler()); + resp.validate(); + + return resp.getFunction(); + } + + /** + * Alter the function with a specified identifier by applying the changes. + * + * @param ident The identifier of the function, which should be "schema.function" format. + * @param changes Function changes to apply to the function. + * @return The altered {@link Function}. + * @throws NoSuchFunctionException if the function with specified identifier does not exist. + * @throws IllegalArgumentException if the changes are invalid. + */ + @Override + public Function alterFunction(NameIdentifier ident, FunctionChange... changes) + throws NoSuchFunctionException, IllegalArgumentException { + checkFunctionNameIdentifier(ident); + + List<FunctionUpdateRequest> requests = + Arrays.stream(changes) + .map(RelationalCatalog::toFunctionUpdateRequest) + .collect(Collectors.toList()); + FunctionUpdatesRequest updatesRequest = new FunctionUpdatesRequest(requests); + updatesRequest.validate(); + + Namespace fullNamespace = getFunctionFullNamespace(ident.namespace()); + FunctionResponse resp = + restClient.put( + formatFunctionRequestPath(fullNamespace) + "/" + RESTUtils.encodeString(ident.name()), + updatesRequest, + FunctionResponse.class, + Collections.emptyMap(), + ErrorHandlers.functionErrorHandler()); + resp.validate(); + + return resp.getFunction(); + } + + /** + * Drop the function with specified identifier. + * + * @param ident The identifier of the function, which should be "schema.function" format. + * @return true if the function is dropped successfully, false if the function does not exist. + */ + @Override + public boolean deleteFunction(NameIdentifier ident) { + checkFunctionNameIdentifier(ident); + + Namespace fullNamespace = getFunctionFullNamespace(ident.namespace()); + DropResponse resp = + restClient.delete( + formatFunctionRequestPath(fullNamespace) + "/" + RESTUtils.encodeString(ident.name()), + DropResponse.class, + Collections.emptyMap(), + ErrorHandlers.functionErrorHandler()); + resp.validate(); + return resp.dropped(); + } + + /** + * Drop the function with a specified identifier and signature. + * + * @param ident The identifier of the function, which should be "schema.function" format. + * @param signature The signature of the function. + * @return true if the function is dropped successfully, false if the function does not exist. + */ + @Override + public boolean deleteFunction(NameIdentifier ident, FunctionSignature signature) { + checkFunctionNameIdentifier(ident); + + Namespace fullNamespace = getFunctionFullNamespace(ident.namespace()); + FunctionDeleteRequest request = new FunctionDeleteRequest(toFunctionSignatureDTO(signature)); + request.validate(); + + DropResponse resp = + restClient.delete( + formatFunctionRequestPath(fullNamespace) + "/" + RESTUtils.encodeString(ident.name()), + request, + DropResponse.class, + Collections.emptyMap(), + ErrorHandlers.functionErrorHandler()); + resp.validate(); + return resp.dropped(); + } + @VisibleForTesting static String formatTableRequestPath(Namespace ns) { Namespace schemaNs = Namespace.of(ns.level(0), ns.level(1)); @@ -292,6 +597,100 @@ class RelationalCatalog extends BaseSchemaCatalog implements TableCatalog { checkTableNamespace(ident.namespace()); } + private static String formatFunctionRequestPath(Namespace ns) { + Namespace schemaNs = Namespace.of(ns.level(0), ns.level(1)); + return formatSchemaRequestPath(schemaNs) + + "/" + + RESTUtils.encodeString(ns.level(2)) + + "/functions"; + } + + private static void checkFunctionNamespace(Namespace namespace) { + Namespace.check( + namespace != null && namespace.length() == 1, + "Function namespace must be non-null and have 1 level, the input namespace is %s", + namespace); + } + + private static void checkFunctionNameIdentifier(NameIdentifier ident) { + NameIdentifier.check(ident != null, "NameIdentifier must not be null"); + NameIdentifier.check( + ident.name() != null && !ident.name().isEmpty(), "NameIdentifier name must not be empty"); + checkFunctionNamespace(ident.namespace()); + } + + private Namespace getFunctionFullNamespace(Namespace functionNamespace) { + return Namespace.of(this.catalogNamespace().level(0), this.name(), functionNamespace.level(0)); + } + + private static FunctionParamDTO[] toFunctionParamDTOs(FunctionParam[] params) { + if (params == null) { + return null; + } + return Arrays.stream(params) + .map( + param -> + new FunctionParamDTO( + param.name(), + param.dataType(), + param.comment(), + param.defaultValue() == null + || Column.DEFAULT_VALUE_NOT_SET.equals(param.defaultValue()) + ? Column.DEFAULT_VALUE_NOT_SET + : toFunctionArg(param.defaultValue()))) + .toArray(FunctionParamDTO[]::new); + } + + private static FunctionImplDTO[] toFunctionImplDTOs(FunctionImpl[] impls) { + if (impls == null) { + return new FunctionImplDTO[0]; + } + return Arrays.stream(impls).map(FunctionImplDTO::from).toArray(FunctionImplDTO[]::new); + } + + private static FunctionColumnDTO[] toFunctionColumnDTOs(FunctionColumn[] columns) { + if (columns == null) { + return new FunctionColumnDTO[0]; + } + return Arrays.stream(columns) + .map(column -> new FunctionColumnDTO(column.name(), column.dataType(), column.comment())) + .toArray(FunctionColumnDTO[]::new); + } + + private static FunctionSignatureDTO toFunctionSignatureDTO(FunctionSignature signature) { + if (signature == null) { + return null; + } + return new FunctionSignatureDTO( + signature.name(), toFunctionParamDTOs(signature.functionParams())); + } + + private static FunctionUpdateRequest toFunctionUpdateRequest(FunctionChange change) { + if (change instanceof FunctionChange.UpdateComment) { + FunctionChange.UpdateComment updateComment = (FunctionChange.UpdateComment) change; + return new FunctionUpdateRequest.UpdateCommentRequest( + toFunctionSignatureDTO(updateComment.signature()), updateComment.newComment()); + + } else if (change instanceof FunctionChange.UpdateImplementations) { + FunctionChange.UpdateImplementations updateImplementations = + (FunctionChange.UpdateImplementations) change; + return new FunctionUpdateRequest.UpdateImplementationsRequest( + toFunctionSignatureDTO(updateImplementations.signature()), + toFunctionImplDTOs(updateImplementations.newImplementations())); + + } else if (change instanceof FunctionChange.AddImplementation) { + FunctionChange.AddImplementation addImplementation = + (FunctionChange.AddImplementation) change; + return new FunctionUpdateRequest.AddImplementationRequest( + toFunctionSignatureDTO(addImplementation.signature()), + FunctionImplDTO.from(addImplementation.implementation())); + + } else { + throw new IllegalArgumentException( + "Unknown change type: " + change.getClass().getSimpleName()); + } + } + /** * Get the full namespace of the table with the given table's short namespace (schema name). * diff --git a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/FunctionCatalogIT.java b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/FunctionCatalogIT.java new file mode 100644 index 0000000000..0f74c06219 --- /dev/null +++ b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/FunctionCatalogIT.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.client.integration.test; + +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.Schema; +import org.apache.gravitino.client.GravitinoMetalake; +import org.apache.gravitino.exceptions.FunctionAlreadyExistsException; +import org.apache.gravitino.exceptions.NoSuchFunctionException; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.function.Function; +import org.apache.gravitino.function.FunctionCatalog; +import org.apache.gravitino.function.FunctionChange; +import org.apache.gravitino.function.FunctionImpl; +import org.apache.gravitino.function.FunctionParam; +import org.apache.gravitino.function.FunctionParams; +import org.apache.gravitino.function.FunctionSignature; +import org.apache.gravitino.function.FunctionType; +import org.apache.gravitino.integration.test.util.BaseIT; +import org.apache.gravitino.integration.test.util.GravitinoITUtils; +import org.apache.gravitino.rel.types.Types; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** + * Integration test that verifies function operations against an Iceberg catalog with memory + * backend. + */ +public class FunctionCatalogIT extends BaseIT { + + /** Metalake used for function catalog verification. */ + private static final String metalakeName = GravitinoITUtils.genRandomName("function_metalake"); + + /** Warehouse directory for the Iceberg memory catalog. */ + private static Path warehousePath; + + /** The metalake instance for the test. */ + private static GravitinoMetalake metalake; + + /** The Iceberg catalog under test. */ + private static Catalog icebergCatalog; + + /** The schema used for registering functions. */ + private static Schema schema; + + /** + * Create the metalake, Iceberg catalog, and schema for function tests. + * + * @throws IOException if creating temporary warehouse directory fails + */ + @BeforeAll + public void setUp() throws IOException { + warehousePath = Files.createTempDirectory("iceberg_memory_warehouse"); + Assertions.assertFalse(client.metalakeExists(metalakeName)); + metalake = + client.createMetalake( + metalakeName, "Metalake for function catalog tests", Collections.emptyMap()); + + String catalogName = GravitinoITUtils.genRandomName("function_catalog"); + Map<String, String> properties = + ImmutableMap.<String, String>builder() + .put("catalog-backend", "memory") + .put("uri", "") + .put("warehouse", warehousePath.toString()) + .build(); + icebergCatalog = + metalake.createCatalog( + catalogName, + Catalog.Type.RELATIONAL, + "lakehouse-iceberg", + "Iceberg catalog for function tests", + properties); + + String schemaName = GravitinoITUtils.genRandomName("function_schema"); + schema = + icebergCatalog + .asSchemas() + .createSchema(schemaName, "Schema for functions", Collections.emptyMap()); + } + + /** + * Drop created resources and clean temporary files. + * + * @throws IOException if deleting warehouse directory fails + */ + @AfterAll + public void tearDown() throws IOException { + if (icebergCatalog != null && schema != null) { + metalake.dropCatalog(icebergCatalog.name(), true); + } + + if (metalake != null) { + client.dropMetalake(metalakeName, true); + } + + if (warehousePath != null) { + FileUtils.deleteQuietly(warehousePath.toFile()); + } + + if (client != null) { + client.close(); + } + + try { + closer.close(); + } catch (Exception e) { + // Ignore close exception + } + } + + /** + * Verify registering, listing, loading, altering, and deleting functions on an Iceberg catalog. + */ + @Test + public void testFunctionLifecycleOnIcebergCatalog() + throws NoSuchFunctionException, NoSuchSchemaException, FunctionAlreadyExistsException { + FunctionCatalog functionCatalog = icebergCatalog.asFunctionCatalog(); + Namespace functionNamespace = Namespace.of(schema.name()); + + String functionName = GravitinoITUtils.genRandomName("function"); + NameIdentifier functionIdent = NameIdentifier.of(schema.name(), functionName); + + Assertions.assertEquals(0, functionCatalog.listFunctions(functionNamespace).length); + Assertions.assertFalse(functionCatalog.functionExists(functionIdent)); + + FunctionImpl initialImpl = + FunctionImpl.ofSql( + FunctionImpl.RuntimeType.SPARK, + "CREATE FUNCTION " + functionName + "(x INT) RETURNS INT RETURN x + 1"); + Function createdFunction = + functionCatalog.registerFunction( + functionIdent, + "initial comment", + FunctionType.SCALAR, + true, + new FunctionParam[] {FunctionParams.of("x", Types.IntegerType.get(), "input value")}, + Types.IntegerType.get(), + new FunctionImpl[] {initialImpl}); + + Assertions.assertEquals(functionName, createdFunction.signature().name()); + Assertions.assertEquals(FunctionType.SCALAR, createdFunction.functionType()); + Assertions.assertTrue(createdFunction.deterministic()); + Assertions.assertEquals("initial comment", createdFunction.comment()); + Assertions.assertEquals(Types.IntegerType.get(), createdFunction.returnType()); + Assertions.assertEquals(1, createdFunction.version()); + Assertions.assertEquals(1, createdFunction.impls().length); + + NameIdentifier[] listedFunctions = functionCatalog.listFunctions(functionNamespace); + Assertions.assertEquals(1, listedFunctions.length); + Assertions.assertEquals(functionIdent, listedFunctions[0]); + + Function[] loadedFunctions = functionCatalog.getFunction(functionIdent); + Assertions.assertEquals(1, loadedFunctions.length); + Assertions.assertEquals(createdFunction.signature(), loadedFunctions[0].signature()); + + Function[] versionedFunctions = + functionCatalog.getFunction(functionIdent, createdFunction.version()); + Assertions.assertEquals(1, versionedFunctions.length); + Assertions.assertEquals(createdFunction.signature(), versionedFunctions[0].signature()); + + FunctionImpl additionalImpl = + FunctionImpl.ofSql( + FunctionImpl.RuntimeType.TRINO, + "CREATE FUNCTION " + functionName + "(x INT) RETURNS INT RETURN x * 2"); + Function updatedFunction = + functionCatalog.alterFunction( + functionIdent, + FunctionChange.updateComment("updated comment"), + FunctionChange.addImplementation(additionalImpl)); + + Assertions.assertEquals(createdFunction.version() + 1, updatedFunction.version()); + Assertions.assertEquals("updated comment", updatedFunction.comment()); + Assertions.assertEquals(2, updatedFunction.impls().length); + + Function[] historicalVersion = + functionCatalog.getFunction(functionIdent, createdFunction.version()); + Assertions.assertEquals(1, historicalVersion.length); + Assertions.assertEquals("initial comment", historicalVersion[0].comment()); + Assertions.assertEquals(1, historicalVersion[0].impls().length); + + FunctionSignature functionSignature = + FunctionSignature.of(functionName, createdFunction.signature().functionParams()); + Assertions.assertTrue(functionCatalog.deleteFunction(functionIdent, functionSignature)); + Assertions.assertEquals(0, functionCatalog.listFunctions(functionNamespace).length); + Assertions.assertThrows( + NoSuchFunctionException.class, () -> functionCatalog.getFunction(functionIdent)); + } + + @Test + public void testListFunctionInfos() + throws NoSuchSchemaException, FunctionAlreadyExistsException, NoSuchFunctionException { + FunctionCatalog functionCatalog = icebergCatalog.asFunctionCatalog(); + Namespace functionNamespace = Namespace.of(schema.name()); + String functionName = GravitinoITUtils.genRandomName("function_info"); + NameIdentifier functionIdent = NameIdentifier.of(schema.name(), functionName); + + FunctionImpl impl = + FunctionImpl.ofSql( + FunctionImpl.RuntimeType.SPARK, + "CREATE FUNCTION " + functionName + "(x INT) RETURNS INT RETURN x + 1"); + + functionCatalog.registerFunction( + functionIdent, + "info comment", + FunctionType.SCALAR, + true, + new FunctionParam[] {FunctionParams.of("x", Types.IntegerType.get(), "input value")}, + Types.IntegerType.get(), + new FunctionImpl[] {impl}); + + Function[] functions = functionCatalog.listFunctionInfos(functionNamespace); + Assertions.assertEquals(1, functions.length); + Assertions.assertEquals(functionName, functions[0].signature().name()); + Assertions.assertEquals(FunctionImpl.RuntimeType.SPARK, functions[0].impls()[0].runtime()); + } +}
