This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d091b55be6 Split `EmptyExec` into `PlaceholderRowExec` (#8446)
d091b55be6 is described below
commit d091b55be6a4ce552023ef162b5d081136d3ff6d
Author: Mohammad Razeghi <[email protected]>
AuthorDate: Sat Dec 9 13:23:34 2023 +0100
Split `EmptyExec` into `PlaceholderRowExec` (#8446)
* add PlaceHolderRowExec
* Change produce_one_row=true calls to use PlaceHolderRowExec
* remove produce_one_row from EmptyExec, changes in proto serializer,
working tests
* PlaceHolder => Placeholder
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/core/src/datasource/empty.rs | 2 +-
datafusion/core/src/datasource/listing/table.rs | 4 +-
.../src/physical_optimizer/aggregate_statistics.rs | 4 +-
.../core/src/physical_optimizer/join_selection.rs | 4 +-
datafusion/core/src/physical_planner.rs | 12 +-
datafusion/core/tests/custom_sources.rs | 12 +-
datafusion/core/tests/sql/explain_analyze.rs | 4 +-
datafusion/expr/src/logical_plan/plan.rs | 2 +-
datafusion/optimizer/README.md | 6 +-
datafusion/physical-plan/src/display.rs | 2 +-
datafusion/physical-plan/src/empty.rs | 93 ++--------------
datafusion/physical-plan/src/lib.rs | 1 +
.../src/{empty.rs => placeholder_row.rs} | 94 +++++-----------
datafusion/proto/proto/datafusion.proto | 8 +-
datafusion/proto/src/generated/pbjson.rs | 123 ++++++++++++++++++---
datafusion/proto/src/generated/prost.rs | 14 ++-
datafusion/proto/src/physical_plan/mod.rs | 19 +++-
.../proto/tests/cases/roundtrip_physical_plan.rs | 63 +++++------
datafusion/sqllogictest/test_files/explain.slt | 6 +-
datafusion/sqllogictest/test_files/join.slt | 2 +-
datafusion/sqllogictest/test_files/limit.slt | 6 +-
datafusion/sqllogictest/test_files/union.slt | 10 +-
datafusion/sqllogictest/test_files/window.slt | 16 +--
23 files changed, 260 insertions(+), 247 deletions(-)
diff --git a/datafusion/core/src/datasource/empty.rs
b/datafusion/core/src/datasource/empty.rs
index 77160aa5d1..5100987520 100644
--- a/datafusion/core/src/datasource/empty.rs
+++ b/datafusion/core/src/datasource/empty.rs
@@ -77,7 +77,7 @@ impl TableProvider for EmptyTable {
// even though there is no data, projections apply
let projected_schema = project_schema(&self.schema, projection)?;
Ok(Arc::new(
- EmptyExec::new(false,
projected_schema).with_partitions(self.partitions),
+ EmptyExec::new(projected_schema).with_partitions(self.partitions),
))
}
}
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 10ec9f8d8d..0ce1b43fe4 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -685,7 +685,7 @@ impl TableProvider for ListingTable {
if partitioned_file_lists.is_empty() {
let schema = self.schema();
let projected_schema = project_schema(&schema, projection)?;
- return Ok(Arc::new(EmptyExec::new(false, projected_schema)));
+ return Ok(Arc::new(EmptyExec::new(projected_schema)));
}
// extract types of partition columns
@@ -713,7 +713,7 @@ impl TableProvider for ListingTable {
let object_store_url = if let Some(url) = self.table_paths.first() {
url.object_store()
} else {
- return Ok(Arc::new(EmptyExec::new(false,
Arc::new(Schema::empty()))));
+ return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
};
// create the execution plan
self.options
diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
index 4265e3ff80..795857b10e 100644
--- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
+++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
@@ -22,7 +22,6 @@ use super::optimizer::PhysicalOptimizerRule;
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_plan::aggregates::AggregateExec;
-use crate::physical_plan::empty::EmptyExec;
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::{expressions, AggregateExpr, ExecutionPlan,
Statistics};
use crate::scalar::ScalarValue;
@@ -30,6 +29,7 @@ use crate::scalar::ScalarValue;
use datafusion_common::stats::Precision;
use datafusion_common::tree_node::TreeNode;
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
+use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
/// Optimizer that uses available statistics for aggregate functions
#[derive(Default)]
@@ -82,7 +82,7 @@ impl PhysicalOptimizerRule for AggregateStatistics {
// input can be entirely removed
Ok(Arc::new(ProjectionExec::try_new(
projections,
- Arc::new(EmptyExec::new(true, plan.schema())),
+ Arc::new(PlaceholderRowExec::new(plan.schema())),
)?))
} else {
plan.map_children(|child| self.optimize(child, _config))
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs
b/datafusion/core/src/physical_optimizer/join_selection.rs
index 0c3ac2d245..6b2fe24acf 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -1623,12 +1623,12 @@ mod hash_join_tests {
let children = vec![
PipelineStatePropagator {
- plan: Arc::new(EmptyExec::new(false,
Arc::new(Schema::empty()))),
+ plan: Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
unbounded: left_unbounded,
children: vec![],
},
PipelineStatePropagator {
- plan: Arc::new(EmptyExec::new(false,
Arc::new(Schema::empty()))),
+ plan: Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
unbounded: right_unbounded,
children: vec![],
},
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 38532002a6..ab38b3ec6d 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -91,6 +91,7 @@ use datafusion_expr::{
WindowFrameBound, WriteOp,
};
use datafusion_physical_expr::expressions::Literal;
+use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_sql::utils::window_expr_common_partition_keys;
use async_trait::async_trait;
@@ -1196,10 +1197,15 @@ impl DefaultPhysicalPlanner {
}
LogicalPlan::Subquery(_) => todo!(),
LogicalPlan::EmptyRelation(EmptyRelation {
- produce_one_row,
+ produce_one_row: false,
schema,
}) => Ok(Arc::new(EmptyExec::new(
- *produce_one_row,
+ SchemaRef::new(schema.as_ref().to_owned().into()),
+ ))),
+ LogicalPlan::EmptyRelation(EmptyRelation {
+ produce_one_row: true,
+ schema,
+ }) => Ok(Arc::new(PlaceholderRowExec::new(
SchemaRef::new(schema.as_ref().to_owned().into()),
))),
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
@@ -2767,7 +2773,7 @@ mod tests {
digraph {
1[shape=box label="ProjectionExec: expr=[id@0 + 2 as employee.id +
Int32(2)]", tooltip=""]
- 2[shape=box label="EmptyExec: produce_one_row=false", tooltip=""]
+ 2[shape=box label="EmptyExec", tooltip=""]
1 -> 2 [arrowhead=none, arrowtail=normal, dir=back]
}
// End DataFusion GraphViz Plan
diff --git a/datafusion/core/tests/custom_sources.rs
b/datafusion/core/tests/custom_sources.rs
index daf1ef41a2..a9ea5cc2a3 100644
--- a/datafusion/core/tests/custom_sources.rs
+++ b/datafusion/core/tests/custom_sources.rs
@@ -30,7 +30,6 @@ use datafusion::execution::context::{SessionContext,
SessionState, TaskContext};
use datafusion::logical_expr::{
col, Expr, LogicalPlan, LogicalPlanBuilder, TableScan, UNNAMED_TABLE,
};
-use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::{
collect, ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan,
Partitioning,
@@ -42,6 +41,7 @@ use datafusion_common::project_schema;
use datafusion_common::stats::Precision;
use async_trait::async_trait;
+use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use futures::stream::Stream;
/// Also run all tests that are found in the `custom_sources_cases` directory
@@ -256,9 +256,9 @@ async fn optimizers_catch_all_statistics() {
let physical_plan = df.create_physical_plan().await.unwrap();
- // when the optimization kicks in, the source is replaced by an EmptyExec
+ // when the optimization kicks in, the source is replaced by an
PlaceholderRowExec
assert!(
- contains_empty_exec(Arc::clone(&physical_plan)),
+ contains_place_holder_exec(Arc::clone(&physical_plan)),
"Expected aggregate_statistics optimizations missing:
{physical_plan:?}"
);
@@ -283,12 +283,12 @@ async fn optimizers_catch_all_statistics() {
assert_eq!(format!("{:?}", actual[0]), format!("{expected:?}"));
}
-fn contains_empty_exec(plan: Arc<dyn ExecutionPlan>) -> bool {
- if plan.as_any().is::<EmptyExec>() {
+fn contains_place_holder_exec(plan: Arc<dyn ExecutionPlan>) -> bool {
+ if plan.as_any().is::<PlaceholderRowExec>() {
true
} else if plan.children().len() != 1 {
false
} else {
- contains_empty_exec(Arc::clone(&plan.children()[0]))
+ contains_place_holder_exec(Arc::clone(&plan.children()[0]))
}
}
diff --git a/datafusion/core/tests/sql/explain_analyze.rs
b/datafusion/core/tests/sql/explain_analyze.rs
index ecb5766a3b..37f8cefc90 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -575,7 +575,7 @@ async fn explain_analyze_runs_optimizers() {
// This happens as an optimization pass where count(*) can be
// answered using statistics only.
- let expected = "EmptyExec: produce_one_row=true";
+ let expected = "PlaceholderRowExec";
let sql = "EXPLAIN SELECT count(*) from alltypes_plain";
let actual = execute_to_batches(&ctx, sql).await;
@@ -806,7 +806,7 @@ async fn explain_physical_plan_only() {
let expected = vec![vec![
"physical_plan",
"ProjectionExec: expr=[2 as COUNT(*)]\
- \n EmptyExec: produce_one_row=true\
+ \n PlaceholderRowExec\
\n",
]];
assert_eq!(expected, actual);
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index dfd4fbf65d..d74015bf09 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -1208,7 +1208,7 @@ impl LogicalPlan {
self.with_new_exprs(new_exprs, &new_inputs_with_values)
}
- /// Walk the logical plan, find any `PlaceHolder` tokens, and return a map
of their IDs and DataTypes
+ /// Walk the logical plan, find any `Placeholder` tokens, and return a map
of their IDs and DataTypes
pub fn get_parameter_types(
&self,
) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
diff --git a/datafusion/optimizer/README.md b/datafusion/optimizer/README.md
index b8e5b93e66..4f9e0fb985 100644
--- a/datafusion/optimizer/README.md
+++ b/datafusion/optimizer/README.md
@@ -153,7 +153,7 @@ Looking at the `EXPLAIN` output we can see that the
optimizer has effectively re
| logical_plan | Projection: Int64(3) AS Int64(1) + Int64(2) |
| | EmptyRelation |
| physical_plan | ProjectionExec: expr=[3 as Int64(1) + Int64(2)] |
-| | EmptyExec: produce_one_row=true |
+| | PlaceholderRowExec |
| | |
+---------------+-------------------------------------------------+
```
@@ -318,7 +318,7 @@ In the following example, the `type_coercion` and
`simplify_expressions` passes
| logical_plan | Projection:
Utf8("3.2") AS foo |
| | EmptyRelation
|
| initial_physical_plan | ProjectionExec:
expr=[3.2 as foo] |
-| | EmptyExec:
produce_one_row=true |
+| |
PlaceholderRowExec |
| |
|
| physical_plan after aggregate_statistics | SAME TEXT AS
ABOVE |
| physical_plan after join_selection | SAME TEXT AS
ABOVE |
@@ -326,7 +326,7 @@ In the following example, the `type_coercion` and
`simplify_expressions` passes
| physical_plan after repartition | SAME TEXT AS
ABOVE |
| physical_plan after add_merge_exec | SAME TEXT AS
ABOVE |
| physical_plan | ProjectionExec:
expr=[3.2 as foo] |
-| | EmptyExec:
produce_one_row=true |
+| |
PlaceholderRowExec |
| |
|
+------------------------------------------------------------+---------------------------------------------------------------------------+
```
diff --git a/datafusion/physical-plan/src/display.rs
b/datafusion/physical-plan/src/display.rs
index aa368251eb..612e164be0 100644
--- a/datafusion/physical-plan/src/display.rs
+++ b/datafusion/physical-plan/src/display.rs
@@ -132,7 +132,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
/// ```dot
/// strict digraph dot_plan {
// 0[label="ProjectionExec: expr=[id@0 + 2 as employee.id +
Int32(2)]",tooltip=""]
- // 1[label="EmptyExec: produce_one_row=false",tooltip=""]
+ // 1[label="EmptyExec",tooltip=""]
// 0 -> 1
// }
/// ```
diff --git a/datafusion/physical-plan/src/empty.rs
b/datafusion/physical-plan/src/empty.rs
index a3e1fb79ed..41c8dbed14 100644
--- a/datafusion/physical-plan/src/empty.rs
+++ b/datafusion/physical-plan/src/empty.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! EmptyRelation execution plan
+//! EmptyRelation with produce_one_row=false execution plan
use std::any::Any;
use std::sync::Arc;
@@ -24,19 +24,16 @@ use super::expressions::PhysicalSortExpr;
use super::{common, DisplayAs, SendableRecordBatchStream, Statistics};
use crate::{memory::MemoryStream, DisplayFormatType, ExecutionPlan,
Partitioning};
-use arrow::array::{ArrayRef, NullArray};
-use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
+use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
use log::trace;
-/// Execution plan for empty relation (produces no rows)
+/// Execution plan for empty relation with produce_one_row=false
#[derive(Debug)]
pub struct EmptyExec {
- /// Specifies whether this exec produces a row or not
- produce_one_row: bool,
/// The schema for the produced row
schema: SchemaRef,
/// Number of partitions
@@ -45,9 +42,8 @@ pub struct EmptyExec {
impl EmptyExec {
/// Create a new EmptyExec
- pub fn new(produce_one_row: bool, schema: SchemaRef) -> Self {
+ pub fn new(schema: SchemaRef) -> Self {
EmptyExec {
- produce_one_row,
schema,
partitions: 1,
}
@@ -59,36 +55,8 @@ impl EmptyExec {
self
}
- /// Specifies whether this exec produces a row or not
- pub fn produce_one_row(&self) -> bool {
- self.produce_one_row
- }
-
fn data(&self) -> Result<Vec<RecordBatch>> {
- let batch = if self.produce_one_row {
- let n_field = self.schema.fields.len();
- // hack for https://github.com/apache/arrow-datafusion/pull/3242
- let n_field = if n_field == 0 { 1 } else { n_field };
- vec![RecordBatch::try_new(
- Arc::new(Schema::new(
- (0..n_field)
- .map(|i| {
- Field::new(format!("placeholder_{i}"),
DataType::Null, true)
- })
- .collect::<Fields>(),
- )),
- (0..n_field)
- .map(|_i| {
- let ret: ArrayRef = Arc::new(NullArray::new(1));
- ret
- })
- .collect(),
- )?]
- } else {
- vec![]
- };
-
- Ok(batch)
+ Ok(vec![])
}
}
@@ -100,7 +68,7 @@ impl DisplayAs for EmptyExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "EmptyExec: produce_one_row={}",
self.produce_one_row)
+ write!(f, "EmptyExec")
}
}
}
@@ -133,10 +101,7 @@ impl ExecutionPlan for EmptyExec {
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- Ok(Arc::new(EmptyExec::new(
- self.produce_one_row,
- self.schema.clone(),
- )))
+ Ok(Arc::new(EmptyExec::new(self.schema.clone())))
}
fn execute(
@@ -184,7 +149,7 @@ mod tests {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
- let empty = EmptyExec::new(false, schema.clone());
+ let empty = EmptyExec::new(schema.clone());
assert_eq!(empty.schema(), schema);
// we should have no results
@@ -198,16 +163,11 @@ mod tests {
#[test]
fn with_new_children() -> Result<()> {
let schema = test::aggr_test_schema();
- let empty = Arc::new(EmptyExec::new(false, schema.clone()));
- let empty_with_row = Arc::new(EmptyExec::new(true, schema));
+ let empty = Arc::new(EmptyExec::new(schema.clone()));
let empty2 = with_new_children_if_necessary(empty.clone(),
vec![])?.into();
assert_eq!(empty.schema(), empty2.schema());
- let empty_with_row_2 =
- with_new_children_if_necessary(empty_with_row.clone(),
vec![])?.into();
- assert_eq!(empty_with_row.schema(), empty_with_row_2.schema());
-
let too_many_kids = vec![empty2];
assert!(
with_new_children_if_necessary(empty, too_many_kids).is_err(),
@@ -220,44 +180,11 @@ mod tests {
async fn invalid_execute() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
- let empty = EmptyExec::new(false, schema);
+ let empty = EmptyExec::new(schema);
// ask for the wrong partition
assert!(empty.execute(1, task_ctx.clone()).is_err());
assert!(empty.execute(20, task_ctx).is_err());
Ok(())
}
-
- #[tokio::test]
- async fn produce_one_row() -> Result<()> {
- let task_ctx = Arc::new(TaskContext::default());
- let schema = test::aggr_test_schema();
- let empty = EmptyExec::new(true, schema);
-
- let iter = empty.execute(0, task_ctx)?;
- let batches = common::collect(iter).await?;
-
- // should have one item
- assert_eq!(batches.len(), 1);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn produce_one_row_multiple_partition() -> Result<()> {
- let task_ctx = Arc::new(TaskContext::default());
- let schema = test::aggr_test_schema();
- let partitions = 3;
- let empty = EmptyExec::new(true, schema).with_partitions(partitions);
-
- for n in 0..partitions {
- let iter = empty.execute(n, task_ctx.clone())?;
- let batches = common::collect(iter).await?;
-
- // should have one item
- assert_eq!(batches.len(), 1);
- }
-
- Ok(())
- }
}
diff --git a/datafusion/physical-plan/src/lib.rs
b/datafusion/physical-plan/src/lib.rs
index f40911c101..6c9e97e03c 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -59,6 +59,7 @@ pub mod limit;
pub mod memory;
pub mod metrics;
mod ordering;
+pub mod placeholder_row;
pub mod projection;
pub mod repartition;
pub mod sorts;
diff --git a/datafusion/physical-plan/src/empty.rs
b/datafusion/physical-plan/src/placeholder_row.rs
similarity index 66%
copy from datafusion/physical-plan/src/empty.rs
copy to datafusion/physical-plan/src/placeholder_row.rs
index a3e1fb79ed..94f3278853 100644
--- a/datafusion/physical-plan/src/empty.rs
+++ b/datafusion/physical-plan/src/placeholder_row.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! EmptyRelation execution plan
+//! EmptyRelation produce_one_row=true execution plan
use std::any::Any;
use std::sync::Arc;
@@ -32,40 +32,32 @@ use datafusion_execution::TaskContext;
use log::trace;
-/// Execution plan for empty relation (produces no rows)
+/// Execution plan for empty relation with produce_one_row=true
#[derive(Debug)]
-pub struct EmptyExec {
- /// Specifies whether this exec produces a row or not
- produce_one_row: bool,
+pub struct PlaceholderRowExec {
/// The schema for the produced row
schema: SchemaRef,
/// Number of partitions
partitions: usize,
}
-impl EmptyExec {
- /// Create a new EmptyExec
- pub fn new(produce_one_row: bool, schema: SchemaRef) -> Self {
- EmptyExec {
- produce_one_row,
+impl PlaceholderRowExec {
+ /// Create a new PlaceholderRowExec
+ pub fn new(schema: SchemaRef) -> Self {
+ PlaceholderRowExec {
schema,
partitions: 1,
}
}
- /// Create a new EmptyExec with specified partition number
+ /// Create a new PlaceholderRowExecPlaceholderRowExec with specified
partition number
pub fn with_partitions(mut self, partitions: usize) -> Self {
self.partitions = partitions;
self
}
- /// Specifies whether this exec produces a row or not
- pub fn produce_one_row(&self) -> bool {
- self.produce_one_row
- }
-
fn data(&self) -> Result<Vec<RecordBatch>> {
- let batch = if self.produce_one_row {
+ Ok({
let n_field = self.schema.fields.len();
// hack for https://github.com/apache/arrow-datafusion/pull/3242
let n_field = if n_field == 0 { 1 } else { n_field };
@@ -84,15 +76,11 @@ impl EmptyExec {
})
.collect(),
)?]
- } else {
- vec![]
- };
-
- Ok(batch)
+ })
}
}
-impl DisplayAs for EmptyExec {
+impl DisplayAs for PlaceholderRowExec {
fn fmt_as(
&self,
t: DisplayFormatType,
@@ -100,13 +88,13 @@ impl DisplayAs for EmptyExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "EmptyExec: produce_one_row={}",
self.produce_one_row)
+ write!(f, "PlaceholderRowExec")
}
}
}
}
-impl ExecutionPlan for EmptyExec {
+impl ExecutionPlan for PlaceholderRowExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
@@ -133,10 +121,7 @@ impl ExecutionPlan for EmptyExec {
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- Ok(Arc::new(EmptyExec::new(
- self.produce_one_row,
- self.schema.clone(),
- )))
+ Ok(Arc::new(PlaceholderRowExec::new(self.schema.clone())))
}
fn execute(
@@ -144,11 +129,11 @@ impl ExecutionPlan for EmptyExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- trace!("Start EmptyExec::execute for partition {} of context
session_id {} and task_id {:?}", partition, context.session_id(),
context.task_id());
+ trace!("Start PlaceholderRowExec::execute for partition {} of context
session_id {} and task_id {:?}", partition, context.session_id(),
context.task_id());
if partition >= self.partitions {
return internal_err!(
- "EmptyExec invalid partition {} (expected less than {})",
+ "PlaceholderRowExec invalid partition {} (expected less than
{})",
partition,
self.partitions
);
@@ -164,7 +149,7 @@ impl ExecutionPlan for EmptyExec {
fn statistics(&self) -> Result<Statistics> {
let batch = self
.data()
- .expect("Create empty RecordBatch should not fail");
+ .expect("Create single row placeholder RecordBatch should not
fail");
Ok(common::compute_record_batch_statistics(
&[batch],
&self.schema,
@@ -179,38 +164,19 @@ mod tests {
use crate::with_new_children_if_necessary;
use crate::{common, test};
- #[tokio::test]
- async fn empty() -> Result<()> {
- let task_ctx = Arc::new(TaskContext::default());
- let schema = test::aggr_test_schema();
-
- let empty = EmptyExec::new(false, schema.clone());
- assert_eq!(empty.schema(), schema);
-
- // we should have no results
- let iter = empty.execute(0, task_ctx)?;
- let batches = common::collect(iter).await?;
- assert!(batches.is_empty());
-
- Ok(())
- }
-
#[test]
fn with_new_children() -> Result<()> {
let schema = test::aggr_test_schema();
- let empty = Arc::new(EmptyExec::new(false, schema.clone()));
- let empty_with_row = Arc::new(EmptyExec::new(true, schema));
- let empty2 = with_new_children_if_necessary(empty.clone(),
vec![])?.into();
- assert_eq!(empty.schema(), empty2.schema());
+ let placeholder = Arc::new(PlaceholderRowExec::new(schema));
- let empty_with_row_2 =
- with_new_children_if_necessary(empty_with_row.clone(),
vec![])?.into();
- assert_eq!(empty_with_row.schema(), empty_with_row_2.schema());
+ let placeholder_2 =
+ with_new_children_if_necessary(placeholder.clone(),
vec![])?.into();
+ assert_eq!(placeholder.schema(), placeholder_2.schema());
- let too_many_kids = vec![empty2];
+ let too_many_kids = vec![placeholder_2];
assert!(
- with_new_children_if_necessary(empty, too_many_kids).is_err(),
+ with_new_children_if_necessary(placeholder,
too_many_kids).is_err(),
"expected error when providing list of kids"
);
Ok(())
@@ -220,11 +186,11 @@ mod tests {
async fn invalid_execute() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
- let empty = EmptyExec::new(false, schema);
+ let placeholder = PlaceholderRowExec::new(schema);
// ask for the wrong partition
- assert!(empty.execute(1, task_ctx.clone()).is_err());
- assert!(empty.execute(20, task_ctx).is_err());
+ assert!(placeholder.execute(1, task_ctx.clone()).is_err());
+ assert!(placeholder.execute(20, task_ctx).is_err());
Ok(())
}
@@ -232,9 +198,9 @@ mod tests {
async fn produce_one_row() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
- let empty = EmptyExec::new(true, schema);
+ let placeholder = PlaceholderRowExec::new(schema);
- let iter = empty.execute(0, task_ctx)?;
+ let iter = placeholder.execute(0, task_ctx)?;
let batches = common::collect(iter).await?;
// should have one item
@@ -248,10 +214,10 @@ mod tests {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
let partitions = 3;
- let empty = EmptyExec::new(true, schema).with_partitions(partitions);
+ let placeholder =
PlaceholderRowExec::new(schema).with_partitions(partitions);
for n in 0..partitions {
- let iter = empty.execute(n, task_ctx.clone())?;
+ let iter = placeholder.execute(n, task_ctx.clone())?;
let batches = common::collect(iter).await?;
// should have one item
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 13a54f2a56..f391592dfe 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1165,6 +1165,7 @@ message PhysicalPlanNode {
JsonSinkExecNode json_sink = 24;
SymmetricHashJoinExecNode symmetric_hash_join = 25;
InterleaveExecNode interleave = 26;
+ PlaceholderRowExecNode placeholder_row = 27;
}
}
@@ -1495,8 +1496,11 @@ message JoinOn {
}
message EmptyExecNode {
- bool produce_one_row = 1;
- Schema schema = 2;
+ Schema schema = 1;
+}
+
+message PlaceholderRowExecNode {
+ Schema schema = 1;
}
message ProjectionExecNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 0d013c72d3..d506b5dcce 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -6389,16 +6389,10 @@ impl serde::Serialize for EmptyExecNode {
{
use serde::ser::SerializeStruct;
let mut len = 0;
- if self.produce_one_row {
- len += 1;
- }
if self.schema.is_some() {
len += 1;
}
let mut struct_ser =
serializer.serialize_struct("datafusion.EmptyExecNode", len)?;
- if self.produce_one_row {
- struct_ser.serialize_field("produceOneRow",
&self.produce_one_row)?;
- }
if let Some(v) = self.schema.as_ref() {
struct_ser.serialize_field("schema", v)?;
}
@@ -6412,14 +6406,11 @@ impl<'de> serde::Deserialize<'de> for EmptyExecNode {
D: serde::Deserializer<'de>,
{
const FIELDS: &[&str] = &[
- "produce_one_row",
- "produceOneRow",
"schema",
];
#[allow(clippy::enum_variant_names)]
enum GeneratedField {
- ProduceOneRow,
Schema,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
@@ -6442,7 +6433,6 @@ impl<'de> serde::Deserialize<'de> for EmptyExecNode {
E: serde::de::Error,
{
match value {
- "produceOneRow" | "produce_one_row" =>
Ok(GeneratedField::ProduceOneRow),
"schema" => Ok(GeneratedField::Schema),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
@@ -6463,16 +6453,9 @@ impl<'de> serde::Deserialize<'de> for EmptyExecNode {
where
V: serde::de::MapAccess<'de>,
{
- let mut produce_one_row__ = None;
let mut schema__ = None;
while let Some(k) = map_.next_key()? {
match k {
- GeneratedField::ProduceOneRow => {
- if produce_one_row__.is_some() {
- return
Err(serde::de::Error::duplicate_field("produceOneRow"));
- }
- produce_one_row__ = Some(map_.next_value()?);
- }
GeneratedField::Schema => {
if schema__.is_some() {
return
Err(serde::de::Error::duplicate_field("schema"));
@@ -6482,7 +6465,6 @@ impl<'de> serde::Deserialize<'de> for EmptyExecNode {
}
}
Ok(EmptyExecNode {
- produce_one_row: produce_one_row__.unwrap_or_default(),
schema: schema__,
})
}
@@ -18020,6 +18002,9 @@ impl serde::Serialize for PhysicalPlanNode {
physical_plan_node::PhysicalPlanType::Interleave(v) => {
struct_ser.serialize_field("interleave", v)?;
}
+ physical_plan_node::PhysicalPlanType::PlaceholderRow(v) => {
+ struct_ser.serialize_field("placeholderRow", v)?;
+ }
}
}
struct_ser.end()
@@ -18069,6 +18054,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode {
"symmetric_hash_join",
"symmetricHashJoin",
"interleave",
+ "placeholder_row",
+ "placeholderRow",
];
#[allow(clippy::enum_variant_names)]
@@ -18098,6 +18085,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode {
JsonSink,
SymmetricHashJoin,
Interleave,
+ PlaceholderRow,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -18144,6 +18132,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode {
"jsonSink" | "json_sink" =>
Ok(GeneratedField::JsonSink),
"symmetricHashJoin" | "symmetric_hash_join" =>
Ok(GeneratedField::SymmetricHashJoin),
"interleave" => Ok(GeneratedField::Interleave),
+ "placeholderRow" | "placeholder_row" =>
Ok(GeneratedField::PlaceholderRow),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -18339,6 +18328,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode
{
return
Err(serde::de::Error::duplicate_field("interleave"));
}
physical_plan_type__ =
map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::Interleave)
+;
+ }
+ GeneratedField::PlaceholderRow => {
+ if physical_plan_type__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("placeholderRow"));
+ }
+ physical_plan_type__ =
map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::PlaceholderRow)
;
}
}
@@ -19369,6 +19365,97 @@ impl<'de> serde::Deserialize<'de> for PlaceholderNode {
deserializer.deserialize_struct("datafusion.PlaceholderNode", FIELDS,
GeneratedVisitor)
}
}
+impl serde::Serialize for PlaceholderRowExecNode {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
+ where
+ S: serde::Serializer,
+ {
+ use serde::ser::SerializeStruct;
+ let mut len = 0;
+ if self.schema.is_some() {
+ len += 1;
+ }
+ let mut struct_ser =
serializer.serialize_struct("datafusion.PlaceholderRowExecNode", len)?;
+ if let Some(v) = self.schema.as_ref() {
+ struct_ser.serialize_field("schema", v)?;
+ }
+ struct_ser.end()
+ }
+}
+impl<'de> serde::Deserialize<'de> for PlaceholderRowExecNode {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "schema",
+ ];
+
+ #[allow(clippy::enum_variant_names)]
+ enum GeneratedField {
+ Schema,
+ }
+ impl<'de> serde::Deserialize<'de> for GeneratedField {
+ fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct GeneratedVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = GeneratedField;
+
+ fn expecting(&self, formatter: &mut
std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ #[allow(unused_variables)]
+ fn visit_str<E>(self, value: &str) ->
std::result::Result<GeneratedField, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "schema" => Ok(GeneratedField::Schema),
+ _ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_identifier(GeneratedVisitor)
+ }
+ }
+ struct GeneratedVisitor;
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = PlaceholderRowExecNode;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
+ formatter.write_str("struct datafusion.PlaceholderRowExecNode")
+ }
+
+ fn visit_map<V>(self, mut map_: V) ->
std::result::Result<PlaceholderRowExecNode, V::Error>
+ where
+ V: serde::de::MapAccess<'de>,
+ {
+ let mut schema__ = None;
+ while let Some(k) = map_.next_key()? {
+ match k {
+ GeneratedField::Schema => {
+ if schema__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("schema"));
+ }
+ schema__ = map_.next_value()?;
+ }
+ }
+ }
+ Ok(PlaceholderRowExecNode {
+ schema: schema__,
+ })
+ }
+ }
+ deserializer.deserialize_struct("datafusion.PlaceholderRowExecNode",
FIELDS, GeneratedVisitor)
+ }
+}
impl serde::Serialize for PlanType {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index d4b62d4b3f..8aadc96349 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1525,7 +1525,7 @@ pub mod owned_table_reference {
pub struct PhysicalPlanNode {
#[prost(
oneof = "physical_plan_node::PhysicalPlanType",
- tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
19, 20, 21, 22, 23, 24, 25, 26"
+ tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
19, 20, 21, 22, 23, 24, 25, 26, 27"
)]
pub physical_plan_type:
::core::option::Option<physical_plan_node::PhysicalPlanType>,
}
@@ -1586,6 +1586,8 @@ pub mod physical_plan_node {
SymmetricHashJoin(::prost::alloc::boxed::Box<super::SymmetricHashJoinExecNode>),
#[prost(message, tag = "26")]
Interleave(super::InterleaveExecNode),
+ #[prost(message, tag = "27")]
+ PlaceholderRow(super::PlaceholderRowExecNode),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
@@ -2103,9 +2105,13 @@ pub struct JoinOn {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct EmptyExecNode {
- #[prost(bool, tag = "1")]
- pub produce_one_row: bool,
- #[prost(message, optional, tag = "2")]
+ #[prost(message, optional, tag = "1")]
+ pub schema: ::core::option::Option<Schema>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PlaceholderRowExecNode {
+ #[prost(message, optional, tag = "1")]
pub schema: ::core::option::Option<Schema>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index 878a5bcb7f..73091a6fce 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -44,6 +44,7 @@ use datafusion::physical_plan::joins::{
};
use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
@@ -721,7 +722,11 @@ impl AsExecutionPlan for PhysicalPlanNode {
}
PhysicalPlanType::Empty(empty) => {
let schema = Arc::new(convert_required!(empty.schema)?);
- Ok(Arc::new(EmptyExec::new(empty.produce_one_row, schema)))
+ Ok(Arc::new(EmptyExec::new(schema)))
+ }
+ PhysicalPlanType::PlaceholderRow(placeholder) => {
+ let schema = Arc::new(convert_required!(placeholder.schema)?);
+ Ok(Arc::new(PlaceholderRowExec::new(schema)))
}
PhysicalPlanType::Sort(sort) => {
let input: Arc<dyn ExecutionPlan> =
@@ -1307,7 +1312,17 @@ impl AsExecutionPlan for PhysicalPlanNode {
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Empty(
protobuf::EmptyExecNode {
- produce_one_row: empty.produce_one_row(),
+ schema: Some(schema),
+ },
+ )),
+ });
+ }
+
+ if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
+ let schema = empty.schema().as_ref().try_into()?;
+ return Ok(protobuf::PhysicalPlanNode {
+ physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
+ protobuf::PlaceholderRowExecNode {
schema: Some(schema),
},
)),
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index f46a29447d..da76209dbb 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -49,6 +49,7 @@ use datafusion::physical_plan::joins::{
HashJoinExec, NestedLoopJoinExec, PartitionMode, StreamJoinPartitionMode,
};
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
@@ -104,7 +105,7 @@ fn roundtrip_test_with_context(
#[test]
fn roundtrip_empty() -> Result<()> {
- roundtrip_test(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))))
+ roundtrip_test(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))))
}
#[test]
@@ -117,7 +118,7 @@ fn roundtrip_date_time_interval() -> Result<()> {
false,
),
]);
- let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone())));
+ let input = Arc::new(EmptyExec::new(Arc::new(schema.clone())));
let date_expr = col("some_date", &schema)?;
let literal_expr = col("some_interval", &schema)?;
let date_time_interval_expr =
@@ -132,7 +133,7 @@ fn roundtrip_date_time_interval() -> Result<()> {
#[test]
fn roundtrip_local_limit() -> Result<()> {
roundtrip_test(Arc::new(LocalLimitExec::new(
- Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
+ Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
25,
)))
}
@@ -140,7 +141,7 @@ fn roundtrip_local_limit() -> Result<()> {
#[test]
fn roundtrip_global_limit() -> Result<()> {
roundtrip_test(Arc::new(GlobalLimitExec::new(
- Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
+ Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
0,
Some(25),
)))
@@ -149,7 +150,7 @@ fn roundtrip_global_limit() -> Result<()> {
#[test]
fn roundtrip_global_skip_no_limit() -> Result<()> {
roundtrip_test(Arc::new(GlobalLimitExec::new(
- Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
+ Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
10,
None, // no limit
)))
@@ -179,8 +180,8 @@ fn roundtrip_hash_join() -> Result<()> {
] {
for partition_mode in &[PartitionMode::Partitioned,
PartitionMode::CollectLeft] {
roundtrip_test(Arc::new(HashJoinExec::try_new(
- Arc::new(EmptyExec::new(false, schema_left.clone())),
- Arc::new(EmptyExec::new(false, schema_right.clone())),
+ Arc::new(EmptyExec::new(schema_left.clone())),
+ Arc::new(EmptyExec::new(schema_right.clone())),
on.clone(),
None,
join_type,
@@ -211,8 +212,8 @@ fn roundtrip_nested_loop_join() -> Result<()> {
JoinType::RightSemi,
] {
roundtrip_test(Arc::new(NestedLoopJoinExec::try_new(
- Arc::new(EmptyExec::new(false, schema_left.clone())),
- Arc::new(EmptyExec::new(false, schema_right.clone())),
+ Arc::new(EmptyExec::new(schema_left.clone())),
+ Arc::new(EmptyExec::new(schema_right.clone())),
None,
join_type,
)?))?;
@@ -277,7 +278,7 @@ fn roundtrip_window() -> Result<()> {
Arc::new(window_frame),
));
- let input = Arc::new(EmptyExec::new(false, schema.clone()));
+ let input = Arc::new(EmptyExec::new(schema.clone()));
roundtrip_test(Arc::new(WindowAggExec::try_new(
vec![
@@ -311,7 +312,7 @@ fn rountrip_aggregate() -> Result<()> {
aggregates.clone(),
vec![None],
vec![None],
- Arc::new(EmptyExec::new(false, schema.clone())),
+ Arc::new(EmptyExec::new(schema.clone())),
schema,
)?))
}
@@ -379,7 +380,7 @@ fn roundtrip_aggregate_udaf() -> Result<()> {
aggregates.clone(),
vec![None],
vec![None],
- Arc::new(EmptyExec::new(false, schema.clone())),
+ Arc::new(EmptyExec::new(schema.clone())),
schema,
)?),
ctx,
@@ -405,7 +406,7 @@ fn roundtrip_filter_with_not_and_in_list() -> Result<()> {
let and = binary(not, Operator::And, in_list, &schema)?;
roundtrip_test(Arc::new(FilterExec::try_new(
and,
- Arc::new(EmptyExec::new(false, schema.clone())),
+ Arc::new(EmptyExec::new(schema.clone())),
)?))
}
@@ -432,7 +433,7 @@ fn roundtrip_sort() -> Result<()> {
];
roundtrip_test(Arc::new(SortExec::new(
sort_exprs,
- Arc::new(EmptyExec::new(false, schema)),
+ Arc::new(EmptyExec::new(schema)),
)))
}
@@ -460,11 +461,11 @@ fn roundtrip_sort_preserve_partitioning() -> Result<()> {
roundtrip_test(Arc::new(SortExec::new(
sort_exprs.clone(),
- Arc::new(EmptyExec::new(false, schema.clone())),
+ Arc::new(EmptyExec::new(schema.clone())),
)))?;
roundtrip_test(Arc::new(
- SortExec::new(sort_exprs, Arc::new(EmptyExec::new(false, schema)))
+ SortExec::new(sort_exprs, Arc::new(EmptyExec::new(schema)))
.with_preserve_partitioning(true),
))
}
@@ -514,7 +515,7 @@ fn roundtrip_builtin_scalar_function() -> Result<()> {
let field_b = Field::new("b", DataType::Int64, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
- let input = Arc::new(EmptyExec::new(false, schema.clone()));
+ let input = Arc::new(EmptyExec::new(schema.clone()));
let execution_props = ExecutionProps::new();
@@ -541,7 +542,7 @@ fn roundtrip_scalar_udf() -> Result<()> {
let field_b = Field::new("b", DataType::Int64, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
- let input = Arc::new(EmptyExec::new(false, schema.clone()));
+ let input = Arc::new(EmptyExec::new(schema.clone()));
let fn_impl = |args: &[ArrayRef]| Ok(Arc::new(args[0].clone()) as
ArrayRef);
@@ -594,7 +595,7 @@ fn roundtrip_distinct_count() -> Result<()> {
aggregates.clone(),
vec![None],
vec![None],
- Arc::new(EmptyExec::new(false, schema.clone())),
+ Arc::new(EmptyExec::new(schema.clone())),
schema,
)?))
}
@@ -605,7 +606,7 @@ fn roundtrip_like() -> Result<()> {
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
]);
- let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone())));
+ let input = Arc::new(EmptyExec::new(Arc::new(schema.clone())));
let like_expr = like(
false,
false,
@@ -632,7 +633,7 @@ fn roundtrip_get_indexed_field_named_struct_field() ->
Result<()> {
];
let schema = Schema::new(fields);
- let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone())));
+ let input = Arc::new(EmptyExec::new(Arc::new(schema.clone())));
let col_arg = col("arg", &schema)?;
let get_indexed_field_expr = Arc::new(GetIndexedFieldExpr::new(
@@ -659,7 +660,7 @@ fn roundtrip_get_indexed_field_list_index() -> Result<()> {
];
let schema = Schema::new(fields);
- let input = Arc::new(EmptyExec::new(true, Arc::new(schema.clone())));
+ let input = Arc::new(PlaceholderRowExec::new(Arc::new(schema.clone())));
let col_arg = col("arg", &schema)?;
let col_key = col("key", &schema)?;
@@ -686,7 +687,7 @@ fn roundtrip_get_indexed_field_list_range() -> Result<()> {
];
let schema = Schema::new(fields);
- let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone())));
+ let input = Arc::new(EmptyExec::new(Arc::new(schema.clone())));
let col_arg = col("arg", &schema)?;
let col_start = col("start", &schema)?;
@@ -712,7 +713,7 @@ fn roundtrip_analyze() -> Result<()> {
let field_a = Field::new("plan_type", DataType::Utf8, false);
let field_b = Field::new("plan", DataType::Utf8, false);
let schema = Schema::new(vec![field_a, field_b]);
- let input = Arc::new(EmptyExec::new(true, Arc::new(schema.clone())));
+ let input = Arc::new(PlaceholderRowExec::new(Arc::new(schema.clone())));
roundtrip_test(Arc::new(AnalyzeExec::new(
false,
@@ -727,7 +728,7 @@ fn roundtrip_json_sink() -> Result<()> {
let field_a = Field::new("plan_type", DataType::Utf8, false);
let field_b = Field::new("plan", DataType::Utf8, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
- let input = Arc::new(EmptyExec::new(true, schema.clone()));
+ let input = Arc::new(PlaceholderRowExec::new(schema.clone()));
let file_sink_config = FileSinkConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
@@ -787,8 +788,8 @@ fn roundtrip_sym_hash_join() -> Result<()> {
] {
roundtrip_test(Arc::new(
datafusion::physical_plan::joins::SymmetricHashJoinExec::try_new(
- Arc::new(EmptyExec::new(false, schema_left.clone())),
- Arc::new(EmptyExec::new(false, schema_right.clone())),
+ Arc::new(EmptyExec::new(schema_left.clone())),
+ Arc::new(EmptyExec::new(schema_right.clone())),
on.clone(),
None,
join_type,
@@ -806,8 +807,8 @@ fn roundtrip_union() -> Result<()> {
let field_a = Field::new("col", DataType::Int64, false);
let schema_left = Schema::new(vec![field_a.clone()]);
let schema_right = Schema::new(vec![field_a]);
- let left = EmptyExec::new(false, Arc::new(schema_left));
- let right = EmptyExec::new(false, Arc::new(schema_right));
+ let left = EmptyExec::new(Arc::new(schema_left));
+ let right = EmptyExec::new(Arc::new(schema_right));
let inputs: Vec<Arc<dyn ExecutionPlan>> = vec![Arc::new(left),
Arc::new(right)];
let union = UnionExec::new(inputs);
roundtrip_test(Arc::new(union))
@@ -820,11 +821,11 @@ fn roundtrip_interleave() -> Result<()> {
let schema_right = Schema::new(vec![field_a]);
let partition = Partitioning::Hash(vec![], 3);
let left = RepartitionExec::try_new(
- Arc::new(EmptyExec::new(false, Arc::new(schema_left))),
+ Arc::new(EmptyExec::new(Arc::new(schema_left))),
partition.clone(),
)?;
let right = RepartitionExec::try_new(
- Arc::new(EmptyExec::new(false, Arc::new(schema_right))),
+ Arc::new(EmptyExec::new(Arc::new(schema_right))),
partition.clone(),
)?;
let inputs: Vec<Arc<dyn ExecutionPlan>> = vec![Arc::new(left),
Arc::new(right)];
diff --git a/datafusion/sqllogictest/test_files/explain.slt
b/datafusion/sqllogictest/test_files/explain.slt
index 18792735ff..4583ef319b 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -94,7 +94,7 @@ EXPLAIN select count(*) from (values ('a', 1, 100), ('a', 2,
150)) as t (c1,c2,c
----
physical_plan
ProjectionExec: expr=[2 as COUNT(*)]
---EmptyExec: produce_one_row=true
+--PlaceholderRowExec
statement ok
set datafusion.explain.physical_plan_only = false
@@ -368,7 +368,7 @@ Projection: List([[1, 2, 3], [4, 5, 6]]) AS
make_array(make_array(Int64(1),Int64
--EmptyRelation
physical_plan
ProjectionExec: expr=[[[1, 2, 3], [4, 5, 6]] as
make_array(make_array(Int64(1),Int64(2),Int64(3)),make_array(Int64(4),Int64(5),Int64(6)))]
---EmptyExec: produce_one_row=true
+--PlaceholderRowExec
query TT
explain select [[1, 2, 3], [4, 5, 6]];
@@ -378,4 +378,4 @@ Projection: List([[1, 2, 3], [4, 5, 6]]) AS
make_array(make_array(Int64(1),Int64
--EmptyRelation
physical_plan
ProjectionExec: expr=[[[1, 2, 3], [4, 5, 6]] as
make_array(make_array(Int64(1),Int64(2),Int64(3)),make_array(Int64(4),Int64(5),Int64(6)))]
---EmptyExec: produce_one_row=true
+--PlaceholderRowExec
diff --git a/datafusion/sqllogictest/test_files/join.slt
b/datafusion/sqllogictest/test_files/join.slt
index 874d849e9a..386ffe766b 100644
--- a/datafusion/sqllogictest/test_files/join.slt
+++ b/datafusion/sqllogictest/test_files/join.slt
@@ -556,7 +556,7 @@ query TT
explain select * from t1 join t2 on false;
----
logical_plan EmptyRelation
-physical_plan EmptyExec: produce_one_row=false
+physical_plan EmptyExec
# Make batch size smaller than table row number. to introduce parallelism to
the plan.
statement ok
diff --git a/datafusion/sqllogictest/test_files/limit.slt
b/datafusion/sqllogictest/test_files/limit.slt
index 182195112e..e063d6e896 100644
--- a/datafusion/sqllogictest/test_files/limit.slt
+++ b/datafusion/sqllogictest/test_files/limit.slt
@@ -312,7 +312,7 @@ Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS
COUNT(*)]]
----TableScan: t1 projection=[], fetch=14
physical_plan
ProjectionExec: expr=[0 as COUNT(*)]
---EmptyExec: produce_one_row=true
+--PlaceholderRowExec
query I
SELECT COUNT(*) FROM (SELECT a FROM t1 LIMIT 3 OFFSET 11);
@@ -330,7 +330,7 @@ Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS
COUNT(*)]]
----TableScan: t1 projection=[], fetch=11
physical_plan
ProjectionExec: expr=[2 as COUNT(*)]
---EmptyExec: produce_one_row=true
+--PlaceholderRowExec
query I
SELECT COUNT(*) FROM (SELECT a FROM t1 LIMIT 3 OFFSET 8);
@@ -348,7 +348,7 @@ Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS
COUNT(*)]]
----TableScan: t1 projection=[]
physical_plan
ProjectionExec: expr=[2 as COUNT(*)]
---EmptyExec: produce_one_row=true
+--PlaceholderRowExec
query I
SELECT COUNT(*) FROM (SELECT a FROM t1 LIMIT 3 OFFSET 8);
diff --git a/datafusion/sqllogictest/test_files/union.slt
b/datafusion/sqllogictest/test_files/union.slt
index 2c8970a139..b4e338875e 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -551,11 +551,11 @@ UnionExec
------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([Int64(1)@0], 4), input_partitions=1
----------AggregateExec: mode=Partial, gby=[1 as Int64(1)], aggr=[]
-------------EmptyExec: produce_one_row=true
+------------PlaceholderRowExec
--ProjectionExec: expr=[2 as a]
-----EmptyExec: produce_one_row=true
+----PlaceholderRowExec
--ProjectionExec: expr=[3 as a]
-----EmptyExec: produce_one_row=true
+----PlaceholderRowExec
# test UNION ALL aliases correctly with aliased subquery
query TT
@@ -583,7 +583,7 @@ UnionExec
--------RepartitionExec: partitioning=Hash([n@0], 4), input_partitions=1
----------AggregateExec: mode=Partial, gby=[n@0 as n], aggr=[COUNT(*)]
------------ProjectionExec: expr=[5 as n]
---------------EmptyExec: produce_one_row=true
+--------------PlaceholderRowExec
--ProjectionExec: expr=[1 as count, MAX(Int64(10))@0 as n]
----AggregateExec: mode=Single, gby=[], aggr=[MAX(Int64(10))]
-------EmptyExec: produce_one_row=true
+------PlaceholderRowExec
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index 7846bb001a..f3de5b54fc 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -279,13 +279,13 @@ SortPreservingMergeExec: [b@0 ASC NULLS LAST]
------------AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[MAX(d.a)]
--------------UnionExec
----------------ProjectionExec: expr=[1 as a, aa as b]
-------------------EmptyExec: produce_one_row=true
+------------------PlaceholderRowExec
----------------ProjectionExec: expr=[3 as a, aa as b]
-------------------EmptyExec: produce_one_row=true
+------------------PlaceholderRowExec
----------------ProjectionExec: expr=[5 as a, bb as b]
-------------------EmptyExec: produce_one_row=true
+------------------PlaceholderRowExec
----------------ProjectionExec: expr=[7 as a, bb as b]
-------------------EmptyExec: produce_one_row=true
+------------------PlaceholderRowExec
# Check actual result:
query TI
@@ -365,13 +365,13 @@ SortPreservingMergeExec: [b@0 ASC NULLS LAST]
--------------RepartitionExec: partitioning=Hash([b@1], 4), input_partitions=4
----------------UnionExec
------------------ProjectionExec: expr=[1 as a, aa as b]
---------------------EmptyExec: produce_one_row=true
+--------------------PlaceholderRowExec
------------------ProjectionExec: expr=[3 as a, aa as b]
---------------------EmptyExec: produce_one_row=true
+--------------------PlaceholderRowExec
------------------ProjectionExec: expr=[5 as a, bb as b]
---------------------EmptyExec: produce_one_row=true
+--------------------PlaceholderRowExec
------------------ProjectionExec: expr=[7 as a, bb as b]
---------------------EmptyExec: produce_one_row=true
+--------------------PlaceholderRowExec
# check actual result