This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 00730ef8f [VL] Fix async io coredump (#5657)
00730ef8f is described below
commit 00730ef8fec72111a258cf54d985203cd2820f48
Author: Rong Ma <[email protected]>
AuthorDate: Sat May 11 17:28:04 2024 +0800
[VL] Fix async io coredump (#5657)
---
.../backendsapi/velox/VeloxListenerApi.scala | 23 +++---------------
cpp/velox/benchmarks/GenericBenchmark.cc | 3 +++
cpp/velox/compute/VeloxBackend.h | 7 ++++++
cpp/velox/jni/VeloxJniWrapper.cc | 8 +++++++
.../gluten/init/NativeBackendInitializer.java | 28 +++++++++++++++++++++-
5 files changed, 48 insertions(+), 21 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
index 1eaf92b5a..bbeb3a271 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
@@ -28,11 +28,9 @@ import org.apache.gluten.vectorized.{JniLibLoader,
JniWorkspace}
import org.apache.spark.SparkConf
import
org.apache.spark.sql.execution.datasources.velox.{VeloxOrcWriterInjects,
VeloxParquetWriterInjects, VeloxRowSplitter}
import org.apache.spark.sql.expression.UDFResolver
-import org.apache.spark.sql.internal.GlutenConfigUtil
-import org.apache.spark.sql.internal.StaticSQLConf
+import org.apache.spark.sql.internal.{GlutenConfigUtil, StaticSQLConf}
import org.apache.spark.util.SparkDirectoryUtil
-import VeloxListenerApi.initializeNative
import org.apache.commons.lang3.StringUtils
import scala.sys.process._
@@ -191,7 +189,7 @@ class VeloxListenerApi extends ListenerApi {
}
val parsed = GlutenConfigUtil.parseConfig(conf.getAll.toMap)
- initializeNative(parsed)
+ NativeBackendInitializer.initializeBackend(parsed)
// inject backend-specific implementations to override spark classes
// FIXME: The following set instances twice in local mode?
@@ -205,19 +203,4 @@ class VeloxListenerApi extends ListenerApi {
}
}
-object VeloxListenerApi {
- // Spark DriverPlugin/ExecutorPlugin will only invoke
ContextInitializer#initialize method once
- // in its init method.
- // In cluster mode, ContextInitializer#initialize only will be invoked in
different JVM.
- // In local mode, ContextInitializer#initialize will be invoked twice in
same thread,
- // driver first then executor, initFlag ensure only invoke initializeBackend
once,
- // so there are no race condition here.
- private var initFlag: Boolean = false
- def initializeNative(conf: Map[String, String]): Unit = {
- if (initFlag) {
- return
- }
- NativeBackendInitializer.initializeBackend(conf)
- initFlag = true
- }
-}
+object VeloxListenerApi {}
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc
b/cpp/velox/benchmarks/GenericBenchmark.cc
index ef88da2c3..14593c8df 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -26,6 +26,7 @@
#include "benchmarks/common/BenchmarkUtils.h"
#include "benchmarks/common/FileReaderIterator.h"
+#include "compute/VeloxBackend.h"
#include "compute/VeloxPlanConverter.h"
#include "compute/VeloxRuntime.h"
#include "config/GlutenConfig.h"
@@ -260,6 +261,8 @@ auto BM_Generic = [](::benchmark::State& state,
writerMetrics.splitTime, benchmark::Counter::kAvgIterations,
benchmark::Counter::OneK::kIs1000);
state.counters["shuffle_compress_time"] = benchmark::Counter(
writerMetrics.compressTime, benchmark::Counter::kAvgIterations,
benchmark::Counter::OneK::kIs1000);
+
+ gluten::VeloxBackend::get()->tearDown();
};
int main(int argc, char** argv) {
diff --git a/cpp/velox/compute/VeloxBackend.h b/cpp/velox/compute/VeloxBackend.h
index 288bb5294..a601d715c 100644
--- a/cpp/velox/compute/VeloxBackend.h
+++ b/cpp/velox/compute/VeloxBackend.h
@@ -55,6 +55,13 @@ class VeloxBackend {
const std::unordered_map<std::string, std::string>& getBackendConf() const;
+ void tearDown() {
+ // Destruct IOThreadPoolExecutor will join all threads.
+ // On threads exit, thread local variables can be constructed with
referencing global variables.
+ // So, we need to destruct IOThreadPoolExecutor and stop the threads
before global variables get destructed.
+ ioExecutor_.reset();
+ }
+
private:
explicit VeloxBackend(const std::unordered_map<std::string, std::string>&
conf) {
init(conf);
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 188d62ac5..7884280c3 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -77,6 +77,14 @@ JNIEXPORT void JNICALL
Java_org_apache_gluten_init_NativeBackendInitializer_init
JNI_METHOD_END()
}
+JNIEXPORT void JNICALL
Java_org_apache_gluten_init_NativeBackendInitializer_shutdown( // NOLINT
+ JNIEnv* env,
+ jclass) {
+ JNI_METHOD_START
+ gluten::VeloxBackend::get()->tearDown();
+ JNI_METHOD_END()
+}
+
JNIEXPORT void JNICALL
Java_org_apache_gluten_udf_UdfJniWrapper_getFunctionSignatures( // NOLINT
JNIEnv* env,
jclass) {
diff --git
a/gluten-data/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java
b/gluten-data/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java
index a97af505b..4863f481f 100644
---
a/gluten-data/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java
+++
b/gluten-data/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java
@@ -19,17 +19,41 @@ package org.apache.gluten.init;
import org.apache.gluten.GlutenConfig;
import org.apache.gluten.backendsapi.BackendsApiManager;
+import org.apache.spark.util.GlutenShutdownManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import scala.runtime.BoxedUnit;
// Initialize native backend before calling any native methods from Java side.
public final class NativeBackendInitializer {
-
private static final Logger LOG =
LoggerFactory.getLogger(NativeBackendInitializer.class);
+ private static final AtomicBoolean initialized = new AtomicBoolean(false);
+ // Spark DriverPlugin/ExecutorPlugin will only invoke
NativeBackendInitializer#initializeBackend
+ // method once in its init method.
+ // In cluster mode, NativeBackendInitializer#initializeBackend only will be
invoked in different
+ // JVM.
+ // In local mode, NativeBackendInitializer#initializeBackend will be invoked
twice in same
+ // thread, driver first then executor, initialized flag ensure only invoke
initializeBackend once,
+ // so there are no race condition here.
public static void initializeBackend(scala.collection.Map<String, String>
conf) {
+ if (!initialized.compareAndSet(false, true)) {
+ // Already called.
+ return;
+ }
+ initialize0(conf);
+ GlutenShutdownManager.addHook(
+ () -> {
+ shutdown();
+ return BoxedUnit.UNIT;
+ });
+ }
+
+ private static void initialize0(scala.collection.Map<String, String> conf) {
try {
String prefix =
BackendsApiManager.getSettings().getBackendConfigPrefix();
Map<String, String> nativeConfMap =
GlutenConfig.getNativeBackendConf(prefix, conf);
@@ -43,5 +67,7 @@ public final class NativeBackendInitializer {
private static native void initialize(byte[] configPlan);
+ private static native void shutdown();
+
private NativeBackendInitializer() {}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]