This is an automated email from the ASF dual-hosted git repository.

mchades pushed a commit to branch udf-poc
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit dd80a177943188b8d5c822c1d2cd93563a7abb66
Author: mchades <[email protected]>
AuthorDate: Wed Dec 17 20:23:22 2025 +0800

    Spark connector supports Gravitino UDF
---
 settings.gradle.kts                                |   1 +
 .../spark/connector/catalog/BaseCatalog.java       |  98 ++++++++++++++++--
 .../functions/BuiltinFunctionSupport.java          |  53 ----------
 .../connector/iceberg/GravitinoIcebergCatalog.java |  12 +--
 .../functions/TestBuiltinFunctionSupport.java      | 112 ---------------------
 spark-connector/spark-udf/build.gradle.kts         |  51 ++++++++++
 .../functions/MyStringLengthFunction.java          |   2 +-
 7 files changed, 146 insertions(+), 183 deletions(-)

diff --git a/settings.gradle.kts b/settings.gradle.kts
index cde9547aad..db41ec8f62 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -64,6 +64,7 @@ include("lance:lance-rest-server")
 include("authorizations:authorization-ranger", 
"authorizations:authorization-common", "authorizations:authorization-chain")
 include("trino-connector:trino-connector", "trino-connector:integration-test")
 include("spark-connector:spark-common")
+include(":spark-connector:spark-udf")
 if (scalaVersion == "2.12") {
   // flink only support scala 2.12
   include("flink-connector:flink")
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
index 13c90be32b..9f94d40631 100644
--- 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.IntStream;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
@@ -34,13 +35,15 @@ import org.apache.gravitino.exceptions.ForbiddenException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NonEmptySchemaException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.JavaImpl;
 import org.apache.gravitino.spark.connector.ConnectorConstants;
 import org.apache.gravitino.spark.connector.PropertiesConverter;
 import org.apache.gravitino.spark.connector.SparkTableChangeConverter;
 import org.apache.gravitino.spark.connector.SparkTransformConverter;
 import 
org.apache.gravitino.spark.connector.SparkTransformConverter.DistributionAndSortOrdersInfo;
 import org.apache.gravitino.spark.connector.SparkTypeConverter;
-import org.apache.gravitino.spark.connector.functions.BuiltinFunctionSupport;
 import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
@@ -85,7 +88,6 @@ public abstract class BaseCatalog implements TableCatalog, 
SupportsNamespaces, F
   protected Catalog gravitinoCatalogClient;
   private SparkTypeConverter sparkTypeConverter;
   private SparkTableChangeConverter sparkTableChangeConverter;
-  private final BuiltinFunctionSupport builtinFunctionSupport = new 
BuiltinFunctionSupport();
 
   private String catalogName;
   private final GravitinoCatalogManager gravitinoCatalogManager;
@@ -444,10 +446,37 @@ public abstract class BaseCatalog implements 
TableCatalog, SupportsNamespaces, F
 
   @Override
   public Identifier[] listFunctions(String[] namespace) throws 
NoSuchNamespaceException {
-    String[] targetNamespace =
-        namespace.length == 0 ? new String[] {getCatalogDefaultNamespace()} : 
namespace;
-    validateNamespace(targetNamespace);
-    return builtinFunctionSupport.listFunctions(targetNamespace, 
getCatalogDefaultNamespace());
+    String gravitinoNamespace;
+    if (namespace.length == 0) {
+      gravitinoNamespace = getCatalogDefaultNamespace();
+    } else {
+      validateNamespace(namespace);
+      gravitinoNamespace = namespace[0];
+    }
+    try {
+      org.apache.gravitino.function.FunctionCatalog functionCatalog =
+          gravitinoCatalogClient.asFunctionCatalog();
+      NameIdentifier[] identifiers =
+          functionCatalog.listFunctions(Namespace.of(gravitinoNamespace));
+      org.apache.gravitino.function.Function[] functions =
+          functionCatalog.listFunctionInfos(Namespace.of(gravitinoNamespace));
+
+      return IntStream.range(0, identifiers.length)
+          .filter(
+              idx ->
+                  functions != null
+                      && functions.length > idx
+                      && functions[idx] != null
+                      && Arrays.stream(functions[idx].impls())
+                          .anyMatch(this::isSparkImplementation))
+          .mapToObj(
+              idx ->
+                  Identifier.of(
+                      new String[] {getDatabase(identifiers[idx])}, 
identifiers[idx].name()))
+          .toArray(Identifier[]::new);
+    } catch (NoSuchSchemaException e) {
+      throw new NoSuchNamespaceException(namespace);
+    }
   }
 
   @Override
@@ -457,11 +486,31 @@ public abstract class BaseCatalog implements 
TableCatalog, SupportsNamespaces, F
         namespace.length == 0 ? new String[] {getCatalogDefaultNamespace()} : 
namespace;
     validateNamespace(targetNamespace);
     Identifier targetIdent = Identifier.of(targetNamespace, ident.name());
-    return builtinFunctionSupport.loadFunction(targetIdent, 
getCatalogDefaultNamespace());
-  }
-
-  protected BuiltinFunctionSupport getBuiltinFunctionSupport() {
-    return builtinFunctionSupport;
+    NameIdentifier gravitinoIdentifier =
+        NameIdentifier.of(targetIdent.namespace()[0], targetIdent.name());
+    try {
+      Function[] functions =
+          
gravitinoCatalogClient.asFunctionCatalog().getFunction(gravitinoIdentifier);
+      for (Function function : functions) {
+        for (FunctionImpl impl : function.impls()) {
+          if (!isSparkImplementation(impl)) {
+            continue;
+          }
+          String className = extractClassName(impl);
+          if (StringUtils.isBlank(className)) {
+            continue;
+          }
+          try {
+            return instantiateFunction(className, targetIdent);
+          } catch (NoSuchFunctionException e) {
+            // try the next implementation, such as SQL implementation
+          }
+        }
+      }
+    } catch (org.apache.gravitino.exceptions.NoSuchFunctionException e) {
+      // fall through
+    }
+    throw new NoSuchFunctionException(targetIdent);
   }
 
   private void validateNamespace(String[] namespace) {
@@ -497,6 +546,33 @@ public abstract class BaseCatalog implements TableCatalog, 
SupportsNamespaces, F
     return gravitinoIdentifier.namespace().level(0);
   }
 
