xiaobai721 opened a new issue, #9042:
URL: https://github.com/apache/arrow-datafusion/issues/9042

   ### Describe the bug
   
   thread '<unnamed>' panicked at 
datafusion-physical-plan-33.0.0\src\stream.rs:83:23
   there is no reactor running, must be called from the context of a Tokio 1.x 
runtime
   stack backtrace:
      0: std::panicking::begin_panic_handler
                at 
/rustc/79e9716c980570bfd1f666e3b16ac583f0168962/library\std\src\panicking.rs:597
      1: core::panicking::panic_fmt
                at 
/rustc/79e9716c980570bfd1f666e3b16ac583f0168962/library\core\src\panicking.rs:72
      2: core::panicking::panic_display<tokio::runtime::handle::TryCurrentError>
                at 
/rustc/79e9716c980570bfd1f666e3b16ac583f0168962\library\core\src\panicking.rs:168
      3: 
tokio::task::spawn::spawn_inner<enum2$<datafusion_physical_plan::stream::impl$1::run_input::async_block_env$0>
 >
                at 
C:\Users\wangwei8\.cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\tokio-1.34.0\src\task\spawn.rs:203
      4: 
tokio::task::spawn::spawn<enum2$<datafusion_physical_plan::stream::impl$1::run_input::async_block_env$0>
 >
                at 
C:\Users\wangwei8\.cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\tokio-1.34.0\src\task\spawn.rs:174
      5: 
tokio::task::join_set::JoinSet<enum2$<core::result::Result<tuple$<>,enum2$<datafusion_common::error::DataFusionError>
 > > 
>::spawn<enum2$<core::result::Result<tuple$<>,enum2$<datafusion_common::error::DataFusionError>
 > >,enum2$<datafusion_physical_plan::
                at 
C:\Users\wangwei8\.cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\tokio-1.34.0\src\task\join_set.rs:139
      6: 
datafusion_physical_plan::stream::ReceiverStreamBuilder<arrow_array::record_batch::RecordBatch>::spawn<arrow_array::record_batch::RecordBatch,enum2$<datafusion_physical_plan::stream::impl$1::run_input::async_block_env$0>
 >
                at 
C:\Users\wangwei8\.cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\datafusion-physical-plan-33.0.0\src\stream.rs:83
      7: 
datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder::run_input
                at 
C:\Users\wangwei8\.cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\datafusion-physical-plan-33.0.0\src\stream.rs:225
      8: datafusion_physical_plan::coalesce_partitions::impl$2::execute
                at 
C:\Users\wangwei8\.cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\datafusion-physical-plan-33.0.0\src\coalesce_partitions.rs:157
      9: datafusion_physical_plan::execute_stream
                at 
C:\Users\wangwei8\.cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\datafusion-physical-plan-33.0.0\src\lib.rs:488
     10: datafusion_physical_plan::collect::async_fn$0
                at 
C:\Users\wangwei8\.cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\datafusion-physical-plan-33.0.0\src\lib.rs:471
     11: datafusion::dataframe::impl$2::collect::async_fn$0
                at 
C:\Users\wangwei8\.cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\datafusion-33.0.0\src\dataframe\mod.rs:759
   
   
   
   
   ### To Reproduce
   
   datafusion = { version = "x", features = ["parquet", "default"] }
   tokio = { version = "1.34.0", features = ["rt", "rt-multi-thread", "macros"] 
}
   async fn reading_from_parquet() -> Result<Vec<String>, UniError> {
       let table_name = "etl_libor_price";
       let file_name = "D:/etl_libor_price.parquet";
       let sql_str = "select * from etl_libor_price";
       let mut df = query_from_ctx(table_name, file_name, sql_str).await;
       let results = df.collect().await.unwrap();
       let records: Vec<String> = Vec::new();
       Ok(records)
   }
   
   async fn query_from_ctx(table_name: &str,
                           file_name: &str,
                           sql_str: &str,
   ) -> datafusion::dataframe::DataFrame {
       let ctx = SessionContext::new();
       ctx.register_parquet(table_name, file_name, 
ParquetReadOptions::default())
           .await.expect("register_parquet failed");
       let df: datafusion::dataframe::DataFrame = 
ctx.sql(sql_str).await.unwrap();
       df
   
   }
   
   async fn async_server() -> Vec<String> {
       reading_from_parquet().await.unwrap()
   
   }
   
   fn server_worker(tx_m: &Sender<String>) {
       let df = block_on(async_server());
       println!("df = {:?}", df);
       tx_m.send("server_worker_i".to_string()).expect("TODO: panic message");
   }
   
   fn server_task() {
       println!("into server task");
       // let rt = tokio::runtime::Runtime::new().unwrap();
       let (tx, rx) = mpsc::channel();
       for _ in 0..1 {
           let tx= tx.clone();
           thread::spawn(move || server_worker(&tx));
       }
   
       let shared_rx = Arc::new(Mutex::new(rx));
   
       for _ in 0..1 {
           let rx = shared_rx.clone();
           thread::spawn(move ||
               loop {
                   let rx_m = rx.lock().expect("obtain arc lock failed");
                   for recv in rx_m.recv() {
                       println!("receive from receiver: {:?}", recv);
                   }
               }
           );
       }
   
       let time_sleep = time::Duration::from_secs(60 * 10);
       thread::sleep(time_sleep);
   }
   
   #[tokio::main]
   async fn main() -> Result<()> {
   server_task();
   Ok(())
   }
   
   ### Expected behavior
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to