This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 184c19464f [#9529] feat(server): Add server-side REST interface for
UDFs (part-2) (#9794)
184c19464f is described below
commit 184c19464f870eb8baaa2096552ce59a5d3a3f42
Author: mchades <[email protected]>
AuthorDate: Wed Jan 28 01:59:44 2026 +0800
[#9529] feat(server): Add server-side REST interface for UDFs (part-2)
(#9794)
### What changes were proposed in this pull request?
Changes include:
- Add FunctionOperations REST endpoint with register, get, update,
delete operations
- Add exception handlers for function-related errors
- Add comprehensive unit tests for all new components
### Why are the changes needed?
Fix: #9529
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
tests added
---
.../apache/gravitino/utils/NameIdentifierUtil.java | 15 +
.../org/apache/gravitino/utils/NamespaceUtil.java | 12 +
.../apache/gravitino/server/GravitinoServer.java | 2 +
.../web/filter/GravitinoInterceptionService.java | 2 +
.../server/web/rest/ExceptionHandlers.java | 43 ++
.../server/web/rest/FunctionOperations.java | 278 ++++++++
.../server/web/rest/TestFunctionOperations.java | 744 +++++++++++++++++++++
7 files changed, 1096 insertions(+)
diff --git
a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
index 02a6761fe7..32523b9725 100644
--- a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
@@ -225,6 +225,21 @@ public class NameIdentifierUtil {
return NameIdentifier.of(metalake, catalog, schema, model);
}
+ /**
+ * Create the function {@link NameIdentifier} with the given metalake,
catalog, schema and
+ * function name.
+ *
+ * @param metalake The metalake name
+ * @param catalog The catalog name
+ * @param schema The schema name
+ * @param function The function name
+ * @return The created function {@link NameIdentifier}
+ */
+ public static NameIdentifier ofFunction(
+ String metalake, String catalog, String schema, String function) {
+ return NameIdentifier.of(metalake, catalog, schema, function);
+ }
+
/**
* Create the model {@link NameIdentifier} from the give model version's
namespace.
*
diff --git a/core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java
b/core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java
index 3d6f76699f..d774c26bf3 100644
--- a/core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java
@@ -183,6 +183,18 @@ public class NamespaceUtil {
return Namespace.of(metalake, catalog, schema);
}
+ /**
+ * Create a namespace for function.
+ *
+ * @param metalake The metalake name
+ * @param catalog The catalog name
+ * @param schema The schema name
+ * @return A namespace for function
+ */
+ public static Namespace ofFunction(String metalake, String catalog, String
schema) {
+ return Namespace.of(metalake, catalog, schema);
+ }
+
/**
* Create a namespace for model version.
*
diff --git
a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
index f86e35e2e4..90a537f885 100644
--- a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
+++ b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
@@ -28,6 +28,7 @@ import org.apache.gravitino.Configs;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.catalog.CatalogDispatcher;
import org.apache.gravitino.catalog.FilesetDispatcher;
+import org.apache.gravitino.catalog.FunctionDispatcher;
import org.apache.gravitino.catalog.ModelDispatcher;
import org.apache.gravitino.catalog.PartitionDispatcher;
import org.apache.gravitino.catalog.SchemaDispatcher;
@@ -146,6 +147,7 @@ public class GravitinoServer extends ResourceConfig {
.to(CredentialOperationDispatcher.class)
.ranked(1);
bind(gravitinoEnv.modelDispatcher()).to(ModelDispatcher.class).ranked(1);
+
bind(gravitinoEnv.functionDispatcher()).to(FunctionDispatcher.class).ranked(1);
bind(lineageService).to(LineageDispatcher.class).ranked(1);
bind(gravitinoEnv.jobOperationDispatcher()).to(JobOperationDispatcher.class).ranked(1);
bind(gravitinoEnv.statisticDispatcher()).to(StatisticDispatcher.class).ranked(1);
diff --git
a/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
b/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
index 0ed9ed9cf9..5dc0057fcd 100644
---
a/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
+++
b/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
@@ -50,6 +50,7 @@ import
org.apache.gravitino.server.web.filter.authorization.AuthorizationExecuto
import
org.apache.gravitino.server.web.filter.authorization.AuthorizeExecutorFactory;
import org.apache.gravitino.server.web.rest.CatalogOperations;
import org.apache.gravitino.server.web.rest.FilesetOperations;
+import org.apache.gravitino.server.web.rest.FunctionOperations;
import org.apache.gravitino.server.web.rest.GroupOperations;
import org.apache.gravitino.server.web.rest.JobOperations;
import org.apache.gravitino.server.web.rest.MetadataObjectCredentialOperations;
@@ -91,6 +92,7 @@ public class GravitinoInterceptionService implements
InterceptionService {
SchemaOperations.class.getName(),
TableOperations.class.getName(),
ModelOperations.class.getName(),
+ FunctionOperations.class.getName(),
TopicOperations.class.getName(),
FilesetOperations.class.getName(),
UserOperations.class.getName(),
diff --git
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
index a4476d6a7b..df6f67387b 100644
---
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
+++
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
@@ -27,6 +27,7 @@ import
org.apache.gravitino.exceptions.CatalogAlreadyExistsException;
import org.apache.gravitino.exceptions.ConnectionFailedException;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.ForbiddenException;
+import org.apache.gravitino.exceptions.FunctionAlreadyExistsException;
import org.apache.gravitino.exceptions.GroupAlreadyExistsException;
import org.apache.gravitino.exceptions.IllegalJobTemplateOperationException;
import org.apache.gravitino.exceptions.InUseException;
@@ -142,6 +143,11 @@ public class ExceptionHandlers {
return ModelExceptionHandler.INSTANCE.handle(op, model, schema, e);
}
+ public static Response handleFunctionException(
+ OperationType op, String function, String schema, Exception e) {
+ return FunctionExceptionHandler.INSTANCE.handle(op, function, schema, e);
+ }
+
public static Response handleJobTemplateException(
OperationType op, String jobTemplate, String metalake, Exception e) {
return JobTemplateExceptionHandler.INSTANCE.handle(op, jobTemplate,
metalake, e);
@@ -843,6 +849,43 @@ public class ExceptionHandlers {
}
}
+ private static class FunctionExceptionHandler extends BaseExceptionHandler {
+ private static final ExceptionHandler INSTANCE = new
FunctionExceptionHandler();
+
+ private static String getFunctionErrorMsg(
+ String function, String operation, String schema, String reason) {
+ return String.format(
+ "Failed to operate function(s)%s operation [%s] under schema [%s],
reason [%s]",
+ function, operation, schema, reason);
+ }
+
+ @Override
+ public Response handle(OperationType op, String function, String schema,
Exception e) {
+ String formatted = StringUtil.isBlank(function) ? "" : " [" + function +
"]";
+ String errorMsg = getFunctionErrorMsg(formatted, op.name(), schema,
getErrorMsg(e));
+ LOG.warn(errorMsg, e);
+
+ if (e instanceof IllegalArgumentException) {
+ return Utils.illegalArguments(errorMsg, e);
+
+ } else if (e instanceof NotFoundException) {
+ return Utils.notFound(errorMsg, e);
+
+ } else if (e instanceof FunctionAlreadyExistsException) {
+ return Utils.alreadyExists(errorMsg, e);
+
+ } else if (e instanceof ForbiddenException) {
+ return Utils.forbidden(errorMsg, e);
+
+ } else if (e instanceof NotInUseException) {
+ return Utils.notInUse(errorMsg, e);
+
+ } else {
+ return super.handle(op, function, schema, e);
+ }
+ }
+ }
+
private static class JobTemplateExceptionHandler extends
BaseExceptionHandler {
private static final ExceptionHandler INSTANCE = new
JobTemplateExceptionHandler();
diff --git
a/server/src/main/java/org/apache/gravitino/server/web/rest/FunctionOperations.java
b/server/src/main/java/org/apache/gravitino/server/web/rest/FunctionOperations.java
new file mode 100644
index 0000000000..4330754314
--- /dev/null
+++
b/server/src/main/java/org/apache/gravitino/server/web/rest/FunctionOperations.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.server.web.rest;
+
+import com.codahale.metrics.annotation.ResponseMetered;
+import com.codahale.metrics.annotation.Timed;
+import java.util.Arrays;
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Response;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.catalog.FunctionDispatcher;
+import org.apache.gravitino.dto.function.FunctionColumnDTO;
+import org.apache.gravitino.dto.function.FunctionDTO;
+import org.apache.gravitino.dto.function.FunctionDefinitionDTO;
+import org.apache.gravitino.dto.requests.FunctionRegisterRequest;
+import org.apache.gravitino.dto.requests.FunctionUpdateRequest;
+import org.apache.gravitino.dto.requests.FunctionUpdatesRequest;
+import org.apache.gravitino.dto.responses.DropResponse;
+import org.apache.gravitino.dto.responses.EntityListResponse;
+import org.apache.gravitino.dto.responses.FunctionListResponse;
+import org.apache.gravitino.dto.responses.FunctionResponse;
+import org.apache.gravitino.dto.util.DTOConverters;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionChange;
+import org.apache.gravitino.function.FunctionColumn;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.metrics.MetricNames;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.server.web.Utils;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** REST operations for function management. */
+@Path("metalakes/{metalake}/catalogs/{catalog}/schemas/{schema}/functions")
+public class FunctionOperations {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FunctionOperations.class);
+
+ private final FunctionDispatcher dispatcher;
+
+ @Context private HttpServletRequest httpRequest;
+
+ @Inject
+ public FunctionOperations(FunctionDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ // TODO: Add authorization support for function operations
+ @GET
+ @Produces("application/vnd.gravitino.v1+json")
+ @Timed(name = "list-function." + MetricNames.HTTP_PROCESS_DURATION, absolute
= true)
+ @ResponseMetered(name = "list-function", absolute = true)
+ public Response listFunctions(
+ @PathParam("metalake") String metalake,
+ @PathParam("catalog") String catalog,
+ @PathParam("schema") String schema,
+ @QueryParam("details") @DefaultValue("false") boolean details) {
+ try {
+ LOG.info("Received list functions request for schema: {}.{}.{}",
metalake, catalog, schema);
+ return Utils.doAs(
+ httpRequest,
+ () -> {
+ Namespace namespace = NamespaceUtil.ofFunction(metalake, catalog,
schema);
+ if (!details) {
+ NameIdentifier[] identifiers =
dispatcher.listFunctions(namespace);
+ LOG.info(
+ "List {} function names under schema: {}.{}.{}",
+ identifiers.length,
+ metalake,
+ catalog,
+ schema);
+ return Utils.ok(new EntityListResponse(identifiers));
+ }
+
+ Function[] functions = dispatcher.listFunctionInfos(namespace);
+ FunctionDTO[] functionDTOs =
+ Arrays.stream(functions)
+ .map(DTOConverters::toDTO)
+ .toList()
+ .toArray(new FunctionDTO[0]);
+ LOG.info(
+ "List {} function definitions under schema: {}.{}.{}",
+ functionDTOs.length,
+ metalake,
+ catalog,
+ schema);
+ return Utils.ok(new FunctionListResponse(functionDTOs));
+ });
+ } catch (Exception e) {
+ return ExceptionHandlers.handleFunctionException(OperationType.LIST, "",
schema, e);
+ }
+ }
+
+ @POST
+ @Produces("application/vnd.gravitino.v1+json")
+ @Timed(name = "register-function." + MetricNames.HTTP_PROCESS_DURATION,
absolute = true)
+ @ResponseMetered(name = "register-function", absolute = true)
+ public Response registerFunction(
+ @PathParam("metalake") String metalake,
+ @PathParam("catalog") String catalog,
+ @PathParam("schema") String schema,
+ FunctionRegisterRequest request) {
+ LOG.info(
+ "Received register function request: {}.{}.{}.{}",
+ metalake,
+ catalog,
+ schema,
+ request.getName());
+ try {
+ return Utils.doAs(
+ httpRequest,
+ () -> {
+ request.validate();
+ NameIdentifier ident =
+ NameIdentifierUtil.ofFunction(metalake, catalog, schema,
request.getName());
+
+ FunctionDefinition[] definitions =
+ Arrays.stream(request.getDefinitions())
+ .map(FunctionDefinitionDTO::toFunctionDefinition)
+ .toArray(FunctionDefinition[]::new);
+
+ Function function;
+ if (request.getFunctionType() == FunctionType.TABLE) {
+ FunctionColumn[] returnColumns =
+ Arrays.stream(request.getReturnColumns())
+ .map(FunctionColumnDTO::toFunctionColumn)
+ .toArray(FunctionColumn[]::new);
+ function =
+ dispatcher.registerFunction(
+ ident,
+ request.getComment(),
+ request.isDeterministic(),
+ returnColumns,
+ definitions);
+ } else {
+ Type returnType = request.getReturnType();
+ function =
+ dispatcher.registerFunction(
+ ident,
+ request.getComment(),
+ request.getFunctionType(),
+ request.isDeterministic(),
+ returnType,
+ definitions);
+ }
+
+ Response response = Utils.ok(new
FunctionResponse(DTOConverters.toDTO(function)));
+ LOG.info(
+ "Function registered: {}.{}.{}.{}", metalake, catalog, schema,
request.getName());
+ return response;
+ });
+ } catch (Exception e) {
+ return ExceptionHandlers.handleFunctionException(
+ OperationType.REGISTER, request.getName(), schema, e);
+ }
+ }
+
+ @GET
+ @Path("{function}")
+ @Produces("application/vnd.gravitino.v1+json")
+ @Timed(name = "get-function." + MetricNames.HTTP_PROCESS_DURATION, absolute
= true)
+ @ResponseMetered(name = "get-function", absolute = true)
+ public Response getFunction(
+ @PathParam("metalake") String metalake,
+ @PathParam("catalog") String catalog,
+ @PathParam("schema") String schema,
+ @PathParam("function") String function) {
+ LOG.info("Received get function request: {}.{}.{}.{}", metalake, catalog,
schema, function);
+ try {
+ return Utils.doAs(
+ httpRequest,
+ () -> {
+ NameIdentifier ident =
+ NameIdentifierUtil.ofFunction(metalake, catalog, schema,
function);
+ Function f = dispatcher.getFunction(ident);
+ Response response = Utils.ok(new
FunctionResponse(DTOConverters.toDTO(f)));
+ LOG.info("Function loaded: {}.{}.{}.{}", metalake, catalog,
schema, function);
+ return response;
+ });
+ } catch (Exception e) {
+ return ExceptionHandlers.handleFunctionException(OperationType.LOAD,
function, schema, e);
+ }
+ }
+
+ @PUT
+ @Path("{function}")
+ @Produces("application/vnd.gravitino.v1+json")
+ @Timed(name = "alter-function." + MetricNames.HTTP_PROCESS_DURATION,
absolute = true)
+ @ResponseMetered(name = "alter-function", absolute = true)
+ public Response alterFunction(
+ @PathParam("metalake") String metalake,
+ @PathParam("catalog") String catalog,
+ @PathParam("schema") String schema,
+ @PathParam("function") String function,
+ FunctionUpdatesRequest request) {
+ LOG.info("Received alter function request: {}.{}.{}.{}", metalake,
catalog, schema, function);
+ try {
+ return Utils.doAs(
+ httpRequest,
+ () -> {
+ request.validate();
+ NameIdentifier ident =
+ NameIdentifierUtil.ofFunction(metalake, catalog, schema,
function);
+ FunctionChange[] changes =
+ request.getUpdates().stream()
+ .map(FunctionUpdateRequest::functionChange)
+ .toArray(FunctionChange[]::new);
+ Function f = dispatcher.alterFunction(ident, changes);
+ Response response = Utils.ok(new
FunctionResponse(DTOConverters.toDTO(f)));
+ LOG.info("Function altered: {}.{}.{}.{}", metalake, catalog,
schema, f.name());
+ return response;
+ });
+ } catch (Exception e) {
+ return ExceptionHandlers.handleFunctionException(OperationType.ALTER,
function, schema, e);
+ }
+ }
+
+ @DELETE
+ @Path("{function}")
+ @Produces("application/vnd.gravitino.v1+json")
+ @Timed(name = "drop-function." + MetricNames.HTTP_PROCESS_DURATION, absolute
= true)
+ @ResponseMetered(name = "drop-function", absolute = true)
+ public Response dropFunction(
+ @PathParam("metalake") String metalake,
+ @PathParam("catalog") String catalog,
+ @PathParam("schema") String schema,
+ @PathParam("function") String function) {
+ LOG.info("Received drop function request: {}.{}.{}.{}", metalake, catalog,
schema, function);
+ try {
+ return Utils.doAs(
+ httpRequest,
+ () -> {
+ NameIdentifier ident =
+ NameIdentifierUtil.ofFunction(metalake, catalog, schema,
function);
+ boolean dropped = dispatcher.dropFunction(ident);
+ if (!dropped) {
+ LOG.warn("Cannot find to be dropped function {} under schema
{}", function, schema);
+ }
+ Response response = Utils.ok(new DropResponse(dropped));
+ LOG.info("Function dropped: {}.{}.{}.{}", metalake, catalog,
schema, function);
+ return response;
+ });
+ } catch (Exception e) {
+ return ExceptionHandlers.handleFunctionException(OperationType.DROP,
function, schema, e);
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestFunctionOperations.java
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestFunctionOperations.java
new file mode 100644
index 0000000000..67a173270a
--- /dev/null
+++
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestFunctionOperations.java
@@ -0,0 +1,744 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.server.web.rest;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Collections;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.catalog.FunctionDispatcher;
+import org.apache.gravitino.dto.function.FunctionDefinitionDTO;
+import org.apache.gravitino.dto.function.FunctionImplDTO;
+import org.apache.gravitino.dto.function.FunctionParamDTO;
+import org.apache.gravitino.dto.function.SQLImplDTO;
+import org.apache.gravitino.dto.requests.FunctionRegisterRequest;
+import org.apache.gravitino.dto.requests.FunctionUpdateRequest;
+import org.apache.gravitino.dto.requests.FunctionUpdatesRequest;
+import org.apache.gravitino.dto.responses.DropResponse;
+import org.apache.gravitino.dto.responses.EntityListResponse;
+import org.apache.gravitino.dto.responses.ErrorConstants;
+import org.apache.gravitino.dto.responses.ErrorResponse;
+import org.apache.gravitino.dto.responses.FunctionListResponse;
+import org.apache.gravitino.dto.responses.FunctionResponse;
+import org.apache.gravitino.exceptions.FunctionAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionChange;
+import org.apache.gravitino.function.FunctionColumn;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionDefinitions;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.FunctionImpls;
+import org.apache.gravitino.function.FunctionParam;
+import org.apache.gravitino.function.FunctionParams;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.rest.RESTUtils;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.glassfish.jersey.internal.inject.AbstractBinder;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.TestProperties;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestFunctionOperations extends BaseOperationsTest {
+
+ private static class MockServletRequestFactory extends
ServletRequestFactoryBase {
+
+ @Override
+ public HttpServletRequest get() {
+ HttpServletRequest request = mock(HttpServletRequest.class);
+ when(request.getRemoteUser()).thenReturn(null);
+ return request;
+ }
+ }
+
+ private final FunctionDispatcher functionDispatcher =
mock(FunctionDispatcher.class);
+
+ private final AuditInfo testAuditInfo =
+
AuditInfo.builder().withCreator("user1").withCreateTime(Instant.now()).build();
+
+ private final String metalake = "metalake_for_function_test";
+
+ private final String catalog = "catalog_for_function_test";
+
+ private final String schema = "schema_for_function_test";
+
+ private final Namespace functionNs = NamespaceUtil.ofFunction(metalake,
catalog, schema);
+
+ @Override
+ protected Application configure() {
+ try {
+ forceSet(
+ TestProperties.CONTAINER_PORT,
String.valueOf(RESTUtils.findAvailablePort(2000, 3000)));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ ResourceConfig resourceConfig = new ResourceConfig();
+ resourceConfig.register(FunctionOperations.class);
+ resourceConfig.register(
+ new AbstractBinder() {
+ @Override
+ protected void configure() {
+ bind(functionDispatcher).to(FunctionDispatcher.class).ranked(2);
+ bindFactory(TestFunctionOperations.MockServletRequestFactory.class)
+ .to(HttpServletRequest.class);
+ }
+ });
+
+ return resourceConfig;
+ }
+
+ @Test
+ public void testListFunctions() {
+ NameIdentifier funcId1 = NameIdentifierUtil.ofFunction(metalake, catalog,
schema, "func1");
+ NameIdentifier funcId2 = NameIdentifierUtil.ofFunction(metalake, catalog,
schema, "func2");
+ NameIdentifier[] funcIds = new NameIdentifier[] {funcId1, funcId2};
+ when(functionDispatcher.listFunctions(functionNs)).thenReturn(funcIds);
+
+ Response response =
+ target(functionPath())
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .get();
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
response.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
response.getMediaType());
+
+ EntityListResponse resp = response.readEntity(EntityListResponse.class);
+ Assertions.assertEquals(0, resp.getCode());
+ Assertions.assertArrayEquals(funcIds, resp.identifiers());
+
+ // test listFunctions with details=true
+ Function mockFunction1 = mockFunction("func1", "comment1",
FunctionType.SCALAR);
+ Function mockFunction2 = mockFunction("func2", "comment2",
FunctionType.SCALAR);
+ Function[] functions = new Function[] {mockFunction1, mockFunction2};
+
when(functionDispatcher.listFunctionInfos(functionNs)).thenReturn(functions);
+
+ Response detailsResp =
+ target(functionPath())
+ .queryParam("details", true)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .get();
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
detailsResp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
detailsResp.getMediaType());
+
+ FunctionListResponse funcListResp =
detailsResp.readEntity(FunctionListResponse.class);
+ Assertions.assertEquals(0, funcListResp.getCode());
+ Assertions.assertEquals(2, funcListResp.getFunctions().length);
+ Assertions.assertEquals("func1", funcListResp.getFunctions()[0].name());
+ Assertions.assertEquals("func2", funcListResp.getFunctions()[1].name());
+
+ // Test mock return empty array for listFunctions
+ when(functionDispatcher.listFunctions(functionNs)).thenReturn(new
NameIdentifier[0]);
+ Response resp3 =
+ target(functionPath())
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .get();
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp3.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp3.getMediaType());
+
+ EntityListResponse resp4 = resp3.readEntity(EntityListResponse.class);
+ Assertions.assertEquals(0, resp4.getCode());
+ Assertions.assertEquals(0, resp4.identifiers().length);
+
+ // Test mock throw NoSuchSchemaException
+ doThrow(new NoSuchSchemaException("mock error"))
+ .when(functionDispatcher)
+ .listFunctions(functionNs);
+ Response resp5 =
+ target(functionPath())
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .get();
+
+ Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
resp5.getStatus());
+
+ ErrorResponse errorResp = resp5.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE,
errorResp.getCode());
+ Assertions.assertEquals(NoSuchSchemaException.class.getSimpleName(),
errorResp.getType());
+
+ // Test mock throw RuntimeException
+ doThrow(new RuntimeException("mock
error")).when(functionDispatcher).listFunctions(functionNs);
+ Response resp6 =
+ target(functionPath())
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .get();
+
+ Assertions.assertEquals(
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resp6.getStatus());
+
+ ErrorResponse errorResp1 = resp6.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE,
errorResp1.getCode());
+ Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp1.getType());
+ }
+
+ @Test
+ public void testGetFunction() {
+ Function mockFunction = mockFunction("func1", "test comment",
FunctionType.SCALAR);
+ NameIdentifier funcId = NameIdentifierUtil.ofFunction(metalake, catalog,
schema, "func1");
+ when(functionDispatcher.getFunction(funcId)).thenReturn(mockFunction);
+
+ Response resp =
+ target(functionPath())
+ .path("func1")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .get();
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+
+ FunctionResponse funcResp = resp.readEntity(FunctionResponse.class);
+ Assertions.assertEquals(0, funcResp.getCode());
+
+ Function resultFunction = funcResp.getFunction();
+ compare(mockFunction, resultFunction);
+
+ // Test mock throw NoSuchFunctionException
+ doThrow(new NoSuchFunctionException("mock
error")).when(functionDispatcher).getFunction(funcId);
+ Response resp1 =
+ target(functionPath())
+ .path("func1")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .get();
+
+ Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
resp1.getStatus());
+
+ ErrorResponse errorResp = resp1.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE,
errorResp.getCode());
+ Assertions.assertEquals(NoSuchFunctionException.class.getSimpleName(),
errorResp.getType());
+
+ // Test mock throw RuntimeException
+ doThrow(new RuntimeException("mock
error")).when(functionDispatcher).getFunction(funcId);
+ Response resp2 =
+ target(functionPath())
+ .path("func1")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .get();
+
+ Assertions.assertEquals(
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resp2.getStatus());
+
+ ErrorResponse errorResp1 = resp2.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE,
errorResp1.getCode());
+ Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp1.getType());
+ }
+
+ @Test
+ public void testRegisterScalarFunction() {
+ NameIdentifier funcId = NameIdentifierUtil.ofFunction(metalake, catalog,
schema, "func1");
+ Function mockFunction = mockFunction("func1", "test comment",
FunctionType.SCALAR);
+
+ when(functionDispatcher.registerFunction(
+ eq(funcId),
+ anyString(),
+ eq(FunctionType.SCALAR),
+ anyBoolean(),
+ any(Type.class),
+ any(FunctionDefinition[].class)))
+ .thenReturn(mockFunction);
+
+ FunctionDefinitionDTO[] definitionDTOs = createMockDefinitionDTOs();
+ FunctionRegisterRequest req =
+ FunctionRegisterRequest.builder()
+ .withName("func1")
+ .withComment("test comment")
+ .withFunctionType(FunctionType.SCALAR)
+ .withDeterministic(true)
+ .withReturnType(Types.IntegerType.get())
+ .withDefinitions(definitionDTOs)
+ .build();
+
+ Response resp =
+ target(functionPath())
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .post(Entity.entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+
+ FunctionResponse funcResp = resp.readEntity(FunctionResponse.class);
+ Assertions.assertEquals(0, funcResp.getCode());
+ compare(mockFunction, funcResp.getFunction());
+
+ // Test mock throw NoSuchSchemaException
+ doThrow(new NoSuchSchemaException("mock error"))
+ .when(functionDispatcher)
+ .registerFunction(
+ eq(funcId),
+ anyString(),
+ eq(FunctionType.SCALAR),
+ anyBoolean(),
+ any(Type.class),
+ any(FunctionDefinition[].class));
+
+ Response resp1 =
+ target(functionPath())
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .post(Entity.entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
resp1.getStatus());
+
+ ErrorResponse errorResp = resp1.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE,
errorResp.getCode());
+ Assertions.assertEquals(NoSuchSchemaException.class.getSimpleName(),
errorResp.getType());
+
+ // Test mock throw FunctionAlreadyExistsException
+ doThrow(new FunctionAlreadyExistsException("mock error"))
+ .when(functionDispatcher)
+ .registerFunction(
+ eq(funcId),
+ anyString(),
+ eq(FunctionType.SCALAR),
+ anyBoolean(),
+ any(Type.class),
+ any(FunctionDefinition[].class));
+
+ Response resp2 =
+ target(functionPath())
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .post(Entity.entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.CONFLICT.getStatusCode(),
resp2.getStatus());
+
+ ErrorResponse errorResp1 = resp2.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.ALREADY_EXISTS_CODE,
errorResp1.getCode());
+ Assertions.assertEquals(
+ FunctionAlreadyExistsException.class.getSimpleName(),
errorResp1.getType());
+
+ // Test mock throw RuntimeException
+ doThrow(new RuntimeException("mock error"))
+ .when(functionDispatcher)
+ .registerFunction(
+ eq(funcId),
+ anyString(),
+ eq(FunctionType.SCALAR),
+ anyBoolean(),
+ any(Type.class),
+ any(FunctionDefinition[].class));
+
+ Response resp3 =
+ target(functionPath())
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .post(Entity.entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resp3.getStatus());
+
+ ErrorResponse errorResp2 = resp3.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE,
errorResp2.getCode());
+ Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp2.getType());
+ }
+
+ @Test
+ public void testRegisterTableFunction() {
+ NameIdentifier funcId = NameIdentifierUtil.ofFunction(metalake, catalog,
schema, "tableFunc1");
+ Function mockFunction = mockTableFunction("tableFunc1", "test comment");
+
+ when(functionDispatcher.registerFunction(
+ eq(funcId),
+ anyString(),
+ anyBoolean(),
+ any(FunctionColumn[].class),
+ any(FunctionDefinition[].class)))
+ .thenReturn(mockFunction);
+
+ FunctionRegisterRequest req =
+ FunctionRegisterRequest.builder()
+ .withName("tableFunc1")
+ .withComment("test comment")
+ .withFunctionType(FunctionType.TABLE)
+ .withDeterministic(true)
+ .withReturnColumns(createMockReturnColumnDTOs())
+ .withDefinitions(createMockDefinitionDTOs())
+ .build();
+
+ Response resp =
+ target(functionPath())
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .post(Entity.entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+
+ FunctionResponse funcResp = resp.readEntity(FunctionResponse.class);
+ Assertions.assertEquals(0, funcResp.getCode());
+ Assertions.assertEquals(FunctionType.TABLE,
funcResp.getFunction().functionType());
+ }
+
+ @Test
+ public void testAlterFunction() {
+ NameIdentifier funcId = NameIdentifierUtil.ofFunction(metalake, catalog,
schema, "func1");
+ Function mockFunction = mockFunction("func1", "new comment",
FunctionType.SCALAR);
+
+ FunctionChange updateComment = FunctionChange.updateComment("new comment");
+ when(functionDispatcher.alterFunction(funcId,
updateComment)).thenReturn(mockFunction);
+
+ FunctionUpdatesRequest req =
+ new FunctionUpdatesRequest(
+ Collections.singletonList(
+ new FunctionUpdateRequest.UpdateCommentRequest("new
comment")));
+
+ Response resp =
+ target(functionPath())
+ .path("func1")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .put(Entity.entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+
+ FunctionResponse funcResp = resp.readEntity(FunctionResponse.class);
+ Assertions.assertEquals(0, funcResp.getCode());
+ Assertions.assertEquals("new comment", funcResp.getFunction().comment());
+
+ // Test mock throw NoSuchFunctionException
+ doThrow(new NoSuchFunctionException("mock error"))
+ .when(functionDispatcher)
+ .alterFunction(funcId, updateComment);
+
+ Response resp1 =
+ target(functionPath())
+ .path("func1")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .put(Entity.entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
resp1.getStatus());
+
+ ErrorResponse errorResp = resp1.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE,
errorResp.getCode());
+ Assertions.assertEquals(NoSuchFunctionException.class.getSimpleName(),
errorResp.getType());
+
+ // Test mock throw IllegalArgumentException
+ doThrow(new IllegalArgumentException("mock error"))
+ .when(functionDispatcher)
+ .alterFunction(funcId, updateComment);
+
+ Response resp2 =
+ target(functionPath())
+ .path("func1")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .put(Entity.entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(),
resp2.getStatus());
+
+ ErrorResponse errorResp1 = resp2.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.ILLEGAL_ARGUMENTS_CODE,
errorResp1.getCode());
+
+ // Test mock throw RuntimeException
+ doThrow(new RuntimeException("mock error"))
+ .when(functionDispatcher)
+ .alterFunction(funcId, updateComment);
+
+ Response resp3 =
+ target(functionPath())
+ .path("func1")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .put(Entity.entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resp3.getStatus());
+
+ ErrorResponse errorResp2 = resp3.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE,
errorResp2.getCode());
+ Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp2.getType());
+ }
+
+ @Test
+ public void testAlterFunctionAddDefinition() {
+ NameIdentifier funcId = NameIdentifierUtil.ofFunction(metalake, catalog,
schema, "func1");
+ Function mockFunction = mockFunction("func1", "comment",
FunctionType.SCALAR);
+
+ FunctionDefinitionDTO newDef = createMockDefinitionDTOs()[0];
+ FunctionChange addDef =
FunctionChange.addDefinition(newDef.toFunctionDefinition());
+ when(functionDispatcher.alterFunction(funcId,
addDef)).thenReturn(mockFunction);
+
+ FunctionUpdatesRequest req =
+ new FunctionUpdatesRequest(
+ Collections.singletonList(new
FunctionUpdateRequest.AddDefinitionRequest(newDef)));
+
+ Response resp =
+ target(functionPath())
+ .path("func1")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .put(Entity.entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ }
+
+ @Test
+ public void testAlterFunctionRemoveDefinition() {
+ NameIdentifier funcId = NameIdentifierUtil.ofFunction(metalake, catalog,
schema, "func1");
+ Function mockFunction = mockFunction("func1", "comment",
FunctionType.SCALAR);
+
+ FunctionParamDTO[] params = createMockParamDTOs();
+ FunctionParam[] functionParams = new FunctionParam[params.length];
+ for (int i = 0; i < params.length; i++) {
+ functionParams[i] = params[i].toFunctionParam();
+ }
+ FunctionChange removeDef = FunctionChange.removeDefinition(functionParams);
+ when(functionDispatcher.alterFunction(funcId,
removeDef)).thenReturn(mockFunction);
+
+ FunctionUpdatesRequest req =
+ new FunctionUpdatesRequest(
+ Collections.singletonList(new
FunctionUpdateRequest.RemoveDefinitionRequest(params)));
+
+ Response resp =
+ target(functionPath())
+ .path("func1")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .put(Entity.entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ }
+
+ @Test
+ public void testAlterFunctionAddImpl() {
+ NameIdentifier funcId = NameIdentifierUtil.ofFunction(metalake, catalog,
schema, "func1");
+ Function mockFunction = mockFunction("func1", "comment",
FunctionType.SCALAR);
+
+ FunctionParamDTO[] params = createMockParamDTOs();
+ FunctionParam[] functionParams = new FunctionParam[params.length];
+ for (int i = 0; i < params.length; i++) {
+ functionParams[i] = params[i].toFunctionParam();
+ }
+ FunctionImplDTO implDTO = createMockSqlImplDTO();
+ FunctionImpl impl = implDTO.toFunctionImpl();
+ FunctionChange addImpl = FunctionChange.addImpl(functionParams, impl);
+ when(functionDispatcher.alterFunction(funcId,
addImpl)).thenReturn(mockFunction);
+
+ FunctionUpdatesRequest req =
+ new FunctionUpdatesRequest(
+ Collections.singletonList(new
FunctionUpdateRequest.AddImplRequest(params, implDTO)));
+
+ Response resp =
+ target(functionPath())
+ .path("func1")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .put(Entity.entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ }
+
+ @Test
+ public void testAlterFunctionRemoveImpl() {
+ NameIdentifier funcId = NameIdentifierUtil.ofFunction(metalake, catalog,
schema, "func1");
+ Function mockFunction = mockFunction("func1", "comment",
FunctionType.SCALAR);
+
+ FunctionParamDTO[] params = createMockParamDTOs();
+ FunctionParam[] functionParams = new FunctionParam[params.length];
+ for (int i = 0; i < params.length; i++) {
+ functionParams[i] = params[i].toFunctionParam();
+ }
+ FunctionChange removeImpl =
+ FunctionChange.removeImpl(functionParams,
FunctionImpl.RuntimeType.SPARK);
+ when(functionDispatcher.alterFunction(funcId,
removeImpl)).thenReturn(mockFunction);
+
+ FunctionUpdatesRequest req =
+ new FunctionUpdatesRequest(
+ Collections.singletonList(
+ new FunctionUpdateRequest.RemoveImplRequest(params, "SPARK")));
+
+ Response resp =
+ target(functionPath())
+ .path("func1")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .put(Entity.entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ }
+
+ @Test
+ public void testDropFunction() {
+ NameIdentifier funcId = NameIdentifierUtil.ofFunction(metalake, catalog,
schema, "func1");
+ when(functionDispatcher.dropFunction(funcId)).thenReturn(true);
+
+ Response resp =
+ target(functionPath())
+ .path("func1")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .delete();
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+
+ DropResponse dropResp = resp.readEntity(DropResponse.class);
+ Assertions.assertEquals(0, dropResp.getCode());
+ Assertions.assertTrue(dropResp.dropped());
+
+ // Test mock return false for dropFunction
+ when(functionDispatcher.dropFunction(funcId)).thenReturn(false);
+ Response resp1 =
+ target(functionPath())
+ .path("func1")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .delete();
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp1.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp1.getMediaType());
+
+ DropResponse dropResp1 = resp1.readEntity(DropResponse.class);
+ Assertions.assertEquals(0, dropResp1.getCode());
+ Assertions.assertFalse(dropResp1.dropped());
+
+ // Test mock throw RuntimeException
+ doThrow(new RuntimeException("mock
error")).when(functionDispatcher).dropFunction(funcId);
+ Response resp2 =
+ target(functionPath())
+ .path("func1")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .delete();
+
+ Assertions.assertEquals(
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resp2.getStatus());
+
+ ErrorResponse errorResp1 = resp2.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE,
errorResp1.getCode());
+ Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp1.getType());
+ }
+
+ private String functionPath() {
+ return "/metalakes/" + metalake + "/catalogs/" + catalog + "/schemas/" +
schema + "/functions";
+ }
+
+ private Function mockFunction(String name, String comment, FunctionType
functionType) {
+ Function mockFunction = mock(Function.class);
+ when(mockFunction.name()).thenReturn(name);
+ when(mockFunction.comment()).thenReturn(comment);
+ when(mockFunction.functionType()).thenReturn(functionType);
+ when(mockFunction.deterministic()).thenReturn(true);
+ when(mockFunction.returnType()).thenReturn(Types.IntegerType.get());
+ when(mockFunction.returnColumns()).thenReturn(new FunctionColumn[0]);
+ when(mockFunction.definitions()).thenReturn(createMockDefinitions());
+ when(mockFunction.auditInfo()).thenReturn(testAuditInfo);
+ return mockFunction;
+ }
+
+ private Function mockTableFunction(String name, String comment) {
+ Function mockFunction = mock(Function.class);
+ when(mockFunction.name()).thenReturn(name);
+ when(mockFunction.comment()).thenReturn(comment);
+ when(mockFunction.functionType()).thenReturn(FunctionType.TABLE);
+ when(mockFunction.deterministic()).thenReturn(true);
+ when(mockFunction.returnType()).thenReturn(null);
+ when(mockFunction.returnColumns()).thenReturn(createMockReturnColumns());
+ when(mockFunction.definitions()).thenReturn(createMockDefinitions());
+ when(mockFunction.auditInfo()).thenReturn(testAuditInfo);
+ return mockFunction;
+ }
+
+ private FunctionDefinition[] createMockDefinitions() {
+ FunctionParam[] params =
+ new FunctionParam[] {FunctionParams.of("param1",
Types.IntegerType.get())};
+ FunctionImpl[] impls =
+ new FunctionImpl[] {
+ FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, "SELECT param1 +
1")
+ };
+ return new FunctionDefinition[] {FunctionDefinitions.of(params, impls)};
+ }
+
+ private FunctionDefinitionDTO[] createMockDefinitionDTOs() {
+ return new FunctionDefinitionDTO[] {
+ FunctionDefinitionDTO.builder()
+ .withParameters(createMockParamDTOs())
+ .withImpls(new FunctionImplDTO[] {createMockSqlImplDTO()})
+ .build()
+ };
+ }
+
+ private FunctionParamDTO[] createMockParamDTOs() {
+ return new FunctionParamDTO[] {
+
FunctionParamDTO.builder().withName("param1").withDataType(Types.IntegerType.get()).build()
+ };
+ }
+
+ private FunctionImplDTO createMockSqlImplDTO() {
+ return new SQLImplDTO("SPARK", null, null, "SELECT param1 + 1");
+ }
+
+ private FunctionColumn[] createMockReturnColumns() {
+ return new FunctionColumn[] {
+ FunctionColumn.of("col1", Types.IntegerType.get(), "column comment")
+ };
+ }
+
+ private org.apache.gravitino.dto.function.FunctionColumnDTO[]
createMockReturnColumnDTOs() {
+ return new org.apache.gravitino.dto.function.FunctionColumnDTO[] {
+ org.apache.gravitino.dto.function.FunctionColumnDTO.builder()
+ .withName("col1")
+ .withDataType(Types.IntegerType.get())
+ .withComment("column comment")
+ .build()
+ };
+ }
+
+ private void compare(Function left, Function right) {
+ Assertions.assertEquals(left.name(), right.name());
+ Assertions.assertEquals(left.comment(), right.comment());
+ Assertions.assertEquals(left.functionType(), right.functionType());
+ Assertions.assertEquals(left.deterministic(), right.deterministic());
+
+ Assertions.assertNotNull(right.auditInfo());
+ Assertions.assertEquals(left.auditInfo().creator(),
right.auditInfo().creator());
+ Assertions.assertEquals(left.auditInfo().createTime(),
right.auditInfo().createTime());
+ }
+}