This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 3bda91afb6 Use tokio only if running from a multi-thread tokio context
(#7205)
3bda91afb6 is described below
commit 3bda91afb65167c15b60ceeedebcebfc852f6fed
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Sun Aug 6 09:11:17 2023 -0700
Use tokio only if running from a multi-thread tokio context (#7205)
* Use tokio only if running from a multi-thread tokio context
* Fix clippy
---
datafusion/core/src/physical_plan/common.rs | 35 ++++++++++++++++-------------
1 file changed, 19 insertions(+), 16 deletions(-)
diff --git a/datafusion/core/src/physical_plan/common.rs
b/datafusion/core/src/physical_plan/common.rs
index f6b8fb33c9..46dbc9ef62 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -97,24 +97,27 @@ pub(crate) fn spawn_buffered(
mut input: SendableRecordBatchStream,
buffer: usize,
) -> SendableRecordBatchStream {
- // Use tokio only if running from a tokio context (#2201)
- if tokio::runtime::Handle::try_current().is_err() {
- return input;
- };
-
- let mut builder = RecordBatchReceiverStream::builder(input.schema(),
buffer);
-
- let sender = builder.tx();
+ // Use tokio only if running from a multi-thread tokio context
+ match tokio::runtime::Handle::try_current() {
+ Ok(handle)
+ if handle.runtime_flavor() ==
tokio::runtime::RuntimeFlavor::MultiThread =>
+ {
+ let mut builder =
RecordBatchReceiverStream::builder(input.schema(), buffer);
+
+ let sender = builder.tx();
+
+ builder.spawn(async move {
+ while let Some(item) = input.next().await {
+ if sender.send(item).await.is_err() {
+ return;
+ }
+ }
+ });
- builder.spawn(async move {
- while let Some(item) = input.next().await {
- if sender.send(item).await.is_err() {
- return;
- }
+ builder.build()
}
- });
-
- builder.build()
+ _ => input,
+ }
}
/// Computes the statistics for an in-memory RecordBatch