+  private boolean isSparkImplementation(FunctionImpl impl) {
+    return FunctionImpl.RuntimeType.SPARK.equals(impl.runtime());
+  }
+
+  private String extractClassName(FunctionImpl impl) {
+    if (impl instanceof JavaImpl) {
+      return ((JavaImpl) impl).className();
+    }
+    throw new IllegalArgumentException(
+        String.format("Unsupported function implementation %s", 
impl.getClass().getName()));
+  }
+
+  private UnboundFunction instantiateFunction(String className, Identifier 
ident)
+      throws NoSuchFunctionException {
+    try {
+      Class<?> functionClass = Class.forName(className);
+      Object instance = functionClass.getDeclaredConstructor().newInstance();
+      if (instance instanceof UnboundFunction) {
+        return (UnboundFunction) instance;
+      }
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException(
+          String.format("Failed to instantiate function class: %s", 
className), e);
+    }
+    throw new NoSuchFunctionException(ident);
+  }
+
   private Table loadSparkTable(Identifier ident) {
     try {
       return sparkCatalog.loadTable(ident);
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/functions/BuiltinFunctionSupport.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/functions/BuiltinFunctionSupport.java
deleted file mode 100644
index de440e4536..0000000000
--- 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/functions/BuiltinFunctionSupport.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.spark.connector.functions;
-
-import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
-import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
-import org.apache.spark.sql.connector.catalog.Identifier;
-import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
-
-public class BuiltinFunctionSupport {
-
-  private static final UnboundFunction MY_STR_LEN_FUNCTION = new 
MyStringLengthFunction();
-  private static final Identifier[] BUILTIN_IDENTIFIERS =
-      new Identifier[] {Identifier.of(new String[0], 
MyStringLengthFunction.NAME)};
-
-  public Identifier[] listFunctions(String[] namespace, String 
defaultNamespace)
-      throws NoSuchNamespaceException {
-    if (isSupportedNamespace(namespace, defaultNamespace)) {
-      return BUILTIN_IDENTIFIERS;
-    }
-    throw new NoSuchNamespaceException(namespace);
-  }
-
-  public UnboundFunction loadFunction(Identifier identifier, String 
defaultNamespace)
-      throws NoSuchFunctionException {
-    if (isSupportedNamespace(identifier.namespace(), defaultNamespace)
-        && MyStringLengthFunction.NAME.equalsIgnoreCase(identifier.name())) {
-      return MY_STR_LEN_FUNCTION;
-    }
-    throw new NoSuchFunctionException(identifier);
-  }
-
-  private boolean isSupportedNamespace(String[] namespace, String 
defaultNamespace) {
-    return namespace.length == 0
-        || (namespace.length == 1 && 
namespace[0].equalsIgnoreCase(defaultNamespace));
-  }
-}
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java
index 5ecdb5c2b8..775113fce0 100644
--- 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java
@@ -108,19 +108,19 @@ public class GravitinoIcebergCatalog extends BaseCatalog
 
   @Override
   public Identifier[] listFunctions(String[] namespace) throws 
NoSuchNamespaceException {
-    Identifier[] builtin =
-        getBuiltinFunctionSupport().listFunctions(namespace, 
getCatalogDefaultNamespace());
+    Identifier[] gravitinoFunctions = super.listFunctions(namespace);
     Identifier[] icebergFunctions = ((SparkCatalog) 
sparkCatalog).listFunctions(namespace);
-    Identifier[] combined = new Identifier[builtin.length + 
icebergFunctions.length];
-    System.arraycopy(builtin, 0, combined, 0, builtin.length);
-    System.arraycopy(icebergFunctions, 0, combined, builtin.length, 
icebergFunctions.length);
+    Identifier[] combined = new Identifier[gravitinoFunctions.length + 
icebergFunctions.length];
+    System.arraycopy(gravitinoFunctions, 0, combined, 0, 
gravitinoFunctions.length);
+    System.arraycopy(
+        icebergFunctions, 0, combined, gravitinoFunctions.length, 
icebergFunctions.length);
     return combined;
   }
 
   @Override
   public UnboundFunction loadFunction(Identifier ident) throws 
NoSuchFunctionException {
     try {
-      return getBuiltinFunctionSupport().loadFunction(ident, 
getCatalogDefaultNamespace());
+      return super.loadFunction(ident);
     } catch (NoSuchFunctionException e) {
       return ((SparkCatalog) sparkCatalog).loadFunction(ident);
     }
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/functions/TestBuiltinFunctionSupport.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/functions/TestBuiltinFunctionSupport.java
deleted file mode 100644
index 782b9d7ad4..0000000000
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/functions/TestBuiltinFunctionSupport.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.spark.connector.functions;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
-import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.connector.catalog.Identifier;
-import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
-import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
-import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.Metadata;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.unsafe.types.UTF8String;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-class TestBuiltinFunctionSupport {
-
-  private final BuiltinFunctionSupport builtinFunctionSupport = new 
BuiltinFunctionSupport();
-  private static final String DEFAULT_NAMESPACE = "default";
-
-  @Test
-  void testListFunctionsInDefaultNamespace() throws NoSuchNamespaceException {
-    Identifier[] functions = builtinFunctionSupport.listFunctions(new 
String[0], DEFAULT_NAMESPACE);
-    Assertions.assertEquals(1, functions.length);
-    Assertions.assertEquals(MyStringLengthFunction.NAME, functions[0].name());
-
-    Identifier[] functionsInDefault =
-        builtinFunctionSupport.listFunctions(new String[] {DEFAULT_NAMESPACE}, 
DEFAULT_NAMESPACE);
-    Assertions.assertEquals(1, functionsInDefault.length);
-    Assertions.assertEquals(MyStringLengthFunction.NAME, 
functionsInDefault[0].name());
-  }
-
-  @Test
-  void testListFunctionsInUnsupportedNamespace() {
-    Assertions.assertThrows(
-        NoSuchNamespaceException.class,
-        () -> builtinFunctionSupport.listFunctions(new String[] {"other"}, 
DEFAULT_NAMESPACE));
-  }
-
-  @Test
-  void testLoadFunctionAndExecute() throws NoSuchFunctionException {
-    UnboundFunction unboundFunction =
-        builtinFunctionSupport.loadFunction(
-            Identifier.of(new String[0], MyStringLengthFunction.NAME), 
DEFAULT_NAMESPACE);
-    Assertions.assertInstanceOf(MyStringLengthFunction.class, unboundFunction);
-
-    StructType inputSchema =
-        new StructType(
-            new StructField[] {
-              new StructField("col", DataTypes.StringType, true, 
Metadata.empty())
-            });
-    BoundFunction boundFunction = unboundFunction.bind(inputSchema);
-    ScalarFunction<?> scalarFunction = (ScalarFunction<?>) boundFunction;
-
-    InternalRow row = new GenericInternalRow(new Object[] 
{UTF8String.fromString("abcde")});
-    Assertions.assertEquals(5, scalarFunction.produceResult(row));
-
-    InternalRow nullRow = new GenericInternalRow(1);
-    nullRow.setNullAt(0);
-    Assertions.assertNull(scalarFunction.produceResult(nullRow));
-  }
-
-  @Test
-  void testLoadFunctionWithWrongNamespaceOrName() {
-    Assertions.assertThrows(
-        NoSuchFunctionException.class,
-        () ->
-            builtinFunctionSupport.loadFunction(
-                Identifier.of(new String[] {"other"}, 
MyStringLengthFunction.NAME),
-                DEFAULT_NAMESPACE));
-    Assertions.assertThrows(
-        NoSuchFunctionException.class,
-        () ->
-            builtinFunctionSupport.loadFunction(
-                Identifier.of(new String[0], "unknown_func"), 
DEFAULT_NAMESPACE));
-  }
-
-  @Test
-  void testBindValidatesInputType() throws NoSuchFunctionException {
-    UnboundFunction unboundFunction =
-        builtinFunctionSupport.loadFunction(
-            Identifier.of(new String[0], MyStringLengthFunction.NAME), 
DEFAULT_NAMESPACE);
-    StructType invalidSchema =
-        new StructType(
-            new StructField[] {
-              new StructField("col", DataTypes.IntegerType, true, 
Metadata.empty())
-            });
-    Assertions.assertThrows(
-        IllegalArgumentException.class, () -> 
unboundFunction.bind(invalidSchema));
-  }
-}
diff --git a/spark-connector/spark-udf/build.gradle.kts 
b/spark-connector/spark-udf/build.gradle.kts
new file mode 100644
index 0000000000..e3431627bc
--- /dev/null
+++ b/spark-connector/spark-udf/build.gradle.kts
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+plugins {
+  `maven-publish`
+  id("java")
+  id("idea")
+}
+
+repositories {
+  mavenCentral()
+}
+
+val scalaVersion: String = project.properties["scalaVersion"] as? String ?: 
extra["defaultScalaVersion"].toString()
+val sparkVersion: String = libs.versions.spark33.get()
+val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
+val icebergVersion: String = libs.versions.iceberg4connector.get()
+val paimonVersion: String = libs.versions.paimon.get()
+val kyuubiVersion: String = libs.versions.kyuubi4spark.get()
+val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get()
+val scalaCollectionCompatVersion: String = 
libs.versions.scala.collection.compat.get()
+
+if (hasProperty("excludePackagesForSparkConnector")) {
+  @Suppress("UNCHECKED_CAST")
+  val configureFunc = properties["excludePackagesForSparkConnector"] as? 
(Project) -> Unit
+  configureFunc?.invoke(project)
+}
+
+dependencies {
+
+  compileOnly("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion")
+  compileOnly("org.apache.spark:spark-core_$scalaVersion:$sparkVersion")
+  compileOnly("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion")
+  
compileOnly("org.scala-lang.modules:scala-java8-compat_$scalaVersion:$scalaJava8CompatVersion")
+  testRuntimeOnly(libs.junit.jupiter.engine)
+}
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/functions/MyStringLengthFunction.java
 
b/spark-connector/spark-udf/src/main/java/com/mycompany/functions/MyStringLengthFunction.java
similarity index 97%
rename from 
spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/functions/MyStringLengthFunction.java
rename to 
spark-connector/spark-udf/src/main/java/com/mycompany/functions/MyStringLengthFunction.java
index 173e720cc7..2bc68930dd 100644
--- 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/functions/MyStringLengthFunction.java
+++ 
b/spark-connector/spark-udf/src/main/java/com/mycompany/functions/MyStringLengthFunction.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.spark.connector.functions;
+package com.mycompany.functions;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.functions.BoundFunction;

Reply via email to