This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d193508794 Make the sink input aware of its plan (#7610)
d193508794 is described below
commit d193508794bdc6a7b1f8bb0f805e860b970f7f24
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Fri Sep 22 14:59:41 2023 +0300
Make the sink input aware of its plan (#7610)
Co-authored-by: Mustafa Akur <[email protected]>
---
datafusion/core/src/datasource/listing/table.rs | 12 ++++++++++--
datafusion/core/src/datasource/physical_plan/mod.rs | 16 ++++++++++++++++
2 files changed, 26 insertions(+), 2 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 51844e6928..8360847e1b 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -34,7 +34,9 @@ use futures::{future, stream, StreamExt, TryStreamExt};
use crate::datasource::file_format::file_compression_type::{
FileCompressionType, FileTypeExt,
};
-use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
+use crate::datasource::physical_plan::{
+ is_plan_streaming, FileScanConfig, FileSinkConfig,
+};
use crate::datasource::{
file_format::{
arrow::ArrowFormat, avro::AvroFormat, csv::CsvFormat, json::JsonFormat,
@@ -894,7 +896,13 @@ impl TableProvider for ListingTable {
output_schema: self.schema(),
table_partition_cols: self.options.table_partition_cols.clone(),
writer_mode,
- unbounded_input: self.options().infinite_source,
+ // A plan can produce finite number of rows even if it has
unbounded sources, like LIMIT
+ // queries. Thus, we can check if the plan is streaming to ensure
file sink input is
+ // unbounded. When `unbounded_input` flag is `true` for sink, we
occasionally call `yield_now`
+ // to consume data at the input. When `unbounded_input` flag is
`false` (e.g non-streaming data),
+ // all of the data at the input is sink after execution finishes.
See discussion for rationale:
+ //
https://github.com/apache/arrow-datafusion/pull/7610#issuecomment-1728979918
+ unbounded_input: is_plan_streaming(&input)?,
single_file_output: self.options.single_file,
overwrite,
file_type_writer_options,
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 4a9c2c2bc2..d8ae6b3c04 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -63,6 +63,7 @@ use datafusion_common::{file_options::FileTypeWriterOptions,
plan_err};
use datafusion_physical_expr::expressions::Column;
use arrow::compute::cast;
+use datafusion_physical_plan::ExecutionPlan;
use log::debug;
use object_store::path::Path;
use object_store::ObjectMeta;
@@ -500,6 +501,21 @@ fn get_projected_output_ordering(
all_orderings
}
+// Get output (un)boundedness information for the given `plan`.
+pub(crate) fn is_plan_streaming(plan: &Arc<dyn ExecutionPlan>) -> Result<bool>
{
+ let result = if plan.children().is_empty() {
+ plan.unbounded_output(&[])
+ } else {
+ let children_unbounded_output = plan
+ .children()
+ .iter()
+ .map(is_plan_streaming)
+ .collect::<Result<Vec<_>>>();
+ plan.unbounded_output(&children_unbounded_output?)
+ };
+ result
+}
+
#[cfg(test)]
mod tests {
use arrow_array::cast::AsArray;