This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new bcdda391ae Change the unbounded_output API default (#7605)
bcdda391ae is described below

commit bcdda391ae6336f75db8cafc8a91b199e5abbdf9
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Wed Sep 20 23:43:22 2023 +0300

    Change the unbounded_output API default (#7605)
---
 datafusion/physical-plan/src/insert.rs | 4 ++++
 datafusion/physical-plan/src/lib.rs    | 8 ++++++--
 datafusion/physical-plan/src/limit.rs  | 8 ++++++++
 3 files changed, 18 insertions(+), 2 deletions(-)

diff --git a/datafusion/physical-plan/src/insert.rs 
b/datafusion/physical-plan/src/insert.rs
index e60afcbcb0..8b467461dd 100644
--- a/datafusion/physical-plan/src/insert.rs
+++ b/datafusion/physical-plan/src/insert.rs
@@ -224,6 +224,10 @@ impl ExecutionPlan for FileSinkExec {
         }))
     }
 
+    fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
+        Ok(_children[0])
+    }
+
     /// Execute the plan and return a stream of `RecordBatch`es for
     /// the specified partition.
     fn execute(
diff --git a/datafusion/physical-plan/src/lib.rs 
b/datafusion/physical-plan/src/lib.rs
index c500ecde5d..c27ec200e7 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -23,8 +23,8 @@ use self::metrics::MetricsSet;
 use self::{
     coalesce_partitions::CoalescePartitionsExec, 
display::DisplayableExecutionPlan,
 };
-use datafusion_common::Result;
 pub use datafusion_common::{internal_err, ColumnStatistics, Statistics};
+use datafusion_common::{plan_err, Result};
 use datafusion_physical_expr::PhysicalSortExpr;
 pub use visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};
 
@@ -74,7 +74,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     /// If the plan does not support pipelining, but its input(s) are
     /// infinite, returns an error to indicate this.
     fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
-        Ok(false)
+        if _children.iter().any(|&x| x) {
+            plan_err!("Plan does not support infinite stream from its 
children")
+        } else {
+            Ok(false)
+        }
     }
 
     /// If the output of this operator within each partition is sorted,
diff --git a/datafusion/physical-plan/src/limit.rs 
b/datafusion/physical-plan/src/limit.rs
index c6d51b7d9c..922c3db0ef 100644
--- a/datafusion/physical-plan/src/limit.rs
+++ b/datafusion/physical-plan/src/limit.rs
@@ -154,6 +154,10 @@ impl ExecutionPlan for GlobalLimitExec {
         )))
     }
 
+    fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
+        Ok(false)
+    }
+
     fn execute(
         &self,
         partition: usize,
@@ -320,6 +324,10 @@ impl ExecutionPlan for LocalLimitExec {
         self.input.ordering_equivalence_properties()
     }
 
+    fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
+        Ok(false)
+    }
+
     fn with_new_children(
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,

Reply via email to