This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d091b55be6 Split `EmptyExec` into `PlaceholderRowExec` (#8446)
d091b55be6 is described below

commit d091b55be6a4ce552023ef162b5d081136d3ff6d
Author: Mohammad Razeghi <[email protected]>
AuthorDate: Sat Dec 9 13:23:34 2023 +0100

    Split `EmptyExec` into `PlaceholderRowExec` (#8446)
    
    * add PlaceHolderRowExec
    
    * Change produce_one_row=true calls to use PlaceHolderRowExec
    
    * remove produce_one_row from EmptyExec, changes in proto serializer, 
working tests
    
    * PlaceHolder => Placeholder
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/core/src/datasource/empty.rs            |   2 +-
 datafusion/core/src/datasource/listing/table.rs    |   4 +-
 .../src/physical_optimizer/aggregate_statistics.rs |   4 +-
 .../core/src/physical_optimizer/join_selection.rs  |   4 +-
 datafusion/core/src/physical_planner.rs            |  12 +-
 datafusion/core/tests/custom_sources.rs            |  12 +-
 datafusion/core/tests/sql/explain_analyze.rs       |   4 +-
 datafusion/expr/src/logical_plan/plan.rs           |   2 +-
 datafusion/optimizer/README.md                     |   6 +-
 datafusion/physical-plan/src/display.rs            |   2 +-
 datafusion/physical-plan/src/empty.rs              |  93 ++--------------
 datafusion/physical-plan/src/lib.rs                |   1 +
 .../src/{empty.rs => placeholder_row.rs}           |  94 +++++-----------
 datafusion/proto/proto/datafusion.proto            |   8 +-
 datafusion/proto/src/generated/pbjson.rs           | 123 ++++++++++++++++++---
 datafusion/proto/src/generated/prost.rs            |  14 ++-
 datafusion/proto/src/physical_plan/mod.rs          |  19 +++-
 .../proto/tests/cases/roundtrip_physical_plan.rs   |  63 +++++------
 datafusion/sqllogictest/test_files/explain.slt     |   6 +-
 datafusion/sqllogictest/test_files/join.slt        |   2 +-
 datafusion/sqllogictest/test_files/limit.slt       |   6 +-
 datafusion/sqllogictest/test_files/union.slt       |  10 +-
 datafusion/sqllogictest/test_files/window.slt      |  16 +--
 23 files changed, 260 insertions(+), 247 deletions(-)

diff --git a/datafusion/core/src/datasource/empty.rs 
b/datafusion/core/src/datasource/empty.rs
index 77160aa5d1..5100987520 100644
--- a/datafusion/core/src/datasource/empty.rs
+++ b/datafusion/core/src/datasource/empty.rs
@@ -77,7 +77,7 @@ impl TableProvider for EmptyTable {
         // even though there is no data, projections apply
         let projected_schema = project_schema(&self.schema, projection)?;
         Ok(Arc::new(
-            EmptyExec::new(false, 
projected_schema).with_partitions(self.partitions),
+            EmptyExec::new(projected_schema).with_partitions(self.partitions),
         ))
     }
 }
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 10ec9f8d8d..0ce1b43fe4 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -685,7 +685,7 @@ impl TableProvider for ListingTable {
         if partitioned_file_lists.is_empty() {
             let schema = self.schema();
             let projected_schema = project_schema(&schema, projection)?;
-            return Ok(Arc::new(EmptyExec::new(false, projected_schema)));
+            return Ok(Arc::new(EmptyExec::new(projected_schema)));
         }
 
         // extract types of partition columns
@@ -713,7 +713,7 @@ impl TableProvider for ListingTable {
         let object_store_url = if let Some(url) = self.table_paths.first() {
             url.object_store()
         } else {
-            return Ok(Arc::new(EmptyExec::new(false, 
Arc::new(Schema::empty()))));
+            return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
         };
         // create the execution plan
         self.options
diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs 
b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
index 4265e3ff80..795857b10e 100644
--- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
+++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
@@ -22,7 +22,6 @@ use super::optimizer::PhysicalOptimizerRule;
 use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_plan::aggregates::AggregateExec;
-use crate::physical_plan::empty::EmptyExec;
 use crate::physical_plan::projection::ProjectionExec;
 use crate::physical_plan::{expressions, AggregateExpr, ExecutionPlan, 
Statistics};
 use crate::scalar::ScalarValue;
@@ -30,6 +29,7 @@ use crate::scalar::ScalarValue;
 use datafusion_common::stats::Precision;
 use datafusion_common::tree_node::TreeNode;
 use datafusion_expr::utils::COUNT_STAR_EXPANSION;
+use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
 
 /// Optimizer that uses available statistics for aggregate functions
 #[derive(Default)]
@@ -82,7 +82,7 @@ impl PhysicalOptimizerRule for AggregateStatistics {
                 // input can be entirely removed
                 Ok(Arc::new(ProjectionExec::try_new(
                     projections,
-                    Arc::new(EmptyExec::new(true, plan.schema())),
+                    Arc::new(PlaceholderRowExec::new(plan.schema())),
                 )?))
             } else {
                 plan.map_children(|child| self.optimize(child, _config))
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs 
b/datafusion/core/src/physical_optimizer/join_selection.rs
index 0c3ac2d245..6b2fe24acf 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -1623,12 +1623,12 @@ mod hash_join_tests {
 
         let children = vec![
             PipelineStatePropagator {
-                plan: Arc::new(EmptyExec::new(false, 
Arc::new(Schema::empty()))),
+                plan: Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
                 unbounded: left_unbounded,
                 children: vec![],
             },
             PipelineStatePropagator {
-                plan: Arc::new(EmptyExec::new(false, 
Arc::new(Schema::empty()))),
+                plan: Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
                 unbounded: right_unbounded,
                 children: vec![],
             },
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 38532002a6..ab38b3ec6d 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -91,6 +91,7 @@ use datafusion_expr::{
     WindowFrameBound, WriteOp,
 };
 use datafusion_physical_expr::expressions::Literal;
+use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
 use datafusion_sql::utils::window_expr_common_partition_keys;
 
 use async_trait::async_trait;
@@ -1196,10 +1197,15 @@ impl DefaultPhysicalPlanner {
                 }
                 LogicalPlan::Subquery(_) => todo!(),
                 LogicalPlan::EmptyRelation(EmptyRelation {
-                    produce_one_row,
+                    produce_one_row: false,
                     schema,
                 }) => Ok(Arc::new(EmptyExec::new(
-                    *produce_one_row,
+                    SchemaRef::new(schema.as_ref().to_owned().into()),
+                ))),
+                LogicalPlan::EmptyRelation(EmptyRelation {
+                    produce_one_row: true,
+                    schema,
+                }) => Ok(Arc::new(PlaceholderRowExec::new(
                     SchemaRef::new(schema.as_ref().to_owned().into()),
                 ))),
                 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
@@ -2767,7 +2773,7 @@ mod tests {
 
 digraph {
     1[shape=box label="ProjectionExec: expr=[id@0 + 2 as employee.id + 
Int32(2)]", tooltip=""]
-    2[shape=box label="EmptyExec: produce_one_row=false", tooltip=""]
+    2[shape=box label="EmptyExec", tooltip=""]
     1 -> 2 [arrowhead=none, arrowtail=normal, dir=back]
 }
 // End DataFusion GraphViz Plan
diff --git a/datafusion/core/tests/custom_sources.rs 
b/datafusion/core/tests/custom_sources.rs
index daf1ef41a2..a9ea5cc2a3 100644
--- a/datafusion/core/tests/custom_sources.rs
+++ b/datafusion/core/tests/custom_sources.rs
@@ -30,7 +30,6 @@ use datafusion::execution::context::{SessionContext, 
SessionState, TaskContext};
 use datafusion::logical_expr::{
     col, Expr, LogicalPlan, LogicalPlanBuilder, TableScan, UNNAMED_TABLE,
 };
-use datafusion::physical_plan::empty::EmptyExec;
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::physical_plan::{
     collect, ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, 
Partitioning,
@@ -42,6 +41,7 @@ use datafusion_common::project_schema;
 use datafusion_common::stats::Precision;
 
 use async_trait::async_trait;
+use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
 use futures::stream::Stream;
 
 /// Also run all tests that are found in the `custom_sources_cases` directory
@@ -256,9 +256,9 @@ async fn optimizers_catch_all_statistics() {
 
     let physical_plan = df.create_physical_plan().await.unwrap();
 
-    // when the optimization kicks in, the source is replaced by an EmptyExec
+    // when the optimization kicks in, the source is replaced by an 
PlaceholderRowExec
     assert!(
-        contains_empty_exec(Arc::clone(&physical_plan)),
+        contains_place_holder_exec(Arc::clone(&physical_plan)),
         "Expected aggregate_statistics optimizations missing: 
{physical_plan:?}"
     );
 
@@ -283,12 +283,12 @@ async fn optimizers_catch_all_statistics() {
     assert_eq!(format!("{:?}", actual[0]), format!("{expected:?}"));
 }
 
-fn contains_empty_exec(plan: Arc<dyn ExecutionPlan>) -> bool {
-    if plan.as_any().is::<EmptyExec>() {
+fn contains_place_holder_exec(plan: Arc<dyn ExecutionPlan>) -> bool {
+    if plan.as_any().is::<PlaceholderRowExec>() {
         true
     } else if plan.children().len() != 1 {
         false
     } else {
-        contains_empty_exec(Arc::clone(&plan.children()[0]))
+        contains_place_holder_exec(Arc::clone(&plan.children()[0]))
     }
 }
diff --git a/datafusion/core/tests/sql/explain_analyze.rs 
b/datafusion/core/tests/sql/explain_analyze.rs
index ecb5766a3b..37f8cefc90 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -575,7 +575,7 @@ async fn explain_analyze_runs_optimizers() {
 
     // This happens as an optimization pass where count(*) can be
     // answered using statistics only.
-    let expected = "EmptyExec: produce_one_row=true";
+    let expected = "PlaceholderRowExec";
 
     let sql = "EXPLAIN SELECT count(*) from alltypes_plain";
     let actual = execute_to_batches(&ctx, sql).await;
@@ -806,7 +806,7 @@ async fn explain_physical_plan_only() {
     let expected = vec![vec![
         "physical_plan",
         "ProjectionExec: expr=[2 as COUNT(*)]\
-        \n  EmptyExec: produce_one_row=true\
+        \n  PlaceholderRowExec\
         \n",
     ]];
     assert_eq!(expected, actual);
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index dfd4fbf65d..d74015bf09 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -1208,7 +1208,7 @@ impl LogicalPlan {
         self.with_new_exprs(new_exprs, &new_inputs_with_values)
     }
 
-    /// Walk the logical plan, find any `PlaceHolder` tokens, and return a map 
of their IDs and DataTypes
+    /// Walk the logical plan, find any `Placeholder` tokens, and return a map 
of their IDs and DataTypes
     pub fn get_parameter_types(
         &self,
     ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
diff --git a/datafusion/optimizer/README.md b/datafusion/optimizer/README.md
index b8e5b93e66..4f9e0fb985 100644
--- a/datafusion/optimizer/README.md
+++ b/datafusion/optimizer/README.md
@@ -153,7 +153,7 @@ Looking at the `EXPLAIN` output we can see that the 
optimizer has effectively re
 | logical_plan  | Projection: Int64(3) AS Int64(1) + Int64(2)     |
 |               |   EmptyRelation                                 |
 | physical_plan | ProjectionExec: expr=[3 as Int64(1) + Int64(2)] |
-|               |   EmptyExec: produce_one_row=true               |
+|               |   PlaceholderRowExec                            |
 |               |                                                 |
 +---------------+-------------------------------------------------+
 ```
@@ -318,7 +318,7 @@ In the following example, the `type_coercion` and 
`simplify_expressions` passes
 | logical_plan                                               | Projection: 
Utf8("3.2") AS foo                                            |
 |                                                            |   EmptyRelation 
                                                          |
 | initial_physical_plan                                      | ProjectionExec: 
expr=[3.2 as foo]                                         |
-|                                                            |   EmptyExec: 
produce_one_row=true                                         |
+|                                                            |   
PlaceholderRowExec                                                      |
 |                                                            |                 
                                                          |
 | physical_plan after aggregate_statistics                   | SAME TEXT AS 
ABOVE                                                        |
 | physical_plan after join_selection                         | SAME TEXT AS 
ABOVE                                                        |
@@ -326,7 +326,7 @@ In the following example, the `type_coercion` and 
`simplify_expressions` passes
 | physical_plan after repartition                            | SAME TEXT AS 
ABOVE                                                        |
 | physical_plan after add_merge_exec                         | SAME TEXT AS 
ABOVE                                                        |
 | physical_plan                                              | ProjectionExec: 
expr=[3.2 as foo]                                         |
-|                                                            |   EmptyExec: 
produce_one_row=true                                         |
+|                                                            |   
PlaceholderRowExec                                                      |
 |                                                            |                 
                                                          |
 
+------------------------------------------------------------+---------------------------------------------------------------------------+
 ```
diff --git a/datafusion/physical-plan/src/display.rs 
b/datafusion/physical-plan/src/display.rs
index aa368251eb..612e164be0 100644
--- a/datafusion/physical-plan/src/display.rs
+++ b/datafusion/physical-plan/src/display.rs
@@ -132,7 +132,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
     /// ```dot
     /// strict digraph dot_plan {
     //     0[label="ProjectionExec: expr=[id@0 + 2 as employee.id + 
Int32(2)]",tooltip=""]
-    //     1[label="EmptyExec: produce_one_row=false",tooltip=""]
+    //     1[label="EmptyExec",tooltip=""]
     //     0 -> 1
     // }
     /// ```
diff --git a/datafusion/physical-plan/src/empty.rs 
b/datafusion/physical-plan/src/empty.rs
index a3e1fb79ed..41c8dbed14 100644
--- a/datafusion/physical-plan/src/empty.rs
+++ b/datafusion/physical-plan/src/empty.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! EmptyRelation execution plan
+//! EmptyRelation with produce_one_row=false execution plan
 
 use std::any::Any;
 use std::sync::Arc;
@@ -24,19 +24,16 @@ use super::expressions::PhysicalSortExpr;
 use super::{common, DisplayAs, SendableRecordBatchStream, Statistics};
 use crate::{memory::MemoryStream, DisplayFormatType, ExecutionPlan, 
Partitioning};
 
-use arrow::array::{ArrayRef, NullArray};
-use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
+use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
 use datafusion_common::{internal_err, DataFusionError, Result};
 use datafusion_execution::TaskContext;
 
 use log::trace;
 
-/// Execution plan for empty relation (produces no rows)
+/// Execution plan for empty relation with produce_one_row=false
 #[derive(Debug)]
 pub struct EmptyExec {
-    /// Specifies whether this exec produces a row or not
-    produce_one_row: bool,
     /// The schema for the produced row
     schema: SchemaRef,
     /// Number of partitions
@@ -45,9 +42,8 @@ pub struct EmptyExec {
 
 impl EmptyExec {
     /// Create a new EmptyExec
-    pub fn new(produce_one_row: bool, schema: SchemaRef) -> Self {
+    pub fn new(schema: SchemaRef) -> Self {
         EmptyExec {
-            produce_one_row,
             schema,
             partitions: 1,
         }
@@ -59,36 +55,8 @@ impl EmptyExec {
         self
     }
 
-    /// Specifies whether this exec produces a row or not
-    pub fn produce_one_row(&self) -> bool {
-        self.produce_one_row
-    }
-
     fn data(&self) -> Result<Vec<RecordBatch>> {
-        let batch = if self.produce_one_row {
-            let n_field = self.schema.fields.len();
-            // hack for https://github.com/apache/arrow-datafusion/pull/3242
-            let n_field = if n_field == 0 { 1 } else { n_field };
-            vec![RecordBatch::try_new(
-                Arc::new(Schema::new(
-                    (0..n_field)
-                        .map(|i| {
-                            Field::new(format!("placeholder_{i}"), 
DataType::Null, true)
-                        })
-                        .collect::<Fields>(),
-                )),
-                (0..n_field)
-                    .map(|_i| {
-                        let ret: ArrayRef = Arc::new(NullArray::new(1));
-                        ret
-                    })
-                    .collect(),
-            )?]
-        } else {
-            vec![]
-        };
-
-        Ok(batch)
+        Ok(vec![])
     }
 }
 
@@ -100,7 +68,7 @@ impl DisplayAs for EmptyExec {
     ) -> std::fmt::Result {
         match t {
             DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "EmptyExec: produce_one_row={}", 
self.produce_one_row)
+                write!(f, "EmptyExec")
             }
         }
     }
@@ -133,10 +101,7 @@ impl ExecutionPlan for EmptyExec {
         self: Arc<Self>,
         _: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        Ok(Arc::new(EmptyExec::new(
-            self.produce_one_row,
-            self.schema.clone(),
-        )))
+        Ok(Arc::new(EmptyExec::new(self.schema.clone())))
     }
 
     fn execute(
@@ -184,7 +149,7 @@ mod tests {
         let task_ctx = Arc::new(TaskContext::default());
         let schema = test::aggr_test_schema();
 
-        let empty = EmptyExec::new(false, schema.clone());
+        let empty = EmptyExec::new(schema.clone());
         assert_eq!(empty.schema(), schema);
 
         // we should have no results
@@ -198,16 +163,11 @@ mod tests {
     #[test]
     fn with_new_children() -> Result<()> {
         let schema = test::aggr_test_schema();
-        let empty = Arc::new(EmptyExec::new(false, schema.clone()));
-        let empty_with_row = Arc::new(EmptyExec::new(true, schema));
+        let empty = Arc::new(EmptyExec::new(schema.clone()));
 
         let empty2 = with_new_children_if_necessary(empty.clone(), 
vec![])?.into();
         assert_eq!(empty.schema(), empty2.schema());
 
-        let empty_with_row_2 =
-            with_new_children_if_necessary(empty_with_row.clone(), 
vec![])?.into();
-        assert_eq!(empty_with_row.schema(), empty_with_row_2.schema());
-
         let too_many_kids = vec![empty2];
         assert!(
             with_new_children_if_necessary(empty, too_many_kids).is_err(),
@@ -220,44 +180,11 @@ mod tests {
     async fn invalid_execute() -> Result<()> {
         let task_ctx = Arc::new(TaskContext::default());
         let schema = test::aggr_test_schema();
-        let empty = EmptyExec::new(false, schema);
+        let empty = EmptyExec::new(schema);
 
         // ask for the wrong partition
         assert!(empty.execute(1, task_ctx.clone()).is_err());
         assert!(empty.execute(20, task_ctx).is_err());
         Ok(())
     }
-
-    #[tokio::test]
-    async fn produce_one_row() -> Result<()> {
-        let task_ctx = Arc::new(TaskContext::default());
-        let schema = test::aggr_test_schema();
-        let empty = EmptyExec::new(true, schema);
-
-        let iter = empty.execute(0, task_ctx)?;
-        let batches = common::collect(iter).await?;
-
-        // should have one item
-        assert_eq!(batches.len(), 1);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn produce_one_row_multiple_partition() -> Result<()> {
-        let task_ctx = Arc::new(TaskContext::default());
-        let schema = test::aggr_test_schema();
-        let partitions = 3;
-        let empty = EmptyExec::new(true, schema).with_partitions(partitions);
-
-        for n in 0..partitions {
-            let iter = empty.execute(n, task_ctx.clone())?;
-            let batches = common::collect(iter).await?;
-
-            // should have one item
-            assert_eq!(batches.len(), 1);
-        }
-
-        Ok(())
-    }
 }
diff --git a/datafusion/physical-plan/src/lib.rs 
b/datafusion/physical-plan/src/lib.rs
index f40911c101..6c9e97e03c 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -59,6 +59,7 @@ pub mod limit;
 pub mod memory;
 pub mod metrics;
 mod ordering;
+pub mod placeholder_row;
 pub mod projection;
 pub mod repartition;
 pub mod sorts;
diff --git a/datafusion/physical-plan/src/empty.rs 
b/datafusion/physical-plan/src/placeholder_row.rs
similarity index 66%
copy from datafusion/physical-plan/src/empty.rs
copy to datafusion/physical-plan/src/placeholder_row.rs
index a3e1fb79ed..94f3278853 100644
--- a/datafusion/physical-plan/src/empty.rs
+++ b/datafusion/physical-plan/src/placeholder_row.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! EmptyRelation execution plan
+//! EmptyRelation produce_one_row=true execution plan
 
 use std::any::Any;
 use std::sync::Arc;
@@ -32,40 +32,32 @@ use datafusion_execution::TaskContext;
 
 use log::trace;
 
-/// Execution plan for empty relation (produces no rows)
+/// Execution plan for empty relation with produce_one_row=true
 #[derive(Debug)]
-pub struct EmptyExec {
-    /// Specifies whether this exec produces a row or not
-    produce_one_row: bool,
+pub struct PlaceholderRowExec {
     /// The schema for the produced row
     schema: SchemaRef,
     /// Number of partitions
     partitions: usize,
 }
 
-impl EmptyExec {
-    /// Create a new EmptyExec
-    pub fn new(produce_one_row: bool, schema: SchemaRef) -> Self {
-        EmptyExec {
-            produce_one_row,
+impl PlaceholderRowExec {
+    /// Create a new PlaceholderRowExec
+    pub fn new(schema: SchemaRef) -> Self {
+        PlaceholderRowExec {
             schema,
             partitions: 1,
         }
     }
 
-    /// Create a new EmptyExec with specified partition number
+    /// Create a new PlaceholderRowExecPlaceholderRowExec with specified 
partition number
     pub fn with_partitions(mut self, partitions: usize) -> Self {
         self.partitions = partitions;
         self
     }
 
-    /// Specifies whether this exec produces a row or not
-    pub fn produce_one_row(&self) -> bool {
-        self.produce_one_row
-    }
-
     fn data(&self) -> Result<Vec<RecordBatch>> {
-        let batch = if self.produce_one_row {
+        Ok({
             let n_field = self.schema.fields.len();
             // hack for https://github.com/apache/arrow-datafusion/pull/3242
             let n_field = if n_field == 0 { 1 } else { n_field };
@@ -84,15 +76,11 @@ impl EmptyExec {
                     })
                     .collect(),
             )?]
-        } else {
-            vec![]
-        };
-
-        Ok(batch)
+        })
     }
 }
 
-impl DisplayAs for EmptyExec {
+impl DisplayAs for PlaceholderRowExec {
     fn fmt_as(
         &self,
         t: DisplayFormatType,
@@ -100,13 +88,13 @@ impl DisplayAs for EmptyExec {
     ) -> std::fmt::Result {
         match t {
             DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "EmptyExec: produce_one_row={}", 
self.produce_one_row)
+                write!(f, "PlaceholderRowExec")
             }
         }
     }
 }
 
-impl ExecutionPlan for EmptyExec {
+impl ExecutionPlan for PlaceholderRowExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
         self
@@ -133,10 +121,7 @@ impl ExecutionPlan for EmptyExec {
         self: Arc<Self>,
         _: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        Ok(Arc::new(EmptyExec::new(
-            self.produce_one_row,
-            self.schema.clone(),
-        )))
+        Ok(Arc::new(PlaceholderRowExec::new(self.schema.clone())))
     }
 
     fn execute(
@@ -144,11 +129,11 @@ impl ExecutionPlan for EmptyExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        trace!("Start EmptyExec::execute for partition {} of context 
session_id {} and task_id {:?}", partition, context.session_id(), 
context.task_id());
+        trace!("Start PlaceholderRowExec::execute for partition {} of context 
session_id {} and task_id {:?}", partition, context.session_id(), 
context.task_id());
 
         if partition >= self.partitions {
             return internal_err!(
-                "EmptyExec invalid partition {} (expected less than {})",
+                "PlaceholderRowExec invalid partition {} (expected less than 
{})",
                 partition,
                 self.partitions
             );
@@ -164,7 +149,7 @@ impl ExecutionPlan for EmptyExec {
     fn statistics(&self) -> Result<Statistics> {
         let batch = self
             .data()
-            .expect("Create empty RecordBatch should not fail");
+            .expect("Create single row placeholder RecordBatch should not 
fail");
         Ok(common::compute_record_batch_statistics(
             &[batch],
             &self.schema,
@@ -179,38 +164,19 @@ mod tests {
     use crate::with_new_children_if_necessary;
     use crate::{common, test};
 
-    #[tokio::test]
-    async fn empty() -> Result<()> {
-        let task_ctx = Arc::new(TaskContext::default());
-        let schema = test::aggr_test_schema();
-
-        let empty = EmptyExec::new(false, schema.clone());
-        assert_eq!(empty.schema(), schema);
-
-        // we should have no results
-        let iter = empty.execute(0, task_ctx)?;
-        let batches = common::collect(iter).await?;
-        assert!(batches.is_empty());
-
-        Ok(())
-    }
-
     #[test]
     fn with_new_children() -> Result<()> {
         let schema = test::aggr_test_schema();
-        let empty = Arc::new(EmptyExec::new(false, schema.clone()));
-        let empty_with_row = Arc::new(EmptyExec::new(true, schema));
 
-        let empty2 = with_new_children_if_necessary(empty.clone(), 
vec![])?.into();
-        assert_eq!(empty.schema(), empty2.schema());
+        let placeholder = Arc::new(PlaceholderRowExec::new(schema));
 
-        let empty_with_row_2 =
-            with_new_children_if_necessary(empty_with_row.clone(), 
vec![])?.into();
-        assert_eq!(empty_with_row.schema(), empty_with_row_2.schema());
+        let placeholder_2 =
+            with_new_children_if_necessary(placeholder.clone(), 
vec![])?.into();
+        assert_eq!(placeholder.schema(), placeholder_2.schema());
 
-        let too_many_kids = vec![empty2];
+        let too_many_kids = vec![placeholder_2];
         assert!(
-            with_new_children_if_necessary(empty, too_many_kids).is_err(),
+            with_new_children_if_necessary(placeholder, 
too_many_kids).is_err(),
             "expected error when providing list of kids"
         );
         Ok(())
@@ -220,11 +186,11 @@ mod tests {
     async fn invalid_execute() -> Result<()> {
         let task_ctx = Arc::new(TaskContext::default());
         let schema = test::aggr_test_schema();
-        let empty = EmptyExec::new(false, schema);
+        let placeholder = PlaceholderRowExec::new(schema);
 
         // ask for the wrong partition
-        assert!(empty.execute(1, task_ctx.clone()).is_err());
-        assert!(empty.execute(20, task_ctx).is_err());
+        assert!(placeholder.execute(1, task_ctx.clone()).is_err());
+        assert!(placeholder.execute(20, task_ctx).is_err());
         Ok(())
     }
 
@@ -232,9 +198,9 @@ mod tests {
     async fn produce_one_row() -> Result<()> {
         let task_ctx = Arc::new(TaskContext::default());
         let schema = test::aggr_test_schema();
-        let empty = EmptyExec::new(true, schema);
+        let placeholder = PlaceholderRowExec::new(schema);
 
-        let iter = empty.execute(0, task_ctx)?;
+        let iter = placeholder.execute(0, task_ctx)?;
         let batches = common::collect(iter).await?;
 
         // should have one item
@@ -248,10 +214,10 @@ mod tests {
         let task_ctx = Arc::new(TaskContext::default());
         let schema = test::aggr_test_schema();
         let partitions = 3;
-        let empty = EmptyExec::new(true, schema).with_partitions(partitions);
+        let placeholder = 
PlaceholderRowExec::new(schema).with_partitions(partitions);
 
         for n in 0..partitions {
-            let iter = empty.execute(n, task_ctx.clone())?;
+            let iter = placeholder.execute(n, task_ctx.clone())?;
             let batches = common::collect(iter).await?;
 
             // should have one item
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 13a54f2a56..f391592dfe 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1165,6 +1165,7 @@ message PhysicalPlanNode {
     JsonSinkExecNode json_sink = 24;
     SymmetricHashJoinExecNode symmetric_hash_join = 25;
     InterleaveExecNode  interleave = 26;
+    PlaceholderRowExecNode placeholder_row = 27;
   }
 }
 
@@ -1495,8 +1496,11 @@ message JoinOn {
 }
 
 message EmptyExecNode {
-  bool produce_one_row = 1;
-  Schema schema = 2;
+  Schema schema = 1;
+}
+
+message PlaceholderRowExecNode {
+  Schema schema = 1;
 }
 
 message ProjectionExecNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 0d013c72d3..d506b5dcce 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -6389,16 +6389,10 @@ impl serde::Serialize for EmptyExecNode {
     {
         use serde::ser::SerializeStruct;
         let mut len = 0;
-        if self.produce_one_row {
-            len += 1;
-        }
         if self.schema.is_some() {
             len += 1;
         }
         let mut struct_ser = 
serializer.serialize_struct("datafusion.EmptyExecNode", len)?;
-        if self.produce_one_row {
-            struct_ser.serialize_field("produceOneRow", 
&self.produce_one_row)?;
-        }
         if let Some(v) = self.schema.as_ref() {
             struct_ser.serialize_field("schema", v)?;
         }
@@ -6412,14 +6406,11 @@ impl<'de> serde::Deserialize<'de> for EmptyExecNode {
         D: serde::Deserializer<'de>,
     {
         const FIELDS: &[&str] = &[
-            "produce_one_row",
-            "produceOneRow",
             "schema",
         ];
 
         #[allow(clippy::enum_variant_names)]
         enum GeneratedField {
-            ProduceOneRow,
             Schema,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
@@ -6442,7 +6433,6 @@ impl<'de> serde::Deserialize<'de> for EmptyExecNode {
                         E: serde::de::Error,
                     {
                         match value {
-                            "produceOneRow" | "produce_one_row" => 
Ok(GeneratedField::ProduceOneRow),
                             "schema" => Ok(GeneratedField::Schema),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
@@ -6463,16 +6453,9 @@ impl<'de> serde::Deserialize<'de> for EmptyExecNode {
                 where
                     V: serde::de::MapAccess<'de>,
             {
-                let mut produce_one_row__ = None;
                 let mut schema__ = None;
                 while let Some(k) = map_.next_key()? {
                     match k {
-                        GeneratedField::ProduceOneRow => {
-                            if produce_one_row__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("produceOneRow"));
-                            }
-                            produce_one_row__ = Some(map_.next_value()?);
-                        }
                         GeneratedField::Schema => {
                             if schema__.is_some() {
                                 return 
Err(serde::de::Error::duplicate_field("schema"));
@@ -6482,7 +6465,6 @@ impl<'de> serde::Deserialize<'de> for EmptyExecNode {
                     }
                 }
                 Ok(EmptyExecNode {
-                    produce_one_row: produce_one_row__.unwrap_or_default(),
                     schema: schema__,
                 })
             }
@@ -18020,6 +18002,9 @@ impl serde::Serialize for PhysicalPlanNode {
                 physical_plan_node::PhysicalPlanType::Interleave(v) => {
                     struct_ser.serialize_field("interleave", v)?;
                 }
+                physical_plan_node::PhysicalPlanType::PlaceholderRow(v) => {
+                    struct_ser.serialize_field("placeholderRow", v)?;
+                }
             }
         }
         struct_ser.end()
@@ -18069,6 +18054,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode {
             "symmetric_hash_join",
             "symmetricHashJoin",
             "interleave",
+            "placeholder_row",
+            "placeholderRow",
         ];
 
         #[allow(clippy::enum_variant_names)]
@@ -18098,6 +18085,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode {
             JsonSink,
             SymmetricHashJoin,
             Interleave,
+            PlaceholderRow,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -18144,6 +18132,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode {
                             "jsonSink" | "json_sink" => 
Ok(GeneratedField::JsonSink),
                             "symmetricHashJoin" | "symmetric_hash_join" => 
Ok(GeneratedField::SymmetricHashJoin),
                             "interleave" => Ok(GeneratedField::Interleave),
+                            "placeholderRow" | "placeholder_row" => 
Ok(GeneratedField::PlaceholderRow),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -18339,6 +18328,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode 
{
                                 return 
Err(serde::de::Error::duplicate_field("interleave"));
                             }
                             physical_plan_type__ = 
map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::Interleave)
+;
+                        }
+                        GeneratedField::PlaceholderRow => {
+                            if physical_plan_type__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("placeholderRow"));
+                            }
+                            physical_plan_type__ = 
map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::PlaceholderRow)
 ;
                         }
                     }
@@ -19369,6 +19365,97 @@ impl<'de> serde::Deserialize<'de> for PlaceholderNode {
         deserializer.deserialize_struct("datafusion.PlaceholderNode", FIELDS, 
GeneratedVisitor)
     }
 }
+impl serde::Serialize for PlaceholderRowExecNode {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if self.schema.is_some() {
+            len += 1;
+        }
+        let mut struct_ser = 
serializer.serialize_struct("datafusion.PlaceholderRowExecNode", len)?;
+        if let Some(v) = self.schema.as_ref() {
+            struct_ser.serialize_field("schema", v)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for PlaceholderRowExecNode {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "schema",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            Schema,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut 
std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> 
std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "schema" => Ok(GeneratedField::Schema),
+                            _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = PlaceholderRowExecNode;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                formatter.write_str("struct datafusion.PlaceholderRowExecNode")
+            }
+
+            fn visit_map<V>(self, mut map_: V) -> 
std::result::Result<PlaceholderRowExecNode, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut schema__ = None;
+                while let Some(k) = map_.next_key()? {
+                    match k {
+                        GeneratedField::Schema => {
+                            if schema__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("schema"));
+                            }
+                            schema__ = map_.next_value()?;
+                        }
+                    }
+                }
+                Ok(PlaceholderRowExecNode {
+                    schema: schema__,
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.PlaceholderRowExecNode", 
FIELDS, GeneratedVisitor)
+    }
+}
 impl serde::Serialize for PlanType {
     #[allow(deprecated)]
     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index d4b62d4b3f..8aadc96349 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1525,7 +1525,7 @@ pub mod owned_table_reference {
 pub struct PhysicalPlanNode {
     #[prost(
         oneof = "physical_plan_node::PhysicalPlanType",
-        tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
19, 20, 21, 22, 23, 24, 25, 26"
+        tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
19, 20, 21, 22, 23, 24, 25, 26, 27"
     )]
     pub physical_plan_type: 
::core::option::Option<physical_plan_node::PhysicalPlanType>,
 }
@@ -1586,6 +1586,8 @@ pub mod physical_plan_node {
         
SymmetricHashJoin(::prost::alloc::boxed::Box<super::SymmetricHashJoinExecNode>),
         #[prost(message, tag = "26")]
         Interleave(super::InterleaveExecNode),
+        #[prost(message, tag = "27")]
+        PlaceholderRow(super::PlaceholderRowExecNode),
     }
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
@@ -2103,9 +2105,13 @@ pub struct JoinOn {
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct EmptyExecNode {
-    #[prost(bool, tag = "1")]
-    pub produce_one_row: bool,
-    #[prost(message, optional, tag = "2")]
+    #[prost(message, optional, tag = "1")]
+    pub schema: ::core::option::Option<Schema>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PlaceholderRowExecNode {
+    #[prost(message, optional, tag = "1")]
     pub schema: ::core::option::Option<Schema>,
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index 878a5bcb7f..73091a6fce 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -44,6 +44,7 @@ use datafusion::physical_plan::joins::{
 };
 use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
 use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
 use datafusion::physical_plan::projection::ProjectionExec;
 use datafusion::physical_plan::repartition::RepartitionExec;
 use datafusion::physical_plan::sorts::sort::SortExec;
@@ -721,7 +722,11 @@ impl AsExecutionPlan for PhysicalPlanNode {
             }
             PhysicalPlanType::Empty(empty) => {
                 let schema = Arc::new(convert_required!(empty.schema)?);
-                Ok(Arc::new(EmptyExec::new(empty.produce_one_row, schema)))
+                Ok(Arc::new(EmptyExec::new(schema)))
+            }
+            PhysicalPlanType::PlaceholderRow(placeholder) => {
+                let schema = Arc::new(convert_required!(placeholder.schema)?);
+                Ok(Arc::new(PlaceholderRowExec::new(schema)))
             }
             PhysicalPlanType::Sort(sort) => {
                 let input: Arc<dyn ExecutionPlan> =
@@ -1307,7 +1312,17 @@ impl AsExecutionPlan for PhysicalPlanNode {
             return Ok(protobuf::PhysicalPlanNode {
                 physical_plan_type: Some(PhysicalPlanType::Empty(
                     protobuf::EmptyExecNode {
-                        produce_one_row: empty.produce_one_row(),
+                        schema: Some(schema),
+                    },
+                )),
+            });
+        }
+
+        if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
+            let schema = empty.schema().as_ref().try_into()?;
+            return Ok(protobuf::PhysicalPlanNode {
+                physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
+                    protobuf::PlaceholderRowExecNode {
                         schema: Some(schema),
                     },
                 )),
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index f46a29447d..da76209dbb 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -49,6 +49,7 @@ use datafusion::physical_plan::joins::{
     HashJoinExec, NestedLoopJoinExec, PartitionMode, StreamJoinPartitionMode,
 };
 use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
 use datafusion::physical_plan::projection::ProjectionExec;
 use datafusion::physical_plan::repartition::RepartitionExec;
 use datafusion::physical_plan::sorts::sort::SortExec;
@@ -104,7 +105,7 @@ fn roundtrip_test_with_context(
 
 #[test]
 fn roundtrip_empty() -> Result<()> {
-    roundtrip_test(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))))
+    roundtrip_test(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))))
 }
 
 #[test]
@@ -117,7 +118,7 @@ fn roundtrip_date_time_interval() -> Result<()> {
             false,
         ),
     ]);
-    let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone())));
+    let input = Arc::new(EmptyExec::new(Arc::new(schema.clone())));
     let date_expr = col("some_date", &schema)?;
     let literal_expr = col("some_interval", &schema)?;
     let date_time_interval_expr =
@@ -132,7 +133,7 @@ fn roundtrip_date_time_interval() -> Result<()> {
 #[test]
 fn roundtrip_local_limit() -> Result<()> {
     roundtrip_test(Arc::new(LocalLimitExec::new(
-        Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
+        Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
         25,
     )))
 }
@@ -140,7 +141,7 @@ fn roundtrip_local_limit() -> Result<()> {
 #[test]
 fn roundtrip_global_limit() -> Result<()> {
     roundtrip_test(Arc::new(GlobalLimitExec::new(
-        Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
+        Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
         0,
         Some(25),
     )))
@@ -149,7 +150,7 @@ fn roundtrip_global_limit() -> Result<()> {
 #[test]
 fn roundtrip_global_skip_no_limit() -> Result<()> {
     roundtrip_test(Arc::new(GlobalLimitExec::new(
-        Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
+        Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
         10,
         None, // no limit
     )))
@@ -179,8 +180,8 @@ fn roundtrip_hash_join() -> Result<()> {
     ] {
         for partition_mode in &[PartitionMode::Partitioned, 
PartitionMode::CollectLeft] {
             roundtrip_test(Arc::new(HashJoinExec::try_new(
-                Arc::new(EmptyExec::new(false, schema_left.clone())),
-                Arc::new(EmptyExec::new(false, schema_right.clone())),
+                Arc::new(EmptyExec::new(schema_left.clone())),
+                Arc::new(EmptyExec::new(schema_right.clone())),
                 on.clone(),
                 None,
                 join_type,
@@ -211,8 +212,8 @@ fn roundtrip_nested_loop_join() -> Result<()> {
         JoinType::RightSemi,
     ] {
         roundtrip_test(Arc::new(NestedLoopJoinExec::try_new(
-            Arc::new(EmptyExec::new(false, schema_left.clone())),
-            Arc::new(EmptyExec::new(false, schema_right.clone())),
+            Arc::new(EmptyExec::new(schema_left.clone())),
+            Arc::new(EmptyExec::new(schema_right.clone())),
             None,
             join_type,
         )?))?;
@@ -277,7 +278,7 @@ fn roundtrip_window() -> Result<()> {
         Arc::new(window_frame),
     ));
 
-    let input = Arc::new(EmptyExec::new(false, schema.clone()));
+    let input = Arc::new(EmptyExec::new(schema.clone()));
 
     roundtrip_test(Arc::new(WindowAggExec::try_new(
         vec![
@@ -311,7 +312,7 @@ fn rountrip_aggregate() -> Result<()> {
         aggregates.clone(),
         vec![None],
         vec![None],
-        Arc::new(EmptyExec::new(false, schema.clone())),
+        Arc::new(EmptyExec::new(schema.clone())),
         schema,
     )?))
 }
@@ -379,7 +380,7 @@ fn roundtrip_aggregate_udaf() -> Result<()> {
             aggregates.clone(),
             vec![None],
             vec![None],
-            Arc::new(EmptyExec::new(false, schema.clone())),
+            Arc::new(EmptyExec::new(schema.clone())),
             schema,
         )?),
         ctx,
@@ -405,7 +406,7 @@ fn roundtrip_filter_with_not_and_in_list() -> Result<()> {
     let and = binary(not, Operator::And, in_list, &schema)?;
     roundtrip_test(Arc::new(FilterExec::try_new(
         and,
-        Arc::new(EmptyExec::new(false, schema.clone())),
+        Arc::new(EmptyExec::new(schema.clone())),
     )?))
 }
 
@@ -432,7 +433,7 @@ fn roundtrip_sort() -> Result<()> {
     ];
     roundtrip_test(Arc::new(SortExec::new(
         sort_exprs,
-        Arc::new(EmptyExec::new(false, schema)),
+        Arc::new(EmptyExec::new(schema)),
     )))
 }
 
@@ -460,11 +461,11 @@ fn roundtrip_sort_preserve_partitioning() -> Result<()> {
 
     roundtrip_test(Arc::new(SortExec::new(
         sort_exprs.clone(),
-        Arc::new(EmptyExec::new(false, schema.clone())),
+        Arc::new(EmptyExec::new(schema.clone())),
     )))?;
 
     roundtrip_test(Arc::new(
-        SortExec::new(sort_exprs, Arc::new(EmptyExec::new(false, schema)))
+        SortExec::new(sort_exprs, Arc::new(EmptyExec::new(schema)))
             .with_preserve_partitioning(true),
     ))
 }
@@ -514,7 +515,7 @@ fn roundtrip_builtin_scalar_function() -> Result<()> {
     let field_b = Field::new("b", DataType::Int64, false);
     let schema = Arc::new(Schema::new(vec![field_a, field_b]));
 
-    let input = Arc::new(EmptyExec::new(false, schema.clone()));
+    let input = Arc::new(EmptyExec::new(schema.clone()));
 
     let execution_props = ExecutionProps::new();
 
@@ -541,7 +542,7 @@ fn roundtrip_scalar_udf() -> Result<()> {
     let field_b = Field::new("b", DataType::Int64, false);
     let schema = Arc::new(Schema::new(vec![field_a, field_b]));
 
-    let input = Arc::new(EmptyExec::new(false, schema.clone()));
+    let input = Arc::new(EmptyExec::new(schema.clone()));
 
     let fn_impl = |args: &[ArrayRef]| Ok(Arc::new(args[0].clone()) as 
ArrayRef);
 
@@ -594,7 +595,7 @@ fn roundtrip_distinct_count() -> Result<()> {
         aggregates.clone(),
         vec![None],
         vec![None],
-        Arc::new(EmptyExec::new(false, schema.clone())),
+        Arc::new(EmptyExec::new(schema.clone())),
         schema,
     )?))
 }
@@ -605,7 +606,7 @@ fn roundtrip_like() -> Result<()> {
         Field::new("a", DataType::Utf8, false),
         Field::new("b", DataType::Utf8, false),
     ]);
-    let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone())));
+    let input = Arc::new(EmptyExec::new(Arc::new(schema.clone())));
     let like_expr = like(
         false,
         false,
@@ -632,7 +633,7 @@ fn roundtrip_get_indexed_field_named_struct_field() -> 
Result<()> {
     ];
 
     let schema = Schema::new(fields);
-    let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone())));
+    let input = Arc::new(EmptyExec::new(Arc::new(schema.clone())));
 
     let col_arg = col("arg", &schema)?;
     let get_indexed_field_expr = Arc::new(GetIndexedFieldExpr::new(
@@ -659,7 +660,7 @@ fn roundtrip_get_indexed_field_list_index() -> Result<()> {
     ];
 
     let schema = Schema::new(fields);
-    let input = Arc::new(EmptyExec::new(true, Arc::new(schema.clone())));
+    let input = Arc::new(PlaceholderRowExec::new(Arc::new(schema.clone())));
 
     let col_arg = col("arg", &schema)?;
     let col_key = col("key", &schema)?;
@@ -686,7 +687,7 @@ fn roundtrip_get_indexed_field_list_range() -> Result<()> {
     ];
 
     let schema = Schema::new(fields);
-    let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone())));
+    let input = Arc::new(EmptyExec::new(Arc::new(schema.clone())));
 
     let col_arg = col("arg", &schema)?;
     let col_start = col("start", &schema)?;
@@ -712,7 +713,7 @@ fn roundtrip_analyze() -> Result<()> {
     let field_a = Field::new("plan_type", DataType::Utf8, false);
     let field_b = Field::new("plan", DataType::Utf8, false);
     let schema = Schema::new(vec![field_a, field_b]);
-    let input = Arc::new(EmptyExec::new(true, Arc::new(schema.clone())));
+    let input = Arc::new(PlaceholderRowExec::new(Arc::new(schema.clone())));
 
     roundtrip_test(Arc::new(AnalyzeExec::new(
         false,
@@ -727,7 +728,7 @@ fn roundtrip_json_sink() -> Result<()> {
     let field_a = Field::new("plan_type", DataType::Utf8, false);
     let field_b = Field::new("plan", DataType::Utf8, false);
     let schema = Arc::new(Schema::new(vec![field_a, field_b]));
-    let input = Arc::new(EmptyExec::new(true, schema.clone()));
+    let input = Arc::new(PlaceholderRowExec::new(schema.clone()));
 
     let file_sink_config = FileSinkConfig {
         object_store_url: ObjectStoreUrl::local_filesystem(),
@@ -787,8 +788,8 @@ fn roundtrip_sym_hash_join() -> Result<()> {
         ] {
             roundtrip_test(Arc::new(
                 
datafusion::physical_plan::joins::SymmetricHashJoinExec::try_new(
-                    Arc::new(EmptyExec::new(false, schema_left.clone())),
-                    Arc::new(EmptyExec::new(false, schema_right.clone())),
+                    Arc::new(EmptyExec::new(schema_left.clone())),
+                    Arc::new(EmptyExec::new(schema_right.clone())),
                     on.clone(),
                     None,
                     join_type,
@@ -806,8 +807,8 @@ fn roundtrip_union() -> Result<()> {
     let field_a = Field::new("col", DataType::Int64, false);
     let schema_left = Schema::new(vec![field_a.clone()]);
     let schema_right = Schema::new(vec![field_a]);
-    let left = EmptyExec::new(false, Arc::new(schema_left));
-    let right = EmptyExec::new(false, Arc::new(schema_right));
+    let left = EmptyExec::new(Arc::new(schema_left));
+    let right = EmptyExec::new(Arc::new(schema_right));
     let inputs: Vec<Arc<dyn ExecutionPlan>> = vec![Arc::new(left), 
Arc::new(right)];
     let union = UnionExec::new(inputs);
     roundtrip_test(Arc::new(union))
@@ -820,11 +821,11 @@ fn roundtrip_interleave() -> Result<()> {
     let schema_right = Schema::new(vec![field_a]);
     let partition = Partitioning::Hash(vec![], 3);
     let left = RepartitionExec::try_new(
-        Arc::new(EmptyExec::new(false, Arc::new(schema_left))),
+        Arc::new(EmptyExec::new(Arc::new(schema_left))),
         partition.clone(),
     )?;
     let right = RepartitionExec::try_new(
-        Arc::new(EmptyExec::new(false, Arc::new(schema_right))),
+        Arc::new(EmptyExec::new(Arc::new(schema_right))),
         partition.clone(),
     )?;
     let inputs: Vec<Arc<dyn ExecutionPlan>> = vec![Arc::new(left), 
Arc::new(right)];
diff --git a/datafusion/sqllogictest/test_files/explain.slt 
b/datafusion/sqllogictest/test_files/explain.slt
index 18792735ff..4583ef319b 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -94,7 +94,7 @@ EXPLAIN select count(*) from (values ('a', 1, 100), ('a', 2, 
150)) as t (c1,c2,c
 ----
 physical_plan
 ProjectionExec: expr=[2 as COUNT(*)]
---EmptyExec: produce_one_row=true
+--PlaceholderRowExec
 
 statement ok
 set datafusion.explain.physical_plan_only = false
@@ -368,7 +368,7 @@ Projection: List([[1, 2, 3], [4, 5, 6]]) AS 
make_array(make_array(Int64(1),Int64
 --EmptyRelation
 physical_plan
 ProjectionExec: expr=[[[1, 2, 3], [4, 5, 6]] as 
make_array(make_array(Int64(1),Int64(2),Int64(3)),make_array(Int64(4),Int64(5),Int64(6)))]
---EmptyExec: produce_one_row=true
+--PlaceholderRowExec
 
 query TT
 explain select [[1, 2, 3], [4, 5, 6]];
@@ -378,4 +378,4 @@ Projection: List([[1, 2, 3], [4, 5, 6]]) AS 
make_array(make_array(Int64(1),Int64
 --EmptyRelation
 physical_plan
 ProjectionExec: expr=[[[1, 2, 3], [4, 5, 6]] as 
make_array(make_array(Int64(1),Int64(2),Int64(3)),make_array(Int64(4),Int64(5),Int64(6)))]
---EmptyExec: produce_one_row=true
+--PlaceholderRowExec
diff --git a/datafusion/sqllogictest/test_files/join.slt 
b/datafusion/sqllogictest/test_files/join.slt
index 874d849e9a..386ffe766b 100644
--- a/datafusion/sqllogictest/test_files/join.slt
+++ b/datafusion/sqllogictest/test_files/join.slt
@@ -556,7 +556,7 @@ query TT
 explain select * from t1 join t2 on false;
 ----
 logical_plan EmptyRelation
-physical_plan EmptyExec: produce_one_row=false
+physical_plan EmptyExec
 
 # Make batch size smaller than table row number. to introduce parallelism to 
the plan.
 statement ok
diff --git a/datafusion/sqllogictest/test_files/limit.slt 
b/datafusion/sqllogictest/test_files/limit.slt
index 182195112e..e063d6e896 100644
--- a/datafusion/sqllogictest/test_files/limit.slt
+++ b/datafusion/sqllogictest/test_files/limit.slt
@@ -312,7 +312,7 @@ Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS 
COUNT(*)]]
 ----TableScan: t1 projection=[], fetch=14
 physical_plan
 ProjectionExec: expr=[0 as COUNT(*)]
---EmptyExec: produce_one_row=true
+--PlaceholderRowExec
 
 query I
 SELECT COUNT(*) FROM (SELECT a FROM t1 LIMIT 3 OFFSET 11);
@@ -330,7 +330,7 @@ Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS 
COUNT(*)]]
 ----TableScan: t1 projection=[], fetch=11
 physical_plan
 ProjectionExec: expr=[2 as COUNT(*)]
---EmptyExec: produce_one_row=true
+--PlaceholderRowExec
 
 query I
 SELECT COUNT(*) FROM (SELECT a FROM t1 LIMIT 3 OFFSET 8);
@@ -348,7 +348,7 @@ Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS 
COUNT(*)]]
 ----TableScan: t1 projection=[]
 physical_plan
 ProjectionExec: expr=[2 as COUNT(*)]
---EmptyExec: produce_one_row=true
+--PlaceholderRowExec
 
 query I
 SELECT COUNT(*) FROM (SELECT a FROM t1 LIMIT 3 OFFSET 8);
diff --git a/datafusion/sqllogictest/test_files/union.slt 
b/datafusion/sqllogictest/test_files/union.slt
index 2c8970a139..b4e338875e 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -551,11 +551,11 @@ UnionExec
 ------CoalesceBatchesExec: target_batch_size=2
 --------RepartitionExec: partitioning=Hash([Int64(1)@0], 4), input_partitions=1
 ----------AggregateExec: mode=Partial, gby=[1 as Int64(1)], aggr=[]
-------------EmptyExec: produce_one_row=true
+------------PlaceholderRowExec
 --ProjectionExec: expr=[2 as a]
-----EmptyExec: produce_one_row=true
+----PlaceholderRowExec
 --ProjectionExec: expr=[3 as a]
-----EmptyExec: produce_one_row=true
+----PlaceholderRowExec
 
 # test UNION ALL aliases correctly with aliased subquery
 query TT
@@ -583,7 +583,7 @@ UnionExec
 --------RepartitionExec: partitioning=Hash([n@0], 4), input_partitions=1
 ----------AggregateExec: mode=Partial, gby=[n@0 as n], aggr=[COUNT(*)]
 ------------ProjectionExec: expr=[5 as n]
---------------EmptyExec: produce_one_row=true
+--------------PlaceholderRowExec
 --ProjectionExec: expr=[1 as count, MAX(Int64(10))@0 as n]
 ----AggregateExec: mode=Single, gby=[], aggr=[MAX(Int64(10))]
-------EmptyExec: produce_one_row=true
+------PlaceholderRowExec
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index 7846bb001a..f3de5b54fc 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -279,13 +279,13 @@ SortPreservingMergeExec: [b@0 ASC NULLS LAST]
 ------------AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[MAX(d.a)]
 --------------UnionExec
 ----------------ProjectionExec: expr=[1 as a, aa as b]
-------------------EmptyExec: produce_one_row=true
+------------------PlaceholderRowExec
 ----------------ProjectionExec: expr=[3 as a, aa as b]
-------------------EmptyExec: produce_one_row=true
+------------------PlaceholderRowExec
 ----------------ProjectionExec: expr=[5 as a, bb as b]
-------------------EmptyExec: produce_one_row=true
+------------------PlaceholderRowExec
 ----------------ProjectionExec: expr=[7 as a, bb as b]
-------------------EmptyExec: produce_one_row=true
+------------------PlaceholderRowExec
 
 # Check actual result:
 query TI
@@ -365,13 +365,13 @@ SortPreservingMergeExec: [b@0 ASC NULLS LAST]
 --------------RepartitionExec: partitioning=Hash([b@1], 4), input_partitions=4
 ----------------UnionExec
 ------------------ProjectionExec: expr=[1 as a, aa as b]
---------------------EmptyExec: produce_one_row=true
+--------------------PlaceholderRowExec
 ------------------ProjectionExec: expr=[3 as a, aa as b]
---------------------EmptyExec: produce_one_row=true
+--------------------PlaceholderRowExec
 ------------------ProjectionExec: expr=[5 as a, bb as b]
---------------------EmptyExec: produce_one_row=true
+--------------------PlaceholderRowExec
 ------------------ProjectionExec: expr=[7 as a, bb as b]
---------------------EmptyExec: produce_one_row=true
+--------------------PlaceholderRowExec
 
 
 # check actual result

Reply via email to