This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 691c3f1fef [#9532] feat(spark-connector): Add FunctionCatalog support
for Gravitino UDFs (#9580)
691c3f1fef is described below
commit 691c3f1feffe40a15ccfa9e06c1fa57721c861ee
Author: mchades <[email protected]>
AuthorDate: Wed Feb 11 23:21:33 2026 +0800
[#9532] feat(spark-connector): Add FunctionCatalog support for Gravitino
UDFs (#9580)
### What changes were proposed in this pull request?
This PR enables Spark V2 connector FunctionCatalog to load/list UDFs
stored in Gravitino.
### Why are the changes needed?
To support loading and listing User-Defined Functions (UDFs) from
Gravitino in Spark connector, which is part of the broader UDF support
initiative.
Fix: #9532
### Does this PR introduce _any_ user-facing change?
Yes, Spark users can now use Gravitino-managed functions through the
Spark SQL interface.
### How was this patch tested?
Unit tests and integration tests are added to cover the new
functionality.
---
.../catalog/FunctionOperationDispatcher.java | 36 ++++++
.../catalog/ManagedFunctionOperations.java | 27 +++++
.../catalog/TestFunctionOperationDispatcher.java | 120 +++++++++++++++++++
.../catalog/TestManagedFunctionOperations.java | 26 +++++
.../spark/connector/catalog/BaseCatalog.java | 127 ++++++++++++++++++---
.../connector/iceberg/GravitinoIcebergCatalog.java | 19 ++-
.../connector/functions/StringLengthFunction.java | 88 ++++++++++++++
.../connector/integration/test/SparkCommonIT.java | 25 ++++
.../connector/integration/test/SparkEnvIT.java | 71 ++++++++++++
.../test/iceberg/SparkIcebergCatalogIT.java | 46 ++++++++
.../integration/test/util/SparkUtilIT.java | 4 +
.../test/hive/SparkHiveCatalogIT33.java | 6 +
.../SparkIcebergCatalogHiveBackendIT33.java | 6 +
.../SparkIcebergCatalogRestBackendIT33.java | 8 +-
.../test/jdbc/SparkJdbcMysqlCatalogIT33.java | 6 +
.../test/jdbc/SparkJdbcPostgreSqlCatalogIT33.java | 6 +
.../SparkPaimonCatalogFilesystemBackendIT33.java | 6 +
17 files changed, 611 insertions(+), 16 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/FunctionOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/FunctionOperationDispatcher.java
index 7d88196887..ebc5f0d15d 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/FunctionOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/FunctionOperationDispatcher.java
@@ -18,6 +18,8 @@
*/
package org.apache.gravitino.catalog;
+import static
org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier;
+
import com.google.common.base.Preconditions;
import java.util.Arrays;
import org.apache.gravitino.EntityStore;
@@ -44,6 +46,9 @@ import org.apache.gravitino.storage.IdGenerator;
*/
public class FunctionOperationDispatcher extends OperationDispatcher
implements FunctionDispatcher {
+ private static final String ICEBERG_PROVIDER = "lakehouse-iceberg";
+ private static final String ICEBERG_SYSTEM_SCHEMA = "system";
+
private final SchemaOperationDispatcher schemaOps;
private final ManagedFunctionOperations managedFunctionOps;
@@ -133,6 +138,8 @@ public class FunctionOperationDispatcher extends
OperationDispatcher implements
Preconditions.checkArgument(
definitions != null && definitions.length > 0, "At least one
definition is required");
+ validateNotIcebergReservedSchema(ident);
+
NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
// Validate schema exists in the underlying catalog
schemaOps.loadSchema(schemaIdent);
@@ -159,6 +166,8 @@ public class FunctionOperationDispatcher extends
OperationDispatcher implements
throws NoSuchFunctionException, IllegalArgumentException {
Preconditions.checkArgument(
changes != null && changes.length > 0, "At least one change is
required");
+ validateNotIcebergReservedSchema(ident);
+
NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
if (!schemaOps.schemaExists(schemaIdent)) {
throw new NoSuchFunctionException("Schema does not exist: %s",
schemaIdent);
@@ -176,6 +185,8 @@ public class FunctionOperationDispatcher extends
OperationDispatcher implements
*/
@Override
public boolean dropFunction(NameIdentifier ident) {
+ validateNotIcebergReservedSchema(ident);
+
NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
if (!schemaOps.schemaExists(schemaIdent)) {
return false;
@@ -184,4 +195,29 @@ public class FunctionOperationDispatcher extends
OperationDispatcher implements
return TreeLockUtils.doWithTreeLock(
ident, LockType.WRITE, () -> managedFunctionOps.dropFunction(ident));
}
+
+ /**
+ * Validates that the function operation is not targeting the Iceberg
reserved "system" schema.
+ * Iceberg's "system" schema is reserved for built-in functions (e.g.,
iceberg_version, bucket,
+ * truncate) and should not allow user-managed function operations.
+ *
+ * @param ident The function identifier.
+ * @throws IllegalArgumentException If the function targets the Iceberg
"system" schema.
+ */
+ private void validateNotIcebergReservedSchema(NameIdentifier ident) {
+ String schemaName = ident.namespace().level(ident.namespace().length() -
1);
+ if (!ICEBERG_SYSTEM_SCHEMA.equals(schemaName)) {
+ return;
+ }
+
+ NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+ String provider =
+ doWithCatalog(catalogIdent, c -> c.catalog().provider(),
IllegalArgumentException.class);
+ if (ICEBERG_PROVIDER.equals(provider)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot operate on functions in the Iceberg reserved schema
\"%s\"",
+ ICEBERG_SYSTEM_SCHEMA));
+ }
+ }
}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/ManagedFunctionOperations.java
b/core/src/main/java/org/apache/gravitino/catalog/ManagedFunctionOperations.java
index 067d68c131..7341052cce 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/ManagedFunctionOperations.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/ManagedFunctionOperations.java
@@ -164,6 +164,7 @@ public class ManagedFunctionOperations implements
FunctionCatalog {
Preconditions.checkArgument(
definitions != null && definitions.length > 0,
"At least one function definition must be provided");
+ validateDefinitionsNoRuntimeDuplicate(definitions);
validateDefinitionsNoArityOverlap(definitions);
String currentUser = PrincipalUtils.getCurrentUserName();
@@ -227,6 +228,32 @@ public class ManagedFunctionOperations implements
FunctionCatalog {
}
}
+ /**
+ * Validates that no definition contains duplicate runtime types in its
implementations. Each
+ * definition must have at most one implementation per runtime type.
+ *
+ * @param definitions The array of definitions to validate.
+ * @throws IllegalArgumentException If any definition has duplicate runtime
types.
+ */
+ private void validateDefinitionsNoRuntimeDuplicate(FunctionDefinition[]
definitions) {
+ for (int i = 0; i < definitions.length; i++) {
+ FunctionImpl[] impls = definitions[i].impls();
+ if (impls == null || impls.length <= 1) {
+ continue;
+ }
+ Set<FunctionImpl.RuntimeType> seenRuntimes = new HashSet<>();
+ for (FunctionImpl impl : impls) {
+ if (!seenRuntimes.add(impl.runtime())) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot register function: definition at index %d has
duplicate runtime '%s'. "
+ + "Each definition must have at most one implementation
per runtime.",
+ i, impl.runtime()));
+ }
+ }
+ }
+ }
+
private FunctionEntity applyChanges(FunctionEntity oldEntity,
FunctionChange... changes) {
String newComment = oldEntity.comment();
List<FunctionDefinition> newDefinitions =
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestFunctionOperationDispatcher.java
b/core/src/test/java/org/apache/gravitino/catalog/TestFunctionOperationDispatcher.java
new file mode 100644
index 0000000000..cac62bf4f2
--- /dev/null
+++
b/core/src/test/java/org/apache/gravitino/catalog/TestFunctionOperationDispatcher.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.connector.BaseCatalog;
+import org.apache.gravitino.function.FunctionChange;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.storage.IdGenerator;
+import org.apache.gravitino.storage.RandomIdGenerator;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestFunctionOperationDispatcher {
+
+ private static final String METALAKE = "test_metalake";
+ private static final String ICEBERG_CATALOG = "iceberg_catalog";
+ private static final String HIVE_CATALOG = "hive_catalog";
+
+ private final IdGenerator idGenerator = new RandomIdGenerator();
+
+ private FunctionOperationDispatcher dispatcher;
+ private CatalogManager catalogManager;
+ private SchemaOperationDispatcher schemaOps;
+ private EntityStore store;
+
+ @BeforeEach
+ public void setUp() {
+ catalogManager = mock(CatalogManager.class);
+ schemaOps = mock(SchemaOperationDispatcher.class);
+ store = mock(EntityStore.class);
+
+ CatalogManager.CatalogWrapper icebergWrapper =
createMockCatalogWrapper("lakehouse-iceberg");
+ CatalogManager.CatalogWrapper hiveWrapper =
createMockCatalogWrapper("hive");
+
+ when(catalogManager.loadCatalogAndWrap(NameIdentifier.of(METALAKE,
ICEBERG_CATALOG)))
+ .thenReturn(icebergWrapper);
+ when(catalogManager.loadCatalogAndWrap(NameIdentifier.of(METALAKE,
HIVE_CATALOG)))
+ .thenReturn(hiveWrapper);
+
+ dispatcher = new FunctionOperationDispatcher(catalogManager, schemaOps,
store, idGenerator);
+ }
+
+ @Test
+ public void testIcebergSystemSchemaRestriction() {
+ NameIdentifier icebergSystemFunc =
+ NameIdentifier.of(Namespace.of(METALAKE, ICEBERG_CATALOG, "system"),
"my_func");
+ FunctionDefinition[] defs = new FunctionDefinition[]
{mock(FunctionDefinition.class)};
+
+ // All write operations on Iceberg system schema should be rejected
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> dispatcher.registerFunction(icebergSystemFunc, "c",
FunctionType.SCALAR, true, defs));
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ dispatcher.alterFunction(
+ icebergSystemFunc, FunctionChange.updateComment("new
comment")));
+ Assertions.assertThrows(
+ IllegalArgumentException.class, () ->
dispatcher.dropFunction(icebergSystemFunc));
+
+ // Non-system schema in Iceberg catalog should pass validation
+ NameIdentifier icebergUserFunc =
+ NameIdentifier.of(Namespace.of(METALAKE, ICEBERG_CATALOG,
"user_schema"), "my_func");
+ try {
+ dispatcher.registerFunction(icebergUserFunc, "c", FunctionType.SCALAR,
true, defs);
+ } catch (IllegalArgumentException e) {
+ Assertions.assertFalse(
+ e.getMessage().contains("Iceberg reserved schema"),
+ "Should not reject non-system schema");
+ } catch (Exception ignored) {
+ // Other exceptions (e.g., LockManager not initialized) are expected in
unit test
+ }
+
+ // System schema in non-Iceberg catalog should pass validation
+ NameIdentifier hiveSystemFunc =
+ NameIdentifier.of(Namespace.of(METALAKE, HIVE_CATALOG, "system"),
"my_func");
+ try {
+ dispatcher.registerFunction(hiveSystemFunc, "c", FunctionType.SCALAR,
true, defs);
+ } catch (IllegalArgumentException e) {
+ Assertions.assertFalse(
+ e.getMessage().contains("Iceberg reserved schema"),
+ "Should not reject system schema in non-Iceberg catalog");
+ } catch (Exception ignored) {
+ // Other exceptions (e.g., LockManager not initialized) are expected in
unit test
+ }
+ }
+
+ private CatalogManager.CatalogWrapper createMockCatalogWrapper(String
provider) {
+ BaseCatalog catalog = mock(BaseCatalog.class);
+ when(catalog.provider()).thenReturn(provider);
+
+ CatalogManager.CatalogWrapper wrapper =
mock(CatalogManager.CatalogWrapper.class);
+ when(wrapper.catalog()).thenReturn(catalog);
+ return wrapper;
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestManagedFunctionOperations.java
b/core/src/test/java/org/apache/gravitino/catalog/TestManagedFunctionOperations.java
index 28a12ae951..ff3b6638e2 100644
---
a/core/src/test/java/org/apache/gravitino/catalog/TestManagedFunctionOperations.java
+++
b/core/src/test/java/org/apache/gravitino/catalog/TestManagedFunctionOperations.java
@@ -530,6 +530,32 @@ public class TestManagedFunctionOperations {
funcIdent, FunctionChange.addImpl(params, anotherSparkImpl)));
}
+ @Test
+ public void testRegisterFunctionWithDuplicateRuntimeInDefinition() {
+ NameIdentifier funcIdent = getFunctionIdent("func_register_dup_runtime");
+ FunctionParam[] params = new FunctionParam[] {FunctionParams.of("a",
Types.IntegerType.get())};
+ FunctionImpl sparkImpl1 =
+ FunctionImpls.ofJava(FunctionImpl.RuntimeType.SPARK,
"com.example.SparkUDF1");
+ FunctionImpl sparkImpl2 =
+ FunctionImpls.ofJava(FunctionImpl.RuntimeType.SPARK,
"com.example.SparkUDF2");
+
+ FunctionDefinition[] definitions =
+ new FunctionDefinition[] {
+ createDefinitionWithImpls(
+ params, Types.StringType.get(), new FunctionImpl[] {sparkImpl1,
sparkImpl2})
+ };
+
+ IllegalArgumentException ex =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ functionOperations.registerFunction(
+ funcIdent, "Test function", FunctionType.SCALAR, true,
definitions));
+ Assertions.assertTrue(
+ ex.getMessage().contains("duplicate runtime"),
+ "Expected error about duplicate runtime, got: " + ex.getMessage());
+ }
+
@Test
public void testAlterFunctionAddImplToNonExistingDefinition() {
NameIdentifier funcIdent = getFunctionIdent("func_add_impl_nodef");
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
index 4ec2652cd2..fc02032593 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
@@ -34,6 +34,10 @@ import org.apache.gravitino.exceptions.ForbiddenException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NonEmptySchemaException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.JavaImpl;
import org.apache.gravitino.spark.connector.ConnectorConstants;
import org.apache.gravitino.spark.connector.PropertiesConverter;
import org.apache.gravitino.spark.connector.SparkTableChangeConverter;
@@ -41,10 +45,12 @@ import
org.apache.gravitino.spark.connector.SparkTransformConverter;
import
org.apache.gravitino.spark.connector.SparkTransformConverter.DistributionAndSortOrdersInfo;
import org.apache.gravitino.spark.connector.SparkTypeConverter;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
import org.apache.spark.sql.connector.catalog.NamespaceChange.SetProperty;
@@ -52,6 +58,7 @@ import
org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
@@ -69,7 +76,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
* needed, optimizing resource utilization and minimizing the overhead
associated with
* initialization.
*/
-public abstract class BaseCatalog implements TableCatalog, SupportsNamespaces {
+public abstract class BaseCatalog implements TableCatalog, SupportsNamespaces,
FunctionCatalog {
// The specific Spark catalog to do IO operations, different catalogs have
different spark catalog
// implementations, like HiveTableCatalog for Hive, JDBCTableCatalog for
JDBC, SparkCatalog for
@@ -168,13 +175,7 @@ public abstract class BaseCatalog implements TableCatalog,
SupportsNamespaces {
@Override
public Identifier[] listTables(String[] namespace) throws
NoSuchNamespaceException {
- String gravitinoNamespace;
- if (namespace.length == 0) {
- gravitinoNamespace = getCatalogDefaultNamespace();
- } else {
- validateNamespace(namespace);
- gravitinoNamespace = namespace[0];
- }
+ String gravitinoNamespace = getDatabase(namespace);
try {
NameIdentifier[] identifiers =
gravitinoCatalogClient.asTableCatalog().listTables(Namespace.of(gravitinoNamespace));
@@ -431,10 +432,70 @@ public abstract class BaseCatalog implements
TableCatalog, SupportsNamespaces {
}
protected String getDatabase(Identifier sparkIdentifier) {
- if (sparkIdentifier.namespace().length > 0) {
- return sparkIdentifier.namespace()[0];
+ return getDatabase(sparkIdentifier.namespace());
+ }
+
+ /**
+ * Get the database name from the namespace array.
+ *
+ * @param namespace the namespace array
+ * @return the database name, or the catalog default namespace if the
namespace is empty
+ */
+ protected String getDatabase(String[] namespace) {
+ if (namespace.length == 0) {
+ return getCatalogDefaultNamespace();
+ }
+ validateNamespace(namespace);
+ return namespace[0];
+ }
+
+ @Override
+ public Identifier[] listFunctions(String[] namespace) throws
NoSuchNamespaceException {
+ String gravitinoNamespace = getDatabase(namespace);
+ try {
+ Function[] functions =
+ gravitinoCatalogClient
+ .asFunctionCatalog()
+ .listFunctionInfos(Namespace.of(gravitinoNamespace));
+ // Filter functions that have Spark runtime implementation
+ return Arrays.stream(functions)
+ .filter(this::hasSparkImplementation)
+ .map(f -> Identifier.of(new String[] {gravitinoNamespace}, f.name()))
+ .toArray(Identifier[]::new);
+ } catch (NoSuchSchemaException e) {
+ throw new NoSuchNamespaceException(namespace);
+ }
+ }
+
+ @Override
+ public UnboundFunction loadFunction(Identifier ident) throws
NoSuchFunctionException {
+ String[] namespace = ident.namespace();
+ if (namespace.length == 0) {
+ namespace = new String[] {getCatalogDefaultNamespace()};
+ ident = Identifier.of(namespace, ident.name());
}
- return getCatalogDefaultNamespace();
+ validateNamespace(namespace);
+
+ NameIdentifier gravitinoIdentifier = NameIdentifier.of(getDatabase(ident),
ident.name());
+ try {
+ Function function =
+
gravitinoCatalogClient.asFunctionCatalog().getFunction(gravitinoIdentifier);
+ for (FunctionDefinition definition : function.definitions()) {
+ for (FunctionImpl impl : definition.impls()) {
+ if (!isSparkImplementation(impl)) {
+ continue;
+ }
+ String className = extractClassName(impl);
+ if (StringUtils.isBlank(className)) {
+ continue;
+ }
+ return instantiateFunction(className, ident);
+ }
+ }
+ } catch (org.apache.gravitino.exceptions.NoSuchFunctionException e) {
+ throw new NoSuchFunctionException(ident);
+ }
+ throw new NoSuchFunctionException(ident);
}
private void validateNamespace(String[] namespace) {
@@ -443,11 +504,11 @@ public abstract class BaseCatalog implements
TableCatalog, SupportsNamespaces {
"Doesn't support multi level namespaces: " + String.join(".",
namespace));
}
- private String getCatalogDefaultNamespace() {
+ protected String getCatalogDefaultNamespace() {
String[] catalogDefaultNamespace = sparkCatalog.defaultNamespace();
Preconditions.checkArgument(
catalogDefaultNamespace != null && catalogDefaultNamespace.length == 1,
- "Catalog default namespace is not valid");
+ "Catalog default namespace is not valid: " +
Arrays.toString(catalogDefaultNamespace));
return catalogDefaultNamespace[0];
}
@@ -470,6 +531,46 @@ public abstract class BaseCatalog implements TableCatalog,
SupportsNamespaces {
return gravitinoIdentifier.namespace().level(0);
}
+ private boolean hasSparkImplementation(Function function) {
+ for (FunctionDefinition definition : function.definitions()) {
+ for (FunctionImpl impl : definition.impls()) {
+ if (isSparkImplementation(impl)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private boolean isSparkImplementation(FunctionImpl impl) {
+ return FunctionImpl.RuntimeType.SPARK.equals(impl.runtime());
+ }
+
+ private String extractClassName(FunctionImpl impl) {
+ if (impl instanceof JavaImpl) {
+ return ((JavaImpl) impl).className();
+ }
+ throw new IllegalArgumentException(
+ String.format("Unsupported function implementation %s",
impl.getClass().getName()));
+ }
+
+ private UnboundFunction instantiateFunction(String className, Identifier
ident)
+ throws NoSuchFunctionException {
+ try {
+ // Use context classloader to work with Spark's isolated plugin
classloaders
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ Class<?> functionClass = Class.forName(className, true, classLoader);
+ Object instance = functionClass.getDeclaredConstructor().newInstance();
+ if (instance instanceof UnboundFunction) {
+ return (UnboundFunction) instance;
+ }
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException(
+ String.format("Failed to instantiate function class: %s",
className), e);
+ }
+ throw new NoSuchFunctionException(ident);
+ }
+
private Table loadSparkTable(Identifier ident) {
try {
return sparkCatalog.loadTable(ident);
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java
index e6d59c853b..4ccacdec1d 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java
@@ -109,12 +109,16 @@ public class GravitinoIcebergCatalog extends BaseCatalog
@Override
public Identifier[] listFunctions(String[] namespace) throws
NoSuchNamespaceException {
- return ((SparkCatalog) sparkCatalog).listFunctions(namespace);
+ return isIcebergFunctionNamespace(namespace)
+ ? ((SparkCatalog) sparkCatalog).listFunctions(namespace)
+ : super.listFunctions(namespace);
}
@Override
public UnboundFunction loadFunction(Identifier ident) throws
NoSuchFunctionException {
- return ((SparkCatalog) sparkCatalog).loadFunction(ident);
+ return isIcebergFunctionNamespace(ident.namespace())
+ ? ((SparkCatalog) sparkCatalog).loadFunction(ident)
+ : super.loadFunction(ident);
}
/**
@@ -190,6 +194,17 @@ public class GravitinoIcebergCatalog extends BaseCatalog
}
}
+ private boolean isIcebergFunctionNamespace(String[] namespace) {
+ try {
+ return namespace.length == 0 || isSystemNamespace(namespace);
+ } catch (IllegalAccessException
+ | InvocationTargetException
+ | NoSuchMethodException
+ | ClassNotFoundException e) {
+ throw new RuntimeException("Failed to check Iceberg function namespace",
e);
+ }
+ }
+
private boolean isSystemNamespace(String[] namespace)
throws NoSuchMethodException, InvocationTargetException,
IllegalAccessException,
ClassNotFoundException {
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/functions/StringLengthFunction.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/functions/StringLengthFunction.java
new file mode 100644
index 0000000000..56cd876247
--- /dev/null
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/functions/StringLengthFunction.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.spark.connector.functions;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A test UDF implementation that returns the length of a string. This is used
for testing
+ * Gravitino's UDF support in Spark.
+ */
+public class StringLengthFunction implements UnboundFunction {
+
+ @Override
+ public String name() {
+ return "my_string_length";
+ }
+
+ @Override
+ public BoundFunction bind(StructType inputType) {
+ if (inputType.fields().length != 1) {
+ throw new UnsupportedOperationException("my_string_length expects
exactly one argument");
+ }
+ if (!inputType.fields()[0].dataType().equals(DataTypes.StringType)) {
+ throw new UnsupportedOperationException(
+ "my_string_length expects a string argument, got: " +
inputType.fields()[0].dataType());
+ }
+ return new BoundStringLengthFunction();
+ }
+
+ @Override
+ public String description() {
+ return "Returns the length of a string";
+ }
+
+ private static class BoundStringLengthFunction implements
ScalarFunction<Integer> {
+ @Override
+ public String name() {
+ return "my_string_length";
+ }
+
+ @Override
+ public DataType[] inputTypes() {
+ return new DataType[] {DataTypes.StringType};
+ }
+
+ @Override
+ public DataType resultType() {
+ return DataTypes.IntegerType;
+ }
+
+ @Override
+ public String canonicalName() {
+ return "gravitino.my_string_length";
+ }
+
+ @Override
+ public Integer produceResult(InternalRow input) {
+ if (input.isNullAt(0)) {
+ return null;
+ }
+ UTF8String str = input.getUTF8String(0);
+ return str.numChars();
+ }
+ }
+}
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
index c74f095dbd..ebc3f07d83 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
@@ -196,6 +196,31 @@ public abstract class SparkCommonIT extends SparkEnvIT {
NoSuchNamespaceException.class, () -> sql("SHOW TABLES IN
nonexistent_schema"));
}
+ @Test
+ @EnabledIf("supportsFunction")
+ void testCallUDF() {
+ // test call function
+ List<String> data =
+ getQueryData(String.format("SELECT %s.%s('abc')", functionSchemaName,
functionName));
+ Assertions.assertEquals(1, data.size());
+ Assertions.assertEquals("3", data.get(0));
+ }
+
+ @Test
+ @EnabledIf("supportsFunction")
+ void testListFunctions() {
+ // Test list functions - should only include Spark runtime functions
+ Set<String> functionNames = listUserFunctions(functionSchemaName);
+ Assertions.assertTrue(
+ functionNames.contains(
+ String.join(".", getCatalogName(), functionSchemaName,
functionName)));
+
+ // Non-Spark function should NOT be listed
+ Assertions.assertFalse(
+ functionNames.contains(
+ String.join(".", getCatalogName(), functionSchemaName,
nonSparkFunctionName)));
+ }
+
@Test
void testLoadCatalogs() {
Set<String> catalogs = getCatalogs();
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkEnvIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkEnvIT.java
index 9d1f72a1b3..d9c4e8a247 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkEnvIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkEnvIT.java
@@ -27,10 +27,20 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionDefinitions;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.FunctionImpls;
+import org.apache.gravitino.function.FunctionParam;
+import org.apache.gravitino.function.FunctionParams;
+import org.apache.gravitino.function.FunctionType;
import org.apache.gravitino.integration.test.container.ContainerSuite;
import org.apache.gravitino.integration.test.container.HiveContainer;
+import org.apache.gravitino.rel.types.Types;
import org.apache.gravitino.spark.connector.GravitinoSparkConfig;
+import org.apache.gravitino.spark.connector.functions.StringLengthFunction;
import org.apache.gravitino.spark.connector.iceberg.IcebergPropertiesConstants;
import org.apache.gravitino.spark.connector.integration.test.util.SparkUtilIT;
import org.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin;
@@ -55,6 +65,9 @@ public abstract class SparkEnvIT extends SparkUtilIT {
protected String warehouse;
protected FileSystem hdfs;
protected String icebergRestServiceUri;
+ protected final String functionSchemaName = "test_function_schema";
+ protected final String functionName = "my_string_length";
+ protected final String nonSparkFunctionName = "trino_upper";
private final String metalakeName = "test";
private SparkSession sparkSession;
@@ -67,6 +80,10 @@ public abstract class SparkEnvIT extends SparkUtilIT {
protected abstract Map<String, String> getCatalogConfigs();
+ protected boolean supportsFunction() {
+ return true;
+ }
+
@Override
protected SparkSession getSparkSession() {
Assertions.assertNotNull(sparkSession);
@@ -87,6 +104,9 @@ public abstract class SparkEnvIT extends SparkUtilIT {
initHdfsFileSystem();
initGravitinoEnv();
initMetalakeAndCatalogs();
+ if (supportsFunction()) {
+ initSchemaAndFunction();
+ }
initSparkEnv();
LOG.info(
"Startup Spark env successfully, Gravitino uri: {}, Hive metastore
uri: {}",
@@ -130,6 +150,57 @@ public abstract class SparkEnvIT extends SparkUtilIT {
getCatalogName(), Catalog.Type.RELATIONAL, getProvider(), "",
properties);
}
+ private void initSchemaAndFunction() {
+ GravitinoMetalake metalake = client.loadMetalake(metalakeName);
+ Catalog catalog = metalake.loadCatalog(getCatalogName());
+ if (!catalog.asSchemas().schemaExists(functionSchemaName)) {
+ catalog.asSchemas().createSchema(functionSchemaName, "",
Collections.emptyMap());
+ }
+
+ // Register a Spark runtime function
+ if (!catalog
+ .asFunctionCatalog()
+ .functionExists(NameIdentifier.of(functionSchemaName, functionName))) {
+ catalog
+ .asFunctionCatalog()
+ .registerFunction(
+ NameIdentifier.of(functionSchemaName, functionName),
+ "Returns the length of a string",
+ FunctionType.SCALAR,
+ true /* deterministic */,
+ new FunctionDefinition[] {
+ FunctionDefinitions.of(
+ new FunctionParam[] {FunctionParams.of("input",
Types.StringType.get())},
+ Types.IntegerType.get(),
+ new FunctionImpl[] {
+ FunctionImpls.ofJava(
+ FunctionImpl.RuntimeType.SPARK,
StringLengthFunction.class.getName())
+ })
+ });
+ }
+
+ // Register a non-Spark (TRINO) runtime function for testing filtering
+ if (!catalog
+ .asFunctionCatalog()
+ .functionExists(NameIdentifier.of(functionSchemaName,
nonSparkFunctionName))) {
+ catalog
+ .asFunctionCatalog()
+ .registerFunction(
+ NameIdentifier.of(functionSchemaName, nonSparkFunctionName),
+ "Converts a string to uppercase (Trino implementation)",
+ FunctionType.SCALAR,
+ true /* deterministic */,
+ new FunctionDefinition[] {
+ FunctionDefinitions.of(
+ new FunctionParam[] {FunctionParams.of("input",
Types.StringType.get())},
+ Types.StringType.get(),
+ new FunctionImpl[] {
+ FunctionImpls.ofJava(FunctionImpl.RuntimeType.TRINO,
"com.example.TrinoUpper")
+ })
+ });
+ }
+ }
+
private void initGravitinoEnv() {
// Gravitino server is already started by AbstractIT, just construct
gravitinoUrl
int gravitinoPort = getGravitinoServerPort();
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
index 0360bea207..92d9d5ca8a 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
@@ -62,6 +62,7 @@ import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@@ -1272,4 +1273,49 @@ public abstract class SparkIcebergCatalogIT extends
SparkCommonIT {
return new IcebergTableWriteProperties(isPartitionedTable,
formatVersion, writeMode);
}
}
+
+ @Test
+ @EnabledIf("supportsFunction")
+ void testListFunctionsWithIcebergBuiltins() {
+ // Test Gravitino function listing
+ Set<String> gravitinoFunctions = listUserFunctions(functionSchemaName);
+ Assertions.assertTrue(
+ gravitinoFunctions.contains(
+ String.join(".", getCatalogName(), functionSchemaName,
functionName)));
+
+ // Non-Spark function should NOT be listed
+ Assertions.assertFalse(
+ gravitinoFunctions.contains(
+ String.join(".", getCatalogName(), functionSchemaName,
nonSparkFunctionName)));
+
+ // Test Iceberg built-in functions are also listed
+ Set<String> systemFunctions = listUserFunctions("system");
+ Assertions.assertTrue(
+ systemFunctions.stream().anyMatch(f -> f.contains("iceberg_version")),
+ "Iceberg built-in function 'iceberg_version' should be listed");
+ Assertions.assertTrue(
+ systemFunctions.stream().anyMatch(f -> f.contains("bucket")),
+ "Iceberg built-in function 'bucket' should be listed");
+ }
+
+ @Test
+ @EnabledIf("supportsFunction")
+ void testCallUDFAndIcebergBuiltins() {
+ // Test Gravitino UDF
+ List<String> gravitinoUdfResult =
+ getQueryData(String.format("SELECT %s.%s('abc')", functionSchemaName,
functionName));
+ Assertions.assertEquals(1, gravitinoUdfResult.size());
+ Assertions.assertEquals("3", gravitinoUdfResult.get(0));
+
+ // Test Iceberg built-in functions can be called
+ List<String> icebergVersionResult =
+ getQueryData(String.format("SELECT %s.system.iceberg_version()",
getCatalogName()));
+ Assertions.assertEquals(1, icebergVersionResult.size());
+ Assertions.assertFalse(icebergVersionResult.get(0).isEmpty());
+
+ List<String> bucketResult =
+ getQueryData(String.format("SELECT %s.system.bucket(2, 100)",
getCatalogName()));
+ Assertions.assertEquals(1, bucketResult.size());
+ Assertions.assertEquals("0", bucketResult.get(0));
+ }
}
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
index f5f480ff28..0ea383e406 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
@@ -68,6 +68,10 @@ public abstract class SparkUtilIT extends BaseIT {
return convertToStringSet(sql("SHOW TABLES in " + database), 1);
}
+ protected Set<String> listUserFunctions(String database) {
+ return convertToStringSet(sql("SHOW USER FUNCTIONS FROM " + database), 0);
+ }
+
protected void dropDatabaseIfExists(String database) {
sql("DROP DATABASE IF EXISTS " + database);
}
diff --git
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT33.java
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT33.java
index cc0630a190..23773314df 100644
---
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT33.java
+++
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT33.java
@@ -32,4 +32,10 @@ public class SparkHiveCatalogIT33 extends SparkHiveCatalogIT
{
.getConfString("spark.sql.catalog." + getCatalogName());
Assertions.assertEquals(GravitinoHiveCatalogSpark33.class.getName(),
catalogClass);
}
+
+ @Override
+ protected boolean supportsFunction() {
+ // Spark 3.3 does not support function operations
+ return false;
+ }
}
diff --git
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT33.java
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT33.java
index 737c3c90e7..3c5fd19081 100644
---
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT33.java
+++
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT33.java
@@ -32,4 +32,10 @@ public class SparkIcebergCatalogHiveBackendIT33 extends
SparkIcebergCatalogHiveB
.getConfString("spark.sql.catalog." + getCatalogName());
Assertions.assertEquals(GravitinoIcebergCatalogSpark33.class.getName(),
catalogClass);
}
+
+ @Override
+ protected boolean supportsFunction() {
+ // Spark 3.3 does not support function operations
+ return false;
+ }
}
diff --git
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT33.java
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT33.java
index e04c1d2351..255627f84a 100644
---
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT33.java
+++
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT33.java
@@ -22,4 +22,10 @@ package
org.apache.gravitino.spark.connector.integration.test.iceberg;
import org.junit.jupiter.api.condition.DisabledIf;
@DisabledIf("org.apache.gravitino.integration.test.util.ITUtils#isEmbedded")
-public class SparkIcebergCatalogRestBackendIT33 extends
SparkIcebergCatalogRestBackendIT {}
+public class SparkIcebergCatalogRestBackendIT33 extends
SparkIcebergCatalogRestBackendIT {
+ @Override
+ protected boolean supportsFunction() {
+ // Spark 3.3 does not support function operations
+ return false;
+ }
+}
diff --git
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcMysqlCatalogIT33.java
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcMysqlCatalogIT33.java
index cf190cfd4f..c96653ae38 100644
---
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcMysqlCatalogIT33.java
+++
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcMysqlCatalogIT33.java
@@ -33,4 +33,10 @@ public class SparkJdbcMysqlCatalogIT33 extends
SparkJdbcMysqlCatalogIT {
.getConfString("spark.sql.catalog." + getCatalogName());
Assertions.assertEquals(GravitinoJdbcCatalogSpark33.class.getName(),
catalogClass);
}
+
+ @Override
+ protected boolean supportsFunction() {
+ // Spark 3.3 does not support function operations
+ return false;
+ }
}
diff --git
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcPostgreSqlCatalogIT33.java
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcPostgreSqlCatalogIT33.java
index 0dba9cde07..b2e0c8be30 100644
---
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcPostgreSqlCatalogIT33.java
+++
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcPostgreSqlCatalogIT33.java
@@ -33,4 +33,10 @@ public class SparkJdbcPostgreSqlCatalogIT33 extends
SparkJdbcPostgreSqlCatalogIT
.getConfString("spark.sql.catalog." + getCatalogName());
Assertions.assertEquals(GravitinoPostgreSqlCatalogSpark33.class.getName(),
catalogClass);
}
+
+ @Override
+ protected boolean supportsFunction() {
+ // Spark 3.3 does not support function operations
+ return false;
+ }
}
diff --git
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT33.java
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT33.java
index 839b959c77..a4b825637f 100644
---
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT33.java
+++
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT33.java
@@ -32,4 +32,10 @@ public class SparkPaimonCatalogFilesystemBackendIT33 extends
SparkPaimonCatalogF
.getConfString("spark.sql.catalog." + getCatalogName());
Assertions.assertEquals(GravitinoPaimonCatalogSpark33.class.getName(),
catalogClass);
}
+
+ @Override
+ protected boolean supportsFunction() {
+ // Spark 3.3 does not support function operations
+ return false;
+ }
}