mbutrovich commented on code in PR #4734:
URL: https://github.com/apache/datafusion-comet/pull/4734#discussion_r3481814866


##########
native/core/src/execution/jni_api.rs:
##########
@@ -118,7 +117,7 @@ use std::sync::OnceLock;
 #[cfg(feature = "jemalloc")]
 use tikv_jemalloc_ctl::{epoch, stats};
 

Review Comment:
   `docs/source/contributor-guide/development.md` is now stale in two spots and
   could be updated separately once this lands:
   
   - Line 87 calls the runtime a `Lazy<Runtime>` static. It is `OnceLock` today 
and
     becomes `Mutex<Option<Runtime>>` here, torn down on plugin shutdown.
   - Line 60 says the `AttachGuard` detaches the thread when dropped. The
     attachment is actually cached in thread-local storage and only released 
when
     the worker thread exits. That detail is exactly why the runtime has to be 
shut
     down for the JVM to exit, so it is worth correcting.



##########
spark/src/main/scala/org/apache/spark/Plugins.scala:
##########
@@ -148,12 +152,32 @@ object CometDriverPlugin extends Logging {
   }
 }
 
+class CometExecutorPlugin extends ExecutorPlugin with Logging {
+
+  override def init(ctx: PluginContext, extraConf: ju.Map[String, String]): 
Unit = {
+    logInfo("CometExecutorPlugin init")
+
+    super.init(ctx, extraConf)
+  }
+
+  override def shutdown(): Unit = {
+    logInfo("CometExecutorPlugin shutdown")

Review Comment:
   In local mode both plugins live in the same JVM and share the one native
   runtime, so both `shutdown()` paths call `NativeBase.release()`. It works
   because `release_runtime()` does `take()`, which makes the second call a 
no-op.
   Could we add a short comment noting that the double release is expected and
   safe? It is not obvious when reading either plugin on its own. The executor
   plugin shuts down before the driver plugin in local mode, so the executor 
wins
   the `take()`.



##########
native/core/src/execution/jni_api.rs:
##########
@@ -212,12 +211,39 @@ fn build_runtime(default_worker_threads: Option<usize>) 
-> Runtime {
 /// Initialize the global Tokio runtime with the given default worker thread 
count.
 /// If the runtime is already initialized, this is a no-op.
 pub fn init_runtime(default_worker_threads: usize) {
-    TOKIO_RUNTIME.get_or_init(|| build_runtime(Some(default_worker_threads)));
+    let mut guard = TOKIO_RUNTIME.lock();
+    if guard.is_none() {
+        *guard = Some(build_runtime(Some(default_worker_threads)));
+    }
 }
 
-/// Function to get a handle to the global Tokio runtime
-pub fn get_runtime() -> &'static Runtime {
-    TOKIO_RUNTIME.get_or_init(|| build_runtime(None))
+/// Returns a handle to the global Tokio runtime, lazily initializing it if 
needed.
+///
+/// A [`Handle`] is returned (rather than a `&'static Runtime`) so that the 
runtime
+/// can be torn down via [`release_runtime`]. The handle is cheap to clone and 
can be
+/// used with `spawn` / `block_on` just like a `Runtime`.
+pub fn get_runtime() -> Handle {
+    let mut guard = TOKIO_RUNTIME.lock();
+    guard
+        .get_or_insert_with(|| build_runtime(None))
+        .handle()
+        .clone()
+}
+
+/// Tears down the global Tokio runtime, if it has been initialized.
+///
+/// The runtime is moved out of the global slot and shut down in the 
background so the
+/// calling (JNI) thread is not blocked waiting for worker threads to finish. 
Any handles
+/// previously returned by [`get_runtime`] will start failing their spawns 
once the runtime
+/// is gone, so this must only be called when no native execution is in flight.
+///
+/// Must not be called from within the runtime's own worker threads, otherwise 
the shutdown
+/// would deadlock/panic.
+pub fn release_runtime() {
+    let runtime = TOKIO_RUNTIME.lock().take();

Review Comment:
   Nice catch on the root cause. The runtime lived in a `static`, and Rust never
   drops statics, so the worker threads never exited. Moving to
   `Mutex<Option<Runtime>>` + `take()` is what finally lets the runtime drop. 
That
   is the real fix.
   
   One question on `shutdown_background()`. It signals the workers and returns
   without joining them, so the JVM-exit only unblocks asynchronously as each
   worker runs its thread-local detach on the way out. Did you consider
   `shutdown_timeout(...)` instead? It would make teardown deterministic, and 
the
   plugin shutdown thread is not latency sensitive. Not blocking, just curious
   about the rationale so we can capture it in the doc comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to