This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new cb63b3867 [GLUTEN-5074][VL] fix: UDF load error in yarn-cluster mode
(#5075)
cb63b3867 is described below
commit cb63b38676eac91cdbd78d169702a99e76e4b652
Author: zhaokuo <[email protected]>
AuthorDate: Mon Mar 25 16:14:43 2024 +0800
[GLUTEN-5074][VL] fix: UDF load error in yarn-cluster mode (#5075)
---
.../java/io/glutenproject/udf/UdfJniWrapper.java | 2 +-
.../backendsapi/velox/SparkPlanExecApiImpl.scala | 2 +-
.../apache/spark/sql/expression/UDFResolver.scala | 75 +++++++++++++---------
.../glutenproject/expression/VeloxUdfSuite.scala | 6 +-
cpp/velox/jni/JniUdf.cc | 5 +-
cpp/velox/jni/JniUdf.h | 2 +-
cpp/velox/jni/VeloxJniWrapper.cc | 7 +-
docs/get-started/Velox.md | 17 +++--
8 files changed, 67 insertions(+), 49 deletions(-)
diff --git
a/backends-velox/src/main/java/io/glutenproject/udf/UdfJniWrapper.java
b/backends-velox/src/main/java/io/glutenproject/udf/UdfJniWrapper.java
index 50c778239..83cb9a16f 100644
--- a/backends-velox/src/main/java/io/glutenproject/udf/UdfJniWrapper.java
+++ b/backends-velox/src/main/java/io/glutenproject/udf/UdfJniWrapper.java
@@ -20,5 +20,5 @@ public class UdfJniWrapper {
public UdfJniWrapper() {}
- public native void nativeLoadUdfLibraries(String udfLibPaths);
+ public native void getFunctionSignatures();
}
diff --git
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala
index e7d2b2225..61ea50695 100644
---
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala
+++
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala
@@ -649,7 +649,7 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi {
override def genInjectedFunctions()
: Seq[(FunctionIdentifier, ExpressionInfo, FunctionBuilder)] = {
- UDFResolver.loadAndGetFunctionDescriptions
+ UDFResolver.getFunctionSignatures
}
override def rewriteSpillPath(path: String): String = {
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
index 05898b171..c6d8b5917 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
@@ -37,7 +37,8 @@ import org.apache.spark.util.Utils
import com.google.common.collect.Lists
import java.io.File
-import java.nio.file.{Files, Paths}
+import java.net.URI
+import java.nio.file.{Files, FileVisitOption, Paths}
import scala.collection.JavaConverters.asScalaIteratorConverter
import scala.collection.mutable
@@ -107,7 +108,7 @@ object UDFResolver extends Logging {
def getFilesWithExtension(directory: java.nio.file.Path, extension: String):
Seq[String] = {
Files
- .walk(directory)
+ .walk(directory, FileVisitOption.FOLLOW_LINKS)
.iterator()
.asScala
.filter(p => Files.isRegularFile(p) && p.toString.endsWith(extension))
@@ -116,19 +117,19 @@ object UDFResolver extends Logging {
}
def resolveUdfConf(conf: java.util.Map[String, String]): Unit = {
- if (isDriver) {
- if (localLibraryPaths != null) {
- conf.put(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS, localLibraryPaths)
- }
+ val sparkConf = SparkEnv.get.conf
+ val udfLibPaths = if (isDriver) {
+ sparkConf
+ .getOption(BackendSettings.GLUTEN_VELOX_DRIVER_UDF_LIB_PATHS)
+
.orElse(sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS))
} else {
- val sparkConf = SparkEnv.get.conf
- Option(conf.get(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)) match {
- case Some(libs) =>
- conf.put(
- BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS,
- getAllLibraries(libs, sparkConf, canAccessSparkFiles = true))
- case None =>
- }
+ sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)
+ }
+
+ udfLibPaths match {
+ case Some(paths) =>
+ conf.put(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS,
getAllLibraries(paths, sparkConf))
+ case None =>
}
}
@@ -150,24 +151,44 @@ object UDFResolver extends Logging {
dest
}
+ private def isRelativePath(path: String): Boolean = {
+ try {
+ val uri = new URI(path)
+ !uri.isAbsolute && uri.getPath == path
+ } catch {
+ case _: Exception => false
+ }
+ }
+
// Get the full paths of all libraries.
// If it's a directory, get all files ends with ".so" recursively.
- def getAllLibraries(files: String, sparkConf: SparkConf,
canAccessSparkFiles: Boolean): String = {
+ def getAllLibraries(files: String, sparkConf: SparkConf): String = {
val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)
+ val master = sparkConf.getOption("spark.master")
+ val isYarnCluster =
+ master.isDefined && master.get.equals("yarn") &&
!Utils.isClientMode(sparkConf)
+ val isYarnClient =
+ master.isDefined && master.get.equals("yarn") &&
Utils.isClientMode(sparkConf)
+
files
.split(",")
.map {
f =>
val file = new File(f)
// Relative paths should be uploaded via --files or --archives
- // Use SparkFiles.get to download and unpack
- if (!file.isAbsolute) {
- if (!canAccessSparkFiles) {
+ if (isRelativePath(f)) {
+ logInfo(s"resolve relative path: $f")
+ if (isDriver && isYarnClient) {
throw new IllegalArgumentException(
"On yarn-client mode, driver only accepts absolute paths, but
got " + f)
}
- new File(SparkFiles.get(f))
+ if (isYarnCluster || isYarnClient) {
+ file
+ } else {
+ new File(SparkFiles.get(f))
+ }
} else {
+ logInfo(s"resolve absolute URI path: $f")
// Download or copy absolute paths to JniWorkspace.
val uri = Utils.resolveURI(f)
val name = file.getName
@@ -192,26 +213,16 @@ object UDFResolver extends Logging {
.mkString(",")
}
- def loadAndGetFunctionDescriptions: Seq[(FunctionIdentifier, ExpressionInfo,
FunctionBuilder)] = {
+ def getFunctionSignatures: Seq[(FunctionIdentifier, ExpressionInfo,
FunctionBuilder)] = {
val sparkContext = SparkContext.getActive.get
val sparkConf = sparkContext.conf
- val udfLibPaths = sparkConf
- .getOption(BackendSettings.GLUTEN_VELOX_DRIVER_UDF_LIB_PATHS)
- .orElse(sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS))
+ val udfLibPaths =
sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)
udfLibPaths match {
case None =>
Seq.empty
case Some(paths) =>
- val master = sparkConf.getOption("spark.master")
- val isYarnClient =
- master.isDefined && master.get.equals("yarn") &&
Utils.isClientMode(sparkConf)
- // For Yarn-client mode, driver cannot get uploaded files via
SparkFiles.get.
- localLibraryPaths = getAllLibraries(paths, sparkConf,
canAccessSparkFiles = !isYarnClient)
-
- logInfo(s"Loading UDF libraries from paths: $localLibraryPaths")
- new UdfJniWrapper().nativeLoadUdfLibraries(localLibraryPaths)
-
+ new UdfJniWrapper().getFunctionSignatures()
UDFMap.map {
case (name, t) =>
(
diff --git
a/backends-velox/src/test/scala/io/glutenproject/expression/VeloxUdfSuite.scala
b/backends-velox/src/test/scala/io/glutenproject/expression/VeloxUdfSuite.scala
index 4fd05f687..effc790b6 100644
---
a/backends-velox/src/test/scala/io/glutenproject/expression/VeloxUdfSuite.scala
+++
b/backends-velox/src/test/scala/io/glutenproject/expression/VeloxUdfSuite.scala
@@ -47,6 +47,9 @@ abstract class VeloxUdfSuite extends GlutenQueryTest with
SQLHelper {
"/path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so")
}
+ protected lazy val udfLibRelativePath: String =
+ udfLibPath.split(",").map(p =>
Paths.get(p).getFileName.toString).mkString(",")
+
override protected def beforeAll(): Unit = {
super.beforeAll()
if (_spark == null) {
@@ -82,7 +85,8 @@ class VeloxUdfSuiteLocal extends VeloxUdfSuite {
override val master: String = "local[2]"
override protected def sparkConf: SparkConf = {
super.sparkConf
- .set("spark.gluten.sql.columnar.backend.velox.udfLibraryPaths",
udfLibPath)
+ .set("spark.files", udfLibPath)
+ .set("spark.gluten.sql.columnar.backend.velox.udfLibraryPaths",
udfLibRelativePath)
}
}
diff --git a/cpp/velox/jni/JniUdf.cc b/cpp/velox/jni/JniUdf.cc
index 28551cfe8..180788bee 100644
--- a/cpp/velox/jni/JniUdf.cc
+++ b/cpp/velox/jni/JniUdf.cc
@@ -47,17 +47,14 @@ void gluten::finalizeVeloxJniUDF(JNIEnv* env) {
env->DeleteGlobalRef(udfResolverClass);
}
-void gluten::jniLoadUdf(JNIEnv* env, const std::string& libPaths) {
+void gluten::jniGetFunctionSignatures(JNIEnv* env) {
auto udfLoader = gluten::UdfLoader::getInstance();
- udfLoader->loadUdfLibraries(libPaths);
-
const auto& udfMap = udfLoader->getUdfMap();
for (const auto& udf : udfMap) {
auto udfString = udf.second;
jbyteArray returnType = env->NewByteArray(udf.second.length());
env->SetByteArrayRegion(returnType, 0, udfString.length(),
reinterpret_cast<const jbyte*>(udfString.c_str()));
jstring name = env->NewStringUTF(udf.first.c_str());
-
jobject instance = env->GetStaticObjectField(
udfResolverClass, env->GetStaticFieldID(udfResolverClass, "MODULE$",
kUdfResolverClassPath.c_str()));
env->CallVoidMethod(instance, registerUDFMethod, name, returnType);
diff --git a/cpp/velox/jni/JniUdf.h b/cpp/velox/jni/JniUdf.h
index b666d8eb0..b91ac08de 100644
--- a/cpp/velox/jni/JniUdf.h
+++ b/cpp/velox/jni/JniUdf.h
@@ -27,6 +27,6 @@ void initVeloxJniUDF(JNIEnv* env);
void finalizeVeloxJniUDF(JNIEnv* env);
-void jniLoadUdf(JNIEnv* env, const std::string& libPaths);
+void jniGetFunctionSignatures(JNIEnv* env);
} // namespace gluten
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index cb1604629..e73fe7eb0 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -76,12 +76,11 @@ JNIEXPORT void JNICALL
Java_io_glutenproject_init_NativeBackendInitializer_initi
JNI_METHOD_END()
}
-JNIEXPORT void JNICALL
Java_io_glutenproject_udf_UdfJniWrapper_nativeLoadUdfLibraries( // NOLINT
+JNIEXPORT void JNICALL
Java_io_glutenproject_udf_UdfJniWrapper_getFunctionSignatures( // NOLINT
JNIEnv* env,
- jclass,
- jstring libPaths) {
+ jclass) {
JNI_METHOD_START
- gluten::jniLoadUdf(env, jStringToCString(env, libPaths));
+ gluten::jniGetFunctionSignatures(env);
JNI_METHOD_END()
}
diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md
index 0f7ca1964..019aa9606 100644
--- a/docs/get-started/Velox.md
+++ b/docs/get-started/Velox.md
@@ -434,15 +434,15 @@ Gluten loads the UDF libraries at runtime. You can upload
UDF libraries via `--f
Note if running on Yarn client mode, the uploaded files are not reachable on
driver side. Users should copy those files to somewhere reachable for driver
and set `spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths`. This
configuration is also useful when the `udfLibraryPaths` is different between
driver side and executor side.
-- Use `--files`
+- Use the `--files` option to upload a library and configure its relative path
```shell
--files /path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
--conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=libmyudf.so
# Needed for Yarn client mode
---conf
spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths=file:///path/to/libmyudf.so
+--conf
spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths=file:///path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
```
-- Use `--archives`
+- Use the `--archives` option to upload an archive and configure its relative
path
```shell
--archives /path/to/udf_archives.zip#udf_archives
--conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=udf_archives
@@ -450,7 +450,7 @@ Note if running on Yarn client mode, the uploaded files are
not reachable on dri
--conf
spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths=file:///path/to/udf_archives.zip
```
-- Specify URI
+- Only configure URI
You can also specify the local or HDFS URIs to the UDF libraries or archives.
Local URIs should exist on driver and every worker nodes.
```shell
@@ -462,10 +462,17 @@ You can also specify the local or HDFS URIs to the UDF
libraries or archives. Lo
We provided an Velox UDF example file
[MyUDF.cpp](../../cpp/velox/udf/examples/MyUDF.cpp). After building gluten cpp,
you can find the example library at
/path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
Start spark-shell or spark-sql with below configuration
-```
+```shell
+# Use the `--files` option to upload a library and configure its relative path
--files /path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
--conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=libmyudf.so
```
+or
+```shell
+# Only configure URI
+--conf
spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=file:///path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
+```
+
Run query. The functions `myudf1` and `myudf2` increment the input value by a
constant of 5
```
select myudf1(1), myudf2(100L)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]