This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d193508794 Make the sink input aware of its plan (#7610)
d193508794 is described below

commit d193508794bdc6a7b1f8bb0f805e860b970f7f24
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Fri Sep 22 14:59:41 2023 +0300

    Make the sink input aware of its plan (#7610)
    
    Co-authored-by: Mustafa Akur <[email protected]>
---
 datafusion/core/src/datasource/listing/table.rs     | 12 ++++++++++--
 datafusion/core/src/datasource/physical_plan/mod.rs | 16 ++++++++++++++++
 2 files changed, 26 insertions(+), 2 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 51844e6928..8360847e1b 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -34,7 +34,9 @@ use futures::{future, stream, StreamExt, TryStreamExt};
 use crate::datasource::file_format::file_compression_type::{
     FileCompressionType, FileTypeExt,
 };
-use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
+use crate::datasource::physical_plan::{
+    is_plan_streaming, FileScanConfig, FileSinkConfig,
+};
 use crate::datasource::{
     file_format::{
         arrow::ArrowFormat, avro::AvroFormat, csv::CsvFormat, json::JsonFormat,
@@ -894,7 +896,13 @@ impl TableProvider for ListingTable {
             output_schema: self.schema(),
             table_partition_cols: self.options.table_partition_cols.clone(),
             writer_mode,
-            unbounded_input: self.options().infinite_source,
+            // A plan can produce finite number of rows even if it has 
unbounded sources, like LIMIT
+            // queries. Thus, we can check if the plan is streaming to ensure 
file sink input is
+            // unbounded. When `unbounded_input` flag is `true` for sink, we 
occasionally call `yield_now`
+            // to consume data at the input. When `unbounded_input` flag is 
`false` (e.g non-streaming data),
+            // all of the data at the input is sink after execution finishes. 
See discussion for rationale:
+            // 
https://github.com/apache/arrow-datafusion/pull/7610#issuecomment-1728979918
+            unbounded_input: is_plan_streaming(&input)?,
             single_file_output: self.options.single_file,
             overwrite,
             file_type_writer_options,
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs 
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 4a9c2c2bc2..d8ae6b3c04 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -63,6 +63,7 @@ use datafusion_common::{file_options::FileTypeWriterOptions, 
plan_err};
 use datafusion_physical_expr::expressions::Column;
 
 use arrow::compute::cast;
+use datafusion_physical_plan::ExecutionPlan;
 use log::debug;
 use object_store::path::Path;
 use object_store::ObjectMeta;
@@ -500,6 +501,21 @@ fn get_projected_output_ordering(
     all_orderings
 }
 
+// Get output (un)boundedness information for the given `plan`.
+pub(crate) fn is_plan_streaming(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> 
{
+    let result = if plan.children().is_empty() {
+        plan.unbounded_output(&[])
+    } else {
+        let children_unbounded_output = plan
+            .children()
+            .iter()
+            .map(is_plan_streaming)
+            .collect::<Result<Vec<_>>>();
+        plan.unbounded_output(&children_unbounded_output?)
+    };
+    result
+}
+
 #[cfg(test)]
 mod tests {
     use arrow_array::cast::AsArray;

Reply via email to