xiaobai721 opened a new issue, #9042:
URL: https://github.com/apache/arrow-datafusion/issues/9042
### Describe the bug
thread '<unnamed>' panicked at
datafusion-physical-plan-33.0.0\src\stream.rs:83:23
there is no reactor running, must be called from the context of a Tokio 1.x
runtime
stack backtrace:
0: std::panicking::begin_panic_handler
at
/rustc/79e9716c980570bfd1f666e3b16ac583f0168962/library\std\src\panicking.rs:597
1: core::panicking::panic_fmt
at
/rustc/79e9716c980570bfd1f666e3b16ac583f0168962/library\core\src\panicking.rs:72
2: core::panicking::panic_display<tokio::runtime::handle::TryCurrentError>
at
/rustc/79e9716c980570bfd1f666e3b16ac583f0168962\library\core\src\panicking.rs:168
3:
tokio::task::spawn::spawn_inner<enum2$<datafusion_physical_plan::stream::impl$1::run_input::async_block_env$0>
>
at
C:\Users\wangwei8\.cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\tokio-1.34.0\src\task\spawn.rs:203
4:
tokio::task::spawn::spawn<enum2$<datafusion_physical_plan::stream::impl$1::run_input::async_block_env$0>
>
at
C:\Users\wangwei8\.cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\tokio-1.34.0\src\task\spawn.rs:174
5:
tokio::task::join_set::JoinSet<enum2$<core::result::Result<tuple$<>,enum2$<datafusion_common::error::DataFusionError>
> >
>::spawn<enum2$<core::result::Result<tuple$<>,enum2$<datafusion_common::error::DataFusionError>
> >,enum2$<datafusion_physical_plan::
at
C:\Users\wangwei8\.cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\tokio-1.34.0\src\task\join_set.rs:139
6:
datafusion_physical_plan::stream::ReceiverStreamBuilder<arrow_array::record_batch::RecordBatch>::spawn<arrow_array::record_batch::RecordBatch,enum2$<datafusion_physical_plan::stream::impl$1::run_input::async_block_env$0>
>
at
C:\Users\wangwei8\.cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\datafusion-physical-plan-33.0.0\src\stream.rs:83
7:
datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder::run_input
at
C:\Users\wangwei8\.cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\datafusion-physical-plan-33.0.0\src\stream.rs:225
8: datafusion_physical_plan::coalesce_partitions::impl$2::execute
at
C:\Users\wangwei8\.cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\datafusion-physical-plan-33.0.0\src\coalesce_partitions.rs:157
9: datafusion_physical_plan::execute_stream
at
C:\Users\wangwei8\.cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\datafusion-physical-plan-33.0.0\src\lib.rs:488
10: datafusion_physical_plan::collect::async_fn$0
at
C:\Users\wangwei8\.cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\datafusion-physical-plan-33.0.0\src\lib.rs:471
11: datafusion::dataframe::impl$2::collect::async_fn$0
at
C:\Users\wangwei8\.cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\datafusion-33.0.0\src\dataframe\mod.rs:759
### To Reproduce
datafusion = { version = "x", features = ["parquet", "default"] }
tokio = { version = "1.34.0", features = ["rt", "rt-multi-thread", "macros"]
}
async fn reading_from_parquet() -> Result<Vec<String>, UniError> {
let table_name = "etl_libor_price";
let file_name = "D:/etl_libor_price.parquet";
let sql_str = "select * from etl_libor_price";
let mut df = query_from_ctx(table_name, file_name, sql_str).await;
let results = df.collect().await.unwrap();
let records: Vec<String> = Vec::new();
Ok(records)
}
async fn query_from_ctx(table_name: &str,
file_name: &str,
sql_str: &str,
) -> datafusion::dataframe::DataFrame {
let ctx = SessionContext::new();
ctx.register_parquet(table_name, file_name,
ParquetReadOptions::default())
.await.expect("register_parquet failed");
let df: datafusion::dataframe::DataFrame =
ctx.sql(sql_str).await.unwrap();
df
}
async fn async_server() -> Vec<String> {
reading_from_parquet().await.unwrap()
}
fn server_worker(tx_m: &Sender<String>) {
let df = block_on(async_server());
println!("df = {:?}", df);
tx_m.send("server_worker_i".to_string()).expect("TODO: panic message");
}
fn server_task() {
println!("into server task");
// let rt = tokio::runtime::Runtime::new().unwrap();
let (tx, rx) = mpsc::channel();
for _ in 0..1 {
let tx= tx.clone();
thread::spawn(move || server_worker(&tx));
}
let shared_rx = Arc::new(Mutex::new(rx));
for _ in 0..1 {
let rx = shared_rx.clone();
thread::spawn(move ||
loop {
let rx_m = rx.lock().expect("obtain arc lock failed");
for recv in rx_m.recv() {
println!("receive from receiver: {:?}", recv);
}
}
);
}
let time_sleep = time::Duration::from_secs(60 * 10);
thread::sleep(time_sleep);
}
#[tokio::main]
async fn main() -> Result<()> {
server_task();
Ok(())
}
### Expected behavior
_No response_
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]