orf opened a new issue, #7453:
URL: https://github.com/apache/arrow-datafusion/issues/7453

   ### Describe the bug
   
   In Datafusion 28 and below, UDFs where executed in a separate thread when 
writing to parquet. The example code below does not fail the assertion in 
version 28 but does in version 30 and git main.
   
   If you have a UDF that expects to be running in a thread, or does some form 
of blocking computation then this change means your previously parallel 
dataframe plan becomes serial.
   
   I couldn't spot anything in the release notes about this.
   
   ### To Reproduce
   
   deps:
   ```toml
   [dependencies]
   tokio = { version = "^1.0", features = ["rt-multi-thread", "full"] }
   datafusion = { version = "=28", default-features = false, features = 
["encoding__expressions", "zstd"] }
   ```
   code:
   ```rust
   #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
   async fn main() {
       use std::sync::Arc;
       use datafusion::arrow::array::{ArrayRef, BooleanArray};
       use datafusion::arrow::datatypes::DataType;
       use datafusion::logical_expr::Volatility;
       use datafusion::parquet::file::properties::WriterProperties;
       use datafusion::physical_plan::functions::make_scalar_function;
       use datafusion::prelude::*;
   
       let config = SessionConfig::default();
       let ctx = SessionContext::with_config(config);
       let df = ctx
           .read_parquet("input.parquet", ParquetReadOptions::default())
           .await
           .unwrap();
   
       let main_thread_id = std::thread::current().id();
   
       let func = create_udf(
           "some_func",
           vec![DataType::Binary],
           Arc::new(DataType::Boolean),
           Volatility::Immutable,
           make_scalar_function(move |args: &[ArrayRef]| {
               let func_thread_id = std::thread::current().id();
               assert_ne!(main_thread_id, func_thread_id);
               return Ok(Arc::new(BooleanArray::from(vec![true; 
args[0].len()])));
           }));
   
       let df = df
           .select(vec![col("hash"), func.call(vec![col("hash")])])
           .unwrap();
   
       let props = WriterProperties::builder()
           .build();
       df.write_parquet("some_dir/", Some(props)).await.unwrap();
   }
   ```
   
   ### Expected behavior
   
   _No response_
   
   ### Additional context
   
   I thought this might be related to 
https://github.com/apache/arrow-datafusion/pull/7205, but it doesn't appear to 
be culprit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to