This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 00730ef8f [VL] Fix async io coredump (#5657)
00730ef8f is described below

commit 00730ef8fec72111a258cf54d985203cd2820f48
Author: Rong Ma <[email protected]>
AuthorDate: Sat May 11 17:28:04 2024 +0800

    [VL] Fix async io coredump (#5657)
---
 .../backendsapi/velox/VeloxListenerApi.scala       | 23 +++---------------
 cpp/velox/benchmarks/GenericBenchmark.cc           |  3 +++
 cpp/velox/compute/VeloxBackend.h                   |  7 ++++++
 cpp/velox/jni/VeloxJniWrapper.cc                   |  8 +++++++
 .../gluten/init/NativeBackendInitializer.java      | 28 +++++++++++++++++++++-
 5 files changed, 48 insertions(+), 21 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
index 1eaf92b5a..bbeb3a271 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
@@ -28,11 +28,9 @@ import org.apache.gluten.vectorized.{JniLibLoader, 
JniWorkspace}
 import org.apache.spark.SparkConf
 import 
org.apache.spark.sql.execution.datasources.velox.{VeloxOrcWriterInjects, 
VeloxParquetWriterInjects, VeloxRowSplitter}
 import org.apache.spark.sql.expression.UDFResolver
-import org.apache.spark.sql.internal.GlutenConfigUtil
-import org.apache.spark.sql.internal.StaticSQLConf
+import org.apache.spark.sql.internal.{GlutenConfigUtil, StaticSQLConf}
 import org.apache.spark.util.SparkDirectoryUtil
 
-import VeloxListenerApi.initializeNative
 import org.apache.commons.lang3.StringUtils
 
 import scala.sys.process._
@@ -191,7 +189,7 @@ class VeloxListenerApi extends ListenerApi {
     }
 
     val parsed = GlutenConfigUtil.parseConfig(conf.getAll.toMap)
-    initializeNative(parsed)
+    NativeBackendInitializer.initializeBackend(parsed)
 
     // inject backend-specific implementations to override spark classes
     // FIXME: The following set instances twice in local mode?
@@ -205,19 +203,4 @@ class VeloxListenerApi extends ListenerApi {
   }
 }
 
-object VeloxListenerApi {
-  // Spark DriverPlugin/ExecutorPlugin will only invoke 
ContextInitializer#initialize method once
-  // in its init method.
-  // In cluster mode, ContextInitializer#initialize only will be invoked in 
different JVM.
-  // In local mode, ContextInitializer#initialize will be invoked twice in 
same thread,
-  // driver first then executor, initFlag ensure only invoke initializeBackend 
once,
-  // so there are no race condition here.
-  private var initFlag: Boolean = false
-  def initializeNative(conf: Map[String, String]): Unit = {
-    if (initFlag) {
-      return
-    }
-    NativeBackendInitializer.initializeBackend(conf)
-    initFlag = true
-  }
-}
+object VeloxListenerApi {}
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc 
b/cpp/velox/benchmarks/GenericBenchmark.cc
index ef88da2c3..14593c8df 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -26,6 +26,7 @@
 
 #include "benchmarks/common/BenchmarkUtils.h"
 #include "benchmarks/common/FileReaderIterator.h"
+#include "compute/VeloxBackend.h"
 #include "compute/VeloxPlanConverter.h"
 #include "compute/VeloxRuntime.h"
 #include "config/GlutenConfig.h"
@@ -260,6 +261,8 @@ auto BM_Generic = [](::benchmark::State& state,
       writerMetrics.splitTime, benchmark::Counter::kAvgIterations, 
benchmark::Counter::OneK::kIs1000);
   state.counters["shuffle_compress_time"] = benchmark::Counter(
       writerMetrics.compressTime, benchmark::Counter::kAvgIterations, 
benchmark::Counter::OneK::kIs1000);
+
+  gluten::VeloxBackend::get()->tearDown();
 };
 
 int main(int argc, char** argv) {
diff --git a/cpp/velox/compute/VeloxBackend.h b/cpp/velox/compute/VeloxBackend.h
index 288bb5294..a601d715c 100644
--- a/cpp/velox/compute/VeloxBackend.h
+++ b/cpp/velox/compute/VeloxBackend.h
@@ -55,6 +55,13 @@ class VeloxBackend {
 
   const std::unordered_map<std::string, std::string>& getBackendConf() const;
 
+  void tearDown() {
+    // Destruct IOThreadPoolExecutor will join all threads.
+    // On threads exit, thread local variables can be constructed with 
referencing global variables.
+    // So, we need to destruct IOThreadPoolExecutor and stop the threads 
before global variables get destructed.
+    ioExecutor_.reset();
+  }
+
  private:
   explicit VeloxBackend(const std::unordered_map<std::string, std::string>& 
conf) {
     init(conf);
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 188d62ac5..7884280c3 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -77,6 +77,14 @@ JNIEXPORT void JNICALL 
Java_org_apache_gluten_init_NativeBackendInitializer_init
   JNI_METHOD_END()
 }
 
+JNIEXPORT void JNICALL 
Java_org_apache_gluten_init_NativeBackendInitializer_shutdown( // NOLINT
+    JNIEnv* env,
+    jclass) {
+  JNI_METHOD_START
+  gluten::VeloxBackend::get()->tearDown();
+  JNI_METHOD_END()
+}
+
 JNIEXPORT void JNICALL 
Java_org_apache_gluten_udf_UdfJniWrapper_getFunctionSignatures( // NOLINT
     JNIEnv* env,
     jclass) {
diff --git 
a/gluten-data/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java
 
b/gluten-data/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java
index a97af505b..4863f481f 100644
--- 
a/gluten-data/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java
+++ 
b/gluten-data/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java
@@ -19,17 +19,41 @@ package org.apache.gluten.init;
 import org.apache.gluten.GlutenConfig;
 import org.apache.gluten.backendsapi.BackendsApiManager;
 
+import org.apache.spark.util.GlutenShutdownManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import scala.runtime.BoxedUnit;
 
 // Initialize native backend before calling any native methods from Java side.
 public final class NativeBackendInitializer {
-
   private static final Logger LOG = 
LoggerFactory.getLogger(NativeBackendInitializer.class);
+  private static final AtomicBoolean initialized = new AtomicBoolean(false);
 
+  // Spark DriverPlugin/ExecutorPlugin will only invoke 
NativeBackendInitializer#initializeBackend
+  // method once in its init method.
+  // In cluster mode, NativeBackendInitializer#initializeBackend only will be 
invoked in different
+  // JVM.
+  // In local mode, NativeBackendInitializer#initializeBackend will be invoked 
twice in same
+  // thread, driver first then executor, initialized flag ensure only invoke 
initializeBackend once,
+  // so there are no race condition here.
   public static void initializeBackend(scala.collection.Map<String, String> 
conf) {
+    if (!initialized.compareAndSet(false, true)) {
+      // Already called.
+      return;
+    }
+    initialize0(conf);
+    GlutenShutdownManager.addHook(
+        () -> {
+          shutdown();
+          return BoxedUnit.UNIT;
+        });
+  }
+
+  private static void initialize0(scala.collection.Map<String, String> conf) {
     try {
       String prefix = 
BackendsApiManager.getSettings().getBackendConfigPrefix();
       Map<String, String> nativeConfMap = 
GlutenConfig.getNativeBackendConf(prefix, conf);
@@ -43,5 +67,7 @@ public final class NativeBackendInitializer {
 
   private static native void initialize(byte[] configPlan);
 
+  private static native void shutdown();
+
   private NativeBackendInitializer() {}
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to