This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 744d854c06 [GLUTEN-9149][CORE] Remove Spark-specific code from
JniLibLoader & JniWorkspace (#9150)
744d854c06 is described below
commit 744d854c060ef9f2356b7d7b38f7c6f16bf55043
Author: shuai.xu <[email protected]>
AuthorDate: Mon Apr 7 16:00:48 2025 +0800
[GLUTEN-9149][CORE] Remove Spark-specific code from JniLibLoader &
JniWorkspace (#9150)
[GLUTEN-9149] [core] make JniWorkspace not depend on spark
---
.../backendsapi/clickhouse/CHListenerApi.scala | 17 ++++++++++-
.../backendsapi/velox/VeloxListenerApi.scala | 35 ++++++++++++++++------
.../java/org/apache/gluten/jni/JniLibLoader.java | 11 -------
.../java/org/apache/gluten/jni/JniWorkspace.java | 26 +++++-----------
4 files changed, 50 insertions(+), 39 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
index 4b661b3c2d..120584ce2e 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
@@ -36,7 +36,7 @@ import org.apache.spark.rpc.{GlutenDriverEndpoint,
GlutenExecutorEndpoint}
import org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules
import org.apache.spark.sql.execution.datasources.v1._
import org.apache.spark.sql.utils.ExpressionUtil
-import org.apache.spark.util.SparkDirectoryUtil
+import org.apache.spark.util.{SparkDirectoryUtil, SparkShutdownManagerUtil}
import org.apache.commons.lang3.StringUtils
@@ -87,6 +87,7 @@ class CHListenerApi extends ListenerApi with Logging {
val executorLibPath =
conf.get(GlutenConfig.GLUTEN_EXECUTOR_LIB_PATH.key, libPath)
JniLibLoader.loadFromPath(executorLibPath, true)
}
+ CHListenerApi.addShutdownHook
// Add configs
import org.apache.gluten.backendsapi.clickhouse.CHConfig._
conf.setCHConfig(
@@ -133,3 +134,17 @@ class CHListenerApi extends ListenerApi with Logging {
CHNativeExpressionEvaluator.finalizeNative()
}
}
+
+object CHListenerApi {
+ var initialized = false
+
+ def addShutdownHook: Unit = {
+ if (!initialized) {
+ initialized = true
+ SparkShutdownManagerUtil.addHookForLibUnloading(
+ () => {
+ JniLibLoader.forceUnloadAll
+ })
+ }
+ }
+}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
index 2aa19afe7a..109f3015a6 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
@@ -41,10 +41,11 @@ import
org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules
import
org.apache.spark.sql.execution.datasources.velox.{VeloxParquetWriterInjects,
VeloxRowSplitter}
import org.apache.spark.sql.expression.UDFResolver
import org.apache.spark.sql.internal.{GlutenConfigUtil, StaticSQLConf}
-import org.apache.spark.util.{SparkDirectoryUtil, SparkResourceUtil}
+import org.apache.spark.util.{SparkDirectoryUtil, SparkResourceUtil,
SparkShutdownManagerUtil}
import org.apache.commons.lang3.StringUtils
+import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean
class VeloxListenerApi extends ListenerApi with Logging {
@@ -116,7 +117,6 @@ class VeloxListenerApi extends ListenerApi with Logging {
}
SparkDirectoryUtil.init(conf)
- UDFResolver.resolveUdfConf(conf, isDriver = true)
initialize(conf, isDriver = true)
UdfJniWrapper.registerFunctionSignatures()
}
@@ -143,13 +143,29 @@ class VeloxListenerApi extends ListenerApi with Logging {
}
SparkDirectoryUtil.init(conf)
- UDFResolver.resolveUdfConf(conf, isDriver = false)
initialize(conf, isDriver = false)
}
override def onExecutorShutdown(): Unit = shutdown()
private def initialize(conf: SparkConf, isDriver: Boolean): Unit = {
+ addShutdownHook
+ // Sets this configuration only once, since not undoable.
+ // DebugInstance should be created first.
+ if (conf.getBoolean(GlutenConfig.DEBUG_KEEP_JNI_WORKSPACE.key,
defaultValue = false)) {
+ val debugDir = conf.get(GlutenConfig.DEBUG_KEEP_JNI_WORKSPACE_DIR.key)
+ JniWorkspace.enableDebug(debugDir)
+ } else {
+ JniWorkspace.initializeDefault(
+ () =>
+ SparkDirectoryUtil.get
+ .namespace("jni")
+ .mkChildDirRandomly(UUID.randomUUID.toString)
+ .getAbsolutePath)
+ }
+
+ UDFResolver.resolveUdfConf(conf, isDriver)
+
// Do row / batch type initializations.
Convention.ensureSparkRowAndBatchTypesRegistered()
ArrowJavaBatch.ensureRegistered()
@@ -169,12 +185,6 @@ class VeloxListenerApi extends ListenerApi with Logging {
classOf[ColumnarShuffleManager].getName
)
- // Sets this configuration only once, since not undoable.
- if (conf.getBoolean(GlutenConfig.DEBUG_KEEP_JNI_WORKSPACE.key,
defaultValue = false)) {
- val debugDir = conf.get(GlutenConfig.DEBUG_KEEP_JNI_WORKSPACE_DIR.key)
- JniWorkspace.enableDebug(debugDir)
- }
-
// Set the system properties.
// Use appending policy for children with the same name in a arrow struct
vector.
System.setProperty("arrow.struct.conflict.policy", "CONFLICT_APPEND")
@@ -241,4 +251,11 @@ object VeloxListenerApi {
private def inLocalMode(conf: SparkConf): Boolean = {
SparkResourceUtil.isLocalMaster(conf)
}
+
+ private def addShutdownHook: Unit = {
+ SparkShutdownManagerUtil.addHookForLibUnloading(
+ () => {
+ JniLibLoader.forceUnloadAll
+ })
+ }
}
diff --git a/gluten-core/src/main/java/org/apache/gluten/jni/JniLibLoader.java
b/gluten-core/src/main/java/org/apache/gluten/jni/JniLibLoader.java
index 2d07c3d79b..72898823f5 100644
--- a/gluten-core/src/main/java/org/apache/gluten/jni/JniLibLoader.java
+++ b/gluten-core/src/main/java/org/apache/gluten/jni/JniLibLoader.java
@@ -18,7 +18,6 @@ package org.apache.gluten.jni;
import org.apache.gluten.exception.GlutenException;
-import org.apache.spark.util.SparkShutdownManagerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,22 +39,12 @@ import java.util.List;
import java.util.Set;
import java.util.Vector;
-import scala.runtime.BoxedUnit;
-
public class JniLibLoader {
private static final Logger LOG =
LoggerFactory.getLogger(JniLibLoader.class);
private static final Set<String> LOADED_LIBRARY_PATHS = new HashSet<>();
private static final Set<String> REQUIRE_UNLOAD_LIBRARY_PATHS = new
LinkedHashSet<>();
- static {
- SparkShutdownManagerUtil.addHookForLibUnloading(
- () -> {
- forceUnloadAll();
- return BoxedUnit.UNIT;
- });
- }
-
private final String workDir;
private final Set<String> loadedLibraries = new HashSet<>();
diff --git a/gluten-core/src/main/java/org/apache/gluten/jni/JniWorkspace.java
b/gluten-core/src/main/java/org/apache/gluten/jni/JniWorkspace.java
index a37aac03c5..50827c7a01 100644
--- a/gluten-core/src/main/java/org/apache/gluten/jni/JniWorkspace.java
+++ b/gluten-core/src/main/java/org/apache/gluten/jni/JniWorkspace.java
@@ -20,7 +20,6 @@ import org.apache.gluten.exception.GlutenException;
import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
-import org.apache.spark.util.SparkDirectoryUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,6 +31,7 @@ import java.nio.file.Paths;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
public class JniWorkspace {
private static final Logger LOG =
LoggerFactory.getLogger(JniWorkspace.class);
@@ -60,19 +60,6 @@ public class JniWorkspace {
}
}
- private static JniWorkspace createDefault() {
- try {
- final String tempRoot =
- SparkDirectoryUtil.get()
- .namespace("jni")
- .mkChildDirRandomly(UUID.randomUUID().toString())
- .getAbsolutePath();
- return createOrGet(tempRoot);
- } catch (Exception e) {
- throw new GlutenException(e);
- }
- }
-
public static void enableDebug(String debugDir) {
// Preserve the JNI libraries even after process exits.
// This is useful for debugging native code if the debug symbols were
embedded in
@@ -99,16 +86,19 @@ public class JniWorkspace {
}
}
- public static JniWorkspace getDefault() {
+ public static void initializeDefault(Supplier<String> rootDir) {
synchronized (DEFAULT_INSTANCE_INIT_LOCK) {
if (DEFAULT_INSTANCE == null) {
- DEFAULT_INSTANCE = createDefault();
+ DEFAULT_INSTANCE = createOrGet(rootDir.get());
}
- Preconditions.checkNotNull(DEFAULT_INSTANCE);
- return DEFAULT_INSTANCE;
}
}
+ public static JniWorkspace getDefault() {
+ Preconditions.checkNotNull(DEFAULT_INSTANCE, "Not call initializeDefault
yet");
+ return DEFAULT_INSTANCE;
+ }
+
private static JniWorkspace createOrGet(String rootDir) {
return INSTANCES.computeIfAbsent(rootDir, JniWorkspace::new);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]