This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new cb63b3867 [GLUTEN-5074][VL] fix: UDF load error in yarn-cluster mode 
(#5075)
cb63b3867 is described below

commit cb63b38676eac91cdbd78d169702a99e76e4b652
Author: zhaokuo <[email protected]>
AuthorDate: Mon Mar 25 16:14:43 2024 +0800

    [GLUTEN-5074][VL] fix: UDF load error in yarn-cluster mode (#5075)
---
 .../java/io/glutenproject/udf/UdfJniWrapper.java   |  2 +-
 .../backendsapi/velox/SparkPlanExecApiImpl.scala   |  2 +-
 .../apache/spark/sql/expression/UDFResolver.scala  | 75 +++++++++++++---------
 .../glutenproject/expression/VeloxUdfSuite.scala   |  6 +-
 cpp/velox/jni/JniUdf.cc                            |  5 +-
 cpp/velox/jni/JniUdf.h                             |  2 +-
 cpp/velox/jni/VeloxJniWrapper.cc                   |  7 +-
 docs/get-started/Velox.md                          | 17 +++--
 8 files changed, 67 insertions(+), 49 deletions(-)

diff --git 
a/backends-velox/src/main/java/io/glutenproject/udf/UdfJniWrapper.java 
b/backends-velox/src/main/java/io/glutenproject/udf/UdfJniWrapper.java
index 50c778239..83cb9a16f 100644
--- a/backends-velox/src/main/java/io/glutenproject/udf/UdfJniWrapper.java
+++ b/backends-velox/src/main/java/io/glutenproject/udf/UdfJniWrapper.java
@@ -20,5 +20,5 @@ public class UdfJniWrapper {
 
   public UdfJniWrapper() {}
 
-  public native void nativeLoadUdfLibraries(String udfLibPaths);
+  public native void getFunctionSignatures();
 }
diff --git 
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala
 
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala
index e7d2b2225..61ea50695 100644
--- 
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala
+++ 
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala
@@ -649,7 +649,7 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi {
 
   override def genInjectedFunctions()
       : Seq[(FunctionIdentifier, ExpressionInfo, FunctionBuilder)] = {
-    UDFResolver.loadAndGetFunctionDescriptions
+    UDFResolver.getFunctionSignatures
   }
 
   override def rewriteSpillPath(path: String): String = {
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
index 05898b171..c6d8b5917 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
@@ -37,7 +37,8 @@ import org.apache.spark.util.Utils
 import com.google.common.collect.Lists
 
 import java.io.File
-import java.nio.file.{Files, Paths}
+import java.net.URI
+import java.nio.file.{Files, FileVisitOption, Paths}
 
 import scala.collection.JavaConverters.asScalaIteratorConverter
 import scala.collection.mutable
@@ -107,7 +108,7 @@ object UDFResolver extends Logging {
 
   def getFilesWithExtension(directory: java.nio.file.Path, extension: String): 
Seq[String] = {
     Files
-      .walk(directory)
+      .walk(directory, FileVisitOption.FOLLOW_LINKS)
       .iterator()
       .asScala
       .filter(p => Files.isRegularFile(p) && p.toString.endsWith(extension))
@@ -116,19 +117,19 @@ object UDFResolver extends Logging {
   }
 
   def resolveUdfConf(conf: java.util.Map[String, String]): Unit = {
-    if (isDriver) {
-      if (localLibraryPaths != null) {
-        conf.put(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS, localLibraryPaths)
-      }
+    val sparkConf = SparkEnv.get.conf
+    val udfLibPaths = if (isDriver) {
+      sparkConf
+        .getOption(BackendSettings.GLUTEN_VELOX_DRIVER_UDF_LIB_PATHS)
+        
.orElse(sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS))
     } else {
-      val sparkConf = SparkEnv.get.conf
-      Option(conf.get(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)) match {
-        case Some(libs) =>
-          conf.put(
-            BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS,
-            getAllLibraries(libs, sparkConf, canAccessSparkFiles = true))
-        case None =>
-      }
+      sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)
+    }
+
+    udfLibPaths match {
+      case Some(paths) =>
+        conf.put(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS, 
getAllLibraries(paths, sparkConf))
+      case None =>
     }
   }
 
@@ -150,24 +151,44 @@ object UDFResolver extends Logging {
     dest
   }
 
+  private def isRelativePath(path: String): Boolean = {
+    try {
+      val uri = new URI(path)
+      !uri.isAbsolute && uri.getPath == path
+    } catch {
+      case _: Exception => false
+    }
+  }
+
   // Get the full paths of all libraries.
   // If it's a directory, get all files ends with ".so" recursively.
-  def getAllLibraries(files: String, sparkConf: SparkConf, 
canAccessSparkFiles: Boolean): String = {
+  def getAllLibraries(files: String, sparkConf: SparkConf): String = {
     val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)
+    val master = sparkConf.getOption("spark.master")
+    val isYarnCluster =
+      master.isDefined && master.get.equals("yarn") && 
!Utils.isClientMode(sparkConf)
+    val isYarnClient =
+      master.isDefined && master.get.equals("yarn") && 
Utils.isClientMode(sparkConf)
+
     files
       .split(",")
       .map {
         f =>
           val file = new File(f)
           // Relative paths should be uploaded via --files or --archives
-          // Use SparkFiles.get to download and unpack
-          if (!file.isAbsolute) {
-            if (!canAccessSparkFiles) {
+          if (isRelativePath(f)) {
+            logInfo(s"resolve relative path: $f")
+            if (isDriver && isYarnClient) {
               throw new IllegalArgumentException(
                 "On yarn-client mode, driver only accepts absolute paths, but 
got " + f)
             }
-            new File(SparkFiles.get(f))
+            if (isYarnCluster || isYarnClient) {
+              file
+            } else {
+              new File(SparkFiles.get(f))
+            }
           } else {
+            logInfo(s"resolve absolute URI path: $f")
             // Download or copy absolute paths to JniWorkspace.
             val uri = Utils.resolveURI(f)
             val name = file.getName
@@ -192,26 +213,16 @@ object UDFResolver extends Logging {
       .mkString(",")
   }
 
-  def loadAndGetFunctionDescriptions: Seq[(FunctionIdentifier, ExpressionInfo, 
FunctionBuilder)] = {
+  def getFunctionSignatures: Seq[(FunctionIdentifier, ExpressionInfo, 
FunctionBuilder)] = {
     val sparkContext = SparkContext.getActive.get
     val sparkConf = sparkContext.conf
-    val udfLibPaths = sparkConf
-      .getOption(BackendSettings.GLUTEN_VELOX_DRIVER_UDF_LIB_PATHS)
-      .orElse(sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS))
+    val udfLibPaths = 
sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)
 
     udfLibPaths match {
       case None =>
         Seq.empty
       case Some(paths) =>
-        val master = sparkConf.getOption("spark.master")
-        val isYarnClient =
-          master.isDefined && master.get.equals("yarn") && 
Utils.isClientMode(sparkConf)
-        // For Yarn-client mode, driver cannot get uploaded files via 
SparkFiles.get.
-        localLibraryPaths = getAllLibraries(paths, sparkConf, 
canAccessSparkFiles = !isYarnClient)
-
-        logInfo(s"Loading UDF libraries from paths: $localLibraryPaths")
-        new UdfJniWrapper().nativeLoadUdfLibraries(localLibraryPaths)
-
+        new UdfJniWrapper().getFunctionSignatures()
         UDFMap.map {
           case (name, t) =>
             (
diff --git 
a/backends-velox/src/test/scala/io/glutenproject/expression/VeloxUdfSuite.scala 
b/backends-velox/src/test/scala/io/glutenproject/expression/VeloxUdfSuite.scala
index 4fd05f687..effc790b6 100644
--- 
a/backends-velox/src/test/scala/io/glutenproject/expression/VeloxUdfSuite.scala
+++ 
b/backends-velox/src/test/scala/io/glutenproject/expression/VeloxUdfSuite.scala
@@ -47,6 +47,9 @@ abstract class VeloxUdfSuite extends GlutenQueryTest with 
SQLHelper {
             "/path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so")
     }
 
+  protected lazy val udfLibRelativePath: String =
+    udfLibPath.split(",").map(p => 
Paths.get(p).getFileName.toString).mkString(",")
+
   override protected def beforeAll(): Unit = {
     super.beforeAll()
     if (_spark == null) {
@@ -82,7 +85,8 @@ class VeloxUdfSuiteLocal extends VeloxUdfSuite {
   override val master: String = "local[2]"
   override protected def sparkConf: SparkConf = {
     super.sparkConf
-      .set("spark.gluten.sql.columnar.backend.velox.udfLibraryPaths", 
udfLibPath)
+      .set("spark.files", udfLibPath)
+      .set("spark.gluten.sql.columnar.backend.velox.udfLibraryPaths", 
udfLibRelativePath)
   }
 }
 
diff --git a/cpp/velox/jni/JniUdf.cc b/cpp/velox/jni/JniUdf.cc
index 28551cfe8..180788bee 100644
--- a/cpp/velox/jni/JniUdf.cc
+++ b/cpp/velox/jni/JniUdf.cc
@@ -47,17 +47,14 @@ void gluten::finalizeVeloxJniUDF(JNIEnv* env) {
   env->DeleteGlobalRef(udfResolverClass);
 }
 
-void gluten::jniLoadUdf(JNIEnv* env, const std::string& libPaths) {
+void gluten::jniGetFunctionSignatures(JNIEnv* env) {
   auto udfLoader = gluten::UdfLoader::getInstance();
-  udfLoader->loadUdfLibraries(libPaths);
-
   const auto& udfMap = udfLoader->getUdfMap();
   for (const auto& udf : udfMap) {
     auto udfString = udf.second;
     jbyteArray returnType = env->NewByteArray(udf.second.length());
     env->SetByteArrayRegion(returnType, 0, udfString.length(), 
reinterpret_cast<const jbyte*>(udfString.c_str()));
     jstring name = env->NewStringUTF(udf.first.c_str());
-
     jobject instance = env->GetStaticObjectField(
         udfResolverClass, env->GetStaticFieldID(udfResolverClass, "MODULE$", 
kUdfResolverClassPath.c_str()));
     env->CallVoidMethod(instance, registerUDFMethod, name, returnType);
diff --git a/cpp/velox/jni/JniUdf.h b/cpp/velox/jni/JniUdf.h
index b666d8eb0..b91ac08de 100644
--- a/cpp/velox/jni/JniUdf.h
+++ b/cpp/velox/jni/JniUdf.h
@@ -27,6 +27,6 @@ void initVeloxJniUDF(JNIEnv* env);
 
 void finalizeVeloxJniUDF(JNIEnv* env);
 
-void jniLoadUdf(JNIEnv* env, const std::string& libPaths);
+void jniGetFunctionSignatures(JNIEnv* env);
 
 } // namespace gluten
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index cb1604629..e73fe7eb0 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -76,12 +76,11 @@ JNIEXPORT void JNICALL 
Java_io_glutenproject_init_NativeBackendInitializer_initi
   JNI_METHOD_END()
 }
 
-JNIEXPORT void JNICALL 
Java_io_glutenproject_udf_UdfJniWrapper_nativeLoadUdfLibraries( // NOLINT
+JNIEXPORT void JNICALL 
Java_io_glutenproject_udf_UdfJniWrapper_getFunctionSignatures( // NOLINT
     JNIEnv* env,
-    jclass,
-    jstring libPaths) {
+    jclass) {
   JNI_METHOD_START
-  gluten::jniLoadUdf(env, jStringToCString(env, libPaths));
+  gluten::jniGetFunctionSignatures(env);
   JNI_METHOD_END()
 }
 
diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md
index 0f7ca1964..019aa9606 100644
--- a/docs/get-started/Velox.md
+++ b/docs/get-started/Velox.md
@@ -434,15 +434,15 @@ Gluten loads the UDF libraries at runtime. You can upload 
UDF libraries via `--f
 
 Note if running on Yarn client mode, the uploaded files are not reachable on 
driver side. Users should copy those files to somewhere reachable for driver 
and set `spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths`. This 
configuration is also useful when the `udfLibraryPaths` is different between 
driver side and executor side.
 
-- Use `--files`
+- Use the `--files` option to upload a library and configure its relative path
 ```shell
 --files /path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
 --conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=libmyudf.so
 # Needed for Yarn client mode
---conf 
spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths=file:///path/to/libmyudf.so
+--conf 
spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths=file:///path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
 ```
 
-- Use `--archives`
+- Use the `--archives` option to upload an archive and configure its relative 
path
 ```shell
 --archives /path/to/udf_archives.zip#udf_archives
 --conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=udf_archives
@@ -450,7 +450,7 @@ Note if running on Yarn client mode, the uploaded files are 
not reachable on dri
 --conf 
spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths=file:///path/to/udf_archives.zip
 ```
 
-- Specify URI
+- Only configure URI
 
 You can also specify the local or HDFS URIs to the UDF libraries or archives. 
Local URIs should exist on driver and every worker nodes.
 ```shell
@@ -462,10 +462,17 @@ You can also specify the local or HDFS URIs to the UDF 
libraries or archives. Lo
 We provided an Velox UDF example file 
[MyUDF.cpp](../../cpp/velox/udf/examples/MyUDF.cpp). After building gluten cpp, 
you can find the example library at 
/path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
 
 Start spark-shell or spark-sql with below configuration 
-```
+```shell
+# Use the `--files` option to upload a library and configure its relative path
 --files /path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
 --conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=libmyudf.so
 ```
+or
+```shell
+# Only configure URI
+--conf 
spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=file:///path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
+```
+
 Run query. The functions `myudf1` and `myudf2` increment the input value by a 
constant of 5
 ```
 select myudf1(1), myudf2(100L)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to