This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 691c3f1fef [#9532] feat(spark-connector): Add FunctionCatalog support 
for Gravitino UDFs (#9580)
691c3f1fef is described below

commit 691c3f1feffe40a15ccfa9e06c1fa57721c861ee
Author: mchades <[email protected]>
AuthorDate: Wed Feb 11 23:21:33 2026 +0800

    [#9532] feat(spark-connector): Add FunctionCatalog support for Gravitino 
UDFs (#9580)
    
    ### What changes were proposed in this pull request?
    
    This PR enables Spark V2 connector FunctionCatalog to load/list UDFs
    stored in Gravitino.
    
    ### Why are the changes needed?
    
    To support loading and listing User-Defined Functions (UDFs) from
    Gravitino in Spark connector, which is part of the broader UDF support
    initiative.
    
    Fix: #9532
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, Spark users can now use Gravitino-managed functions through the
    Spark SQL interface.
    
    ### How was this patch tested?
    
    Unit tests and integration tests are added to cover the new
    functionality.
---
 .../catalog/FunctionOperationDispatcher.java       |  36 ++++++
 .../catalog/ManagedFunctionOperations.java         |  27 +++++
 .../catalog/TestFunctionOperationDispatcher.java   | 120 +++++++++++++++++++
 .../catalog/TestManagedFunctionOperations.java     |  26 +++++
 .../spark/connector/catalog/BaseCatalog.java       | 127 ++++++++++++++++++---
 .../connector/iceberg/GravitinoIcebergCatalog.java |  19 ++-
 .../connector/functions/StringLengthFunction.java  |  88 ++++++++++++++
 .../connector/integration/test/SparkCommonIT.java  |  25 ++++
 .../connector/integration/test/SparkEnvIT.java     |  71 ++++++++++++
 .../test/iceberg/SparkIcebergCatalogIT.java        |  46 ++++++++
 .../integration/test/util/SparkUtilIT.java         |   4 +
 .../test/hive/SparkHiveCatalogIT33.java            |   6 +
 .../SparkIcebergCatalogHiveBackendIT33.java        |   6 +
 .../SparkIcebergCatalogRestBackendIT33.java        |   8 +-
 .../test/jdbc/SparkJdbcMysqlCatalogIT33.java       |   6 +
 .../test/jdbc/SparkJdbcPostgreSqlCatalogIT33.java  |   6 +
 .../SparkPaimonCatalogFilesystemBackendIT33.java   |   6 +
 17 files changed, 611 insertions(+), 16 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/FunctionOperationDispatcher.java
 
b/core/src/main/java/org/apache/gravitino/catalog/FunctionOperationDispatcher.java
index 7d88196887..ebc5f0d15d 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/FunctionOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/FunctionOperationDispatcher.java
@@ -18,6 +18,8 @@
  */
 package org.apache.gravitino.catalog;
 
+import static 
org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier;
+
 import com.google.common.base.Preconditions;
 import java.util.Arrays;
 import org.apache.gravitino.EntityStore;
@@ -44,6 +46,9 @@ import org.apache.gravitino.storage.IdGenerator;
  */
 public class FunctionOperationDispatcher extends OperationDispatcher 
implements FunctionDispatcher {
 
+  private static final String ICEBERG_PROVIDER = "lakehouse-iceberg";
+  private static final String ICEBERG_SYSTEM_SCHEMA = "system";
+
   private final SchemaOperationDispatcher schemaOps;
   private final ManagedFunctionOperations managedFunctionOps;
 
@@ -133,6 +138,8 @@ public class FunctionOperationDispatcher extends 
OperationDispatcher implements
     Preconditions.checkArgument(
         definitions != null && definitions.length > 0, "At least one 
definition is required");
 
+    validateNotIcebergReservedSchema(ident);
+
     NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
     // Validate schema exists in the underlying catalog
     schemaOps.loadSchema(schemaIdent);
@@ -159,6 +166,8 @@ public class FunctionOperationDispatcher extends 
OperationDispatcher implements
       throws NoSuchFunctionException, IllegalArgumentException {
     Preconditions.checkArgument(
         changes != null && changes.length > 0, "At least one change is 
required");
+    validateNotIcebergReservedSchema(ident);
+
     NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
     if (!schemaOps.schemaExists(schemaIdent)) {
       throw new NoSuchFunctionException("Schema does not exist: %s", 
schemaIdent);
@@ -176,6 +185,8 @@ public class FunctionOperationDispatcher extends 
OperationDispatcher implements
    */
   @Override
   public boolean dropFunction(NameIdentifier ident) {
+    validateNotIcebergReservedSchema(ident);
+
     NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
     if (!schemaOps.schemaExists(schemaIdent)) {
       return false;
@@ -184,4 +195,29 @@ public class FunctionOperationDispatcher extends 
OperationDispatcher implements
     return TreeLockUtils.doWithTreeLock(
         ident, LockType.WRITE, () -> managedFunctionOps.dropFunction(ident));
   }
+
+  /**
+   * Validates that the function operation is not targeting the Iceberg 
reserved "system" schema.
+   * Iceberg's "system" schema is reserved for built-in functions (e.g., 
iceberg_version, bucket,
+   * truncate) and should not allow user-managed function operations.
+   *
+   * @param ident The function identifier.
+   * @throws IllegalArgumentException If the function targets the Iceberg 
"system" schema.
+   */
+  private void validateNotIcebergReservedSchema(NameIdentifier ident) {
+    String schemaName = ident.namespace().level(ident.namespace().length() - 
1);
+    if (!ICEBERG_SYSTEM_SCHEMA.equals(schemaName)) {
+      return;
+    }
+
+    NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+    String provider =
+        doWithCatalog(catalogIdent, c -> c.catalog().provider(), 
IllegalArgumentException.class);
+    if (ICEBERG_PROVIDER.equals(provider)) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Cannot operate on functions in the Iceberg reserved schema 
\"%s\"",
+              ICEBERG_SYSTEM_SCHEMA));
+    }
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/ManagedFunctionOperations.java
 
b/core/src/main/java/org/apache/gravitino/catalog/ManagedFunctionOperations.java
index 067d68c131..7341052cce 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/ManagedFunctionOperations.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/ManagedFunctionOperations.java
@@ -164,6 +164,7 @@ public class ManagedFunctionOperations implements 
FunctionCatalog {
     Preconditions.checkArgument(
         definitions != null && definitions.length > 0,
         "At least one function definition must be provided");
+    validateDefinitionsNoRuntimeDuplicate(definitions);
     validateDefinitionsNoArityOverlap(definitions);
 
     String currentUser = PrincipalUtils.getCurrentUserName();
@@ -227,6 +228,32 @@ public class ManagedFunctionOperations implements 
FunctionCatalog {
     }
   }
 
+  /**
+   * Validates that no definition contains duplicate runtime types in its 
implementations. Each
+   * definition must have at most one implementation per runtime type.
+   *
+   * @param definitions The array of definitions to validate.
+   * @throws IllegalArgumentException If any definition has duplicate runtime 
types.
+   */
+  private void validateDefinitionsNoRuntimeDuplicate(FunctionDefinition[] 
definitions) {
+    for (int i = 0; i < definitions.length; i++) {
+      FunctionImpl[] impls = definitions[i].impls();
+      if (impls == null || impls.length <= 1) {
+        continue;
+      }
+      Set<FunctionImpl.RuntimeType> seenRuntimes = new HashSet<>();
+      for (FunctionImpl impl : impls) {
+        if (!seenRuntimes.add(impl.runtime())) {
+          throw new IllegalArgumentException(
+              String.format(
+                  "Cannot register function: definition at index %d has 
duplicate runtime '%s'. "
+                      + "Each definition must have at most one implementation 
per runtime.",
+                  i, impl.runtime()));
+        }
+      }
+    }
+  }
+
   private FunctionEntity applyChanges(FunctionEntity oldEntity, 
FunctionChange... changes) {
     String newComment = oldEntity.comment();
     List<FunctionDefinition> newDefinitions =
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestFunctionOperationDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestFunctionOperationDispatcher.java
new file mode 100644
index 0000000000..cac62bf4f2
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestFunctionOperationDispatcher.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.connector.BaseCatalog;
+import org.apache.gravitino.function.FunctionChange;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.storage.IdGenerator;
+import org.apache.gravitino.storage.RandomIdGenerator;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestFunctionOperationDispatcher {
+
+  private static final String METALAKE = "test_metalake";
+  private static final String ICEBERG_CATALOG = "iceberg_catalog";
+  private static final String HIVE_CATALOG = "hive_catalog";
+
+  private final IdGenerator idGenerator = new RandomIdGenerator();
+
+  private FunctionOperationDispatcher dispatcher;
+  private CatalogManager catalogManager;
+  private SchemaOperationDispatcher schemaOps;
+  private EntityStore store;
+
+  @BeforeEach
+  public void setUp() {
+    catalogManager = mock(CatalogManager.class);
+    schemaOps = mock(SchemaOperationDispatcher.class);
+    store = mock(EntityStore.class);
+
+    CatalogManager.CatalogWrapper icebergWrapper = 
createMockCatalogWrapper("lakehouse-iceberg");
+    CatalogManager.CatalogWrapper hiveWrapper = 
createMockCatalogWrapper("hive");
+
+    when(catalogManager.loadCatalogAndWrap(NameIdentifier.of(METALAKE, 
ICEBERG_CATALOG)))
+        .thenReturn(icebergWrapper);
+    when(catalogManager.loadCatalogAndWrap(NameIdentifier.of(METALAKE, 
HIVE_CATALOG)))
+        .thenReturn(hiveWrapper);
+
+    dispatcher = new FunctionOperationDispatcher(catalogManager, schemaOps, 
store, idGenerator);
+  }
+
+  @Test
+  public void testIcebergSystemSchemaRestriction() {
+    NameIdentifier icebergSystemFunc =
+        NameIdentifier.of(Namespace.of(METALAKE, ICEBERG_CATALOG, "system"), 
"my_func");
+    FunctionDefinition[] defs = new FunctionDefinition[] 
{mock(FunctionDefinition.class)};
+
+    // All write operations on Iceberg system schema should be rejected
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> dispatcher.registerFunction(icebergSystemFunc, "c", 
FunctionType.SCALAR, true, defs));
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            dispatcher.alterFunction(
+                icebergSystemFunc, FunctionChange.updateComment("new 
comment")));
+    Assertions.assertThrows(
+        IllegalArgumentException.class, () -> 
dispatcher.dropFunction(icebergSystemFunc));
+
+    // Non-system schema in Iceberg catalog should pass validation
+    NameIdentifier icebergUserFunc =
+        NameIdentifier.of(Namespace.of(METALAKE, ICEBERG_CATALOG, 
"user_schema"), "my_func");
+    try {
+      dispatcher.registerFunction(icebergUserFunc, "c", FunctionType.SCALAR, 
true, defs);
+    } catch (IllegalArgumentException e) {
+      Assertions.assertFalse(
+          e.getMessage().contains("Iceberg reserved schema"),
+          "Should not reject non-system schema");
+    } catch (Exception ignored) {
+      // Other exceptions (e.g., LockManager not initialized) are expected in 
unit test
+    }
+
+    // System schema in non-Iceberg catalog should pass validation
+    NameIdentifier hiveSystemFunc =
+        NameIdentifier.of(Namespace.of(METALAKE, HIVE_CATALOG, "system"), 
"my_func");
+    try {
+      dispatcher.registerFunction(hiveSystemFunc, "c", FunctionType.SCALAR, 
true, defs);
+    } catch (IllegalArgumentException e) {
+      Assertions.assertFalse(
+          e.getMessage().contains("Iceberg reserved schema"),
+          "Should not reject system schema in non-Iceberg catalog");
+    } catch (Exception ignored) {
+      // Other exceptions (e.g., LockManager not initialized) are expected in 
unit test
+    }
+  }
+
+  private CatalogManager.CatalogWrapper createMockCatalogWrapper(String 
provider) {
+    BaseCatalog catalog = mock(BaseCatalog.class);
+    when(catalog.provider()).thenReturn(provider);
+
+    CatalogManager.CatalogWrapper wrapper = 
mock(CatalogManager.CatalogWrapper.class);
+    when(wrapper.catalog()).thenReturn(catalog);
+    return wrapper;
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestManagedFunctionOperations.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestManagedFunctionOperations.java
index 28a12ae951..ff3b6638e2 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestManagedFunctionOperations.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestManagedFunctionOperations.java
@@ -530,6 +530,32 @@ public class TestManagedFunctionOperations {
                 funcIdent, FunctionChange.addImpl(params, anotherSparkImpl)));
   }
 
+  @Test
+  public void testRegisterFunctionWithDuplicateRuntimeInDefinition() {
+    NameIdentifier funcIdent = getFunctionIdent("func_register_dup_runtime");
+    FunctionParam[] params = new FunctionParam[] {FunctionParams.of("a", 
Types.IntegerType.get())};
+    FunctionImpl sparkImpl1 =
+        FunctionImpls.ofJava(FunctionImpl.RuntimeType.SPARK, 
"com.example.SparkUDF1");
+    FunctionImpl sparkImpl2 =
+        FunctionImpls.ofJava(FunctionImpl.RuntimeType.SPARK, 
"com.example.SparkUDF2");
+
+    FunctionDefinition[] definitions =
+        new FunctionDefinition[] {
+          createDefinitionWithImpls(
+              params, Types.StringType.get(), new FunctionImpl[] {sparkImpl1, 
sparkImpl2})
+        };
+
+    IllegalArgumentException ex =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                functionOperations.registerFunction(
+                    funcIdent, "Test function", FunctionType.SCALAR, true, 
definitions));
+    Assertions.assertTrue(
+        ex.getMessage().contains("duplicate runtime"),
+        "Expected error about duplicate runtime, got: " + ex.getMessage());
+  }
+
   @Test
   public void testAlterFunctionAddImplToNonExistingDefinition() {
     NameIdentifier funcIdent = getFunctionIdent("func_add_impl_nodef");
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
index 4ec2652cd2..fc02032593 100644
--- 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
@@ -34,6 +34,10 @@ import org.apache.gravitino.exceptions.ForbiddenException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NonEmptySchemaException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.JavaImpl;
 import org.apache.gravitino.spark.connector.ConnectorConstants;
 import org.apache.gravitino.spark.connector.PropertiesConverter;
 import org.apache.gravitino.spark.connector.SparkTableChangeConverter;
@@ -41,10 +45,12 @@ import 
org.apache.gravitino.spark.connector.SparkTransformConverter;
 import 
org.apache.gravitino.spark.connector.SparkTransformConverter.DistributionAndSortOrdersInfo;
 import org.apache.gravitino.spark.connector.SparkTypeConverter;
 import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.NamespaceChange;
 import org.apache.spark.sql.connector.catalog.NamespaceChange.SetProperty;
@@ -52,6 +58,7 @@ import 
org.apache.spark.sql.connector.catalog.SupportsNamespaces;
 import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
@@ -69,7 +76,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
  * needed, optimizing resource utilization and minimizing the overhead 
associated with
  * initialization.
  */
-public abstract class BaseCatalog implements TableCatalog, SupportsNamespaces {
+public abstract class BaseCatalog implements TableCatalog, SupportsNamespaces, 
FunctionCatalog {
 
   // The specific Spark catalog to do IO operations, different catalogs have 
different spark catalog
   // implementations, like HiveTableCatalog for Hive, JDBCTableCatalog for 
JDBC, SparkCatalog for
@@ -168,13 +175,7 @@ public abstract class BaseCatalog implements TableCatalog, 
SupportsNamespaces {
 
   @Override
   public Identifier[] listTables(String[] namespace) throws 
NoSuchNamespaceException {
-    String gravitinoNamespace;
-    if (namespace.length == 0) {
-      gravitinoNamespace = getCatalogDefaultNamespace();
-    } else {
-      validateNamespace(namespace);
-      gravitinoNamespace = namespace[0];
-    }
+    String gravitinoNamespace = getDatabase(namespace);
     try {
       NameIdentifier[] identifiers =
           
gravitinoCatalogClient.asTableCatalog().listTables(Namespace.of(gravitinoNamespace));
@@ -431,10 +432,70 @@ public abstract class BaseCatalog implements 
TableCatalog, SupportsNamespaces {
   }
 
   protected String getDatabase(Identifier sparkIdentifier) {
-    if (sparkIdentifier.namespace().length > 0) {
-      return sparkIdentifier.namespace()[0];
+    return getDatabase(sparkIdentifier.namespace());
+  }
+
+  /**
+   * Get the database name from the namespace array.
+   *
+   * @param namespace the namespace array
+   * @return the database name, or the catalog default namespace if the 
namespace is empty
+   */
+  protected String getDatabase(String[] namespace) {
+    if (namespace.length == 0) {
+      return getCatalogDefaultNamespace();
+    }
+    validateNamespace(namespace);
+    return namespace[0];
+  }
+
+  @Override
+  public Identifier[] listFunctions(String[] namespace) throws 
NoSuchNamespaceException {
+    String gravitinoNamespace = getDatabase(namespace);
+    try {
+      Function[] functions =
+          gravitinoCatalogClient
+              .asFunctionCatalog()
+              .listFunctionInfos(Namespace.of(gravitinoNamespace));
+      // Filter functions that have Spark runtime implementation
+      return Arrays.stream(functions)
+          .filter(this::hasSparkImplementation)
+          .map(f -> Identifier.of(new String[] {gravitinoNamespace}, f.name()))
+          .toArray(Identifier[]::new);
+    } catch (NoSuchSchemaException e) {
+      throw new NoSuchNamespaceException(namespace);
+    }
+  }
+
+  @Override
+  public UnboundFunction loadFunction(Identifier ident) throws 
NoSuchFunctionException {
+    String[] namespace = ident.namespace();
+    if (namespace.length == 0) {
+      namespace = new String[] {getCatalogDefaultNamespace()};
+      ident = Identifier.of(namespace, ident.name());
     }
-    return getCatalogDefaultNamespace();
+    validateNamespace(namespace);
+
+    NameIdentifier gravitinoIdentifier = NameIdentifier.of(getDatabase(ident), 
ident.name());
+    try {
+      Function function =
+          
gravitinoCatalogClient.asFunctionCatalog().getFunction(gravitinoIdentifier);
+      for (FunctionDefinition definition : function.definitions()) {
+        for (FunctionImpl impl : definition.impls()) {
+          if (!isSparkImplementation(impl)) {
+            continue;
+          }
+          String className = extractClassName(impl);
+          if (StringUtils.isBlank(className)) {
+            continue;
+          }
+          return instantiateFunction(className, ident);
+        }
+      }
+    } catch (org.apache.gravitino.exceptions.NoSuchFunctionException e) {
+      throw new NoSuchFunctionException(ident);
+    }
+    throw new NoSuchFunctionException(ident);
   }
 
   private void validateNamespace(String[] namespace) {
@@ -443,11 +504,11 @@ public abstract class BaseCatalog implements 
TableCatalog, SupportsNamespaces {
         "Doesn't support multi level namespaces: " + String.join(".", 
namespace));
   }
 
-  private String getCatalogDefaultNamespace() {
+  protected String getCatalogDefaultNamespace() {
     String[] catalogDefaultNamespace = sparkCatalog.defaultNamespace();
     Preconditions.checkArgument(
         catalogDefaultNamespace != null && catalogDefaultNamespace.length == 1,
-        "Catalog default namespace is not valid");
+        "Catalog default namespace is not valid: " + 
Arrays.toString(catalogDefaultNamespace));
     return catalogDefaultNamespace[0];
   }
 
@@ -470,6 +531,46 @@ public abstract class BaseCatalog implements TableCatalog, 
SupportsNamespaces {
     return gravitinoIdentifier.namespace().level(0);
   }
 
+  private boolean hasSparkImplementation(Function function) {
+    for (FunctionDefinition definition : function.definitions()) {
+      for (FunctionImpl impl : definition.impls()) {
+        if (isSparkImplementation(impl)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private boolean isSparkImplementation(FunctionImpl impl) {
+    return FunctionImpl.RuntimeType.SPARK.equals(impl.runtime());
+  }
+
+  private String extractClassName(FunctionImpl impl) {
+    if (impl instanceof JavaImpl) {
+      return ((JavaImpl) impl).className();
+    }
+    throw new IllegalArgumentException(
+        String.format("Unsupported function implementation %s", 
impl.getClass().getName()));
+  }
+
+  private UnboundFunction instantiateFunction(String className, Identifier 
ident)
+      throws NoSuchFunctionException {
+    try {
+      // Use context classloader to work with Spark's isolated plugin 
classloaders
+      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+      Class<?> functionClass = Class.forName(className, true, classLoader);
+      Object instance = functionClass.getDeclaredConstructor().newInstance();
+      if (instance instanceof UnboundFunction) {
+        return (UnboundFunction) instance;
+      }
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException(
+          String.format("Failed to instantiate function class: %s", 
className), e);
+    }
+    throw new NoSuchFunctionException(ident);
+  }
+
   private Table loadSparkTable(Identifier ident) {
     try {
       return sparkCatalog.loadTable(ident);
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java
index e6d59c853b..4ccacdec1d 100644
--- 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java
@@ -109,12 +109,16 @@ public class GravitinoIcebergCatalog extends BaseCatalog
 
   @Override
   public Identifier[] listFunctions(String[] namespace) throws 
NoSuchNamespaceException {
-    return ((SparkCatalog) sparkCatalog).listFunctions(namespace);
+    return isIcebergFunctionNamespace(namespace)
+        ? ((SparkCatalog) sparkCatalog).listFunctions(namespace)
+        : super.listFunctions(namespace);
   }
 
   @Override
   public UnboundFunction loadFunction(Identifier ident) throws 
NoSuchFunctionException {
-    return ((SparkCatalog) sparkCatalog).loadFunction(ident);
+    return isIcebergFunctionNamespace(ident.namespace())
+        ? ((SparkCatalog) sparkCatalog).loadFunction(ident)
+        : super.loadFunction(ident);
   }
 
   /**
@@ -190,6 +194,17 @@ public class GravitinoIcebergCatalog extends BaseCatalog
     }
   }
 
+  private boolean isIcebergFunctionNamespace(String[] namespace) {
+    try {
+      return namespace.length == 0 || isSystemNamespace(namespace);
+    } catch (IllegalAccessException
+        | InvocationTargetException
+        | NoSuchMethodException
+        | ClassNotFoundException e) {
+      throw new RuntimeException("Failed to check Iceberg function namespace", 
e);
+    }
+  }
+
   private boolean isSystemNamespace(String[] namespace)
       throws NoSuchMethodException, InvocationTargetException, 
IllegalAccessException,
           ClassNotFoundException {
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/functions/StringLengthFunction.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/functions/StringLengthFunction.java
new file mode 100644
index 0000000000..56cd876247
--- /dev/null
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/functions/StringLengthFunction.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.spark.connector.functions;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A test UDF implementation that returns the length of a string. This is used 
for testing
+ * Gravitino's UDF support in Spark.
+ */
+public class StringLengthFunction implements UnboundFunction {
+
+  @Override
+  public String name() {
+    return "my_string_length";
+  }
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.fields().length != 1) {
+      throw new UnsupportedOperationException("my_string_length expects 
exactly one argument");
+    }
+    if (!inputType.fields()[0].dataType().equals(DataTypes.StringType)) {
+      throw new UnsupportedOperationException(
+          "my_string_length expects a string argument, got: " + 
inputType.fields()[0].dataType());
+    }
+    return new BoundStringLengthFunction();
+  }
+
+  @Override
+  public String description() {
+    return "Returns the length of a string";
+  }
+
+  private static class BoundStringLengthFunction implements 
ScalarFunction<Integer> {
+    @Override
+    public String name() {
+      return "my_string_length";
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.StringType};
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+
+    @Override
+    public String canonicalName() {
+      return "gravitino.my_string_length";
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      if (input.isNullAt(0)) {
+        return null;
+      }
+      UTF8String str = input.getUTF8String(0);
+      return str.numChars();
+    }
+  }
+}
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
index c74f095dbd..ebc3f07d83 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
@@ -196,6 +196,31 @@ public abstract class SparkCommonIT extends SparkEnvIT {
         NoSuchNamespaceException.class, () -> sql("SHOW TABLES IN 
nonexistent_schema"));
   }
 
+  @Test
+  @EnabledIf("supportsFunction")
+  void testCallUDF() {
+    // test call function
+    List<String> data =
+        getQueryData(String.format("SELECT %s.%s('abc')", functionSchemaName, 
functionName));
+    Assertions.assertEquals(1, data.size());
+    Assertions.assertEquals("3", data.get(0));
+  }
+
+  @Test
+  @EnabledIf("supportsFunction")
+  void testListFunctions() {
+    // Test list functions - should only include Spark runtime functions
+    Set<String> functionNames = listUserFunctions(functionSchemaName);
+    Assertions.assertTrue(
+        functionNames.contains(
+            String.join(".", getCatalogName(), functionSchemaName, 
functionName)));
+
+    // Non-Spark function should NOT be listed
+    Assertions.assertFalse(
+        functionNames.contains(
+            String.join(".", getCatalogName(), functionSchemaName, 
nonSparkFunctionName)));
+  }
+
   @Test
   void testLoadCatalogs() {
     Set<String> catalogs = getCatalogs();
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkEnvIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkEnvIT.java
index 9d1f72a1b3..d9c4e8a247 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkEnvIT.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkEnvIT.java
@@ -27,10 +27,20 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionDefinitions;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.FunctionImpls;
+import org.apache.gravitino.function.FunctionParam;
+import org.apache.gravitino.function.FunctionParams;
+import org.apache.gravitino.function.FunctionType;
 import org.apache.gravitino.integration.test.container.ContainerSuite;
 import org.apache.gravitino.integration.test.container.HiveContainer;
+import org.apache.gravitino.rel.types.Types;
 import org.apache.gravitino.spark.connector.GravitinoSparkConfig;
+import org.apache.gravitino.spark.connector.functions.StringLengthFunction;
 import org.apache.gravitino.spark.connector.iceberg.IcebergPropertiesConstants;
 import org.apache.gravitino.spark.connector.integration.test.util.SparkUtilIT;
 import org.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin;
@@ -55,6 +65,9 @@ public abstract class SparkEnvIT extends SparkUtilIT {
   protected String warehouse;
   protected FileSystem hdfs;
   protected String icebergRestServiceUri;
+  protected final String functionSchemaName = "test_function_schema";
+  protected final String functionName = "my_string_length";
+  protected final String nonSparkFunctionName = "trino_upper";
 
   private final String metalakeName = "test";
   private SparkSession sparkSession;
@@ -67,6 +80,10 @@ public abstract class SparkEnvIT extends SparkUtilIT {
 
   protected abstract Map<String, String> getCatalogConfigs();
 
+  protected boolean supportsFunction() {
+    return true;
+  }
+
   @Override
   protected SparkSession getSparkSession() {
     Assertions.assertNotNull(sparkSession);
@@ -87,6 +104,9 @@ public abstract class SparkEnvIT extends SparkUtilIT {
     initHdfsFileSystem();
     initGravitinoEnv();
     initMetalakeAndCatalogs();
+    if (supportsFunction()) {
+      initSchemaAndFunction();
+    }
     initSparkEnv();
     LOG.info(
         "Startup Spark env successfully, Gravitino uri: {}, Hive metastore 
uri: {}",
@@ -130,6 +150,57 @@ public abstract class SparkEnvIT extends SparkUtilIT {
         getCatalogName(), Catalog.Type.RELATIONAL, getProvider(), "", 
properties);
   }
 
+  private void initSchemaAndFunction() {
+    GravitinoMetalake metalake = client.loadMetalake(metalakeName);
+    Catalog catalog = metalake.loadCatalog(getCatalogName());
+    if (!catalog.asSchemas().schemaExists(functionSchemaName)) {
+      catalog.asSchemas().createSchema(functionSchemaName, "", 
Collections.emptyMap());
+    }
+
+    // Register a Spark runtime function
+    if (!catalog
+        .asFunctionCatalog()
+        .functionExists(NameIdentifier.of(functionSchemaName, functionName))) {
+      catalog
+          .asFunctionCatalog()
+          .registerFunction(
+              NameIdentifier.of(functionSchemaName, functionName),
+              "Returns the length of a string",
+              FunctionType.SCALAR,
+              true /* deterministic */,
+              new FunctionDefinition[] {
+                FunctionDefinitions.of(
+                    new FunctionParam[] {FunctionParams.of("input", 
Types.StringType.get())},
+                    Types.IntegerType.get(),
+                    new FunctionImpl[] {
+                      FunctionImpls.ofJava(
+                          FunctionImpl.RuntimeType.SPARK, 
StringLengthFunction.class.getName())
+                    })
+              });
+    }
+
+    // Register a non-Spark (TRINO) runtime function for testing filtering
+    if (!catalog
+        .asFunctionCatalog()
+        .functionExists(NameIdentifier.of(functionSchemaName, 
nonSparkFunctionName))) {
+      catalog
+          .asFunctionCatalog()
+          .registerFunction(
+              NameIdentifier.of(functionSchemaName, nonSparkFunctionName),
+              "Converts a string to uppercase (Trino implementation)",
+              FunctionType.SCALAR,
+              true /* deterministic */,
+              new FunctionDefinition[] {
+                FunctionDefinitions.of(
+                    new FunctionParam[] {FunctionParams.of("input", 
Types.StringType.get())},
+                    Types.StringType.get(),
+                    new FunctionImpl[] {
+                      FunctionImpls.ofJava(FunctionImpl.RuntimeType.TRINO, 
"com.example.TrinoUpper")
+                    })
+              });
+    }
+  }
+
   private void initGravitinoEnv() {
     // Gravitino server is already started by AbstractIT, just construct 
gravitinoUrl
     int gravitinoPort = getGravitinoServerPort();
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
index 0360bea207..92d9d5ca8a 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
@@ -62,6 +62,7 @@ import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
@@ -1272,4 +1273,49 @@ public abstract class SparkIcebergCatalogIT extends 
SparkCommonIT {
       return new IcebergTableWriteProperties(isPartitionedTable, 
formatVersion, writeMode);
     }
   }
+
+  @Test
+  @EnabledIf("supportsFunction")
+  void testListFunctionsWithIcebergBuiltins() {
+    // Test Gravitino function listing
+    Set<String> gravitinoFunctions = listUserFunctions(functionSchemaName);
+    Assertions.assertTrue(
+        gravitinoFunctions.contains(
+            String.join(".", getCatalogName(), functionSchemaName, 
functionName)));
+
+    // Non-Spark function should NOT be listed
+    Assertions.assertFalse(
+        gravitinoFunctions.contains(
+            String.join(".", getCatalogName(), functionSchemaName, 
nonSparkFunctionName)));
+
+    // Test Iceberg built-in functions are also listed
+    Set<String> systemFunctions = listUserFunctions("system");
+    Assertions.assertTrue(
+        systemFunctions.stream().anyMatch(f -> f.contains("iceberg_version")),
+        "Iceberg built-in function 'iceberg_version' should be listed");
+    Assertions.assertTrue(
+        systemFunctions.stream().anyMatch(f -> f.contains("bucket")),
+        "Iceberg built-in function 'bucket' should be listed");
+  }
+
+  @Test
+  @EnabledIf("supportsFunction")
+  void testCallUDFAndIcebergBuiltins() {
+    // Test Gravitino UDF
+    List<String> gravitinoUdfResult =
+        getQueryData(String.format("SELECT %s.%s('abc')", functionSchemaName, 
functionName));
+    Assertions.assertEquals(1, gravitinoUdfResult.size());
+    Assertions.assertEquals("3", gravitinoUdfResult.get(0));
+
+    // Test Iceberg built-in functions can be called
+    List<String> icebergVersionResult =
+        getQueryData(String.format("SELECT %s.system.iceberg_version()", 
getCatalogName()));
+    Assertions.assertEquals(1, icebergVersionResult.size());
+    Assertions.assertFalse(icebergVersionResult.get(0).isEmpty());
+
+    List<String> bucketResult =
+        getQueryData(String.format("SELECT %s.system.bucket(2, 100)", 
getCatalogName()));
+    Assertions.assertEquals(1, bucketResult.size());
+    Assertions.assertEquals("0", bucketResult.get(0));
+  }
 }
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
index f5f480ff28..0ea383e406 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
@@ -68,6 +68,10 @@ public abstract class SparkUtilIT extends BaseIT {
     return convertToStringSet(sql("SHOW TABLES in " + database), 1);
   }
 
+  protected Set<String> listUserFunctions(String database) {
+    return convertToStringSet(sql("SHOW USER FUNCTIONS FROM " + database), 0);
+  }
+
   protected void dropDatabaseIfExists(String database) {
     sql("DROP DATABASE IF EXISTS " + database);
   }
diff --git 
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT33.java
 
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT33.java
index cc0630a190..23773314df 100644
--- 
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT33.java
+++ 
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT33.java
@@ -32,4 +32,10 @@ public class SparkHiveCatalogIT33 extends SparkHiveCatalogIT 
{
             .getConfString("spark.sql.catalog." + getCatalogName());
     Assertions.assertEquals(GravitinoHiveCatalogSpark33.class.getName(), 
catalogClass);
   }
+
+  @Override
+  protected boolean supportsFunction() {
+    // Spark 3.3 does not support function operations
+    return false;
+  }
 }
diff --git 
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT33.java
 
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT33.java
index 737c3c90e7..3c5fd19081 100644
--- 
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT33.java
+++ 
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT33.java
@@ -32,4 +32,10 @@ public class SparkIcebergCatalogHiveBackendIT33 extends 
SparkIcebergCatalogHiveB
             .getConfString("spark.sql.catalog." + getCatalogName());
     Assertions.assertEquals(GravitinoIcebergCatalogSpark33.class.getName(), 
catalogClass);
   }
+
+  @Override
+  protected boolean supportsFunction() {
+    // Spark 3.3 does not support function operations
+    return false;
+  }
 }
diff --git 
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT33.java
 
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT33.java
index e04c1d2351..255627f84a 100644
--- 
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT33.java
+++ 
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT33.java
@@ -22,4 +22,10 @@ package 
org.apache.gravitino.spark.connector.integration.test.iceberg;
 import org.junit.jupiter.api.condition.DisabledIf;
 
 @DisabledIf("org.apache.gravitino.integration.test.util.ITUtils#isEmbedded")
-public class SparkIcebergCatalogRestBackendIT33 extends 
SparkIcebergCatalogRestBackendIT {}
+public class SparkIcebergCatalogRestBackendIT33 extends 
SparkIcebergCatalogRestBackendIT {
+  @Override
+  protected boolean supportsFunction() {
+    // Spark 3.3 does not support function operations
+    return false;
+  }
+}
diff --git 
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcMysqlCatalogIT33.java
 
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcMysqlCatalogIT33.java
index cf190cfd4f..c96653ae38 100644
--- 
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcMysqlCatalogIT33.java
+++ 
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcMysqlCatalogIT33.java
@@ -33,4 +33,10 @@ public class SparkJdbcMysqlCatalogIT33 extends 
SparkJdbcMysqlCatalogIT {
             .getConfString("spark.sql.catalog." + getCatalogName());
     Assertions.assertEquals(GravitinoJdbcCatalogSpark33.class.getName(), 
catalogClass);
   }
+
+  @Override
+  protected boolean supportsFunction() {
+    // Spark 3.3 does not support function operations
+    return false;
+  }
 }
diff --git 
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcPostgreSqlCatalogIT33.java
 
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcPostgreSqlCatalogIT33.java
index 0dba9cde07..b2e0c8be30 100644
--- 
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcPostgreSqlCatalogIT33.java
+++ 
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcPostgreSqlCatalogIT33.java
@@ -33,4 +33,10 @@ public class SparkJdbcPostgreSqlCatalogIT33 extends 
SparkJdbcPostgreSqlCatalogIT
             .getConfString("spark.sql.catalog." + getCatalogName());
     Assertions.assertEquals(GravitinoPostgreSqlCatalogSpark33.class.getName(), 
catalogClass);
   }
+
+  @Override
+  protected boolean supportsFunction() {
+    // Spark 3.3 does not support function operations
+    return false;
+  }
 }
diff --git 
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT33.java
 
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT33.java
index 839b959c77..a4b825637f 100644
--- 
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT33.java
+++ 
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT33.java
@@ -32,4 +32,10 @@ public class SparkPaimonCatalogFilesystemBackendIT33 extends 
SparkPaimonCatalogF
             .getConfString("spark.sql.catalog." + getCatalogName());
     Assertions.assertEquals(GravitinoPaimonCatalogSpark33.class.getName(), 
catalogClass);
   }
+
+  @Override
+  protected boolean supportsFunction() {
+    // Spark 3.3 does not support function operations
+    return false;
+  }
 }


Reply via email to