This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 364710299b [#9530] feat(client): Add Java client support for UDF 
operations (#9576)
364710299b is described below

commit 364710299b623a2a11abb1e4131669a46efd4bfe
Author: mchades <[email protected]>
AuthorDate: Thu Feb 5 15:49:45 2026 +0800

    [#9530] feat(client): Add Java client support for UDF operations (#9576)
    
    ### What changes were proposed in this pull request?
    
    This PR adds Java client support for UDF (User-Defined Function)
    operations, including:
    
    1. **FunctionCatalog interface implementation in BaseSchemaCatalog**:
    All catalog types now support function operations
    2. **Client-side DTOs**: Added `FunctionDTO`, `FunctionParamDTO`,
    `FunctionDefinitionDTO`, and `FunctionImplDTO` with subclasses for
    different implementation types (Java, Python, External)
    3. **REST API client integration**: Implemented REST client methods for:
       - `registerFunction`: Register a new function
       - `getFunction`: Get function by name (with optional version)
       - `listFunctions`: List all functions in a schema
       - `dropFunction`: Drop a function
    - `alterFunction`: Alter function with various changes (add/remove
    definition, add/update/remove impl, set/remove properties)
    4. **Integration tests**: Added comprehensive `FunctionIT` test class
    covering all function operations
    
    ### Why are the changes needed?
    
    This is part of the UDF feature implementation for Apache Gravitino. The
    Java client needs to support function operations so that users can
    programmatically manage UDFs through the Gravitino client library.
    
    Fix: #9530
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, this PR introduces new user-facing APIs:
    - `FunctionCatalog` interface with methods: `registerFunction`,
    `getFunction`, `listFunctions`, `dropFunction`, `alterFunction`
    - All catalog types (Relational, Fileset, Messaging, Model) now
    implement `FunctionCatalog`
    
    ### How was this patch tested?
    
    1. Added comprehensive integration tests in `FunctionIT.java` covering:
    - Register and get functions with different types (scalar, table,
    aggregate)
    - Register functions with multiple definitions and implementations
    (Java, Python, External)
    - Alter function operations (add/remove definition, add/update/remove
    impl)
    - Error cases (function not found, schema not found, function already
    exists, definition conflicts)
       - Drop function operations
       - List functions operations
    
    Co-authored-by: Jerry Shao <[email protected]>
---
 .../hive/integration/test/CatalogHive2IT.java      |  74 +++
 .../hive/converter/HiveDatabaseConverter.java      |   8 +-
 .../apache/gravitino/client/BaseSchemaCatalog.java |  69 ++-
 .../org/apache/gravitino/client/DTOConverters.java |  66 +++
 .../org/apache/gravitino/client/ErrorHandlers.java |  72 +++
 .../client/FunctionCatalogOperations.java          | 355 ++++++++++++
 .../gravitino/client/TestFunctionCatalog.java      | 328 +++++++++++
 .../client/integration/test/FunctionIT.java        | 610 +++++++++++++++++++++
 .../gravitino/storage/relational/JDBCBackend.java  |   4 +-
 9 files changed, 1581 insertions(+), 5 deletions(-)

diff --git 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHive2IT.java
 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHive2IT.java
index 7f9eb0e771..ec332c3072 100644
--- 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHive2IT.java
+++ 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHive2IT.java
@@ -66,10 +66,18 @@ import org.apache.gravitino.catalog.hive.TableType;
 import org.apache.gravitino.client.GravitinoMetalake;
 import org.apache.gravitino.connector.BaseCatalog;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
 import org.apache.gravitino.exceptions.NoSuchPartitionException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionChange;
+import org.apache.gravitino.function.FunctionDefinitions;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.FunctionImpls;
+import org.apache.gravitino.function.FunctionParams;
+import org.apache.gravitino.function.FunctionType;
 import org.apache.gravitino.hive.HiveClientPool;
 import org.apache.gravitino.hive.HivePartition;
 import org.apache.gravitino.hive.HiveSchema;
@@ -78,6 +86,7 @@ import 
org.apache.gravitino.integration.test.container.ContainerSuite;
 import org.apache.gravitino.integration.test.container.HiveContainer;
 import org.apache.gravitino.integration.test.util.BaseIT;
 import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableCatalog;
@@ -762,6 +771,71 @@ public class CatalogHive2IT extends BaseIT {
     Assertions.assertEquals(1, tables.length);
   }
 
+  @Test
+  public void testFunctions() throws InterruptedException {
+    // test list functions in a schema which not created by Gravitino
+    String schemaName1 = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
+    hiveClientPool.run(
+        client -> {
+          client.createDatabase(
+              HiveSchema.builder()
+                  .withName(schemaName1)
+                  .withCatalogName(hmsCatalog)
+                  .withAuditInfo(AuditInfo.EMPTY)
+                  .build());
+          return null;
+        });
+    NameIdentifier[] functionIdents =
+        catalog.asFunctionCatalog().listFunctions(Namespace.of(schemaName1));
+    Assertions.assertEquals(0, functionIdents.length);
+
+    // test register functions in a schema which not created by Gravitino
+    String schemaName2 = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
+    hiveClientPool.run(
+        client -> {
+          client.createDatabase(
+              HiveSchema.builder()
+                  .withName(schemaName2)
+                  .withCatalogName(hmsCatalog)
+                  .withAuditInfo(AuditInfo.EMPTY)
+                  .build());
+          return null;
+        });
+    Function function =
+        catalog
+            .asFunctionCatalog()
+            .registerFunction(
+                NameIdentifier.of(schemaName2, "test_func"),
+                "test comment",
+                FunctionType.SCALAR,
+                true,
+                Types.StringType.get(),
+                FunctionDefinitions.of(
+                    FunctionDefinitions.of(
+                        FunctionParams.of(FunctionParams.of("input", 
Types.StringType.get())),
+                        FunctionImpls.of(
+                            FunctionImpls.ofJava(
+                                FunctionImpl.RuntimeType.SPARK, 
"mock.udf.class.name")))));
+    Assertions.assertEquals("test_func", function.name());
+
+    // test alter a non-existing function under a schema which not created by 
Gravitino
+    String schemaName3 = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
+    NameIdentifier id = NameIdentifier.of(schemaName3, "test_func");
+    NoSuchFunctionException exception =
+        assertThrows(
+            NoSuchFunctionException.class,
+            () ->
+                catalog
+                    .asFunctionCatalog()
+                    .alterFunction(id, FunctionChange.updateComment("new 
comment")));
+    Assertions.assertTrue(exception.getMessage().contains("does not exist"));
+
+    // test drop a non-existing function under a chema which not created by 
Gravitino
+    String schemaName4 = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
+    NameIdentifier id2 = NameIdentifier.of(schemaName4, "test_func");
+    Assertions.assertFalse(catalog.asFunctionCatalog().dropFunction(id2));
+  }
+
   @Test
   public void testHiveSchemaProperties() throws TException, 
InterruptedException {
     Assumptions.assumeTrue(enableSparkTest);
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveDatabaseConverter.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveDatabaseConverter.java
index d3ba428161..4d7fec4130 100644
--- 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveDatabaseConverter.java
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveDatabaseConverter.java
@@ -58,6 +58,7 @@ public class HiveDatabaseConverter {
             .build();
     return hiveSchema;
   }
+
   /**
    * Add a comment on lines L57 to L65Add diff commentMarkdown input: edit mode
    * selected.WritePreviewHeadingBoldItalicQuoteCodeLinkUnordered listNumbered 
listTask
@@ -78,14 +79,17 @@ public class HiveDatabaseConverter {
     Database hiveDb = new Database();
 
     hiveDb.setName(hiveSchema.name());
-    
Optional.ofNullable(hiveSchema.properties().get(LOCATION)).ifPresent(hiveDb::setLocationUri);
+    Optional.ofNullable(hiveSchema.properties())
+        .map(props -> props.get(LOCATION))
+        .ifPresent(hiveDb::setLocationUri);
     
Optional.ofNullable(hiveSchema.comment()).ifPresent(hiveDb::setDescription);
 
     // TODO: Add more privilege info to Hive's Database object after Gravitino 
supports privilege.
     hiveDb.setOwnerName(hiveSchema.auditInfo().creator());
     hiveDb.setOwnerType(PrincipalType.USER);
 
-    Map<String, String> parameters = new HashMap<>(hiveSchema.properties());
+    Map<String, String> parameters =
+        
Optional.ofNullable(hiveSchema.properties()).map(HashMap::new).orElseGet(HashMap::new);
     parameters.remove(LOCATION);
     hiveDb.setParameters(parameters);
 
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/BaseSchemaCatalog.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/BaseSchemaCatalog.java
index c402df22e3..64a3ba8732 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/BaseSchemaCatalog.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/BaseSchemaCatalog.java
@@ -46,6 +46,12 @@ import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NonEmptySchemaException;
 import org.apache.gravitino.exceptions.PolicyAlreadyAssociatedException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionCatalog;
+import org.apache.gravitino.function.FunctionChange;
+import org.apache.gravitino.function.FunctionColumn;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionType;
 import org.apache.gravitino.policy.Policy;
 import org.apache.gravitino.policy.SupportsPolicies;
 import org.apache.gravitino.rest.RESTUtils;
@@ -58,7 +64,12 @@ import org.apache.gravitino.tag.Tag;
  * create, load, alter and drop a schema with specified identifier.
  */
 abstract class BaseSchemaCatalog extends CatalogDTO
-    implements Catalog, SupportsSchemas, SupportsTags, SupportsRoles, 
SupportsPolicies {
+    implements Catalog,
+        SupportsSchemas,
+        SupportsTags,
+        SupportsRoles,
+        SupportsPolicies,
+        FunctionCatalog {
 
   /** The REST client to send the requests. */
   protected final RESTClient restClient;
@@ -70,6 +81,7 @@ abstract class BaseSchemaCatalog extends CatalogDTO
   private final MetadataObjectPolicyOperations objectPolicyOperations;
   private final MetadataObjectRoleOperations objectRoleOperations;
   protected final MetadataObjectCredentialOperations 
objectCredentialOperations;
+  private final FunctionCatalogOperations functionOperations;
 
   BaseSchemaCatalog(
       Namespace catalogNamespace,
@@ -100,6 +112,8 @@ abstract class BaseSchemaCatalog extends CatalogDTO
     this.objectCredentialOperations =
         new MetadataObjectCredentialOperations(
             catalogNamespace.level(0), metadataObject, restClient);
+    this.functionOperations =
+        new FunctionCatalogOperations(restClient, catalogNamespace, 
this.name());
   }
 
   @Override
@@ -317,4 +331,57 @@ abstract class BaseSchemaCatalog extends CatalogDTO
         .append("/schemas")
         .toString();
   }
+
+  @Override
+  public FunctionCatalog asFunctionCatalog() {
+    return this;
+  }
+
+  @Override
+  public NameIdentifier[] listFunctions(Namespace namespace) {
+    return functionOperations.listFunctions(namespace);
+  }
+
+  @Override
+  public Function[] listFunctionInfos(Namespace namespace) throws 
NoSuchSchemaException {
+    return functionOperations.listFunctionInfos(namespace);
+  }
+
+  @Override
+  public Function getFunction(NameIdentifier ident) {
+    return functionOperations.getFunction(ident);
+  }
+
+  @Override
+  public Function registerFunction(
+      NameIdentifier ident,
+      String comment,
+      FunctionType functionType,
+      boolean deterministic,
+      org.apache.gravitino.rel.types.Type returnType,
+      FunctionDefinition[] definitions) {
+    return functionOperations.registerFunction(
+        ident, comment, functionType, deterministic, returnType, definitions);
+  }
+
+  @Override
+  public Function registerFunction(
+      NameIdentifier ident,
+      String comment,
+      boolean deterministic,
+      FunctionColumn[] returnColumns,
+      FunctionDefinition[] definitions) {
+    return functionOperations.registerFunction(
+        ident, comment, deterministic, returnColumns, definitions);
+  }
+
+  @Override
+  public Function alterFunction(NameIdentifier ident, FunctionChange... 
changes) {
+    return functionOperations.alterFunction(ident, changes);
+  }
+
+  @Override
+  public boolean dropFunction(NameIdentifier ident) {
+    return functionOperations.dropFunction(ident);
+  }
 }
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/DTOConverters.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/DTOConverters.java
index 8521501c01..781331a553 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/DTOConverters.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/DTOConverters.java
@@ -36,6 +36,10 @@ import org.apache.gravitino.dto.CatalogDTO;
 import org.apache.gravitino.dto.MetalakeDTO;
 import org.apache.gravitino.dto.authorization.PrivilegeDTO;
 import org.apache.gravitino.dto.authorization.SecurableObjectDTO;
+import org.apache.gravitino.dto.function.FunctionColumnDTO;
+import org.apache.gravitino.dto.function.FunctionDefinitionDTO;
+import org.apache.gravitino.dto.function.FunctionImplDTO;
+import org.apache.gravitino.dto.function.FunctionParamDTO;
 import org.apache.gravitino.dto.job.JobTemplateDTO;
 import org.apache.gravitino.dto.job.ShellJobTemplateDTO;
 import org.apache.gravitino.dto.job.ShellTemplateUpdateDTO;
@@ -44,6 +48,7 @@ import org.apache.gravitino.dto.job.SparkTemplateUpdateDTO;
 import org.apache.gravitino.dto.job.TemplateUpdateDTO;
 import org.apache.gravitino.dto.requests.CatalogUpdateRequest;
 import org.apache.gravitino.dto.requests.FilesetUpdateRequest;
+import org.apache.gravitino.dto.requests.FunctionUpdateRequest;
 import org.apache.gravitino.dto.requests.JobTemplateUpdateRequest;
 import org.apache.gravitino.dto.requests.MetalakeUpdateRequest;
 import org.apache.gravitino.dto.requests.ModelUpdateRequest;
@@ -54,6 +59,10 @@ import org.apache.gravitino.dto.requests.TableUpdateRequest;
 import org.apache.gravitino.dto.requests.TagUpdateRequest;
 import org.apache.gravitino.dto.requests.TopicUpdateRequest;
 import org.apache.gravitino.file.FilesetChange;
+import org.apache.gravitino.function.FunctionChange;
+import org.apache.gravitino.function.FunctionColumn;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionParam;
 import org.apache.gravitino.job.JobTemplate;
 import org.apache.gravitino.job.JobTemplateChange;
 import org.apache.gravitino.job.ShellJobTemplate;
@@ -552,4 +561,61 @@ class DTOConverters {
           "Unknown template update type: " + 
change.getClass().getSimpleName());
     }
   }
+
+  static FunctionUpdateRequest toFunctionUpdateRequest(FunctionChange change) {
+    if (change instanceof FunctionChange.UpdateComment) {
+      return new FunctionUpdateRequest.UpdateCommentRequest(
+          ((FunctionChange.UpdateComment) change).newComment());
+
+    } else if (change instanceof FunctionChange.AddDefinition) {
+      FunctionDefinition def = ((FunctionChange.AddDefinition) 
change).definition();
+      return new FunctionUpdateRequest.AddDefinitionRequest(
+          FunctionDefinitionDTO.fromFunctionDefinition(def));
+
+    } else if (change instanceof FunctionChange.RemoveDefinition) {
+      FunctionParam[] params = ((FunctionChange.RemoveDefinition) 
change).parameters();
+      return new 
FunctionUpdateRequest.RemoveDefinitionRequest(toFunctionParamDTOs(params));
+
+    } else if (change instanceof FunctionChange.AddImpl) {
+      FunctionChange.AddImpl addImpl = (FunctionChange.AddImpl) change;
+      return new FunctionUpdateRequest.AddImplRequest(
+          toFunctionParamDTOs(addImpl.parameters()),
+          FunctionImplDTO.fromFunctionImpl(addImpl.implementation()));
+
+    } else if (change instanceof FunctionChange.UpdateImpl) {
+      FunctionChange.UpdateImpl updateImpl = (FunctionChange.UpdateImpl) 
change;
+      return new FunctionUpdateRequest.UpdateImplRequest(
+          toFunctionParamDTOs(updateImpl.parameters()),
+          updateImpl.runtime().name(),
+          FunctionImplDTO.fromFunctionImpl(updateImpl.implementation()));
+
+    } else if (change instanceof FunctionChange.RemoveImpl) {
+      FunctionChange.RemoveImpl removeImpl = (FunctionChange.RemoveImpl) 
change;
+      return new FunctionUpdateRequest.RemoveImplRequest(
+          toFunctionParamDTOs(removeImpl.parameters()), 
removeImpl.runtime().name());
+
+    } else {
+      throw new IllegalArgumentException(
+          "Unknown function change type: " + 
change.getClass().getSimpleName());
+    }
+  }
+
+  static FunctionDefinitionDTO toFunctionDefinitionDTO(FunctionDefinition 
definition) {
+    return FunctionDefinitionDTO.fromFunctionDefinition(definition);
+  }
+
+  static FunctionColumnDTO toFunctionColumnDTO(FunctionColumn column) {
+    return FunctionColumnDTO.fromFunctionColumn(column);
+  }
+
+  private static FunctionParamDTO[] toFunctionParamDTOs(FunctionParam[] 
params) {
+    if (params == null) {
+      return new FunctionParamDTO[0];
+    }
+    FunctionParamDTO[] dtos = new FunctionParamDTO[params.length];
+    for (int i = 0; i < params.length; i++) {
+      dtos[i] = FunctionParamDTO.fromFunctionParam(params[i]);
+    }
+    return dtos;
+  }
 }
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
index 62806e106f..2dd515eb9a 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
@@ -33,6 +33,7 @@ import 
org.apache.gravitino.exceptions.CatalogNotInUseException;
 import org.apache.gravitino.exceptions.ConnectionFailedException;
 import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
 import org.apache.gravitino.exceptions.ForbiddenException;
+import org.apache.gravitino.exceptions.FunctionAlreadyExistsException;
 import org.apache.gravitino.exceptions.GroupAlreadyExistsException;
 import org.apache.gravitino.exceptions.IllegalJobTemplateOperationException;
 import org.apache.gravitino.exceptions.IllegalMetadataObjectException;
@@ -48,6 +49,7 @@ import 
org.apache.gravitino.exceptions.ModelAlreadyExistsException;
 import 
org.apache.gravitino.exceptions.ModelVersionAliasesAlreadyExistException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
 import org.apache.gravitino.exceptions.NoSuchGroupException;
 import org.apache.gravitino.exceptions.NoSuchJobException;
 import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
@@ -270,6 +272,15 @@ public class ErrorHandlers {
     return StatisticsErrorHandler.INSTANCE;
   }
 
+  /**
+   * Creates an error handler specific to Function operations.
+   *
+   * @return A Consumer representing the Function error handler.
+   */
+  public static Consumer<ErrorResponse> functionErrorHandler() {
+    return FunctionErrorHandler.INSTANCE;
+  }
+
   private ErrorHandlers() {}
 
   /**
@@ -1303,4 +1314,65 @@ public class ErrorHandlers {
       throw new RESTException("Unable to process: %s", 
formatErrorMessage(errorResponse));
     }
   }
+
+  /** Error handler specific to Function operations. */
+  @SuppressWarnings("FormatStringAnnotation")
+  private static class FunctionErrorHandler extends RestErrorHandler {
+
+    private static final FunctionErrorHandler INSTANCE = new 
FunctionErrorHandler();
+
+    @Override
+    public void accept(ErrorResponse errorResponse) {
+      String errorMessage = formatErrorMessage(errorResponse);
+
+      switch (errorResponse.getCode()) {
+        case ErrorConstants.ILLEGAL_ARGUMENTS_CODE:
+          throw new IllegalArgumentException(errorMessage);
+
+        case ErrorConstants.NOT_FOUND_CODE:
+          if 
(errorResponse.getType().equals(NoSuchMetalakeException.class.getSimpleName())) 
{
+            throw new NoSuchMetalakeException(errorMessage);
+
+          } else if 
(errorResponse.getType().equals(NoSuchCatalogException.class.getSimpleName())) {
+            throw new NoSuchCatalogException(errorMessage);
+
+          } else if 
(errorResponse.getType().equals(NoSuchSchemaException.class.getSimpleName())) {
+            throw new NoSuchSchemaException(errorMessage);
+
+          } else if (errorResponse
+              .getType()
+              .equals(NoSuchFunctionException.class.getSimpleName())) {
+            throw new NoSuchFunctionException(errorMessage);
+
+          } else {
+            throw new NotFoundException(errorMessage);
+          }
+
+        case ErrorConstants.ALREADY_EXISTS_CODE:
+          throw new FunctionAlreadyExistsException(errorMessage);
+
+        case ErrorConstants.FORBIDDEN_CODE:
+          throw new ForbiddenException(errorMessage);
+
+        case ErrorConstants.INTERNAL_ERROR_CODE:
+          throw new RuntimeException(errorMessage);
+
+        case ErrorConstants.NOT_IN_USE_CODE:
+          if 
(errorResponse.getType().equals(CatalogNotInUseException.class.getSimpleName()))
 {
+            throw new CatalogNotInUseException(errorMessage);
+
+          } else if (errorResponse
+              .getType()
+              .equals(MetalakeNotInUseException.class.getSimpleName())) {
+            throw new MetalakeNotInUseException(errorMessage);
+
+          } else {
+            throw new NotInUseException(errorMessage);
+          }
+
+        default:
+          super.accept(errorResponse);
+      }
+    }
+  }
 }
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/FunctionCatalogOperations.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/FunctionCatalogOperations.java
new file mode 100644
index 0000000000..1e8abade24
--- /dev/null
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/FunctionCatalogOperations.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.dto.function.FunctionColumnDTO;
+import org.apache.gravitino.dto.function.FunctionDefinitionDTO;
+import org.apache.gravitino.dto.requests.FunctionRegisterRequest;
+import org.apache.gravitino.dto.requests.FunctionUpdateRequest;
+import org.apache.gravitino.dto.requests.FunctionUpdatesRequest;
+import org.apache.gravitino.dto.responses.DropResponse;
+import org.apache.gravitino.dto.responses.EntityListResponse;
+import org.apache.gravitino.dto.responses.FunctionListResponse;
+import org.apache.gravitino.dto.responses.FunctionResponse;
+import org.apache.gravitino.exceptions.FunctionAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionCatalog;
+import org.apache.gravitino.function.FunctionChange;
+import org.apache.gravitino.function.FunctionColumn;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rest.RESTUtils;
+
+/**
+ * Function catalog operations helper class that provides implementations for 
function management.
+ * This class is used by catalogs that support function operations (e.g., 
RelationalCatalog).
+ */
+class FunctionCatalogOperations implements FunctionCatalog {
+
+  private final RESTClient restClient;
+  private final Namespace catalogNamespace;
+  private final String catalogName;
+
+  FunctionCatalogOperations(RESTClient restClient, Namespace catalogNamespace, 
String catalogName) {
+    this.restClient = restClient;
+    this.catalogNamespace = catalogNamespace;
+    this.catalogName = catalogName;
+  }
+
+  /**
+   * List the functions in a schema namespace from the catalog.
+   *
+   * @param namespace A schema namespace. This namespace should have 1 level, 
which is the schema
+   *     name;
+   * @return An array of {@link NameIdentifier} of functions under the given 
namespace.
+   * @throws NoSuchSchemaException If the schema does not exist.
+   */
+  @Override
+  public NameIdentifier[] listFunctions(Namespace namespace) throws 
NoSuchSchemaException {
+    checkFunctionNamespace(namespace);
+
+    Namespace fullNamespace = getFunctionFullNamespace(namespace);
+    EntityListResponse resp =
+        restClient.get(
+            formatFunctionRequestPath(fullNamespace),
+            EntityListResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.functionErrorHandler());
+    resp.validate();
+
+    return Arrays.stream(resp.identifiers())
+        .map(ident -> NameIdentifier.of(ident.namespace().level(2), 
ident.name()))
+        .toArray(NameIdentifier[]::new);
+  }
+
+  /**
+   * List the functions with details in a schema namespace from the catalog.
+   *
+   * @param namespace A namespace.
+   * @return An array of functions in the namespace.
+   * @throws NoSuchSchemaException If the schema does not exist.
+   */
+  @Override
+  public Function[] listFunctionInfos(Namespace namespace) throws 
NoSuchSchemaException {
+    checkFunctionNamespace(namespace);
+
+    Namespace fullNamespace = getFunctionFullNamespace(namespace);
+    Map<String, String> params = new HashMap<>();
+    params.put("details", "true");
+
+    FunctionListResponse resp =
+        restClient.get(
+            formatFunctionRequestPath(fullNamespace),
+            params,
+            FunctionListResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.functionErrorHandler());
+    resp.validate();
+    return resp.getFunctions();
+  }
+
+  /**
+   * Get a function by {@link NameIdentifier} from the catalog. This method 
returns the latest
+   * version of the function.
+   *
+   * @param ident A function identifier, which should be "schema.function" 
format.
+   * @return The function metadata.
+   * @throws NoSuchFunctionException If the function does not exist.
+   */
+  @Override
+  public Function getFunction(NameIdentifier ident) throws 
NoSuchFunctionException {
+    checkFunctionNameIdentifier(ident);
+
+    Namespace fullNamespace = getFunctionFullNamespace(ident.namespace());
+    FunctionResponse resp =
+        restClient.get(
+            formatFunctionRequestPath(fullNamespace) + "/" + 
RESTUtils.encodeString(ident.name()),
+            FunctionResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.functionErrorHandler());
+    resp.validate();
+
+    return resp.getFunction();
+  }
+
+  /**
+   * Register a scalar or aggregate function with one or more definitions 
(overloads).
+   *
+   * @param ident The function identifier.
+   * @param comment The optional function comment.
+   * @param functionType The function type.
+   * @param deterministic Whether the function is deterministic.
+   * @param returnType The return type.
+   * @param definitions The function definitions.
+   * @return The registered function.
+   * @throws NoSuchSchemaException If the schema does not exist.
+   * @throws FunctionAlreadyExistsException If the function already exists.
+   */
+  @Override
+  public Function registerFunction(
+      NameIdentifier ident,
+      String comment,
+      FunctionType functionType,
+      boolean deterministic,
+      Type returnType,
+      FunctionDefinition[] definitions)
+      throws NoSuchSchemaException, FunctionAlreadyExistsException {
+    checkFunctionNameIdentifier(ident);
+
+    Namespace fullNamespace = getFunctionFullNamespace(ident.namespace());
+    FunctionRegisterRequest req =
+        FunctionRegisterRequest.builder()
+            .withName(ident.name())
+            .withComment(comment)
+            .withFunctionType(functionType)
+            .withDeterministic(deterministic)
+            .withReturnType(returnType)
+            .withDefinitions(toFunctionDefinitionDTOs(definitions))
+            .build();
+    req.validate();
+
+    FunctionResponse resp =
+        restClient.post(
+            formatFunctionRequestPath(fullNamespace),
+            req,
+            FunctionResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.functionErrorHandler());
+    resp.validate();
+
+    return resp.getFunction();
+  }
+
+  /**
+   * Register a table-valued function with one or more definitions (overloads).
+   *
+   * @param ident The function identifier.
+   * @param comment The optional function comment.
+   * @param deterministic Whether the function is deterministic.
+   * @param returnColumns The return columns.
+   * @param definitions The function definitions.
+   * @return The registered function.
+   * @throws NoSuchSchemaException If the schema does not exist.
+   * @throws FunctionAlreadyExistsException If the function already exists.
+   */
+  @Override
+  public Function registerFunction(
+      NameIdentifier ident,
+      String comment,
+      boolean deterministic,
+      FunctionColumn[] returnColumns,
+      FunctionDefinition[] definitions)
+      throws NoSuchSchemaException, FunctionAlreadyExistsException {
+    checkFunctionNameIdentifier(ident);
+
+    Namespace fullNamespace = getFunctionFullNamespace(ident.namespace());
+    FunctionRegisterRequest req =
+        FunctionRegisterRequest.builder()
+            .withName(ident.name())
+            .withComment(comment)
+            .withFunctionType(FunctionType.TABLE)
+            .withDeterministic(deterministic)
+            .withReturnColumns(toFunctionColumnDTOs(returnColumns))
+            .withDefinitions(toFunctionDefinitionDTOs(definitions))
+            .build();
+    req.validate();
+
+    FunctionResponse resp =
+        restClient.post(
+            formatFunctionRequestPath(fullNamespace),
+            req,
+            FunctionResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.functionErrorHandler());
+    resp.validate();
+
+    return resp.getFunction();
+  }
+
+  /**
+   * Alter a function in the catalog.
+   *
+   * @param ident A function identifier, which should be "schema.function" 
format.
+   * @param changes The changes to apply to the function.
+   * @return The updated function metadata.
+   * @throws NoSuchFunctionException If the function does not exist.
+   * @throws IllegalArgumentException If the changes are invalid.
+   */
+  @Override
+  public Function alterFunction(NameIdentifier ident, FunctionChange... 
changes)
+      throws NoSuchFunctionException, IllegalArgumentException {
+    checkFunctionNameIdentifier(ident);
+
+    Namespace fullNamespace = getFunctionFullNamespace(ident.namespace());
+    List<FunctionUpdateRequest> updates =
+        Arrays.stream(changes)
+            .map(DTOConverters::toFunctionUpdateRequest)
+            .collect(Collectors.toList());
+    FunctionUpdatesRequest req = new FunctionUpdatesRequest(updates);
+    req.validate();
+
+    FunctionResponse resp =
+        restClient.put(
+            formatFunctionRequestPath(fullNamespace) + "/" + 
RESTUtils.encodeString(ident.name()),
+            req,
+            FunctionResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.functionErrorHandler());
+    resp.validate();
+
+    return resp.getFunction();
+  }
+
+  /**
+   * Drop a function from the catalog.
+   *
+   * @param ident A function identifier, which should be "schema.function" 
format.
+   * @return true If the function is dropped, false the function did not exist.
+   */
+  @Override
+  public boolean dropFunction(NameIdentifier ident) {
+    checkFunctionNameIdentifier(ident);
+
+    Namespace fullNamespace = getFunctionFullNamespace(ident.namespace());
+    DropResponse resp =
+        restClient.delete(
+            formatFunctionRequestPath(fullNamespace) + "/" + 
RESTUtils.encodeString(ident.name()),
+            DropResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.functionErrorHandler());
+    resp.validate();
+
+    return resp.dropped();
+  }
+
+  @VisibleForTesting
+  String formatFunctionRequestPath(Namespace ns) {
+    Namespace schemaNs = Namespace.of(ns.level(0), ns.level(1));
+    return new StringBuilder()
+        .append(BaseSchemaCatalog.formatSchemaRequestPath(schemaNs))
+        .append("/")
+        .append(RESTUtils.encodeString(ns.level(2)))
+        .append("/functions")
+        .toString();
+  }
+
+  /**
+   * Check whether the namespace of a function is valid.
+   *
+   * @param namespace The namespace to check.
+   */
+  static void checkFunctionNamespace(Namespace namespace) {
+    Namespace.check(
+        namespace != null && namespace.length() == 1,
+        "Function namespace must be non-null and have 1 level, the input 
namespace is %s",
+        namespace);
+  }
+
+  /**
+   * Check whether the NameIdentifier of a function is valid.
+   *
+   * @param ident The NameIdentifier to check, which should be 
"schema.function" format.
+   */
+  static void checkFunctionNameIdentifier(NameIdentifier ident) {
+    NameIdentifier.check(ident != null, "NameIdentifier must not be null");
+    NameIdentifier.check(
+        ident.name() != null && !ident.name().isEmpty(), "NameIdentifier name 
must not be empty");
+    checkFunctionNamespace(ident.namespace());
+  }
+
+  /**
+   * Get the full namespace of the function with the given function's short 
namespace (schema name).
+   *
+   * @param functionNamespace The function's short namespace, which is the 
schema name.
+   * @return full namespace of the function, which is 
"metalake.catalog.schema" format.
+   */
+  private Namespace getFunctionFullNamespace(Namespace functionNamespace) {
+    return Namespace.of(catalogNamespace.level(0), catalogName, 
functionNamespace.level(0));
+  }
+
+  private FunctionDefinitionDTO[] 
toFunctionDefinitionDTOs(FunctionDefinition[] definitions) {
+    if (definitions == null) {
+      return null;
+    }
+    return Arrays.stream(definitions)
+        .map(DTOConverters::toFunctionDefinitionDTO)
+        .toArray(FunctionDefinitionDTO[]::new);
+  }
+
+  private FunctionColumnDTO[] toFunctionColumnDTOs(FunctionColumn[] columns) {
+    if (columns == null) {
+      return null;
+    }
+    return Arrays.stream(columns)
+        .map(DTOConverters::toFunctionColumnDTO)
+        .toArray(FunctionColumnDTO[]::new);
+  }
+}
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestFunctionCatalog.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestFunctionCatalog.java
new file mode 100644
index 0000000000..e6790fec2d
--- /dev/null
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestFunctionCatalog.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.client;
+
+import static org.apache.hc.core5.http.HttpStatus.SC_CONFLICT;
+import static org.apache.hc.core5.http.HttpStatus.SC_NOT_FOUND;
+import static org.apache.hc.core5.http.HttpStatus.SC_OK;
+import static org.apache.hc.core5.http.HttpStatus.SC_SERVER_ERROR;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableMap;
+import java.time.Instant;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.dto.AuditDTO;
+import org.apache.gravitino.dto.CatalogDTO;
+import org.apache.gravitino.dto.function.FunctionDTO;
+import org.apache.gravitino.dto.function.FunctionDefinitionDTO;
+import org.apache.gravitino.dto.function.FunctionImplDTO;
+import org.apache.gravitino.dto.function.FunctionParamDTO;
+import org.apache.gravitino.dto.function.SQLImplDTO;
+import org.apache.gravitino.dto.requests.CatalogCreateRequest;
+import org.apache.gravitino.dto.responses.CatalogResponse;
+import org.apache.gravitino.dto.responses.DropResponse;
+import org.apache.gravitino.dto.responses.EntityListResponse;
+import org.apache.gravitino.dto.responses.ErrorResponse;
+import org.apache.gravitino.dto.responses.FunctionResponse;
+import org.apache.gravitino.exceptions.FunctionAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionCatalog;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.hc.core5.http.Method;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class TestFunctionCatalog extends TestBase {
+
+  protected static Catalog catalog;
+
+  private static GravitinoMetalake metalake;
+
+  protected static final String metalakeName = "testMetalake";
+
+  protected static final String catalogName = "testCatalog";
+
+  private static final String provider = "test";
+
+  @BeforeAll
+  public static void setUp() throws Exception {
+    TestBase.setUp();
+
+    metalake = TestGravitinoMetalake.createMetalake(client, metalakeName);
+
+    CatalogDTO mockCatalog =
+        CatalogDTO.builder()
+            .withName(catalogName)
+            .withType(CatalogDTO.Type.RELATIONAL)
+            .withProvider(provider)
+            .withComment("comment")
+            .withProperties(ImmutableMap.of("k1", "k2"))
+            .withAudit(
+                
AuditDTO.builder().withCreator("creator").withCreateTime(Instant.now()).build())
+            .build();
+
+    CatalogCreateRequest catalogCreateRequest =
+        new CatalogCreateRequest(
+            catalogName,
+            CatalogDTO.Type.RELATIONAL,
+            provider,
+            "comment",
+            ImmutableMap.of("k1", "k2"));
+    CatalogResponse catalogResponse = new CatalogResponse(mockCatalog);
+    buildMockResource(
+        Method.POST,
+        "/api/metalakes/" + metalakeName + "/catalogs",
+        catalogCreateRequest,
+        catalogResponse,
+        SC_OK);
+
+    catalog =
+        metalake.createCatalog(
+            catalogName,
+            CatalogDTO.Type.RELATIONAL,
+            provider,
+            "comment",
+            ImmutableMap.of("k1", "k2"));
+  }
+
+  @Test
+  public void testListFunctions() throws JsonProcessingException {
+    NameIdentifier func1 = NameIdentifier.of("schema1", "func1");
+    NameIdentifier func2 = NameIdentifier.of("schema1", "func2");
+    NameIdentifier expectedResultFunc1 =
+        NameIdentifier.of(metalakeName, catalogName, "schema1", "func1");
+    NameIdentifier expectedResultFunc2 =
+        NameIdentifier.of(metalakeName, catalogName, "schema1", "func2");
+    String functionPath =
+        withSlash(formatFunctionRequestPath(Namespace.of(metalakeName, 
catalogName, "schema1")));
+
+    EntityListResponse resp =
+        new EntityListResponse(new NameIdentifier[] {expectedResultFunc1, 
expectedResultFunc2});
+    buildMockResource(Method.GET, functionPath, null, resp, SC_OK);
+    NameIdentifier[] functions = 
catalog.asFunctionCatalog().listFunctions(func1.namespace());
+
+    Assertions.assertEquals(2, functions.length);
+    Assertions.assertEquals(func1, functions[0]);
+    Assertions.assertEquals(func2, functions[1]);
+
+    // Throw schema not found exception
+    ErrorResponse errResp =
+        ErrorResponse.notFound(NoSuchSchemaException.class.getSimpleName(), 
"schema not found");
+    buildMockResource(Method.GET, functionPath, null, errResp, SC_NOT_FOUND);
+    Assertions.assertThrows(
+        NoSuchSchemaException.class,
+        () -> catalog.asFunctionCatalog().listFunctions(func1.namespace()),
+        "schema not found");
+
+    // Throw Runtime exception
+    ErrorResponse errResp2 = ErrorResponse.internalError("internal error");
+    buildMockResource(Method.GET, functionPath, null, errResp2, 
SC_SERVER_ERROR);
+    Assertions.assertThrows(
+        RuntimeException.class,
+        () -> catalog.asFunctionCatalog().listFunctions(func1.namespace()),
+        "internal error");
+  }
+
+  @Test
+  public void testGetFunction() throws JsonProcessingException {
+    NameIdentifier func = NameIdentifier.of("schema1", "func1");
+    String functionPath =
+        withSlash(
+            formatFunctionRequestPath(Namespace.of(metalakeName, catalogName, 
"schema1"))
+                + "/func1");
+
+    FunctionDTO mockFunction =
+        mockFunctionDTO(func.name(), FunctionType.SCALAR, "mock comment", 
true);
+    FunctionResponse resp = new FunctionResponse(mockFunction);
+    buildMockResource(Method.GET, functionPath, null, resp, SC_OK);
+    Function loadedFunction = catalog.asFunctionCatalog().getFunction(func);
+    Assertions.assertNotNull(loadedFunction);
+    assertFunction(mockFunction, loadedFunction);
+
+    // Throw schema not found exception
+    ErrorResponse errResp =
+        ErrorResponse.notFound(NoSuchSchemaException.class.getSimpleName(), 
"schema not found");
+    buildMockResource(Method.GET, functionPath, null, errResp, SC_NOT_FOUND);
+    Assertions.assertThrows(
+        NoSuchSchemaException.class,
+        () -> catalog.asFunctionCatalog().getFunction(func),
+        "schema not found");
+
+    // Throw function not found exception
+    ErrorResponse errResp1 =
+        ErrorResponse.notFound(NoSuchFunctionException.class.getSimpleName(), 
"function not found");
+    buildMockResource(Method.GET, functionPath, null, errResp1, SC_NOT_FOUND);
+    Assertions.assertThrows(
+        NoSuchFunctionException.class,
+        () -> catalog.asFunctionCatalog().getFunction(func),
+        "function not found");
+  }
+
+  @Test
+  public void testRegisterFunction() throws JsonProcessingException {
+    NameIdentifier func = NameIdentifier.of("schema1", "func1");
+    String functionPath =
+        withSlash(formatFunctionRequestPath(Namespace.of(metalakeName, 
catalogName, "schema1")));
+
+    FunctionDTO mockFunction =
+        mockFunctionDTO(func.name(), FunctionType.SCALAR, "mock comment", 
true);
+
+    FunctionResponse resp = new FunctionResponse(mockFunction);
+    // Use null for request body to match any request body
+    buildMockResource(Method.POST, functionPath, null, resp, SC_OK);
+
+    Function registeredFunction =
+        catalog
+            .asFunctionCatalog()
+            .registerFunction(
+                func,
+                "mock comment",
+                FunctionType.SCALAR,
+                true,
+                Types.StringType.get(),
+                mockFunction.definitions());
+
+    Assertions.assertNotNull(registeredFunction);
+    assertFunction(mockFunction, registeredFunction);
+
+    // Throw function already exists exception
+    ErrorResponse errResp =
+        ErrorResponse.alreadyExists(
+            FunctionAlreadyExistsException.class.getSimpleName(), "function 
already exists");
+    buildMockResource(Method.POST, functionPath, null, errResp, SC_CONFLICT);
+    Assertions.assertThrows(
+        FunctionAlreadyExistsException.class,
+        () ->
+            catalog
+                .asFunctionCatalog()
+                .registerFunction(
+                    func,
+                    "mock comment",
+                    FunctionType.SCALAR,
+                    true,
+                    Types.StringType.get(),
+                    mockFunction.definitions()),
+        "function already exists");
+  }
+
+  @Test
+  public void testAlterFunction() throws JsonProcessingException {
+    NameIdentifier func = NameIdentifier.of("schema1", "func1");
+    String functionPath =
+        withSlash(
+            formatFunctionRequestPath(Namespace.of(metalakeName, catalogName, 
"schema1"))
+                + "/func1");
+
+    FunctionDTO mockFunction =
+        mockFunctionDTO(func.name(), FunctionType.SCALAR, "updated comment", 
true);
+
+    FunctionResponse resp = new FunctionResponse(mockFunction);
+    // Use null for request body to match any request body
+    buildMockResource(Method.PUT, functionPath, null, resp, SC_OK);
+
+    FunctionCatalog functionCatalog = catalog.asFunctionCatalog();
+    Function alteredFunction =
+        functionCatalog.alterFunction(
+            func, 
org.apache.gravitino.function.FunctionChange.updateComment("updated comment"));
+
+    Assertions.assertNotNull(alteredFunction);
+    Assertions.assertEquals("updated comment", alteredFunction.comment());
+
+    // Throw function not found exception
+    ErrorResponse errResp =
+        ErrorResponse.notFound(NoSuchFunctionException.class.getSimpleName(), 
"function not found");
+    buildMockResource(Method.PUT, functionPath, null, errResp, SC_NOT_FOUND);
+    Assertions.assertThrows(
+        NoSuchFunctionException.class,
+        () ->
+            functionCatalog.alterFunction(
+                func,
+                
org.apache.gravitino.function.FunctionChange.updateComment("updated comment")),
+        "function not found");
+  }
+
+  @Test
+  public void testDropFunction() throws JsonProcessingException {
+    NameIdentifier func = NameIdentifier.of("schema1", "func1");
+    String functionPath =
+        withSlash(
+            formatFunctionRequestPath(Namespace.of(metalakeName, catalogName, 
"schema1"))
+                + "/func1");
+
+    DropResponse resp = new DropResponse(true);
+    buildMockResource(Method.DELETE, functionPath, null, resp, SC_OK);
+    Assertions.assertTrue(catalog.asFunctionCatalog().dropFunction(func));
+
+    // Return false when the function does not exist
+    DropResponse resp1 = new DropResponse(false);
+    buildMockResource(Method.DELETE, functionPath, null, resp1, SC_OK);
+    Assertions.assertFalse(catalog.asFunctionCatalog().dropFunction(func));
+  }
+
+  private static FunctionDTO mockFunctionDTO(
+      String name, FunctionType functionType, String comment, boolean 
deterministic) {
+    FunctionParamDTO[] params =
+        new FunctionParamDTO[] {
+          FunctionParamDTO.builder()
+              .withName("param1")
+              .withDataType(Types.IntegerType.get())
+              .build()
+        };
+
+    SQLImplDTO impl =
+        new SQLImplDTO(FunctionImpl.RuntimeType.SPARK.name(), null, null, 
"SELECT param1 + 1");
+
+    FunctionDefinitionDTO definition =
+        FunctionDefinitionDTO.builder()
+            .withParameters(params)
+            .withImpls(new FunctionImplDTO[] {impl})
+            .build();
+
+    return FunctionDTO.builder()
+        .withName(name)
+        .withFunctionType(functionType)
+        .withComment(comment)
+        .withDeterministic(deterministic)
+        .withReturnType(Types.StringType.get())
+        .withDefinitions(new FunctionDefinitionDTO[] {definition})
+        
.withAudit(AuditDTO.builder().withCreator("creator").withCreateTime(Instant.now()).build())
+        .build();
+  }
+
+  private static void assertFunction(FunctionDTO expected, Function actual) {
+    Assertions.assertEquals(expected.name(), actual.name());
+    Assertions.assertEquals(expected.functionType(), actual.functionType());
+    Assertions.assertEquals(expected.comment(), actual.comment());
+    Assertions.assertEquals(expected.deterministic(), actual.deterministic());
+    Assertions.assertEquals(expected.returnType(), actual.returnType());
+    Assertions.assertEquals(expected.definitions().length, 
actual.definitions().length);
+  }
+
+  private static String formatFunctionRequestPath(Namespace ns) {
+    Namespace schemaNs = Namespace.of(ns.level(0), ns.level(1));
+    return BaseSchemaCatalog.formatSchemaRequestPath(schemaNs) + "/" + 
ns.level(2) + "/functions";
+  }
+}
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/FunctionIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/FunctionIT.java
new file mode 100644
index 0000000000..598c47a23a
--- /dev/null
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/FunctionIT.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.client.integration.test;
+
+import com.google.common.collect.Maps;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.FunctionAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionCatalog;
+import org.apache.gravitino.function.FunctionChange;
+import org.apache.gravitino.function.FunctionColumn;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionDefinitions;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.FunctionImpls;
+import org.apache.gravitino.function.FunctionParam;
+import org.apache.gravitino.function.FunctionParams;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.container.HiveContainer;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+@Tag("gravitino-docker-test")
+public class FunctionIT extends BaseIT {
+
+  private static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
+
+  private static final String METALAKE_NAME =
+      GravitinoITUtils.genRandomName("function_it_metalake");
+
+  private static String hmsUri;
+
+  private GravitinoMetalake metalake;
+  private Catalog catalog;
+  private String schemaName;
+  private FunctionCatalog functionCatalog;
+
+  @BeforeAll
+  public void setUp() {
+    containerSuite.startHiveContainer();
+    hmsUri =
+        String.format(
+            "thrift://%s:%d",
+            containerSuite.getHiveContainer().getContainerIpAddress(),
+            HiveContainer.HIVE_METASTORE_PORT);
+
+    metalake = client.createMetalake(METALAKE_NAME, "metalake comment", 
Collections.emptyMap());
+  }
+
+  @AfterAll
+  public void tearDown() {
+    client.dropMetalake(METALAKE_NAME, true);
+
+    if (client != null) {
+      client.close();
+      client = null;
+    }
+
+    try {
+      closer.close();
+    } catch (Exception e) {
+      // Swallow exceptions
+    }
+  }
+
+  @BeforeEach
+  public void createCatalogAndSchema() {
+    String catalogName = GravitinoITUtils.genRandomName("function_it_catalog");
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("metastore.uris", hmsUri);
+
+    catalog =
+        metalake.createCatalog(
+            catalogName, Catalog.Type.RELATIONAL, "hive", "catalog comment", 
properties);
+
+    schemaName = GravitinoITUtils.genRandomName("function_it_schema");
+    catalog.asSchemas().createSchema(schemaName, "schema comment", 
Collections.emptyMap());
+
+    functionCatalog = catalog.asFunctionCatalog();
+  }
+
+  @AfterEach
+  public void cleanUp() {
+    metalake.dropCatalog(catalog.name(), true);
+  }
+
+  @Test
+  public void testRegisterAndGetFunction() {
+    // Test 1: Register and get a deterministic scalar function
+    String functionName = GravitinoITUtils.genRandomName("test_func");
+    NameIdentifier ident = NameIdentifier.of(schemaName, functionName);
+
+    FunctionParam param =
+        FunctionParams.of("x", Types.IntegerType.get(), null, 
Literals.integerLiteral(0));
+    FunctionImpl impl = FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, 
"SELECT x + 1");
+    FunctionDefinition definition =
+        FunctionDefinitions.of(new FunctionParam[] {param}, new FunctionImpl[] 
{impl});
+
+    Function registered =
+        functionCatalog.registerFunction(
+            ident,
+            "Add one to input",
+            FunctionType.SCALAR,
+            true,
+            Types.IntegerType.get(),
+            new FunctionDefinition[] {definition});
+
+    Assertions.assertEquals(functionName, registered.name());
+    Assertions.assertEquals(FunctionType.SCALAR, registered.functionType());
+    Assertions.assertTrue(registered.deterministic());
+    Assertions.assertEquals("Add one to input", registered.comment());
+    Assertions.assertEquals(Types.IntegerType.get(), registered.returnType());
+    Assertions.assertEquals(1, registered.definitions().length);
+
+    Function loaded = functionCatalog.getFunction(ident);
+    Assertions.assertEquals(functionName, loaded.name());
+    Assertions.assertEquals(FunctionType.SCALAR, loaded.functionType());
+    Assertions.assertTrue(loaded.deterministic());
+    Assertions.assertEquals("Add one to input", loaded.comment());
+    Assertions.assertEquals(Types.IntegerType.get(), loaded.returnType());
+
+    Assertions.assertTrue(functionCatalog.functionExists(ident));
+    Assertions.assertFalse(
+        functionCatalog.functionExists(NameIdentifier.of(schemaName, 
"non_existent_func")));
+
+    // Test 2: Register a non-deterministic function
+    String nonDetFuncName = 
GravitinoITUtils.genRandomName("nondeterministic_func");
+    NameIdentifier nonDetIdent = NameIdentifier.of(schemaName, nonDetFuncName);
+
+    FunctionImpl nonDetImpl = 
FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, "SELECT RAND()");
+    FunctionDefinition nonDetDefinition =
+        FunctionDefinitions.of(new FunctionParam[0], new FunctionImpl[] 
{nonDetImpl});
+
+    Function nonDetRegistered =
+        functionCatalog.registerFunction(
+            nonDetIdent,
+            "Non-deterministic function",
+            FunctionType.SCALAR,
+            false,
+            Types.DoubleType.get(),
+            new FunctionDefinition[] {nonDetDefinition});
+
+    Assertions.assertFalse(nonDetRegistered.deterministic());
+
+    // Test 3: Register a table function
+    String tableFuncName = GravitinoITUtils.genRandomName("table_func");
+    NameIdentifier tableFuncIdent = NameIdentifier.of(schemaName, 
tableFuncName);
+
+    FunctionParam tableParam = FunctionParams.of("n", Types.IntegerType.get());
+    FunctionImpl tableImpl =
+        FunctionImpls.ofJava(FunctionImpl.RuntimeType.SPARK, 
"com.example.GenerateRowsUDTF");
+    FunctionDefinition tableDefinition =
+        FunctionDefinitions.of(new FunctionParam[] {tableParam}, new 
FunctionImpl[] {tableImpl});
+
+    Function tableRegistered =
+        functionCatalog.registerFunction(
+            tableFuncIdent,
+            "Table function",
+            true,
+            new FunctionColumn[] {FunctionColumn.of("x", 
Types.StringType.get(), "comment")},
+            new FunctionDefinition[] {tableDefinition});
+
+    Assertions.assertEquals(FunctionType.TABLE, 
tableRegistered.functionType());
+  }
+
+  @Test
+  public void testRegisterFunctionConflict() {
+    // Test 1: Register function with same name should fail
+    String functionName = GravitinoITUtils.genRandomName("test_func");
+    NameIdentifier ident = NameIdentifier.of(schemaName, functionName);
+
+    FunctionParam param = FunctionParams.of("x", Types.IntegerType.get());
+    FunctionImpl impl = FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, 
"SELECT x + 1");
+    FunctionDefinition definition =
+        FunctionDefinitions.of(new FunctionParam[] {param}, new FunctionImpl[] 
{impl});
+
+    functionCatalog.registerFunction(
+        ident,
+        "comment",
+        FunctionType.SCALAR,
+        true,
+        Types.IntegerType.get(),
+        new FunctionDefinition[] {definition});
+
+    Assertions.assertThrows(
+        FunctionAlreadyExistsException.class,
+        () ->
+            functionCatalog.registerFunction(
+                ident,
+                "comment",
+                FunctionType.SCALAR,
+                true,
+                Types.IntegerType.get(),
+                new FunctionDefinition[] {definition}));
+
+    // Test 2: Register function with ambiguous definitions should fail
+    // foo(int, float default 1.0) and foo(int, string default 'x') both 
support call foo(1)
+    String ambiguousFuncName = 
GravitinoITUtils.genRandomName("ambiguous_func");
+    NameIdentifier ambiguousIdent = NameIdentifier.of(schemaName, 
ambiguousFuncName);
+
+    FunctionParam intParam = FunctionParams.of("x", Types.IntegerType.get());
+    FunctionParam floatParamWithDefault =
+        FunctionParams.of("y", Types.FloatType.get(), "1.0", 
Literals.floatLiteral(1.0f));
+    FunctionParam stringParamWithDefault =
+        FunctionParams.of("z", Types.StringType.get(), "'x'", 
Literals.stringLiteral("x"));
+
+    FunctionImpl ambiguousImpl =
+        FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, "SELECT x + 1");
+
+    // Definition 1: foo(int, float default 1.0) supports arities: (int), 
(int, float)
+    FunctionDefinition def1 =
+        FunctionDefinitions.of(
+            new FunctionParam[] {intParam, floatParamWithDefault},
+            new FunctionImpl[] {ambiguousImpl});
+
+    // Definition 2: foo(int, string default 'x') supports arities: (int), 
(int, string)
+    FunctionDefinition def2 =
+        FunctionDefinitions.of(
+            new FunctionParam[] {intParam, stringParamWithDefault},
+            new FunctionImpl[] {ambiguousImpl});
+
+    // Both definitions support call foo(1), so this should fail
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            functionCatalog.registerFunction(
+                ambiguousIdent,
+                "Ambiguous function",
+                FunctionType.SCALAR,
+                true,
+                Types.IntegerType.get(),
+                new FunctionDefinition[] {def1, def2}));
+  }
+
+  @Test
+  public void testGetFunctionNotFound() {
+    // Test 1: Function not found in existing schema
+    NameIdentifier ident = NameIdentifier.of(schemaName, "non_existent_func");
+
+    Assertions.assertThrows(
+        NoSuchFunctionException.class, () -> 
functionCatalog.getFunction(ident));
+
+    // Test 2: Function not found when schema does not exist
+    NameIdentifier identWithNonExistentSchema =
+        NameIdentifier.of("non_existent_schema", "some_func");
+
+    Assertions.assertThrows(
+        NoSuchFunctionException.class,
+        () -> functionCatalog.getFunction(identWithNonExistentSchema));
+  }
+
+  @Test
+  public void testListFunctions() {
+    // Register multiple functions
+    String func1Name = GravitinoITUtils.genRandomName("list_func1");
+    String func2Name = GravitinoITUtils.genRandomName("list_func2");
+
+    FunctionParam param = FunctionParams.of("x", Types.IntegerType.get());
+    FunctionImpl impl = FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, 
"SELECT x + 1");
+    FunctionDefinition definition =
+        FunctionDefinitions.of(new FunctionParam[] {param}, new FunctionImpl[] 
{impl});
+
+    Function func1 =
+        functionCatalog.registerFunction(
+            NameIdentifier.of(schemaName, func1Name),
+            "comment1",
+            FunctionType.SCALAR,
+            true,
+            Types.IntegerType.get(),
+            new FunctionDefinition[] {definition});
+
+    Function func2 =
+        functionCatalog.registerFunction(
+            NameIdentifier.of(schemaName, func2Name),
+            "comment2",
+            FunctionType.SCALAR,
+            true,
+            Types.IntegerType.get(),
+            new FunctionDefinition[] {definition});
+
+    // List functions
+    NameIdentifier[] functions = 
functionCatalog.listFunctions(Namespace.of(schemaName));
+    Assertions.assertEquals(2, functions.length);
+
+    Set<String> functionNames =
+        
Arrays.stream(functions).map(NameIdentifier::name).collect(Collectors.toSet());
+    Assertions.assertTrue(functionNames.contains(func1Name));
+    Assertions.assertTrue(functionNames.contains(func2Name));
+
+    // List function infos
+    Function[] functionInfos = 
functionCatalog.listFunctionInfos(Namespace.of(schemaName));
+    Assertions.assertEquals(2, functionInfos.length);
+
+    Set<Function> functionSet = new HashSet<>(Arrays.asList(functionInfos));
+    Assertions.assertTrue(functionSet.contains(func1));
+    Assertions.assertTrue(functionSet.contains(func2));
+  }
+
+  @Test
+  public void testDropFunction() {
+    String functionName = GravitinoITUtils.genRandomName("drop_func");
+    NameIdentifier ident = NameIdentifier.of(schemaName, functionName);
+
+    FunctionParam param = FunctionParams.of("x", Types.IntegerType.get());
+    FunctionImpl impl = FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, 
"SELECT x + 1");
+    FunctionDefinition definition =
+        FunctionDefinitions.of(new FunctionParam[] {param}, new FunctionImpl[] 
{impl});
+
+    functionCatalog.registerFunction(
+        ident,
+        "comment",
+        FunctionType.SCALAR,
+        true,
+        Types.IntegerType.get(),
+        new FunctionDefinition[] {definition});
+
+    // Function exists
+    Assertions.assertTrue(functionCatalog.functionExists(ident));
+
+    // Drop function
+    Assertions.assertTrue(functionCatalog.dropFunction(ident));
+
+    // Function no longer exists
+    Assertions.assertFalse(functionCatalog.functionExists(ident));
+
+    // Drop again should return false
+    Assertions.assertFalse(functionCatalog.dropFunction(ident));
+  }
+
+  @Test
+  public void testAlterFunctionUpdateComment() {
+    String functionName = GravitinoITUtils.genRandomName("alter_func");
+    NameIdentifier ident = NameIdentifier.of(schemaName, functionName);
+
+    FunctionParam param = FunctionParams.of("x", Types.IntegerType.get());
+    FunctionImpl impl = FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, 
"SELECT x + 1");
+    FunctionDefinition definition =
+        FunctionDefinitions.of(new FunctionParam[] {param}, new FunctionImpl[] 
{impl});
+
+    functionCatalog.registerFunction(
+        ident,
+        "original comment",
+        FunctionType.SCALAR,
+        true,
+        Types.IntegerType.get(),
+        new FunctionDefinition[] {definition});
+
+    // Alter comment
+    Function altered =
+        functionCatalog.alterFunction(ident, 
FunctionChange.updateComment("updated comment"));
+
+    Assertions.assertEquals("updated comment", altered.comment());
+
+    // Verify the change persisted
+    Function loaded = functionCatalog.getFunction(ident);
+    Assertions.assertEquals("updated comment", loaded.comment());
+  }
+
+  @Test
+  public void testAlterFunctionAddDefinition() {
+    String functionName = GravitinoITUtils.genRandomName("alter_def_func");
+    NameIdentifier ident = NameIdentifier.of(schemaName, functionName);
+
+    // Create initial function with one definition
+    FunctionParam param1 = FunctionParams.of("x", Types.IntegerType.get());
+    FunctionImpl impl1 = FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, 
"SELECT x + 1");
+    FunctionDefinition definition1 =
+        FunctionDefinitions.of(new FunctionParam[] {param1}, new 
FunctionImpl[] {impl1});
+
+    functionCatalog.registerFunction(
+        ident,
+        "comment",
+        FunctionType.SCALAR,
+        true,
+        Types.IntegerType.get(),
+        new FunctionDefinition[] {definition1});
+
+    // Add a new definition with different parameter type
+    FunctionParam param2 = FunctionParams.of("x", Types.StringType.get());
+    FunctionImpl impl2 =
+        FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, "SELECT CONCAT(x, 
'_suffix')");
+    FunctionDefinition definition2 =
+        FunctionDefinitions.of(new FunctionParam[] {param2}, new 
FunctionImpl[] {impl2});
+
+    Function altered =
+        functionCatalog.alterFunction(ident, 
FunctionChange.addDefinition(definition2));
+
+    Assertions.assertEquals(2, altered.definitions().length);
+  }
+
+  @Test
+  public void testAlterFunctionImplOperations() {
+    String functionName = GravitinoITUtils.genRandomName("impl_ops_func");
+    NameIdentifier ident = NameIdentifier.of(schemaName, functionName);
+
+    // Create function with Spark implementation
+    FunctionParam param = FunctionParams.of("x", Types.IntegerType.get());
+    FunctionImpl sparkImpl = 
FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, "SELECT x + 1");
+    FunctionDefinition definition =
+        FunctionDefinitions.of(new FunctionParam[] {param}, new FunctionImpl[] 
{sparkImpl});
+
+    functionCatalog.registerFunction(
+        ident,
+        "comment",
+        FunctionType.SCALAR,
+        true,
+        Types.IntegerType.get(),
+        new FunctionDefinition[] {definition});
+
+    // Test 1: Add Trino implementation
+    FunctionImpl trinoImpl = 
FunctionImpls.ofSql(FunctionImpl.RuntimeType.TRINO, "SELECT x + 1");
+    Function afterAdd =
+        functionCatalog.alterFunction(
+            ident, FunctionChange.addImpl(new FunctionParam[] {param}, 
trinoImpl));
+
+    Assertions.assertEquals(1, afterAdd.definitions().length);
+    Assertions.assertEquals(2, afterAdd.definitions()[0].impls().length);
+
+    // Test 2: Update Spark implementation
+    FunctionImpl newSparkImpl =
+        FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, "SELECT x + 10");
+    Function afterUpdate =
+        functionCatalog.alterFunction(
+            ident,
+            FunctionChange.updateImpl(
+                new FunctionParam[] {param}, FunctionImpl.RuntimeType.SPARK, 
newSparkImpl));
+
+    Assertions.assertEquals(1, afterUpdate.definitions().length);
+    Assertions.assertEquals(2, afterUpdate.definitions()[0].impls().length);
+
+    // Test 3: Remove Trino implementation
+    Function afterRemove =
+        functionCatalog.alterFunction(
+            ident,
+            FunctionChange.removeImpl(new FunctionParam[] {param}, 
FunctionImpl.RuntimeType.TRINO));
+
+    Assertions.assertEquals(1, afterRemove.definitions().length);
+    Assertions.assertEquals(1, afterRemove.definitions()[0].impls().length);
+    Assertions.assertEquals(
+        FunctionImpl.RuntimeType.SPARK, 
afterRemove.definitions()[0].impls()[0].runtime());
+
+    // Test 4: Remove the last impl should fail
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            functionCatalog.alterFunction(
+                ident,
+                FunctionChange.removeImpl(
+                    new FunctionParam[] {param}, 
FunctionImpl.RuntimeType.SPARK)));
+  }
+
+  @Test
+  public void testRegisterFunctionWithImpls() {
+    // Test 1: Register function with Java impl
+    String javaFuncName = GravitinoITUtils.genRandomName("java_func");
+    NameIdentifier javaIdent = NameIdentifier.of(schemaName, javaFuncName);
+
+    FunctionParam javaParam = FunctionParams.of("x", Types.IntegerType.get());
+    FunctionImpl javaImpl =
+        FunctionImpls.ofJava(FunctionImpl.RuntimeType.SPARK, 
"com.example.AddOneUDF");
+    FunctionDefinition javaDefinition =
+        FunctionDefinitions.of(new FunctionParam[] {javaParam}, new 
FunctionImpl[] {javaImpl});
+
+    Function javaRegistered =
+        functionCatalog.registerFunction(
+            javaIdent,
+            "Java UDF",
+            FunctionType.SCALAR,
+            true,
+            Types.IntegerType.get(),
+            new FunctionDefinition[] {javaDefinition});
+
+    Assertions.assertEquals(javaFuncName, javaRegistered.name());
+    Assertions.assertEquals(
+        FunctionImpl.Language.JAVA, 
javaRegistered.definitions()[0].impls()[0].language());
+
+    // Test 2: Register function with Python impl
+    String pythonFuncName = GravitinoITUtils.genRandomName("python_func");
+    NameIdentifier pythonIdent = NameIdentifier.of(schemaName, pythonFuncName);
+
+    FunctionParam pythonParam = FunctionParams.of("x", 
Types.IntegerType.get());
+    FunctionImpl pythonImpl =
+        FunctionImpls.ofPython(
+            FunctionImpl.RuntimeType.SPARK,
+            "add_one",
+            "def add_one(x):\n    return x + 1",
+            null,
+            null);
+    FunctionDefinition pythonDefinition =
+        FunctionDefinitions.of(new FunctionParam[] {pythonParam}, new 
FunctionImpl[] {pythonImpl});
+
+    Function pythonRegistered =
+        functionCatalog.registerFunction(
+            pythonIdent,
+            "Python UDF",
+            FunctionType.SCALAR,
+            true,
+            Types.IntegerType.get(),
+            new FunctionDefinition[] {pythonDefinition});
+
+    Assertions.assertEquals(pythonFuncName, pythonRegistered.name());
+    Assertions.assertEquals(
+        FunctionImpl.Language.PYTHON, 
pythonRegistered.definitions()[0].impls()[0].language());
+
+    // Test 3: Register function with multiple definitions
+    String multiFuncName = GravitinoITUtils.genRandomName("multi_def_func");
+    NameIdentifier multiIdent = NameIdentifier.of(schemaName, multiFuncName);
+
+    FunctionParam param1 = FunctionParams.of("x", Types.IntegerType.get());
+    FunctionImpl impl1 = FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, 
"SELECT x + 1");
+    FunctionDefinition definition1 =
+        FunctionDefinitions.of(new FunctionParam[] {param1}, new 
FunctionImpl[] {impl1});
+
+    FunctionParam param2 = FunctionParams.of("x", Types.StringType.get());
+    FunctionImpl impl2 =
+        FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, "SELECT CONCAT(x, 
'_suffix')");
+    FunctionDefinition definition2 =
+        FunctionDefinitions.of(new FunctionParam[] {param2}, new 
FunctionImpl[] {impl2});
+
+    Function multiRegistered =
+        functionCatalog.registerFunction(
+            multiIdent,
+            "Overloaded function",
+            FunctionType.SCALAR,
+            true,
+            Types.StringType.get(),
+            new FunctionDefinition[] {definition1, definition2});
+
+    Assertions.assertEquals(2, multiRegistered.definitions().length);
+  }
+
+  @Test
+  public void testAlterFunctionRemoveDefinition() {
+    String functionName = GravitinoITUtils.genRandomName("remove_def_func");
+    NameIdentifier ident = NameIdentifier.of(schemaName, functionName);
+
+    // Create function with two definitions
+    FunctionParam param1 = FunctionParams.of("x", Types.IntegerType.get());
+    FunctionImpl impl1 = FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, 
"SELECT x + 1");
+    FunctionDefinition definition1 =
+        FunctionDefinitions.of(new FunctionParam[] {param1}, new 
FunctionImpl[] {impl1});
+
+    FunctionParam param2 = FunctionParams.of("x", Types.StringType.get());
+    FunctionImpl impl2 =
+        FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, "SELECT CONCAT(x, 
'_suffix')");
+    FunctionDefinition definition2 =
+        FunctionDefinitions.of(new FunctionParam[] {param2}, new 
FunctionImpl[] {impl2});
+
+    functionCatalog.registerFunction(
+        ident,
+        "comment",
+        FunctionType.SCALAR,
+        true,
+        Types.StringType.get(),
+        new FunctionDefinition[] {definition1, definition2});
+
+    // Remove one definition
+    Function altered =
+        functionCatalog.alterFunction(
+            ident, FunctionChange.removeDefinition(new FunctionParam[] 
{param1}));
+
+    Assertions.assertEquals(1, altered.definitions().length);
+    // The remaining definition should be the one with String parameter
+    Assertions.assertEquals(
+        Types.StringType.get(), 
altered.definitions()[0].parameters()[0].dataType());
+
+    // Test removing the last definition should fail
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            functionCatalog.alterFunction(
+                ident, FunctionChange.removeDefinition(new FunctionParam[] 
{param2})));
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index 1a1110143a..ceeb6baef7 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -254,7 +254,7 @@ public class JDBCBackend implements RelationalBackend {
       case MODEL_VERSION:
         return (E) 
ModelVersionMetaService.getInstance().updateModelVersion(ident, updater);
       case FUNCTION:
-        throw new UnsupportedOperationException("updateFunction not 
implemented yet");
+        return (E) FunctionMetaService.getInstance().updateFunction(ident, 
updater);
       case POLICY:
         return (E) PolicyMetaService.getInstance().updatePolicy(ident, 
updater);
       case JOB_TEMPLATE:
@@ -353,7 +353,7 @@ public class JDBCBackend implements RelationalBackend {
       case MODEL_VERSION:
         return ModelVersionMetaService.getInstance().deleteModelVersion(ident);
       case FUNCTION:
-        throw new UnsupportedOperationException("deleteFunction not 
implemented yet");
+        return FunctionMetaService.getInstance().deleteFunction(ident);
       case POLICY:
         return PolicyMetaService.getInstance().deletePolicy(ident);
       case JOB_TEMPLATE:

Reply via email to