This is an automated email from the ASF dual-hosted git repository. mchades pushed a commit to branch udf-poc in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit dd80a177943188b8d5c822c1d2cd93563a7abb66 Author: mchades <[email protected]> AuthorDate: Wed Dec 17 20:23:22 2025 +0800 Spark connector supports Gravitino UDF --- settings.gradle.kts | 1 + .../spark/connector/catalog/BaseCatalog.java | 98 ++++++++++++++++-- .../functions/BuiltinFunctionSupport.java | 53 ---------- .../connector/iceberg/GravitinoIcebergCatalog.java | 12 +-- .../functions/TestBuiltinFunctionSupport.java | 112 --------------------- spark-connector/spark-udf/build.gradle.kts | 51 ++++++++++ .../functions/MyStringLengthFunction.java | 2 +- 7 files changed, 146 insertions(+), 183 deletions(-) diff --git a/settings.gradle.kts b/settings.gradle.kts index cde9547aad..db41ec8f62 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -64,6 +64,7 @@ include("lance:lance-rest-server") include("authorizations:authorization-ranger", "authorizations:authorization-common", "authorizations:authorization-chain") include("trino-connector:trino-connector", "trino-connector:integration-test") include("spark-connector:spark-common") +include(":spark-connector:spark-udf") if (scalaVersion == "2.12") { // flink only support scala 2.12 include("flink-connector:flink") diff --git a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java index 13c90be32b..9f94d40631 100644 --- a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java +++ b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.stream.IntStream; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.Catalog; import org.apache.gravitino.NameIdentifier; @@ -34,13 +35,15 @@ import org.apache.gravitino.exceptions.ForbiddenException; import org.apache.gravitino.exceptions.NoSuchSchemaException; import org.apache.gravitino.exceptions.NonEmptySchemaException; import org.apache.gravitino.exceptions.SchemaAlreadyExistsException; +import org.apache.gravitino.function.Function; +import org.apache.gravitino.function.FunctionImpl; +import org.apache.gravitino.function.JavaImpl; import org.apache.gravitino.spark.connector.ConnectorConstants; import org.apache.gravitino.spark.connector.PropertiesConverter; import org.apache.gravitino.spark.connector.SparkTableChangeConverter; import org.apache.gravitino.spark.connector.SparkTransformConverter; import org.apache.gravitino.spark.connector.SparkTransformConverter.DistributionAndSortOrdersInfo; import org.apache.gravitino.spark.connector.SparkTypeConverter; -import org.apache.gravitino.spark.connector.functions.BuiltinFunctionSupport; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; @@ -85,7 +88,6 @@ public abstract class BaseCatalog implements TableCatalog, SupportsNamespaces, F protected Catalog gravitinoCatalogClient; private SparkTypeConverter sparkTypeConverter; private SparkTableChangeConverter sparkTableChangeConverter; - private final BuiltinFunctionSupport builtinFunctionSupport = new BuiltinFunctionSupport(); private String catalogName; private final GravitinoCatalogManager gravitinoCatalogManager; @@ -444,10 +446,37 @@ public abstract class BaseCatalog implements TableCatalog, SupportsNamespaces, F @Override public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException { - String[] targetNamespace = - namespace.length == 0 ? new String[] {getCatalogDefaultNamespace()} : namespace; - validateNamespace(targetNamespace); - return builtinFunctionSupport.listFunctions(targetNamespace, getCatalogDefaultNamespace()); + String gravitinoNamespace; + if (namespace.length == 0) { + gravitinoNamespace = getCatalogDefaultNamespace(); + } else { + validateNamespace(namespace); + gravitinoNamespace = namespace[0]; + } + try { + org.apache.gravitino.function.FunctionCatalog functionCatalog = + gravitinoCatalogClient.asFunctionCatalog(); + NameIdentifier[] identifiers = + functionCatalog.listFunctions(Namespace.of(gravitinoNamespace)); + org.apache.gravitino.function.Function[] functions = + functionCatalog.listFunctionInfos(Namespace.of(gravitinoNamespace)); + + return IntStream.range(0, identifiers.length) + .filter( + idx -> + functions != null + && functions.length > idx + && functions[idx] != null + && Arrays.stream(functions[idx].impls()) + .anyMatch(this::isSparkImplementation)) + .mapToObj( + idx -> + Identifier.of( + new String[] {getDatabase(identifiers[idx])}, identifiers[idx].name())) + .toArray(Identifier[]::new); + } catch (NoSuchSchemaException e) { + throw new NoSuchNamespaceException(namespace); + } } @Override @@ -457,11 +486,31 @@ public abstract class BaseCatalog implements TableCatalog, SupportsNamespaces, F namespace.length == 0 ? new String[] {getCatalogDefaultNamespace()} : namespace; validateNamespace(targetNamespace); Identifier targetIdent = Identifier.of(targetNamespace, ident.name()); - return builtinFunctionSupport.loadFunction(targetIdent, getCatalogDefaultNamespace()); - } - - protected BuiltinFunctionSupport getBuiltinFunctionSupport() { - return builtinFunctionSupport; + NameIdentifier gravitinoIdentifier = + NameIdentifier.of(targetIdent.namespace()[0], targetIdent.name()); + try { + Function[] functions = + gravitinoCatalogClient.asFunctionCatalog().getFunction(gravitinoIdentifier); + for (Function function : functions) { + for (FunctionImpl impl : function.impls()) { + if (!isSparkImplementation(impl)) { + continue; + } + String className = extractClassName(impl); + if (StringUtils.isBlank(className)) { + continue; + } + try { + return instantiateFunction(className, targetIdent); + } catch (NoSuchFunctionException e) { + // try the next implementation, such as SQL implementation + } + } + } + } catch (org.apache.gravitino.exceptions.NoSuchFunctionException e) { + // fall through + } + throw new NoSuchFunctionException(targetIdent); } private void validateNamespace(String[] namespace) { @@ -497,6 +546,33 @@ public abstract class BaseCatalog implements TableCatalog, SupportsNamespaces, F return gravitinoIdentifier.namespace().level(0); } + private boolean isSparkImplementation(FunctionImpl impl) { + return FunctionImpl.RuntimeType.SPARK.equals(impl.runtime()); + } + + private String extractClassName(FunctionImpl impl) { + if (impl instanceof JavaImpl) { + return ((JavaImpl) impl).className(); + } + throw new IllegalArgumentException( + String.format("Unsupported function implementation %s", impl.getClass().getName())); + } + + private UnboundFunction instantiateFunction(String className, Identifier ident) + throws NoSuchFunctionException { + try { + Class<?> functionClass = Class.forName(className); + Object instance = functionClass.getDeclaredConstructor().newInstance(); + if (instance instanceof UnboundFunction) { + return (UnboundFunction) instance; + } + } catch (ReflectiveOperationException e) { + throw new RuntimeException( + String.format("Failed to instantiate function class: %s", className), e); + } + throw new NoSuchFunctionException(ident); + } + private Table loadSparkTable(Identifier ident) { try { return sparkCatalog.loadTable(ident); diff --git a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/functions/BuiltinFunctionSupport.java b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/functions/BuiltinFunctionSupport.java deleted file mode 100644 index de440e4536..0000000000 --- a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/functions/BuiltinFunctionSupport.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.gravitino.spark.connector.functions; - -import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; -import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; -import org.apache.spark.sql.connector.catalog.Identifier; -import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; - -public class BuiltinFunctionSupport { - - private static final UnboundFunction MY_STR_LEN_FUNCTION = new MyStringLengthFunction(); - private static final Identifier[] BUILTIN_IDENTIFIERS = - new Identifier[] {Identifier.of(new String[0], MyStringLengthFunction.NAME)}; - - public Identifier[] listFunctions(String[] namespace, String defaultNamespace) - throws NoSuchNamespaceException { - if (isSupportedNamespace(namespace, defaultNamespace)) { - return BUILTIN_IDENTIFIERS; - } - throw new NoSuchNamespaceException(namespace); - } - - public UnboundFunction loadFunction(Identifier identifier, String defaultNamespace) - throws NoSuchFunctionException { - if (isSupportedNamespace(identifier.namespace(), defaultNamespace) - && MyStringLengthFunction.NAME.equalsIgnoreCase(identifier.name())) { - return MY_STR_LEN_FUNCTION; - } - throw new NoSuchFunctionException(identifier); - } - - private boolean isSupportedNamespace(String[] namespace, String defaultNamespace) { - return namespace.length == 0 - || (namespace.length == 1 && namespace[0].equalsIgnoreCase(defaultNamespace)); - } -} diff --git a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java index 5ecdb5c2b8..775113fce0 100644 --- a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java +++ b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java @@ -108,19 +108,19 @@ public class GravitinoIcebergCatalog extends BaseCatalog @Override public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException { - Identifier[] builtin = - getBuiltinFunctionSupport().listFunctions(namespace, getCatalogDefaultNamespace()); + Identifier[] gravitinoFunctions = super.listFunctions(namespace); Identifier[] icebergFunctions = ((SparkCatalog) sparkCatalog).listFunctions(namespace); - Identifier[] combined = new Identifier[builtin.length + icebergFunctions.length]; - System.arraycopy(builtin, 0, combined, 0, builtin.length); - System.arraycopy(icebergFunctions, 0, combined, builtin.length, icebergFunctions.length); + Identifier[] combined = new Identifier[gravitinoFunctions.length + icebergFunctions.length]; + System.arraycopy(gravitinoFunctions, 0, combined, 0, gravitinoFunctions.length); + System.arraycopy( + icebergFunctions, 0, combined, gravitinoFunctions.length, icebergFunctions.length); return combined; } @Override public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException { try { - return getBuiltinFunctionSupport().loadFunction(ident, getCatalogDefaultNamespace()); + return super.loadFunction(ident); } catch (NoSuchFunctionException e) { return ((SparkCatalog) sparkCatalog).loadFunction(ident); } diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/functions/TestBuiltinFunctionSupport.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/functions/TestBuiltinFunctionSupport.java deleted file mode 100644 index 782b9d7ad4..0000000000 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/functions/TestBuiltinFunctionSupport.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.gravitino.spark.connector.functions; - -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; -import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.sql.connector.catalog.Identifier; -import org.apache.spark.sql.connector.catalog.functions.BoundFunction; -import org.apache.spark.sql.connector.catalog.functions.ScalarFunction; -import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; -import org.apache.spark.unsafe.types.UTF8String; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -class TestBuiltinFunctionSupport { - - private final BuiltinFunctionSupport builtinFunctionSupport = new BuiltinFunctionSupport(); - private static final String DEFAULT_NAMESPACE = "default"; - - @Test - void testListFunctionsInDefaultNamespace() throws NoSuchNamespaceException { - Identifier[] functions = builtinFunctionSupport.listFunctions(new String[0], DEFAULT_NAMESPACE); - Assertions.assertEquals(1, functions.length); - Assertions.assertEquals(MyStringLengthFunction.NAME, functions[0].name()); - - Identifier[] functionsInDefault = - builtinFunctionSupport.listFunctions(new String[] {DEFAULT_NAMESPACE}, DEFAULT_NAMESPACE); - Assertions.assertEquals(1, functionsInDefault.length); - Assertions.assertEquals(MyStringLengthFunction.NAME, functionsInDefault[0].name()); - } - - @Test - void testListFunctionsInUnsupportedNamespace() { - Assertions.assertThrows( - NoSuchNamespaceException.class, - () -> builtinFunctionSupport.listFunctions(new String[] {"other"}, DEFAULT_NAMESPACE)); - } - - @Test - void testLoadFunctionAndExecute() throws NoSuchFunctionException { - UnboundFunction unboundFunction = - builtinFunctionSupport.loadFunction( - Identifier.of(new String[0], MyStringLengthFunction.NAME), DEFAULT_NAMESPACE); - Assertions.assertInstanceOf(MyStringLengthFunction.class, unboundFunction); - - StructType inputSchema = - new StructType( - new StructField[] { - new StructField("col", DataTypes.StringType, true, Metadata.empty()) - }); - BoundFunction boundFunction = unboundFunction.bind(inputSchema); - ScalarFunction<?> scalarFunction = (ScalarFunction<?>) boundFunction; - - InternalRow row = new GenericInternalRow(new Object[] {UTF8String.fromString("abcde")}); - Assertions.assertEquals(5, scalarFunction.produceResult(row)); - - InternalRow nullRow = new GenericInternalRow(1); - nullRow.setNullAt(0); - Assertions.assertNull(scalarFunction.produceResult(nullRow)); - } - - @Test - void testLoadFunctionWithWrongNamespaceOrName() { - Assertions.assertThrows( - NoSuchFunctionException.class, - () -> - builtinFunctionSupport.loadFunction( - Identifier.of(new String[] {"other"}, MyStringLengthFunction.NAME), - DEFAULT_NAMESPACE)); - Assertions.assertThrows( - NoSuchFunctionException.class, - () -> - builtinFunctionSupport.loadFunction( - Identifier.of(new String[0], "unknown_func"), DEFAULT_NAMESPACE)); - } - - @Test - void testBindValidatesInputType() throws NoSuchFunctionException { - UnboundFunction unboundFunction = - builtinFunctionSupport.loadFunction( - Identifier.of(new String[0], MyStringLengthFunction.NAME), DEFAULT_NAMESPACE); - StructType invalidSchema = - new StructType( - new StructField[] { - new StructField("col", DataTypes.IntegerType, true, Metadata.empty()) - }); - Assertions.assertThrows( - IllegalArgumentException.class, () -> unboundFunction.bind(invalidSchema)); - } -} diff --git a/spark-connector/spark-udf/build.gradle.kts b/spark-connector/spark-udf/build.gradle.kts new file mode 100644 index 0000000000..e3431627bc --- /dev/null +++ b/spark-connector/spark-udf/build.gradle.kts @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +plugins { + `maven-publish` + id("java") + id("idea") +} + +repositories { + mavenCentral() +} + +val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString() +val sparkVersion: String = libs.versions.spark33.get() +val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".") +val icebergVersion: String = libs.versions.iceberg4connector.get() +val paimonVersion: String = libs.versions.paimon.get() +val kyuubiVersion: String = libs.versions.kyuubi4spark.get() +val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get() +val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get() + +if (hasProperty("excludePackagesForSparkConnector")) { + @Suppress("UNCHECKED_CAST") + val configureFunc = properties["excludePackagesForSparkConnector"] as? (Project) -> Unit + configureFunc?.invoke(project) +} + +dependencies { + + compileOnly("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion") + compileOnly("org.apache.spark:spark-core_$scalaVersion:$sparkVersion") + compileOnly("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion") + compileOnly("org.scala-lang.modules:scala-java8-compat_$scalaVersion:$scalaJava8CompatVersion") + testRuntimeOnly(libs.junit.jupiter.engine) +} diff --git a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/functions/MyStringLengthFunction.java b/spark-connector/spark-udf/src/main/java/com/mycompany/functions/MyStringLengthFunction.java similarity index 97% rename from spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/functions/MyStringLengthFunction.java rename to spark-connector/spark-udf/src/main/java/com/mycompany/functions/MyStringLengthFunction.java index 173e720cc7..2bc68930dd 100644 --- a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/functions/MyStringLengthFunction.java +++ b/spark-connector/spark-udf/src/main/java/com/mycompany/functions/MyStringLengthFunction.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.gravitino.spark.connector.functions; +package com.mycompany.functions; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
