This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7be9897237 Fix thread panic when "unreachable" SpawnedTask code is 
reachable. (#12086)
7be9897237 is described below

commit 7be9897237b0003dc66b70831c48f0a8b11c3e56
Author: wiedld <[email protected]>
AuthorDate: Fri Aug 23 09:06:41 2024 -0700

    Fix thread panic when "unreachable" SpawnedTask code is reachable. (#12086)
    
    * test: demonstrate that the unreachable in SpawnedTask is reachable
    
    * chore: use workspace tokio and add feature
    
    * fix(12089): SpawnedTask will no longer panic during shutdown
    
    * chore(12089): add new error type for JoinError
    
    * refactor(12089): handle join error when using SpawnedTask::join_unwind
    
    * Revert "chore: use workspace tokio and add feature"
    
    This reverts commit 3010288d50a7dd3435f14a74bae6cc28b67f91ad.
    
    * refactor(12089): update test to avoid the looping and global (to package 
tests) panic hook manipulation
    
    * refactor(12089): make single conditional for unwind vs no-unwind, and 
update test for cancellation error
---
 datafusion-cli/Cargo.lock                          |  2 ++
 datafusion/common-runtime/Cargo.toml               |  4 +++
 datafusion/common-runtime/src/common.rs            | 42 +++++++++++++++++++---
 datafusion/common/Cargo.toml                       |  1 +
 datafusion/common/src/error.rs                     |  8 +++++
 .../core/src/datasource/file_format/arrow.rs       |  5 ++-
 .../core/src/datasource/file_format/parquet.rs     | 18 +++++++---
 .../datasource/file_format/write/orchestration.rs  |  4 +--
 datafusion/core/src/datasource/stream.rs           |  5 ++-
 9 files changed, 77 insertions(+), 12 deletions(-)

diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index e35eb3906b..456dc295d4 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1295,12 +1295,14 @@ dependencies = [
  "parquet",
  "paste",
  "sqlparser",
+ "tokio",
 ]
 
 [[package]]
 name = "datafusion-common-runtime"
 version = "41.0.0"
 dependencies = [
+ "log",
  "tokio",
 ]
 
diff --git a/datafusion/common-runtime/Cargo.toml 
b/datafusion/common-runtime/Cargo.toml
index c104360876..a21c72cd9f 100644
--- a/datafusion/common-runtime/Cargo.toml
+++ b/datafusion/common-runtime/Cargo.toml
@@ -36,4 +36,8 @@ name = "datafusion_common_runtime"
 path = "src/lib.rs"
 
 [dependencies]
+log = { workspace = true }
 tokio = { workspace = true }
+
+[dev-dependencies]
+tokio = { version = "1.36", features = ["rt", "rt-multi-thread", "time"] }
diff --git a/datafusion/common-runtime/src/common.rs 
b/datafusion/common-runtime/src/common.rs
index 2f7ddb972f..698a846b48 100644
--- a/datafusion/common-runtime/src/common.rs
+++ b/datafusion/common-runtime/src/common.rs
@@ -60,8 +60,8 @@ impl<R: 'static> SpawnedTask<R> {
     }
 
     /// Joins the task and unwinds the panic if it happens.
-    pub async fn join_unwind(self) -> R {
-        self.join().await.unwrap_or_else(|e| {
+    pub async fn join_unwind(self) -> Result<R, JoinError> {
+        self.join().await.map_err(|e| {
             // `JoinError` can be caused either by panic or cancellation. We 
have to handle panics:
             if e.is_panic() {
                 std::panic::resume_unwind(e.into_panic());
@@ -69,9 +69,43 @@ impl<R: 'static> SpawnedTask<R> {
                 // Cancellation may be caused by two reasons:
                 // 1. Abort is called, but since we consumed `self`, it's not 
our case (`JoinHandle` not accessible outside).
                 // 2. The runtime is shutting down.
-                // So we consider this branch as unreachable.
-                unreachable!("SpawnedTask was cancelled unexpectedly");
+                log::warn!("SpawnedTask was polled during shutdown");
+                e
             }
         })
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use std::future::{pending, Pending};
+
+    use tokio::runtime::Runtime;
+
+    #[tokio::test]
+    async fn runtime_shutdown() {
+        let rt = Runtime::new().unwrap();
+        let task = rt
+            .spawn(async {
+                SpawnedTask::spawn(async {
+                    let fut: Pending<()> = pending();
+                    fut.await;
+                    unreachable!("should never return");
+                })
+            })
+            .await
+            .unwrap();
+
+        // caller shutdown their DF runtime (e.g. timeout, error in caller, 
etc)
+        rt.shutdown_background();
+
+        // race condition
+        // poll occurs during shutdown (buffered stream poll calls, etc)
+        assert!(matches!(
+            task.join_unwind().await,
+            Err(e) if e.is_cancelled()
+        ));
+    }
+}
diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml
index 8435d06325..79e20ba121 100644
--- a/datafusion/common/Cargo.toml
+++ b/datafusion/common/Cargo.toml
@@ -63,6 +63,7 @@ parquet = { workspace = true, optional = true, 
default-features = true }
 paste = "1.0.15"
 pyo3 = { version = "0.21.0", optional = true }
 sqlparser = { workspace = true }
+tokio = { workspace = true }
 
 [target.'cfg(target_family = "wasm")'.dependencies]
 instant = { version = "0.1", features = ["wasm-bindgen"] }
diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs
index 27a25d0c9d..05988d6c6d 100644
--- a/datafusion/common/src/error.rs
+++ b/datafusion/common/src/error.rs
@@ -34,6 +34,7 @@ use arrow::error::ArrowError;
 #[cfg(feature = "parquet")]
 use parquet::errors::ParquetError;
 use sqlparser::parser::ParserError;
+use tokio::task::JoinError;
 
 /// Result type for operations that could result in an [DataFusionError]
 pub type Result<T, E = DataFusionError> = result::Result<T, E>;
@@ -112,6 +113,10 @@ pub enum DataFusionError {
     /// SQL method, opened a CSV file that is broken, or tried to divide an
     /// integer by zero.
     Execution(String),
+    /// [`JoinError`] during execution of the query.
+    ///
+    /// This error can unoccur for unjoined tasks, such as execution shutdown.
+    ExecutionJoin(JoinError),
     /// Error when resources (such as memory of scratch disk space) are 
exhausted.
     ///
     /// This error is thrown when a consumer cannot acquire additional memory
@@ -306,6 +311,7 @@ impl Error for DataFusionError {
             DataFusionError::Plan(_) => None,
             DataFusionError::SchemaError(e, _) => Some(e),
             DataFusionError::Execution(_) => None,
+            DataFusionError::ExecutionJoin(e) => Some(e),
             DataFusionError::ResourcesExhausted(_) => None,
             DataFusionError::External(e) => Some(e.as_ref()),
             DataFusionError::Context(_, e) => Some(e.as_ref()),
@@ -418,6 +424,7 @@ impl DataFusionError {
             DataFusionError::Configuration(_) => "Invalid or Unsupported 
Configuration: ",
             DataFusionError::SchemaError(_, _) => "Schema error: ",
             DataFusionError::Execution(_) => "Execution error: ",
+            DataFusionError::ExecutionJoin(_) => "ExecutionJoin error: ",
             DataFusionError::ResourcesExhausted(_) => "Resources exhausted: ",
             DataFusionError::External(_) => "External error: ",
             DataFusionError::Context(_, _) => "",
@@ -453,6 +460,7 @@ impl DataFusionError {
                 Cow::Owned(format!("{desc}{backtrace}"))
             }
             DataFusionError::Execution(ref desc) => 
Cow::Owned(desc.to_string()),
+            DataFusionError::ExecutionJoin(ref desc) => 
Cow::Owned(desc.to_string()),
             DataFusionError::ResourcesExhausted(ref desc) => 
Cow::Owned(desc.to_string()),
             DataFusionError::External(ref desc) => 
Cow::Owned(desc.to_string()),
             #[cfg(feature = "object_store")]
diff --git a/datafusion/core/src/datasource/file_format/arrow.rs 
b/datafusion/core/src/datasource/file_format/arrow.rs
index 8b6a880011..95f76195e6 100644
--- a/datafusion/core/src/datasource/file_format/arrow.rs
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -341,7 +341,10 @@ impl DataSink for ArrowFileSink {
             }
         }
 
-        demux_task.join_unwind().await?;
+        demux_task
+            .join_unwind()
+            .await
+            .map_err(DataFusionError::ExecutionJoin)??;
         Ok(row_count as u64)
     }
 }
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index f233f3842c..83f77ca937 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -836,7 +836,10 @@ impl DataSink for ParquetSink {
             }
         }
 
-        demux_task.join_unwind().await?;
+        demux_task
+            .join_unwind()
+            .await
+            .map_err(DataFusionError::ExecutionJoin)??;
 
         Ok(row_count as u64)
     }
@@ -942,7 +945,10 @@ fn spawn_rg_join_and_finalize_task(
         let num_cols = column_writer_tasks.len();
         let mut finalized_rg = Vec::with_capacity(num_cols);
         for task in column_writer_tasks.into_iter() {
-            let (writer, _col_reservation) = task.join_unwind().await?;
+            let (writer, _col_reservation) = task
+                .join_unwind()
+                .await
+                .map_err(DataFusionError::ExecutionJoin)??;
             let encoded_size = writer.get_estimated_total_bytes();
             rg_reservation.grow(encoded_size);
             finalized_rg.push(writer.close()?);
@@ -1070,7 +1076,8 @@ async fn concatenate_parallel_row_groups(
     while let Some(task) = serialize_rx.recv().await {
         let result = task.join_unwind().await;
         let mut rg_out = parquet_writer.next_row_group()?;
-        let (serialized_columns, mut rg_reservation, _cnt) = result?;
+        let (serialized_columns, mut rg_reservation, _cnt) =
+            result.map_err(DataFusionError::ExecutionJoin)??;
         for chunk in serialized_columns {
             chunk.append_to_row_group(&mut rg_out)?;
             rg_reservation.free();
@@ -1134,7 +1141,10 @@ async fn output_single_parquet_file_parallelized(
     )
     .await?;
 
-    launch_serialization_task.join_unwind().await?;
+    launch_serialization_task
+        .join_unwind()
+        .await
+        .map_err(DataFusionError::ExecutionJoin)??;
     Ok(file_metadata)
 }
 
diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs 
b/datafusion/core/src/datasource/file_format/write/orchestration.rs
index 1d32063ee9..6f27e6f388 100644
--- a/datafusion/core/src/datasource/file_format/write/orchestration.rs
+++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs
@@ -298,8 +298,8 @@ pub(crate) async fn stateless_multipart_put(
         write_coordinator_task.join_unwind(),
         demux_task.join_unwind()
     );
-    r1?;
-    r2?;
+    r1.map_err(DataFusionError::ExecutionJoin)??;
+    r2.map_err(DataFusionError::ExecutionJoin)??;
 
     let total_count = rx_row_cnt.await.map_err(|_| {
         internal_datafusion_err!("Did not receive row count from write 
coordinator")
diff --git a/datafusion/core/src/datasource/stream.rs 
b/datafusion/core/src/datasource/stream.rs
index 682565aea9..b53fe86631 100644
--- a/datafusion/core/src/datasource/stream.rs
+++ b/datafusion/core/src/datasource/stream.rs
@@ -438,6 +438,9 @@ impl DataSink for StreamWrite {
             }
         }
         drop(sender);
-        write_task.join_unwind().await
+        write_task
+            .join_unwind()
+            .await
+            .map_err(DataFusionError::ExecutionJoin)?
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to