This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new bcdda391ae Change the unbounded_output API default (#7605)
bcdda391ae is described below
commit bcdda391ae6336f75db8cafc8a91b199e5abbdf9
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Wed Sep 20 23:43:22 2023 +0300
Change the unbounded_output API default (#7605)
---
datafusion/physical-plan/src/insert.rs | 4 ++++
datafusion/physical-plan/src/lib.rs | 8 ++++++--
datafusion/physical-plan/src/limit.rs | 8 ++++++++
3 files changed, 18 insertions(+), 2 deletions(-)
diff --git a/datafusion/physical-plan/src/insert.rs
b/datafusion/physical-plan/src/insert.rs
index e60afcbcb0..8b467461dd 100644
--- a/datafusion/physical-plan/src/insert.rs
+++ b/datafusion/physical-plan/src/insert.rs
@@ -224,6 +224,10 @@ impl ExecutionPlan for FileSinkExec {
}))
}
+ fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
+ Ok(_children[0])
+ }
+
/// Execute the plan and return a stream of `RecordBatch`es for
/// the specified partition.
fn execute(
diff --git a/datafusion/physical-plan/src/lib.rs
b/datafusion/physical-plan/src/lib.rs
index c500ecde5d..c27ec200e7 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -23,8 +23,8 @@ use self::metrics::MetricsSet;
use self::{
coalesce_partitions::CoalescePartitionsExec,
display::DisplayableExecutionPlan,
};
-use datafusion_common::Result;
pub use datafusion_common::{internal_err, ColumnStatistics, Statistics};
+use datafusion_common::{plan_err, Result};
use datafusion_physical_expr::PhysicalSortExpr;
pub use visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};
@@ -74,7 +74,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
- Ok(false)
+ if _children.iter().any(|&x| x) {
+ plan_err!("Plan does not support infinite stream from its
children")
+ } else {
+ Ok(false)
+ }
}
/// If the output of this operator within each partition is sorted,
diff --git a/datafusion/physical-plan/src/limit.rs
b/datafusion/physical-plan/src/limit.rs
index c6d51b7d9c..922c3db0ef 100644
--- a/datafusion/physical-plan/src/limit.rs
+++ b/datafusion/physical-plan/src/limit.rs
@@ -154,6 +154,10 @@ impl ExecutionPlan for GlobalLimitExec {
)))
}
+ fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
+ Ok(false)
+ }
+
fn execute(
&self,
partition: usize,
@@ -320,6 +324,10 @@ impl ExecutionPlan for LocalLimitExec {
self.input.ordering_equivalence_properties()
}
+ fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
+ Ok(false)
+ }
+
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,