jonmmease opened a new issue #1154:
URL: https://github.com/apache/arrow-datafusion/issues/1154
**Describe the bug**
A compilation error is raised in a situation that I believe is a valid use
case of DataFusion
**To Reproduce**
Define an `#[async_trait]` with an async function signature that returns a
DataFrame. Then implement the trait with a function that creates a context
(`ctx`) and returns the result of `ctx.read_csv`.
```rust
#[async_trait]
trait CallReadTrait {
async fn call(&self) -> Arc<dyn DataFrame>;
}
struct CallRead {}
#[async_trait]
impl CallReadTrait for CallRead {
async fn call(&self) -> Arc<dyn DataFrame> {
let mut ctx = ExecutionContext::new();
ctx.read_csv("dummy.csv", CsvReadOptions::new()).await.unwrap()
}
}
```
When compiling with the stable 1.55.0 toolchain, the following error is
raised
```
error: future cannot be sent between threads safely
--> datafusion/src/execution/context.rs:4151:52
|
4151 | async fn call(&self) -> Arc<dyn DataFrame> {
| ____________________________________________________^
4152 | | let mut ctx = ExecutionContext::new();
4153 | | ctx.read_csv("dummy.csv",
CsvReadOptions::new()).await.unwrap()
4154 | | }
| |_________^ future created by async block is not `Send`
|
= help: within `impl futures::Future`, the trait `std::marker::Send` is
not implemented for `std::sync::MutexGuard<'_, context::ExecutionContextState>`
note: future is not `Send` as this value is used across an await
--> datafusion/src/execution/context.rs:341:14
|
341 | &LogicalPlanBuilder::scan_csv(
| ______________^
342 | | object_store,
343 | | path,
344 | | options,
... |
347 | | )
348 | | .await?
| |__________________^ first, await occurs here, with
`self.state.lock().unwrap()` maybe used later...
note: `self.state.lock().unwrap()` is later dropped here
--> datafusion/src/execution/context.rs:351:5
|
346 | self.state.lock().unwrap().config.target_partitions,
| -------------------------- has type
`std::sync::MutexGuard<'_, context::ExecutionContextState>` which is not `Send`
...
351 | }
| ^
= note: required for the cast to the object type `dyn
futures::Future<Output = std::sync::Arc<(dyn dataframe::DataFrame + 'static)>>
+ std::marker::Send`
```
**Expected behavior**
I expect this code compile and run successfully
**Additional context**
This error is not raised when calling the code in an async function that is
not defined in an `#[async_trait]` implementation.
Here is an excerpt of the current `ctx.read_csv` function
```rust
Ok(Arc::new(DataFrameImpl::new(
self.state.clone(),
&LogicalPlanBuilder::scan_csv(
object_store,
path,
options,
None,
self.state.lock().unwrap().config.target_partitions,
)
.await?
.build()?,
)))
```
I don't fully understand the root cause here, but the compilation error goes
away if the `self.state.lock().unwrap().config.target_partitions` expression is
lifted to a local variable like this:
```rust
let target_partitions =
self.state.lock().unwrap().config.target_partitions;
Ok(Arc::new(DataFrameImpl::new(
self.state.clone(),
&LogicalPlanBuilder::scan_csv(
object_store,
path,
options,
None,
target_partitions,
)
.await?
.build()?,
)))
```
The situation is similar for the `read_parquet` and `read_avro` functions as
well.
If this change makes sense, I'm happy to open a PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]