mbutrovich commented on code in PR #4734:
URL: https://github.com/apache/datafusion-comet/pull/4734#discussion_r3481814866
##########
native/core/src/execution/jni_api.rs:
##########
@@ -118,7 +117,7 @@ use std::sync::OnceLock;
#[cfg(feature = "jemalloc")]
use tikv_jemalloc_ctl::{epoch, stats};
Review Comment:
`docs/source/contributor-guide/development.md` is now stale in two spots and
could be updated separately once this lands:
- Line 87 calls the runtime a `Lazy<Runtime>` static. It is `OnceLock` today
and
becomes `Mutex<Option<Runtime>>` here, torn down on plugin shutdown.
- Line 60 says the `AttachGuard` detaches the thread when dropped. The
attachment is actually cached in thread-local storage and only released
when
the worker thread exits. That detail is exactly why the runtime has to be
shut
down for the JVM to exit, so it is worth correcting.
##########
spark/src/main/scala/org/apache/spark/Plugins.scala:
##########
@@ -148,12 +152,32 @@ object CometDriverPlugin extends Logging {
}
}
+class CometExecutorPlugin extends ExecutorPlugin with Logging {
+
+ override def init(ctx: PluginContext, extraConf: ju.Map[String, String]):
Unit = {
+ logInfo("CometExecutorPlugin init")
+
+ super.init(ctx, extraConf)
+ }
+
+ override def shutdown(): Unit = {
+ logInfo("CometExecutorPlugin shutdown")
Review Comment:
In local mode both plugins live in the same JVM and share the one native
runtime, so both `shutdown()` paths call `NativeBase.release()`. It works
because `release_runtime()` does `take()`, which makes the second call a
no-op.
Could we add a short comment noting that the double release is expected and
safe? It is not obvious when reading either plugin on its own. The executor
plugin shuts down before the driver plugin in local mode, so the executor
wins
the `take()`.
##########
native/core/src/execution/jni_api.rs:
##########
@@ -212,12 +211,39 @@ fn build_runtime(default_worker_threads: Option<usize>)
-> Runtime {
/// Initialize the global Tokio runtime with the given default worker thread
count.
/// If the runtime is already initialized, this is a no-op.
pub fn init_runtime(default_worker_threads: usize) {
- TOKIO_RUNTIME.get_or_init(|| build_runtime(Some(default_worker_threads)));
+ let mut guard = TOKIO_RUNTIME.lock();
+ if guard.is_none() {
+ *guard = Some(build_runtime(Some(default_worker_threads)));
+ }
}
-/// Function to get a handle to the global Tokio runtime
-pub fn get_runtime() -> &'static Runtime {
- TOKIO_RUNTIME.get_or_init(|| build_runtime(None))
+/// Returns a handle to the global Tokio runtime, lazily initializing it if
needed.
+///
+/// A [`Handle`] is returned (rather than a `&'static Runtime`) so that the
runtime
+/// can be torn down via [`release_runtime`]. The handle is cheap to clone and
can be
+/// used with `spawn` / `block_on` just like a `Runtime`.
+pub fn get_runtime() -> Handle {
+ let mut guard = TOKIO_RUNTIME.lock();
+ guard
+ .get_or_insert_with(|| build_runtime(None))
+ .handle()
+ .clone()
+}
+
+/// Tears down the global Tokio runtime, if it has been initialized.
+///
+/// The runtime is moved out of the global slot and shut down in the
background so the
+/// calling (JNI) thread is not blocked waiting for worker threads to finish.
Any handles
+/// previously returned by [`get_runtime`] will start failing their spawns
once the runtime
+/// is gone, so this must only be called when no native execution is in flight.
+///
+/// Must not be called from within the runtime's own worker threads, otherwise
the shutdown
+/// would deadlock/panic.
+pub fn release_runtime() {
+ let runtime = TOKIO_RUNTIME.lock().take();
Review Comment:
Nice catch on the root cause. The runtime lived in a `static`, and Rust never
drops statics, so the worker threads never exited. Moving to
`Mutex<Option<Runtime>>` + `take()` is what finally lets the runtime drop.
That
is the real fix.
One question on `shutdown_background()`. It signals the workers and returns
without joining them, so the JVM-exit only unblocks asynchronously as each
worker runs its thread-local detach on the way out. Did you consider
`shutdown_timeout(...)` instead? It would make teardown deterministic, and
the
plugin shutdown thread is not latency sensitive. Not blocking, just curious
about the rationale so we can capture it in the doc comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]