This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3bda91afb6 Use tokio only if running from a multi-thread tokio context 
(#7205)
3bda91afb6 is described below

commit 3bda91afb65167c15b60ceeedebcebfc852f6fed
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Sun Aug 6 09:11:17 2023 -0700

    Use tokio only if running from a multi-thread tokio context (#7205)
    
    * Use tokio only if running from a multi-thread tokio context
    
    * Fix clippy
---
 datafusion/core/src/physical_plan/common.rs | 35 ++++++++++++++++-------------
 1 file changed, 19 insertions(+), 16 deletions(-)

diff --git a/datafusion/core/src/physical_plan/common.rs 
b/datafusion/core/src/physical_plan/common.rs
index f6b8fb33c9..46dbc9ef62 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -97,24 +97,27 @@ pub(crate) fn spawn_buffered(
     mut input: SendableRecordBatchStream,
     buffer: usize,
 ) -> SendableRecordBatchStream {
-    // Use tokio only if running from a tokio context (#2201)
-    if tokio::runtime::Handle::try_current().is_err() {
-        return input;
-    };
-
-    let mut builder = RecordBatchReceiverStream::builder(input.schema(), 
buffer);
-
-    let sender = builder.tx();
+    // Use tokio only if running from a multi-thread tokio context
+    match tokio::runtime::Handle::try_current() {
+        Ok(handle)
+            if handle.runtime_flavor() == 
tokio::runtime::RuntimeFlavor::MultiThread =>
+        {
+            let mut builder = 
RecordBatchReceiverStream::builder(input.schema(), buffer);
+
+            let sender = builder.tx();
+
+            builder.spawn(async move {
+                while let Some(item) = input.next().await {
+                    if sender.send(item).await.is_err() {
+                        return;
+                    }
+                }
+            });
 
-    builder.spawn(async move {
-        while let Some(item) = input.next().await {
-            if sender.send(item).await.is_err() {
-                return;
-            }
+            builder.build()
         }
-    });
-
-    builder.build()
+        _ => input,
+    }
 }
 
 /// Computes the statistics for an in-memory RecordBatch

Reply via email to