This is an automated email from the ASF dual-hosted git repository.

ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new f3b1141d0f Replace `execution_mode` with `emission_type` and 
`boundedness` (#13823)
f3b1141d0f is described below

commit f3b1141d0f417e9d9e6c0ada03592c9d9ec60cd4
Author: Jay Zhan <[email protected]>
AuthorDate: Fri Dec 20 17:51:06 2024 +0800

    Replace `execution_mode` with `emission_type` and `boundedness` (#13823)
    
    * feat: update execution modes and add bitflags dependency
    
    - Introduced `Incremental` execution mode alongside existing modes in the 
DataFusion execution plan.
    - Updated various execution plans to utilize the new `Incremental` mode 
where applicable, enhancing streaming capabilities.
    - Added `bitflags` dependency to `Cargo.toml` for better management of 
execution modes.
    - Adjusted execution mode handling in multiple files to ensure 
compatibility with the new structure.
    
    * add exec API
    
    Signed-off-by: Jay Zhan <[email protected]>
    
    * replace done but has stackoverflow
    
    Signed-off-by: Jay Zhan <[email protected]>
    
    * exec API done
    
    Signed-off-by: Jay Zhan <[email protected]>
    
    * Refactor execution plan properties to remove execution mode
    
    - Removed the `ExecutionMode` parameter from `PlanProperties` across 
multiple physical plan implementations.
    - Updated related functions to utilize the new structure, ensuring 
compatibility with the changes.
    - Adjusted comments and cleaned up imports to reflect the removal of 
execution mode handling.
    
    This refactor simplifies the execution plan properties and enhances 
maintainability.
    
    * Refactor execution plan to remove `ExecutionMode` and introduce 
`EmissionType`
    
    - Removed the `ExecutionMode` parameter from `PlanProperties` and related 
implementations across multiple files.
    - Introduced `EmissionType` to better represent the output characteristics 
of execution plans.
    - Updated functions and tests to reflect the new structure, ensuring 
compatibility and enhancing maintainability.
    - Cleaned up imports and adjusted comments accordingly.
    
    This refactor simplifies the execution plan properties and improves the 
clarity of memory handling in execution plans.
    
    * fix test
    
    Signed-off-by: Jay Zhan <[email protected]>
    
    * Refactor join handling and emission type logic
    
    - Updated test cases in `sanity_checker.rs` to reflect changes in expected 
outcomes for bounded and unbounded joins, ensuring accurate test coverage.
    - Simplified the `is_pipeline_breaking` method in `execution_plan.rs` to 
clarify the conditions under which a plan is considered pipeline-breaking.
    - Enhanced the emission type determination logic in `execution_plan.rs` to 
prioritize `Final` over `Both` and `Incremental`, improving clarity in 
execution plan behavior.
    - Adjusted join type handling in `hash_join.rs` to classify `Right` joins 
as `Incremental`, allowing for immediate row emission.
    
    These changes improve the accuracy of tests and the clarity of execution 
plan properties.
    
    * Implement emission type for execution plans
    
    - Updated multiple execution plan implementations to replace 
`unimplemented!()` with `EmissionType::Incremental`, ensuring that the emission 
type is correctly defined for various plans.
    - This change enhances the clarity and functionality of the execution plans 
by explicitly specifying their emission behavior.
    
    These updates contribute to a more robust execution plan framework within 
the DataFusion project.
    
    * Enhance join type documentation and refine emission type logic
    
    - Updated the `JoinType` enum in `join_type.rs` to include detailed 
descriptions for each join type, improving clarity on their behavior and 
expected results.
    - Modified the emission type logic in `hash_join.rs` to ensure that `Right` 
and `RightAnti` joins are classified as `Incremental`, allowing for immediate 
row emission when applicable.
    
    These changes improve the documentation and functionality of join 
operations within the DataFusion project.
    
    * Refactor emission type logic in join and sort execution plans
    
    - Updated the emission type determination in `SortMergeJoinExec` and 
`SymmetricHashJoinExec` to utilize the `emission_type_from_children` function, 
enhancing the accuracy of emission behavior based on input characteristics.
    - Clarified comments in `sort.rs` regarding the conditions under which 
results are emitted, emphasizing the relationship between input sorting and 
emission type.
    - These changes improve the clarity and functionality of the execution 
plans within the DataFusion project, ensuring more robust handling of emission 
types.
    
    * Refactor emission type handling in execution plans
    
    - Updated the `emission_type_from_children` function to accept an iterator 
instead of a slice, enhancing flexibility in how child execution plans are 
passed.
    - Modified the `SymmetricHashJoinExec` implementation to utilize the new 
function signature, improving code clarity and maintainability.
    
    These changes streamline the emission type determination process within the 
DataFusion project, contributing to a more robust execution plan framework.
    
    * Enhance execution plan properties with boundedness and emission type
    
    - Introduced `boundedness` and `pipeline_behavior` methods to the 
`ExecutionPlanProperties` trait, improving the handling of execution plan 
characteristics.
    - Updated the `CsvExec`, `SortExec`, and related implementations to utilize 
the new methods for determining boundedness and emission behavior.
    - Refactored the `ensure_distribution` function to use the new boundedness 
logic, enhancing clarity in distribution decisions.
    - These changes contribute to a more robust and maintainable execution plan 
framework within the DataFusion project.
    
    * Refactor execution plans to enhance boundedness and emission type handling
    
    - Updated multiple execution plan implementations to incorporate 
`Boundedness` and `EmissionType`, improving the clarity and functionality of 
execution plans.
    - Replaced instances of `unimplemented!()` with appropriate emission types, 
ensuring that plans correctly define their output behavior.
    - Refactored the `PlanProperties` structure to utilize the new boundedness 
logic, enhancing decision-making in execution plans.
    - These changes contribute to a more robust and maintainable execution plan 
framework within the DataFusion project.
    
    * Refactor memory handling in execution plans
    
    - Updated the condition for checking memory requirements in execution plans 
from `has_finite_memory()` to `boundedness().requires_finite_memory()`, 
improving clarity in memory management.
    - This change enhances the robustness of execution plans within the 
DataFusion project by ensuring more accurate assessments of memory constraints.
    
    * Refactor boundedness checks in execution plans
    
    - Updated conditions for checking boundedness in various execution plans to 
use `is_unbounded()` instead of `requires_finite_memory()`, enhancing clarity 
in memory management.
    - Adjusted the `PlanProperties` structure to reflect these changes, 
ensuring more accurate assessments of memory constraints across the DataFusion 
project.
    - These modifications contribute to a more robust and maintainable 
execution plan framework, improving the handling of boundedness in execution 
strategies.
    
    * Remove TODO comment regarding unbounded execution plans in 
`UnboundedExec` implementation
    
    - Eliminated the outdated comment suggesting a switch to unbounded 
execution with finite memory, streamlining the code and improving clarity.
    - This change contributes to a cleaner and more maintainable codebase 
within the DataFusion project.
    
    * Refactor execution plan boundedness and emission type handling
    
    - Updated the `is_pipeline_breaking` method to use 
`requires_finite_memory()` for improved clarity in determining pipeline 
behavior.
    - Enhanced the `Boundedness` enum to include detailed documentation on 
memory requirements for unbounded streams.
    - Refactored `compute_properties` methods in `GlobalLimitExec` and 
`LocalLimitExec` to directly use the input's boundedness, simplifying the logic.
    - Adjusted emission type determination in `NestedLoopJoinExec` to utilize 
the `emission_type_from_children` function, ensuring accurate output behavior 
based on input characteristics.
    
    These changes contribute to a more robust and maintainable execution plan 
framework within the DataFusion project, improving clarity and functionality in 
handling boundedness and emission types.
    
    * Refactor emission type and boundedness handling in execution plans
    
    - Removed the `OptionalEmissionType` struct from `plan_properties.rs`, 
simplifying the codebase.
    - Updated the `is_pipeline_breaking` function in `execution_plan.rs` for 
improved readability by formatting the condition across multiple lines.
    - Adjusted the `GlobalLimitExec` implementation in `limit.rs` to directly 
use the input's boundedness, enhancing clarity in memory management.
    
    These changes contribute to a more streamlined and maintainable execution 
plan framework within the DataFusion project, improving the handling of 
emission types and boundedness.
    
    * Refactor GlobalLimitExec and LocalLimitExec to enhance boundedness 
handling
    
    - Updated the `compute_properties` methods in both `GlobalLimitExec` and 
`LocalLimitExec` to replace `EmissionType::Final` with `Boundedness::Bounded`, 
reflecting that limit operations always produce a finite number of rows.
    - Changed the input's boundedness reference to `pipeline_behavior()` for 
improved clarity in execution plan properties.
    
    These changes contribute to a more streamlined and maintainable execution 
plan framework within the DataFusion project, enhancing the handling of 
boundedness in limit operations.
    
    * Review Part1
    
    * Update sanity_checker.rs
    
    * addressing reviews
    
    * Review Part 1
    
    * Update datafusion/physical-plan/src/execution_plan.rs
    
    * Update datafusion/physical-plan/src/execution_plan.rs
    
    * Shorten imports
    
    * Enhance documentation for JoinType and Boundedness enums
    
    - Improved descriptions for the Inner and Full join types in join_type.rs 
to clarify their behavior and examples.
    - Added explanations regarding the boundedness of output streams and memory 
requirements in execution_plan.rs, including specific examples for operators 
like Median and Min/Max.
    
    ---------
    
    Signed-off-by: Jay Zhan <[email protected]>
    Co-authored-by: berkaysynnada <[email protected]>
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 datafusion-cli/src/exec.rs                         |  14 +-
 datafusion-examples/examples/custom_datasource.rs  |   8 +-
 datafusion/common/src/join_type.rs                 |  25 +-
 .../src/datasource/physical_plan/arrow_file.rs     |   6 +-
 .../core/src/datasource/physical_plan/avro.rs      |   8 +-
 .../core/src/datasource/physical_plan/csv.rs       |   8 +-
 .../core/src/datasource/physical_plan/json.rs      |   8 +-
 .../src/datasource/physical_plan/parquet/mod.rs    |  11 +-
 .../src/physical_optimizer/enforce_distribution.rs |  15 +-
 .../core/src/physical_optimizer/enforce_sorting.rs |   2 +-
 .../core/src/physical_optimizer/join_selection.rs  |  28 ++-
 .../replace_with_order_preserving_variants.rs      |   6 +-
 .../core/src/physical_optimizer/sanity_checker.rs  |  19 +-
 datafusion/core/src/physical_planner.rs            |  11 +-
 datafusion/core/src/test/mod.rs                    |  13 +-
 datafusion/core/src/test_util/mod.rs               |  23 +-
 datafusion/core/tests/custom_sources_cases/mod.rs  |  12 +-
 .../provider_filter_pushdown.rs                    |  13 +-
 .../core/tests/custom_sources_cases/statistics.rs  |  14 +-
 .../core/tests/user_defined/insert_operation.rs    |  18 +-
 .../core/tests/user_defined/user_defined_plan.rs   |  19 +-
 datafusion/ffi/src/execution_plan.rs               |  11 +-
 datafusion/ffi/src/plan_properties.rs              | 107 ++++++---
 .../physical-optimizer/src/output_requirements.rs  |   3 +-
 datafusion/physical-plan/src/aggregates/mod.rs     |  36 +--
 datafusion/physical-plan/src/analyze.rs            |  10 +-
 datafusion/physical-plan/src/coalesce_batches.rs   |   3 +-
 .../physical-plan/src/coalesce_partitions.rs       |   3 +-
 datafusion/physical-plan/src/empty.rs              |  20 +-
 datafusion/physical-plan/src/execution_plan.rs     | 253 ++++++++++++++-------
 datafusion/physical-plan/src/explain.rs            |   9 +-
 datafusion/physical-plan/src/filter.rs             |   4 +-
 datafusion/physical-plan/src/insert.rs             |   8 +-
 datafusion/physical-plan/src/joins/cross_join.rs   |  20 +-
 datafusion/physical-plan/src/joins/hash_join.rs    |  48 ++--
 .../physical-plan/src/joins/nested_loop_join.rs    |  37 ++-
 .../physical-plan/src/joins/sort_merge_join.rs     |  21 +-
 .../physical-plan/src/joins/symmetric_hash_join.rs |  12 +-
 datafusion/physical-plan/src/lib.rs                |   3 +-
 datafusion/physical-plan/src/limit.rs              |  12 +-
 datafusion/physical-plan/src/memory.rs             |  16 +-
 datafusion/physical-plan/src/placeholder_row.rs    |  17 +-
 datafusion/physical-plan/src/projection.rs         |   3 +-
 datafusion/physical-plan/src/recursive_query.rs    |   6 +-
 datafusion/physical-plan/src/repartition/mod.rs    |  10 +-
 datafusion/physical-plan/src/sorts/partial_sort.rs |  10 +-
 datafusion/physical-plan/src/sorts/sort.rs         |  68 ++++--
 .../src/sorts/sort_preserving_merge.rs             |  16 +-
 datafusion/physical-plan/src/streaming.rs          |  23 +-
 datafusion/physical-plan/src/test/exec.rs          |  51 ++---
 datafusion/physical-plan/src/union.rs              |  19 +-
 datafusion/physical-plan/src/unnest.rs             |   9 +-
 datafusion/physical-plan/src/values.rs             |  13 +-
 .../src/windows/bounded_window_agg_exec.rs         |   8 +-
 datafusion/physical-plan/src/windows/mod.rs        |   2 +-
 .../physical-plan/src/windows/window_agg_exec.rs   |  23 +-
 datafusion/physical-plan/src/work_table.rs         |  10 +-
 57 files changed, 748 insertions(+), 457 deletions(-)

diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
index 1890653669..a4f154b2de 100644
--- a/datafusion-cli/src/exec.rs
+++ b/datafusion-cli/src/exec.rs
@@ -33,11 +33,12 @@ use crate::{
 };
 
 use datafusion::common::instant::Instant;
-use datafusion::common::plan_datafusion_err;
+use datafusion::common::{plan_datafusion_err, plan_err};
 use datafusion::config::ConfigFileType;
 use datafusion::datasource::listing::ListingTableUrl;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::logical_expr::{DdlStatement, LogicalPlan};
+use datafusion::physical_plan::execution_plan::EmissionType;
 use datafusion::physical_plan::{collect, execute_stream, 
ExecutionPlanProperties};
 use datafusion::sql::parser::{DFParser, Statement};
 use datafusion::sql::sqlparser::dialect::dialect_from_str;
@@ -234,10 +235,19 @@ pub(super) async fn exec_and_print(
         let df = ctx.execute_logical_plan(plan).await?;
         let physical_plan = df.create_physical_plan().await?;
 
-        if physical_plan.execution_mode().is_unbounded() {
+        if physical_plan.boundedness().is_unbounded() {
+            if physical_plan.pipeline_behavior() == EmissionType::Final {
+                return plan_err!(
+                    "The given query can generate a valid result only once \
+                    the source finishes, but the source is unbounded"
+                );
+            }
+            // As the input stream comes, we can generate results.
+            // However, memory safety is not guaranteed.
             let stream = execute_stream(physical_plan, task_ctx.clone())?;
             print_options.print_stream(stream, now).await?;
         } else {
+            // Bounded stream; collected results are printed after all input 
consumed.
             let schema = physical_plan.schema();
             let results = collect(physical_plan, task_ctx.clone()).await?;
             adjusted.into_inner().print_batches(schema, &results, now)?;
diff --git a/datafusion-examples/examples/custom_datasource.rs 
b/datafusion-examples/examples/custom_datasource.rs
index 90e9d2c7a6..bc865fac5a 100644
--- a/datafusion-examples/examples/custom_datasource.rs
+++ b/datafusion-examples/examples/custom_datasource.rs
@@ -30,10 +30,11 @@ use datafusion::error::Result;
 use datafusion::execution::context::TaskContext;
 use datafusion::logical_expr::LogicalPlanBuilder;
 use datafusion::physical_expr::EquivalenceProperties;
+use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
 use datafusion::physical_plan::memory::MemoryStream;
 use datafusion::physical_plan::{
-    project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
-    Partitioning, PlanProperties, SendableRecordBatchStream,
+    project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
+    PlanProperties, SendableRecordBatchStream,
 };
 use datafusion::prelude::*;
 
@@ -214,7 +215,8 @@ impl CustomExec {
         PlanProperties::new(
             eq_properties,
             Partitioning::UnknownPartitioning(1),
-            ExecutionMode::Bounded,
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 }
diff --git a/datafusion/common/src/join_type.rs 
b/datafusion/common/src/join_type.rs
index e98f34199b..bdca253c5f 100644
--- a/datafusion/common/src/join_type.rs
+++ b/datafusion/common/src/join_type.rs
@@ -28,21 +28,30 @@ use crate::{DataFusionError, Result};
 /// Join type
 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
 pub enum JoinType {
-    /// Inner Join
+    /// Inner Join - Returns only rows where there is a matching value in both 
tables based on the join condition.
+    /// For example, if joining table A and B on A.id = B.id, only rows where 
A.id equals B.id will be included.
+    /// All columns from both tables are returned for the matching rows. 
Non-matching rows are excluded entirely.
     Inner,
-    /// Left Join
+    /// Left Join - Returns all rows from the left table and matching rows 
from the right table.
+    /// If no match, NULL values are returned for columns from the right table.
     Left,
-    /// Right Join
+    /// Right Join - Returns all rows from the right table and matching rows 
from the left table.
+    /// If no match, NULL values are returned for columns from the left table.
     Right,
-    /// Full Join
+    /// Full Join (also called Full Outer Join) - Returns all rows from both 
tables, matching rows where possible.
+    /// When a row from either table has no match in the other table, the 
missing columns are filled with NULL values.
+    /// For example, if table A has row X with no match in table B, the result 
will contain row X with NULL values for all of table B's columns.
+    /// This join type preserves all records from both tables, making it 
useful when you need to see all data regardless of matches.
     Full,
-    /// Left Semi Join
+    /// Left Semi Join - Returns rows from the left table that have matching 
rows in the right table.
+    /// Only columns from the left table are returned.
     LeftSemi,
-    /// Right Semi Join
+    /// Right Semi Join - Returns rows from the right table that have matching 
rows in the left table.
+    /// Only columns from the right table are returned.
     RightSemi,
-    /// Left Anti Join
+    /// Left Anti Join - Returns rows from the left table that do not have a 
matching row in the right table.
     LeftAnti,
-    /// Right Anti Join
+    /// Right Anti Join - Returns rows from the right table that do not have a 
matching row in the left table.
     RightAnti,
     /// Left Mark join
     ///
diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs 
b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
index 8df5ef82cd..4e76b087ab 100644
--- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs
+++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
@@ -38,7 +38,8 @@ use datafusion_common::config::ConfigOptions;
 use datafusion_common::Statistics;
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
-use datafusion_physical_plan::{ExecutionMode, PlanProperties};
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
+use datafusion_physical_plan::PlanProperties;
 
 use futures::StreamExt;
 use itertools::Itertools;
@@ -97,7 +98,8 @@ impl ArrowExec {
         PlanProperties::new(
             eq_properties,
             Self::output_partitioning_helper(file_scan_config), // Output 
Partitioning
-            ExecutionMode::Bounded,                             // Execution 
Mode
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 
diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs 
b/datafusion/core/src/datasource/physical_plan/avro.rs
index def6504189..fb36179c3c 100644
--- a/datafusion/core/src/datasource/physical_plan/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -24,13 +24,14 @@ use super::FileScanConfig;
 use crate::error::Result;
 use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use crate::physical_plan::{
-    DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
-    PlanProperties, SendableRecordBatchStream, Statistics,
+    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
+    SendableRecordBatchStream, Statistics,
 };
 
 use arrow::datatypes::SchemaRef;
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
 
 /// Execution plan for scanning Avro data source
 #[derive(Debug, Clone)]
@@ -81,7 +82,8 @@ impl AvroExec {
         PlanProperties::new(
             eq_properties,
             Partitioning::UnknownPartitioning(n_partitions), // Output 
Partitioning
-            ExecutionMode::Bounded,                          // Execution Mode
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 }
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs 
b/datafusion/core/src/datasource/physical_plan/csv.rs
index c54c663dca..a00e74cf4f 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -33,8 +33,8 @@ use crate::datasource::physical_plan::FileMeta;
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use crate::physical_plan::{
-    DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, 
ExecutionPlanProperties,
-    Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
+    DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, 
Partitioning,
+    PlanProperties, SendableRecordBatchStream, Statistics,
 };
 
 use arrow::csv;
@@ -43,6 +43,7 @@ use datafusion_common::config::ConfigOptions;
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
 
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
 use futures::{StreamExt, TryStreamExt};
 use object_store::buffered::BufWriter;
 use object_store::{GetOptions, GetResultPayload, ObjectStore};
@@ -327,7 +328,8 @@ impl CsvExec {
         PlanProperties::new(
             eq_properties,
             Self::output_partitioning_helper(file_scan_config), // Output 
Partitioning
-            ExecutionMode::Bounded,                             // Execution 
Mode
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs 
b/datafusion/core/src/datasource/physical_plan/json.rs
index 5c70968fbb..879c9817a3 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -33,14 +33,15 @@ use crate::datasource::physical_plan::FileMeta;
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use crate::physical_plan::{
-    DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, 
ExecutionPlanProperties,
-    Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
+    DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, 
Partitioning,
+    PlanProperties, SendableRecordBatchStream, Statistics,
 };
 
 use arrow::json::ReaderBuilder;
 use arrow::{datatypes::SchemaRef, json};
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
 
 use futures::{StreamExt, TryStreamExt};
 use object_store::buffered::BufWriter;
@@ -107,7 +108,8 @@ impl NdJsonExec {
         PlanProperties::new(
             eq_properties,
             Self::output_partitioning_helper(file_scan_config), // Output 
Partitioning
-            ExecutionMode::Bounded,                             // Execution 
Mode
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 446d5f5311..cb79055ce3 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -34,13 +34,14 @@ use crate::{
     physical_optimizer::pruning::PruningPredicate,
     physical_plan::{
         metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
-        DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, 
PlanProperties,
+        DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
         SendableRecordBatchStream, Statistics,
     },
 };
 
 use arrow::datatypes::SchemaRef;
 use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, 
PhysicalExpr};
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
 
 use itertools::Itertools;
 use log::debug;
@@ -654,13 +655,11 @@ impl ParquetExec {
         orderings: &[LexOrdering],
         file_config: &FileScanConfig,
     ) -> PlanProperties {
-        // Equivalence Properties
-        let eq_properties = EquivalenceProperties::new_with_orderings(schema, 
orderings);
-
         PlanProperties::new(
-            eq_properties,
+            EquivalenceProperties::new_with_orderings(schema, orderings),
             Self::output_partitioning_helper(file_config), // Output 
Partitioning
-            ExecutionMode::Bounded,                        // Execution Mode
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 27323eaedc..76c4d668d7 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -52,12 +52,13 @@ use 
datafusion_physical_expr::utils::map_columns_before_projection;
 use datafusion_physical_expr::{
     physical_exprs_equal, EquivalenceProperties, PhysicalExpr, PhysicalExprRef,
 };
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
 use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::execution_plan::EmissionType;
 use datafusion_physical_plan::windows::{get_best_fitting_window, 
BoundedWindowAggExec};
 use datafusion_physical_plan::ExecutionPlanProperties;
 
-use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use itertools::izip;
 
 /// The `EnforceDistribution` rule ensures that distribution requirements are
@@ -1161,12 +1162,17 @@ fn ensure_distribution(
     let should_use_estimates = config
         .execution
         .use_row_number_estimates_to_optimize_partitioning;
-    let is_unbounded = dist_context.plan.execution_mode().is_unbounded();
+    let unbounded_and_pipeline_friendly = 
dist_context.plan.boundedness().is_unbounded()
+        && matches!(
+            dist_context.plan.pipeline_behavior(),
+            EmissionType::Incremental | EmissionType::Both
+        );
     // Use order preserving variants either of the conditions true
     // - it is desired according to config
     // - when plan is unbounded
+    // - when it is pipeline friendly (can incrementally produce results)
     let order_preserving_variants_desirable =
-        is_unbounded || config.optimizer.prefer_existing_sort;
+        unbounded_and_pipeline_friendly || 
config.optimizer.prefer_existing_sort;
 
     // Remove unnecessary repartition from the physical plan if any
     let DistributionContext {
@@ -1459,7 +1465,8 @@ pub(crate) mod tests {
             PlanProperties::new(
                 input.equivalence_properties().clone(), // Equivalence 
Properties
                 input.output_partitioning().clone(),    // Output Partitioning
-                input.execution_mode(),                 // Execution Mode
+                input.pipeline_behavior(),              // Pipeline Behavior
+                input.boundedness(),                    // Boundedness
             )
         }
     }
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs 
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index cfc08562f7..85fe9ecfcd 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -214,7 +214,7 @@ fn replace_with_partial_sort(
     let plan_any = plan.as_any();
     if let Some(sort_plan) = plan_any.downcast_ref::<SortExec>() {
         let child = Arc::clone(sort_plan.children()[0]);
-        if !child.execution_mode().is_unbounded() {
+        if !child.boundedness().is_unbounded() {
             return Ok(plan);
         }
 
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs 
b/datafusion/core/src/physical_optimizer/join_selection.rs
index 9d65c6ded4..009757f3a9 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -43,6 +43,7 @@ use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::PhysicalExpr;
 use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::execution_plan::EmissionType;
 
 /// The [`JoinSelection`] rule tries to modify a given plan so that it can
 /// accommodate infinite sources and optimize joins in the plan according to
@@ -516,7 +517,8 @@ fn statistical_join_selection_subrule(
 pub type PipelineFixerSubrule =
     dyn Fn(Arc<dyn ExecutionPlan>, &ConfigOptions) -> Result<Arc<dyn 
ExecutionPlan>>;
 
-/// Converts a hash join to a symmetric hash join in the case of infinite 
inputs on both sides.
+/// Converts a hash join to a symmetric hash join if both its inputs are
+/// unbounded and incremental.
 ///
 /// This subrule checks if a hash join can be replaced with a symmetric hash 
join when dealing
 /// with unbounded (infinite) inputs on both sides. This replacement avoids 
pipeline breaking and
@@ -537,10 +539,18 @@ fn hash_join_convert_symmetric_subrule(
 ) -> Result<Arc<dyn ExecutionPlan>> {
     // Check if the current plan node is a HashJoinExec.
     if let Some(hash_join) = input.as_any().downcast_ref::<HashJoinExec>() {
-        let left_unbounded = hash_join.left.execution_mode().is_unbounded();
-        let right_unbounded = hash_join.right.execution_mode().is_unbounded();
-        // Process only if both left and right sides are unbounded.
-        if left_unbounded && right_unbounded {
+        let left_unbounded = hash_join.left.boundedness().is_unbounded();
+        let left_incremental = matches!(
+            hash_join.left.pipeline_behavior(),
+            EmissionType::Incremental | EmissionType::Both
+        );
+        let right_unbounded = hash_join.right.boundedness().is_unbounded();
+        let right_incremental = matches!(
+            hash_join.right.pipeline_behavior(),
+            EmissionType::Incremental | EmissionType::Both
+        );
+        // Process only if both left and right sides are unbounded and 
incrementally emit.
+        if left_unbounded && right_unbounded & left_incremental & 
right_incremental {
             // Determine the partition mode based on configuration.
             let mode = if config_options.optimizer.repartition_joins {
                 StreamJoinPartitionMode::Partitioned
@@ -669,8 +679,8 @@ fn hash_join_swap_subrule(
     _config_options: &ConfigOptions,
 ) -> Result<Arc<dyn ExecutionPlan>> {
     if let Some(hash_join) = input.as_any().downcast_ref::<HashJoinExec>() {
-        if hash_join.left.execution_mode().is_unbounded()
-            && !hash_join.right.execution_mode().is_unbounded()
+        if hash_join.left.boundedness().is_unbounded()
+            && !hash_join.right.boundedness().is_unbounded()
             && matches!(
                 *hash_join.join_type(),
                 JoinType::Inner
@@ -2025,12 +2035,12 @@ mod hash_join_tests {
             assert_eq!(
                 (
                     t.case.as_str(),
-                    if left.execution_mode().is_unbounded() {
+                    if left.boundedness().is_unbounded() {
                         SourceType::Unbounded
                     } else {
                         SourceType::Bounded
                     },
-                    if right.execution_mode().is_unbounded() {
+                    if right.boundedness().is_unbounded() {
                         SourceType::Unbounded
                     } else {
                         SourceType::Bounded
diff --git 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 2f6b7a51ee..96b2454fa3 100644
--- 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++ 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -29,11 +29,12 @@ use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::Transformed;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use datafusion_physical_plan::execution_plan::EmissionType;
 use datafusion_physical_plan::tree_node::PlanContext;
 use datafusion_physical_plan::ExecutionPlanProperties;
 
-use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use itertools::izip;
 
 /// For a given `plan`, this object carries the information one needs from its
@@ -246,7 +247,8 @@ pub(crate) fn replace_with_order_preserving_variants(
     // For unbounded cases, we replace with the order-preserving variant in any
     // case, as doing so helps fix the pipeline. Also replace if config allows.
     let use_order_preserving_variant = config.optimizer.prefer_existing_sort
-        || !requirements.plan.execution_mode().pipeline_friendly();
+        || (requirements.plan.boundedness().is_unbounded()
+            && requirements.plan.pipeline_behavior() == EmissionType::Final);
 
     // Create an alternate plan with order-preserving variants:
     let mut alternate_plan = plan_with_order_preserving_variants(
diff --git a/datafusion/core/src/physical_optimizer/sanity_checker.rs 
b/datafusion/core/src/physical_optimizer/sanity_checker.rs
index 99bd1cab3e..b6d22320d0 100644
--- a/datafusion/core/src/physical_optimizer/sanity_checker.rs
+++ b/datafusion/core/src/physical_optimizer/sanity_checker.rs
@@ -30,6 +30,7 @@ use datafusion_common::config::{ConfigOptions, 
OptimizerOptions};
 use datafusion_common::plan_err;
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_physical_expr::intervals::utils::{check_support, 
is_datatype_supported};
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
 use datafusion_physical_plan::joins::SymmetricHashJoinExec;
 use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};
 
@@ -85,7 +86,15 @@ pub fn check_finiteness_requirements(
                               the 'allow_symmetric_joins_without_pruning' 
configuration flag");
         }
     }
-    if !input.execution_mode().pipeline_friendly() {
+
+    if matches!(
+        input.boundedness(),
+        Boundedness::Unbounded {
+            requires_infinite_memory: true
+        }
+    ) || (input.boundedness().is_unbounded()
+        && input.pipeline_behavior() == EmissionType::Final)
+    {
         plan_err!(
             "Cannot execute pipeline breaking queries, operator: {:?}",
             input
@@ -215,7 +224,9 @@ mod tests {
 
         let test2 = BinaryTestCase {
             source_types: (SourceType::Bounded, SourceType::Unbounded),
-            expect_fail: true,
+            // Left join for bounded build side and unbounded probe side can 
generate
+            // both incremental matched rows and final non-matched rows.
+            expect_fail: false,
         };
         let test3 = BinaryTestCase {
             source_types: (SourceType::Bounded, SourceType::Bounded),
@@ -290,7 +301,9 @@ mod tests {
         };
         let test2 = BinaryTestCase {
             source_types: (SourceType::Bounded, SourceType::Unbounded),
-            expect_fail: true,
+            // Full join for bounded build side and unbounded probe side can 
generate
+            // both incremental matched rows and final non-matched rows.
+            expect_fail: false,
         };
         let test3 = BinaryTestCase {
             source_types: (SourceType::Bounded, SourceType::Bounded),
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 44537c951f..47b31d2f4e 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -2017,7 +2017,7 @@ mod tests {
     use crate::datasource::file_format::options::CsvReadOptions;
     use crate::datasource::MemTable;
     use crate::physical_plan::{
-        expressions, DisplayAs, DisplayFormatType, ExecutionMode, 
PlanProperties,
+        expressions, DisplayAs, DisplayFormatType, PlanProperties,
         SendableRecordBatchStream,
     };
     use crate::prelude::{SessionConfig, SessionContext};
@@ -2032,6 +2032,7 @@ mod tests {
     use datafusion_expr::{col, lit, LogicalPlanBuilder, 
UserDefinedLogicalNodeCore};
     use datafusion_functions_aggregate::expr_fn::sum;
     use datafusion_physical_expr::EquivalenceProperties;
+    use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
 
     fn make_session_state() -> SessionState {
         let runtime = Arc::new(RuntimeEnv::default());
@@ -2619,13 +2620,11 @@ mod tests {
 
         /// This function creates the cache object that stores the plan 
properties such as schema, equivalence properties, ordering, partitioning, etc.
         fn compute_properties(schema: SchemaRef) -> PlanProperties {
-            let eq_properties = EquivalenceProperties::new(schema);
             PlanProperties::new(
-                eq_properties,
-                // Output Partitioning
+                EquivalenceProperties::new(schema),
                 Partitioning::UnknownPartitioning(1),
-                // Execution Mode
-                ExecutionMode::Bounded,
+                EmissionType::Incremental,
+                Boundedness::Bounded,
             )
         }
     }
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index d8304c2f0a..e5ce28e738 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -45,10 +45,9 @@ use arrow::record_batch::RecordBatch;
 use datafusion_common::{DataFusionError, Statistics};
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 use datafusion_physical_expr::{EquivalenceProperties, Partitioning, 
PhysicalSortExpr};
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
 use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
-use datafusion_physical_plan::{
-    DisplayAs, DisplayFormatType, ExecutionMode, PlanProperties,
-};
+use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PlanProperties};
 
 #[cfg(feature = "compression")]
 use bzip2::write::BzEncoder;
@@ -386,13 +385,11 @@ impl StatisticsExec {
 
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
     fn compute_properties(schema: SchemaRef) -> PlanProperties {
-        let eq_properties = EquivalenceProperties::new(schema);
         PlanProperties::new(
-            eq_properties,
-            // Output Partitioning
+            EquivalenceProperties::new(schema),
             Partitioning::UnknownPartitioning(2),
-            // Execution Mode
-            ExecutionMode::Bounded,
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 }
diff --git a/datafusion/core/src/test_util/mod.rs 
b/datafusion/core/src/test_util/mod.rs
index 09608887c0..aa134f28fe 100644
--- a/datafusion/core/src/test_util/mod.rs
+++ b/datafusion/core/src/test_util/mod.rs
@@ -39,22 +39,23 @@ use crate::error::Result;
 use crate::execution::context::TaskContext;
 use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE};
 use crate::physical_plan::{
-    DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
-    PlanProperties, RecordBatchStream, SendableRecordBatchStream,
+    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
+    RecordBatchStream, SendableRecordBatchStream,
 };
 use crate::prelude::{CsvReadOptions, SessionContext};
 
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
+use datafusion_catalog::Session;
 use datafusion_common::TableReference;
 use datafusion_expr::utils::COUNT_STAR_EXPANSION;
 use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
 use datafusion_functions_aggregate::count::count_udaf;
+use datafusion_physical_expr::aggregate::{AggregateExprBuilder, 
AggregateFunctionExpr};
 use datafusion_physical_expr::{expressions, EquivalenceProperties, 
PhysicalExpr};
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
 
 use async_trait::async_trait;
-use datafusion_catalog::Session;
-use datafusion_physical_expr::aggregate::{AggregateExprBuilder, 
AggregateFunctionExpr};
 use futures::Stream;
 use tempfile::TempDir;
 // backwards compatibility
@@ -259,16 +260,18 @@ impl UnboundedExec {
         batch_produce: Option<usize>,
         n_partitions: usize,
     ) -> PlanProperties {
-        let eq_properties = EquivalenceProperties::new(schema);
-        let mode = if batch_produce.is_none() {
-            ExecutionMode::Unbounded
+        let boundedness = if batch_produce.is_none() {
+            Boundedness::Unbounded {
+                requires_infinite_memory: false,
+            }
         } else {
-            ExecutionMode::Bounded
+            Boundedness::Bounded
         };
         PlanProperties::new(
-            eq_properties,
+            EquivalenceProperties::new(schema),
             Partitioning::UnknownPartitioning(n_partitions),
-            mode,
+            EmissionType::Incremental,
+            boundedness,
         )
     }
 }
diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs 
b/datafusion/core/tests/custom_sources_cases/mod.rs
index e1bd14105e..aafefac04e 100644
--- a/datafusion/core/tests/custom_sources_cases/mod.rs
+++ b/datafusion/core/tests/custom_sources_cases/mod.rs
@@ -35,15 +35,16 @@ use datafusion::physical_plan::{
     RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
 use datafusion::scalar::ScalarValue;
+use datafusion_catalog::Session;
 use datafusion_common::cast::as_primitive_array;
 use datafusion_common::project_schema;
 use datafusion_common::stats::Precision;
 use datafusion_physical_expr::EquivalenceProperties;
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
 use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
-use datafusion_physical_plan::{ExecutionMode, PlanProperties};
+use datafusion_physical_plan::PlanProperties;
 
 use async_trait::async_trait;
-use datafusion_catalog::Session;
 use futures::stream::Stream;
 
 mod provider_filter_pushdown;
@@ -91,12 +92,11 @@ impl CustomExecutionPlan {
 
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
     fn compute_properties(schema: SchemaRef) -> PlanProperties {
-        let eq_properties = EquivalenceProperties::new(schema);
         PlanProperties::new(
-            eq_properties,
-            // Output Partitioning
+            EquivalenceProperties::new(schema),
             Partitioning::UnknownPartitioning(1),
-            ExecutionMode::Bounded,
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 }
diff --git 
a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs 
b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
index 09f7265d63..af0506a505 100644
--- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
+++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
@@ -28,19 +28,20 @@ use datafusion::execution::context::TaskContext;
 use datafusion::logical_expr::TableProviderFilterPushDown;
 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
 use datafusion::physical_plan::{
-    DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
-    PlanProperties, SendableRecordBatchStream, Statistics,
+    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
+    SendableRecordBatchStream, Statistics,
 };
 use datafusion::prelude::*;
 use datafusion::scalar::ScalarValue;
+use datafusion_catalog::Session;
 use datafusion_common::cast::as_primitive_array;
 use datafusion_common::{internal_err, not_impl_err};
 use datafusion_expr::expr::{BinaryExpr, Cast};
 use datafusion_functions_aggregate::expr_fn::count;
 use datafusion_physical_expr::EquivalenceProperties;
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
 
 use async_trait::async_trait;
-use datafusion_catalog::Session;
 
 fn create_batch(value: i32, num_rows: usize) -> Result<RecordBatch> {
     let mut builder = Int32Builder::with_capacity(num_rows);
@@ -72,11 +73,11 @@ impl CustomPlan {
 
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
     fn compute_properties(schema: SchemaRef) -> PlanProperties {
-        let eq_properties = EquivalenceProperties::new(schema);
         PlanProperties::new(
-            eq_properties,
+            EquivalenceProperties::new(schema),
             Partitioning::UnknownPartitioning(1),
-            ExecutionMode::Bounded,
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 }
diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs 
b/datafusion/core/tests/custom_sources_cases/statistics.rs
index 41d182a376..9d3bd594a9 100644
--- a/datafusion/core/tests/custom_sources_cases/statistics.rs
+++ b/datafusion/core/tests/custom_sources_cases/statistics.rs
@@ -26,17 +26,18 @@ use datafusion::{
     error::Result,
     logical_expr::Expr,
     physical_plan::{
-        ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionMode, 
ExecutionPlan,
-        Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
+        ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, 
Partitioning,
+        PlanProperties, SendableRecordBatchStream, Statistics,
     },
     prelude::SessionContext,
     scalar::ScalarValue,
 };
+use datafusion_catalog::Session;
 use datafusion_common::{project_schema, stats::Precision};
 use datafusion_physical_expr::EquivalenceProperties;
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
 
 use async_trait::async_trait;
-use datafusion_catalog::Session;
 
 /// This is a testing structure for statistics
 /// It will act both as a table provider and execution plan
@@ -64,12 +65,11 @@ impl StatisticsValidation {
 
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
     fn compute_properties(schema: SchemaRef) -> PlanProperties {
-        let eq_properties = EquivalenceProperties::new(schema);
-
         PlanProperties::new(
-            eq_properties,
+            EquivalenceProperties::new(schema),
             Partitioning::UnknownPartitioning(2),
-            ExecutionMode::Bounded,
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 }
diff --git a/datafusion/core/tests/user_defined/insert_operation.rs 
b/datafusion/core/tests/user_defined/insert_operation.rs
index ff14fa0be3..aa531632c6 100644
--- a/datafusion/core/tests/user_defined/insert_operation.rs
+++ b/datafusion/core/tests/user_defined/insert_operation.rs
@@ -26,7 +26,10 @@ use datafusion::{
 use datafusion_catalog::{Session, TableProvider};
 use datafusion_expr::{dml::InsertOp, Expr, TableType};
 use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
-use datafusion_physical_plan::{DisplayAs, ExecutionMode, ExecutionPlan, 
PlanProperties};
+use datafusion_physical_plan::{
+    execution_plan::{Boundedness, EmissionType},
+    DisplayAs, ExecutionPlan, PlanProperties,
+};
 
 #[tokio::test]
 async fn insert_operation_is_passed_correctly_to_table_provider() {
@@ -122,15 +125,14 @@ struct TestInsertExec {
 
 impl TestInsertExec {
     fn new(op: InsertOp) -> Self {
-        let eq_properties = EquivalenceProperties::new(make_count_schema());
-        let plan_properties = PlanProperties::new(
-            eq_properties,
-            Partitioning::UnknownPartitioning(1),
-            ExecutionMode::Bounded,
-        );
         Self {
             op,
-            plan_properties,
+            plan_properties: PlanProperties::new(
+                EquivalenceProperties::new(make_count_schema()),
+                Partitioning::UnknownPartitioning(1),
+                EmissionType::Incremental,
+                Boundedness::Bounded,
+            ),
         }
     }
 }
diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs 
b/datafusion/core/tests/user_defined/user_defined_plan.rs
index 520a91aeb4..77753290c3 100644
--- a/datafusion/core/tests/user_defined/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined/user_defined_plan.rs
@@ -68,9 +68,6 @@ use arrow::{
     record_batch::RecordBatch,
     util::pretty::pretty_format_batches,
 };
-use async_trait::async_trait;
-use futures::{Stream, StreamExt};
-
 use datafusion::execution::session_state::SessionStateBuilder;
 use datafusion::{
     common::cast::{as_int64_array, as_string_array},
@@ -87,9 +84,8 @@ use datafusion::{
     optimizer::{OptimizerConfig, OptimizerRule},
     physical_expr::EquivalenceProperties,
     physical_plan::{
-        DisplayAs, DisplayFormatType, Distribution, ExecutionMode, 
ExecutionPlan,
-        Partitioning, PlanProperties, RecordBatchStream, 
SendableRecordBatchStream,
-        Statistics,
+        DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, 
Partitioning,
+        PlanProperties, RecordBatchStream, SendableRecordBatchStream, 
Statistics,
     },
     physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, 
PhysicalPlanner},
     prelude::{SessionConfig, SessionContext},
@@ -100,6 +96,10 @@ use datafusion_common::ScalarValue;
 use datafusion_expr::{FetchType, Projection, SortExpr};
 use datafusion_optimizer::optimizer::ApplyOrder;
 use datafusion_optimizer::AnalyzerRule;
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
+
+use async_trait::async_trait;
+use futures::{Stream, StreamExt};
 
 /// Execute the specified sql and return the resulting record batches
 /// pretty printed as a String.
@@ -495,12 +495,11 @@ impl TopKExec {
 
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
     fn compute_properties(schema: SchemaRef) -> PlanProperties {
-        let eq_properties = EquivalenceProperties::new(schema);
-
         PlanProperties::new(
-            eq_properties,
+            EquivalenceProperties::new(schema),
             Partitioning::UnknownPartitioning(1),
-            ExecutionMode::Bounded,
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 }
diff --git a/datafusion/ffi/src/execution_plan.rs 
b/datafusion/ffi/src/execution_plan.rs
index d10eda8990..5ab321cc01 100644
--- a/datafusion/ffi/src/execution_plan.rs
+++ b/datafusion/ffi/src/execution_plan.rs
@@ -272,7 +272,13 @@ impl ExecutionPlan for ForeignExecutionPlan {
 
 #[cfg(test)]
 mod tests {
-    use datafusion::{physical_plan::Partitioning, prelude::SessionContext};
+    use datafusion::{
+        physical_plan::{
+            execution_plan::{Boundedness, EmissionType},
+            Partitioning,
+        },
+        prelude::SessionContext,
+    };
 
     use super::*;
 
@@ -287,7 +293,8 @@ mod tests {
                 props: PlanProperties::new(
                     
datafusion::physical_expr::EquivalenceProperties::new(schema),
                     Partitioning::UnknownPartitioning(3),
-                    datafusion::physical_plan::ExecutionMode::Unbounded,
+                    EmissionType::Incremental,
+                    Boundedness::Bounded,
                 ),
             }
         }
diff --git a/datafusion/ffi/src/plan_properties.rs 
b/datafusion/ffi/src/plan_properties.rs
index 722681ae4a..3c7bc886ae 100644
--- a/datafusion/ffi/src/plan_properties.rs
+++ b/datafusion/ffi/src/plan_properties.rs
@@ -28,7 +28,10 @@ use arrow::datatypes::SchemaRef;
 use datafusion::{
     error::{DataFusionError, Result},
     physical_expr::EquivalenceProperties,
-    physical_plan::{ExecutionMode, PlanProperties},
+    physical_plan::{
+        execution_plan::{Boundedness, EmissionType},
+        PlanProperties,
+    },
     prelude::SessionContext,
 };
 use datafusion_proto::{
@@ -53,8 +56,11 @@ pub struct FFI_PlanProperties {
     pub output_partitioning:
         unsafe extern "C" fn(plan: &Self) -> RResult<RVec<u8>, RStr<'static>>,
 
-    /// Return the execution mode of the plan.
-    pub execution_mode: unsafe extern "C" fn(plan: &Self) -> FFI_ExecutionMode,
+    /// Return the emission type of the plan.
+    pub emission_type: unsafe extern "C" fn(plan: &Self) -> FFI_EmissionType,
+
+    /// Indicate boundedness of the plan and its memory requirements.
+    pub boundedness: unsafe extern "C" fn(plan: &Self) -> FFI_Boundedness,
 
     /// The output ordering is a [`PhysicalSortExprNodeCollection`] protobuf 
message
     /// serialized into bytes to pass across the FFI boundary.
@@ -98,12 +104,20 @@ unsafe extern "C" fn output_partitioning_fn_wrapper(
     ROk(output_partitioning.into())
 }
 
-unsafe extern "C" fn execution_mode_fn_wrapper(
+unsafe extern "C" fn emission_type_fn_wrapper(
     properties: &FFI_PlanProperties,
-) -> FFI_ExecutionMode {
+) -> FFI_EmissionType {
     let private_data = properties.private_data as *const 
PlanPropertiesPrivateData;
     let props = &(*private_data).props;
-    props.execution_mode().into()
+    props.emission_type.into()
+}
+
+unsafe extern "C" fn boundedness_fn_wrapper(
+    properties: &FFI_PlanProperties,
+) -> FFI_Boundedness {
+    let private_data = properties.private_data as *const 
PlanPropertiesPrivateData;
+    let props = &(*private_data).props;
+    props.boundedness.into()
 }
 
 unsafe extern "C" fn output_ordering_fn_wrapper(
@@ -164,7 +178,8 @@ impl From<&PlanProperties> for FFI_PlanProperties {
 
         FFI_PlanProperties {
             output_partitioning: output_partitioning_fn_wrapper,
-            execution_mode: execution_mode_fn_wrapper,
+            emission_type: emission_type_fn_wrapper,
+            boundedness: boundedness_fn_wrapper,
             output_ordering: output_ordering_fn_wrapper,
             schema: schema_fn_wrapper,
             release: release_fn_wrapper,
@@ -220,9 +235,6 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
             RErr(e) => Err(DataFusionError::Plan(e.to_string())),
         }?;
 
-        let execution_mode: ExecutionMode =
-            unsafe { (ffi_props.execution_mode)(&ffi_props).into() };
-
         let eq_properties = match orderings {
             Some(ordering) => {
                 EquivalenceProperties::new_with_orderings(Arc::new(schema), 
&[ordering])
@@ -230,40 +242,82 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
             None => EquivalenceProperties::new(Arc::new(schema)),
         };
 
+        let emission_type: EmissionType =
+            unsafe { (ffi_props.emission_type)(&ffi_props).into() };
+
+        let boundedness: Boundedness =
+            unsafe { (ffi_props.boundedness)(&ffi_props).into() };
+
         Ok(PlanProperties::new(
             eq_properties,
             partitioning,
-            execution_mode,
+            emission_type,
+            boundedness,
         ))
     }
 }
 
-/// FFI safe version of [`ExecutionMode`].
+/// FFI safe version of [`Boundedness`].
 #[repr(C)]
 #[allow(non_camel_case_types)]
 #[derive(Clone, StableAbi)]
-pub enum FFI_ExecutionMode {
+pub enum FFI_Boundedness {
     Bounded,
-    Unbounded,
-    PipelineBreaking,
+    Unbounded { requires_infinite_memory: bool },
+}
+
+impl From<Boundedness> for FFI_Boundedness {
+    fn from(value: Boundedness) -> Self {
+        match value {
+            Boundedness::Bounded => FFI_Boundedness::Bounded,
+            Boundedness::Unbounded {
+                requires_infinite_memory,
+            } => FFI_Boundedness::Unbounded {
+                requires_infinite_memory,
+            },
+        }
+    }
+}
+
+impl From<FFI_Boundedness> for Boundedness {
+    fn from(value: FFI_Boundedness) -> Self {
+        match value {
+            FFI_Boundedness::Bounded => Boundedness::Bounded,
+            FFI_Boundedness::Unbounded {
+                requires_infinite_memory,
+            } => Boundedness::Unbounded {
+                requires_infinite_memory,
+            },
+        }
+    }
+}
+
+/// FFI safe version of [`EmissionType`].
+#[repr(C)]
+#[allow(non_camel_case_types)]
+#[derive(Clone, StableAbi)]
+pub enum FFI_EmissionType {
+    Incremental,
+    Final,
+    Both,
 }
 
-impl From<ExecutionMode> for FFI_ExecutionMode {
-    fn from(value: ExecutionMode) -> Self {
+impl From<EmissionType> for FFI_EmissionType {
+    fn from(value: EmissionType) -> Self {
         match value {
-            ExecutionMode::Bounded => FFI_ExecutionMode::Bounded,
-            ExecutionMode::Unbounded => FFI_ExecutionMode::Unbounded,
-            ExecutionMode::PipelineBreaking => 
FFI_ExecutionMode::PipelineBreaking,
+            EmissionType::Incremental => FFI_EmissionType::Incremental,
+            EmissionType::Final => FFI_EmissionType::Final,
+            EmissionType::Both => FFI_EmissionType::Both,
         }
     }
 }
 
-impl From<FFI_ExecutionMode> for ExecutionMode {
-    fn from(value: FFI_ExecutionMode) -> Self {
+impl From<FFI_EmissionType> for EmissionType {
+    fn from(value: FFI_EmissionType) -> Self {
         match value {
-            FFI_ExecutionMode::Bounded => ExecutionMode::Bounded,
-            FFI_ExecutionMode::Unbounded => ExecutionMode::Unbounded,
-            FFI_ExecutionMode::PipelineBreaking => 
ExecutionMode::PipelineBreaking,
+            FFI_EmissionType::Incremental => EmissionType::Incremental,
+            FFI_EmissionType::Final => EmissionType::Final,
+            FFI_EmissionType::Both => EmissionType::Both,
         }
     }
 }
@@ -283,7 +337,8 @@ mod tests {
         let original_props = PlanProperties::new(
             EquivalenceProperties::new(schema),
             Partitioning::UnknownPartitioning(3),
-            ExecutionMode::Unbounded,
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         );
 
         let local_props_ptr = FFI_PlanProperties::from(&original_props);
diff --git a/datafusion/physical-optimizer/src/output_requirements.rs 
b/datafusion/physical-optimizer/src/output_requirements.rs
index 6c8e76bff8..d5ffaad6d8 100644
--- a/datafusion/physical-optimizer/src/output_requirements.rs
+++ b/datafusion/physical-optimizer/src/output_requirements.rs
@@ -121,7 +121,8 @@ impl OutputRequirementExec {
         PlanProperties::new(
             input.equivalence_properties().clone(), // Equivalence Properties
             input.output_partitioning().clone(),    // Output Partitioning
-            input.execution_mode(),                 // Execution Mode
+            input.pipeline_behavior(),              // Pipeline Behavior
+            input.boundedness(),                    // Boundedness
         )
     }
 }
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index feca5eb1db..2e0103defd 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -20,11 +20,12 @@
 use std::any::Any;
 use std::sync::Arc;
 
-use super::{DisplayAs, ExecutionMode, ExecutionPlanProperties, PlanProperties};
+use super::{DisplayAs, ExecutionPlanProperties, PlanProperties};
 use crate::aggregates::{
     no_grouping::AggregateStream, row_hash::GroupedHashAggregateStream,
     topk_stream::GroupedTopKAggregateStream,
 };
+use crate::execution_plan::{CardinalityEffect, EmissionType};
 use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use crate::projection::get_field_metadata;
 use crate::windows::get_ordered_partition_by_indices;
@@ -41,6 +42,7 @@ use datafusion_common::stats::Precision;
 use datafusion_common::{internal_err, not_impl_err, Result};
 use datafusion_execution::TaskContext;
 use datafusion_expr::{Accumulator, Aggregate};
+use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
 use datafusion_physical_expr::{
     equivalence::{collapse_lex_req, ProjectionMapping},
     expressions::Column,
@@ -48,8 +50,6 @@ use datafusion_physical_expr::{
     PhysicalExpr, PhysicalSortRequirement,
 };
 
-use crate::execution_plan::CardinalityEffect;
-use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
 use itertools::Itertools;
 
 pub(crate) mod group_values;
@@ -663,16 +663,19 @@ impl AggregateExec {
             input_partitioning.clone()
         };
 
-        // Determine execution mode:
-        let mut exec_mode = input.execution_mode();
-        if exec_mode == ExecutionMode::Unbounded
-            && *input_order_mode == InputOrderMode::Linear
-        {
-            // Cannot run without breaking the pipeline
-            exec_mode = ExecutionMode::PipelineBreaking;
-        }
+        // TODO: Emission type and boundedness information can be enhanced here
+        let emission_type = if *input_order_mode == InputOrderMode::Linear {
+            EmissionType::Final
+        } else {
+            input.pipeline_behavior()
+        };
 
-        PlanProperties::new(eq_properties, output_partitioning, exec_mode)
+        PlanProperties::new(
+            eq_properties,
+            output_partitioning,
+            emission_type,
+            input.boundedness(),
+        )
     }
 
     pub fn input_order_mode(&self) -> &InputOrderMode {
@@ -1298,6 +1301,7 @@ mod tests {
     use crate::coalesce_batches::CoalesceBatchesExec;
     use crate::coalesce_partitions::CoalescePartitionsExec;
     use crate::common;
+    use crate::execution_plan::Boundedness;
     use crate::expressions::col;
     use crate::memory::MemoryExec;
     use crate::test::assert_is_pending;
@@ -1730,13 +1734,11 @@ mod tests {
 
         /// This function creates the cache object that stores the plan 
properties such as schema, equivalence properties, ordering, partitioning, etc.
         fn compute_properties(schema: SchemaRef) -> PlanProperties {
-            let eq_properties = EquivalenceProperties::new(schema);
             PlanProperties::new(
-                eq_properties,
-                // Output Partitioning
+                EquivalenceProperties::new(schema),
                 Partitioning::UnknownPartitioning(1),
-                // Execution Mode
-                ExecutionMode::Bounded,
+                EmissionType::Incremental,
+                Boundedness::Bounded,
             )
         }
     }
diff --git a/datafusion/physical-plan/src/analyze.rs 
b/datafusion/physical-plan/src/analyze.rs
index c8b329fabd..1fc3280ceb 100644
--- a/datafusion/physical-plan/src/analyze.rs
+++ b/datafusion/physical-plan/src/analyze.rs
@@ -89,10 +89,12 @@ impl AnalyzeExec {
         input: &Arc<dyn ExecutionPlan>,
         schema: SchemaRef,
     ) -> PlanProperties {
-        let eq_properties = EquivalenceProperties::new(schema);
-        let output_partitioning = Partitioning::UnknownPartitioning(1);
-        let exec_mode = input.execution_mode();
-        PlanProperties::new(eq_properties, output_partitioning, exec_mode)
+        PlanProperties::new(
+            EquivalenceProperties::new(schema),
+            Partitioning::UnknownPartitioning(1),
+            input.pipeline_behavior(),
+            input.boundedness(),
+        )
     }
 }
 
diff --git a/datafusion/physical-plan/src/coalesce_batches.rs 
b/datafusion/physical-plan/src/coalesce_batches.rs
index 11678e7a46..fa8d125d62 100644
--- a/datafusion/physical-plan/src/coalesce_batches.rs
+++ b/datafusion/physical-plan/src/coalesce_batches.rs
@@ -97,7 +97,8 @@ impl CoalesceBatchesExec {
         PlanProperties::new(
             input.equivalence_properties().clone(), // Equivalence Properties
             input.output_partitioning().clone(),    // Output Partitioning
-            input.execution_mode(),                 // Execution Mode
+            input.pipeline_behavior(),
+            input.boundedness(),
         )
     }
 }
diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs 
b/datafusion/physical-plan/src/coalesce_partitions.rs
index 3da101d609..7c1bdba2f3 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -70,7 +70,8 @@ impl CoalescePartitionsExec {
         PlanProperties::new(
             eq_properties,                        // Equivalence Properties
             Partitioning::UnknownPartitioning(1), // Output Partitioning
-            input.execution_mode(),               // Execution Mode
+            input.pipeline_behavior(),
+            input.boundedness(),
         )
     }
 }
diff --git a/datafusion/physical-plan/src/empty.rs 
b/datafusion/physical-plan/src/empty.rs
index 192619f69f..5168c3cc10 100644
--- a/datafusion/physical-plan/src/empty.rs
+++ b/datafusion/physical-plan/src/empty.rs
@@ -20,11 +20,12 @@
 use std::any::Any;
 use std::sync::Arc;
 
-use super::{
-    common, DisplayAs, ExecutionMode, PlanProperties, 
SendableRecordBatchStream,
-    Statistics,
+use super::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, 
Statistics};
+use crate::{
+    execution_plan::{Boundedness, EmissionType},
+    memory::MemoryStream,
+    DisplayFormatType, ExecutionPlan, Partitioning,
 };
-use crate::{memory::MemoryStream, DisplayFormatType, ExecutionPlan, 
Partitioning};
 
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
@@ -74,14 +75,11 @@ impl EmptyExec {
 
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
     fn compute_properties(schema: SchemaRef, n_partitions: usize) -> 
PlanProperties {
-        let eq_properties = EquivalenceProperties::new(schema);
-        let output_partitioning = 
Self::output_partitioning_helper(n_partitions);
         PlanProperties::new(
-            eq_properties,
-            // Output Partitioning
-            output_partitioning,
-            // Execution Mode
-            ExecutionMode::Bounded,
+            EquivalenceProperties::new(schema),
+            Self::output_partitioning_helper(n_partitions),
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 }
diff --git a/datafusion/physical-plan/src/execution_plan.rs 
b/datafusion/physical-plan/src/execution_plan.rs
index ba9e4b0697..09bb807344 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -431,11 +431,6 @@ pub trait ExecutionPlanProperties {
     /// partitions.
     fn output_partitioning(&self) -> &Partitioning;
 
-    /// Specifies whether this plan generates an infinite stream of records.
-    /// If the plan does not support pipelining, but its input(s) are
-    /// infinite, returns [`ExecutionMode::PipelineBreaking`] to indicate this.
-    fn execution_mode(&self) -> ExecutionMode;
-
     /// If the output of this `ExecutionPlan` within each partition is sorted,
     /// returns `Some(keys)` describing the ordering. A `None` return value
     /// indicates no assumptions should be made on the output ordering.
@@ -445,6 +440,14 @@ pub trait ExecutionPlanProperties {
     /// output if its input is sorted as it does not reorder the input rows.
     fn output_ordering(&self) -> Option<&LexOrdering>;
 
+    /// Boundedness information of the stream corresponding to this 
`ExecutionPlan`.
+    /// For more details, see [`Boundedness`].
+    fn boundedness(&self) -> Boundedness;
+
+    /// Indicates how the stream of this `ExecutionPlan` emits its results.
+    /// For more details, see [`EmissionType`].
+    fn pipeline_behavior(&self) -> EmissionType;
+
     /// Get the [`EquivalenceProperties`] within the plan.
     ///
     /// Equivalence properties tell DataFusion what columns are known to be
@@ -470,14 +473,18 @@ impl ExecutionPlanProperties for Arc<dyn ExecutionPlan> {
         self.properties().output_partitioning()
     }
 
-    fn execution_mode(&self) -> ExecutionMode {
-        self.properties().execution_mode()
-    }
-
     fn output_ordering(&self) -> Option<&LexOrdering> {
         self.properties().output_ordering()
     }
 
+    fn boundedness(&self) -> Boundedness {
+        self.properties().boundedness
+    }
+
+    fn pipeline_behavior(&self) -> EmissionType {
+        self.properties().emission_type
+    }
+
     fn equivalence_properties(&self) -> &EquivalenceProperties {
         self.properties().equivalence_properties()
     }
@@ -488,95 +495,159 @@ impl ExecutionPlanProperties for &dyn ExecutionPlan {
         self.properties().output_partitioning()
     }
 
-    fn execution_mode(&self) -> ExecutionMode {
-        self.properties().execution_mode()
-    }
-
     fn output_ordering(&self) -> Option<&LexOrdering> {
         self.properties().output_ordering()
     }
 
+    fn boundedness(&self) -> Boundedness {
+        self.properties().boundedness
+    }
+
+    fn pipeline_behavior(&self) -> EmissionType {
+        self.properties().emission_type
+    }
+
     fn equivalence_properties(&self) -> &EquivalenceProperties {
         self.properties().equivalence_properties()
     }
 }
 
-/// Describes the execution mode of the result of calling
-/// [`ExecutionPlan::execute`] with respect to its size and behavior.
+/// Represents whether a stream of data **generated** by an operator is 
bounded (finite)
+/// or unbounded (infinite).
 ///
-/// The mode of the execution plan is determined by the mode of its input
-/// execution plans and the details of the operator itself. For example, a
-/// `FilterExec` operator will have the same execution mode as its input, but a
-/// `SortExec` operator may have a different execution mode than its input,
-/// depending on how the input stream is sorted.
+/// This is used to determine whether an execution plan will eventually 
complete
+/// processing all its data (bounded) or could potentially run forever 
(unbounded).
 ///
-/// There are three possible execution modes: `Bounded`, `Unbounded` and
-/// `PipelineBreaking`.
-#[derive(Clone, Copy, PartialEq, Debug)]
-pub enum ExecutionMode {
-    /// The stream is bounded / finite.
-    ///
-    /// In this case the stream will eventually return `None` to indicate that
-    /// there are no more records to process.
+/// For unbounded streams, it also tracks whether the operator requires finite 
memory
+/// to process the stream or if memory usage could grow unbounded.
+///
+/// Bounedness of the output stream is based on the the boundedness of the 
input stream and the nature of
+/// the operator. For example, limit or topk with fetch operator can convert 
an unbounded stream to a bounded stream.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum Boundedness {
+    /// The data stream is bounded (finite) and will eventually complete
     Bounded,
-    /// The stream is unbounded / infinite.
-    ///
-    /// In this case, the stream will never be done (never return `None`),
-    /// except in case of error.
-    ///
-    /// This mode is often used in "Steaming" use cases where data is
-    /// incrementally processed as it arrives.
-    ///
-    /// Note that even though the operator generates an unbounded stream of
-    /// results, it can execute with bounded memory and incrementally produces
-    /// output.
-    Unbounded,
-    /// Some of the operator's input stream(s) are unbounded, but the operator
-    /// cannot generate streaming results from these streaming inputs.
-    ///
-    /// In this case, the execution mode will be pipeline breaking, e.g. the
-    /// operator requires unbounded memory to generate results. This
-    /// information is used by the planner when performing sanity checks
-    /// on plans processings unbounded data sources.
-    PipelineBreaking,
+    /// The data stream is unbounded (infinite) and could run forever
+    Unbounded {
+        /// Whether this operator requires infinite memory to process the 
unbounded stream.
+        /// If false, the operator can process an infinite stream with bounded 
memory.
+        /// If true, memory usage may grow unbounded while processing the 
stream.
+        ///
+        /// For example, `Median` requires infinite memory to compute the 
median of an unbounded stream.
+        /// `Min/Max` requires infinite memory if the stream is unordered, but 
can be computed with bounded memory if the stream is ordered.
+        requires_infinite_memory: bool,
+    },
 }
 
-impl ExecutionMode {
-    /// Check whether the execution mode is unbounded or not.
+impl Boundedness {
     pub fn is_unbounded(&self) -> bool {
-        matches!(self, ExecutionMode::Unbounded)
+        matches!(self, Boundedness::Unbounded { .. })
     }
+}
 
-    /// Check whether the execution is pipeline friendly. If so, operator can
-    /// execute safely.
-    pub fn pipeline_friendly(&self) -> bool {
-        matches!(self, ExecutionMode::Bounded | ExecutionMode::Unbounded)
-    }
+/// Represents how an operator emits its output records.
+///
+/// This is used to determine whether an operator emits records incrementally 
as they arrive,
+/// only emits a final result at the end, or can do both. Note that it 
generates the output -- record batch with `batch_size` rows
+/// but it may still buffer data internally until it has enough data to emit a 
record batch or the source is exhausted.
+///
+/// For example, in the following plan:
+/// ```text
+///   SortExec [EmissionType::Final]
+///     |_ on: [col1 ASC]
+///     FilterExec [EmissionType::Incremental]
+///       |_ pred: col2 > 100
+///       CsvExec [EmissionType::Incremental]
+///         |_ file: "data.csv"
+/// ```
+/// - CsvExec emits records incrementally as it reads from the file
+/// - FilterExec processes and emits filtered records incrementally as they 
arrive
+/// - SortExec must wait for all input records before it can emit the sorted 
result,
+///   since it needs to see all values to determine their final order
+///
+/// Left joins can emit both incrementally and finally:
+/// - Incrementally emit matches as they are found
+/// - Finally emit non-matches after all input is processed
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum EmissionType {
+    /// Records are emitted incrementally as they arrive and are processed
+    Incremental,
+    /// Records are only emitted once all input has been processed
+    Final,
+    /// Records can be emitted both incrementally and as a final result
+    Both,
 }
 
-/// Conservatively "combines" execution modes of a given collection of 
operators.
-pub(crate) fn execution_mode_from_children<'a>(
+/// Utility to determine an operator's boundedness based on its children's 
boundedness.
+///
+/// Assumes boundedness can be inferred from child operators:
+/// - Unbounded (requires_infinite_memory: true) takes precedence.
+/// - Unbounded (requires_infinite_memory: false) is considered next.
+/// - Otherwise, the operator is bounded.
+///
+/// **Note:** This is a general-purpose utility and may not apply to
+/// all multi-child operators. Ensure your operator's behavior aligns
+/// with these assumptions before using.
+pub(crate) fn boundedness_from_children<'a>(
     children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
-) -> ExecutionMode {
-    let mut result = ExecutionMode::Bounded;
-    for mode in children.into_iter().map(|child| child.execution_mode()) {
-        match (mode, result) {
-            (ExecutionMode::PipelineBreaking, _)
-            | (_, ExecutionMode::PipelineBreaking) => {
-                // If any of the modes is `PipelineBreaking`, so is the result:
-                return ExecutionMode::PipelineBreaking;
-            }
-            (ExecutionMode::Unbounded, _) | (_, ExecutionMode::Unbounded) => {
-                // Unbounded mode eats up bounded mode:
-                result = ExecutionMode::Unbounded;
+) -> Boundedness {
+    let mut unbounded_with_finite_mem = false;
+
+    for child in children {
+        match child.boundedness() {
+            Boundedness::Unbounded {
+                requires_infinite_memory: true,
+            } => {
+                return Boundedness::Unbounded {
+                    requires_infinite_memory: true,
+                }
             }
-            (ExecutionMode::Bounded, ExecutionMode::Bounded) => {
-                // When both modes are bounded, so is the result:
-                result = ExecutionMode::Bounded;
+            Boundedness::Unbounded {
+                requires_infinite_memory: false,
+            } => {
+                unbounded_with_finite_mem = true;
             }
+            Boundedness::Bounded => {}
         }
     }
-    result
+
+    if unbounded_with_finite_mem {
+        Boundedness::Unbounded {
+            requires_infinite_memory: false,
+        }
+    } else {
+        Boundedness::Bounded
+    }
+}
+
+/// Determines the emission type of an operator based on its children's 
pipeline behavior.
+///
+/// The precedence of emission types is:
+/// - `Final` has the highest precedence.
+/// - `Both` is next: if any child emits both incremental and final results, 
the parent inherits this behavior unless a `Final` is present.
+/// - `Incremental` is the default if all children emit incremental results.
+///
+/// **Note:** This is a general-purpose utility and may not apply to
+/// all multi-child operators. Verify your operator's behavior aligns
+/// with these assumptions.
+pub(crate) fn emission_type_from_children<'a>(
+    children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
+) -> EmissionType {
+    let mut inc_and_final = false;
+
+    for child in children {
+        match child.pipeline_behavior() {
+            EmissionType::Final => return EmissionType::Final,
+            EmissionType::Both => inc_and_final = true,
+            EmissionType::Incremental => continue,
+        }
+    }
+
+    if inc_and_final {
+        EmissionType::Both
+    } else {
+        EmissionType::Incremental
+    }
 }
 
 /// Stores certain, often expensive to compute, plan properties used in query
@@ -591,8 +662,10 @@ pub struct PlanProperties {
     pub eq_properties: EquivalenceProperties,
     /// See [ExecutionPlanProperties::output_partitioning]
     pub partitioning: Partitioning,
-    /// See [ExecutionPlanProperties::execution_mode]
-    pub execution_mode: ExecutionMode,
+    /// See [ExecutionPlanProperties::pipeline_behavior]
+    pub emission_type: EmissionType,
+    /// See [ExecutionPlanProperties::boundedness]
+    pub boundedness: Boundedness,
     /// See [ExecutionPlanProperties::output_ordering]
     output_ordering: Option<LexOrdering>,
 }
@@ -602,14 +675,16 @@ impl PlanProperties {
     pub fn new(
         eq_properties: EquivalenceProperties,
         partitioning: Partitioning,
-        execution_mode: ExecutionMode,
+        emission_type: EmissionType,
+        boundedness: Boundedness,
     ) -> Self {
         // Output ordering can be derived from `eq_properties`.
         let output_ordering = eq_properties.output_ordering();
         Self {
             eq_properties,
             partitioning,
-            execution_mode,
+            emission_type,
+            boundedness,
             output_ordering,
         }
     }
@@ -620,12 +695,6 @@ impl PlanProperties {
         self
     }
 
-    /// Overwrite the execution Mode with its new value.
-    pub fn with_execution_mode(mut self, execution_mode: ExecutionMode) -> 
Self {
-        self.execution_mode = execution_mode;
-        self
-    }
-
     /// Overwrite equivalence properties with its new value.
     pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) 
-> Self {
         // Changing equivalence properties also changes output ordering, so
@@ -635,6 +704,18 @@ impl PlanProperties {
         self
     }
 
+    /// Overwrite boundedness with its new value.
+    pub fn with_boundedness(mut self, boundedness: Boundedness) -> Self {
+        self.boundedness = boundedness;
+        self
+    }
+
+    /// Overwrite emission type with its new value.
+    pub fn with_emission_type(mut self, emission_type: EmissionType) -> Self {
+        self.emission_type = emission_type;
+        self
+    }
+
     pub fn equivalence_properties(&self) -> &EquivalenceProperties {
         &self.eq_properties
     }
@@ -647,10 +728,6 @@ impl PlanProperties {
         self.output_ordering.as_ref()
     }
 
-    pub fn execution_mode(&self) -> ExecutionMode {
-        self.execution_mode
-    }
-
     /// Get schema of the node.
     fn schema(&self) -> &SchemaRef {
         self.eq_properties.schema()
diff --git a/datafusion/physical-plan/src/explain.rs 
b/datafusion/physical-plan/src/explain.rs
index cc42e05871..cb00958cec 100644
--- a/datafusion/physical-plan/src/explain.rs
+++ b/datafusion/physical-plan/src/explain.rs
@@ -20,7 +20,8 @@
 use std::any::Any;
 use std::sync::Arc;
 
-use super::{DisplayAs, ExecutionMode, PlanProperties, 
SendableRecordBatchStream};
+use super::{DisplayAs, PlanProperties, SendableRecordBatchStream};
+use crate::execution_plan::{Boundedness, EmissionType};
 use crate::stream::RecordBatchStreamAdapter;
 use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
 
@@ -74,11 +75,11 @@ impl ExplainExec {
 
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
     fn compute_properties(schema: SchemaRef) -> PlanProperties {
-        let eq_properties = EquivalenceProperties::new(schema);
         PlanProperties::new(
-            eq_properties,
+            EquivalenceProperties::new(schema),
             Partitioning::UnknownPartitioning(1),
-            ExecutionMode::Bounded,
+            EmissionType::Final,
+            Boundedness::Bounded,
         )
     }
 }
diff --git a/datafusion/physical-plan/src/filter.rs 
b/datafusion/physical-plan/src/filter.rs
index 07898e8d22..901907cf38 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -272,10 +272,12 @@ impl FilterExec {
                 output_partitioning.project(&projection_mapping, 
&eq_properties);
             eq_properties = eq_properties.project(&projection_mapping, 
out_schema);
         }
+
         Ok(PlanProperties::new(
             eq_properties,
             output_partitioning,
-            input.execution_mode(),
+            input.pipeline_behavior(),
+            input.boundedness(),
         ))
     }
 }
diff --git a/datafusion/physical-plan/src/insert.rs 
b/datafusion/physical-plan/src/insert.rs
index ae8a2acce6..e848640386 100644
--- a/datafusion/physical-plan/src/insert.rs
+++ b/datafusion/physical-plan/src/insert.rs
@@ -23,11 +23,12 @@ use std::fmt::Debug;
 use std::sync::Arc;
 
 use super::{
-    execute_input_stream, DisplayAs, DisplayFormatType, ExecutionPlan,
-    ExecutionPlanProperties, Partitioning, PlanProperties, 
SendableRecordBatchStream,
+    execute_input_stream, DisplayAs, DisplayFormatType, ExecutionPlan, 
Partitioning,
+    PlanProperties, SendableRecordBatchStream,
 };
 use crate::metrics::MetricsSet;
 use crate::stream::RecordBatchStreamAdapter;
+use crate::ExecutionPlanProperties;
 
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
@@ -140,7 +141,8 @@ impl DataSinkExec {
         PlanProperties::new(
             eq_properties,
             Partitioning::UnknownPartitioning(1),
-            input.execution_mode(),
+            input.pipeline_behavior(),
+            input.boundedness(),
         )
     }
 }
diff --git a/datafusion/physical-plan/src/joins/cross_join.rs 
b/datafusion/physical-plan/src/joins/cross_join.rs
index 8bf675e873..b70eeb313b 100644
--- a/datafusion/physical-plan/src/joins/cross_join.rs
+++ b/datafusion/physical-plan/src/joins/cross_join.rs
@@ -24,11 +24,11 @@ use super::utils::{
     StatefulStreamResult,
 };
 use crate::coalesce_partitions::CoalescePartitionsExec;
+use crate::execution_plan::{boundedness_from_children, EmissionType};
 use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use crate::{
-    execution_mode_from_children, handle_state, ColumnStatistics, DisplayAs,
-    DisplayFormatType, Distribution, ExecutionMode, ExecutionPlan,
-    ExecutionPlanProperties, PlanProperties, RecordBatchStream,
+    handle_state, ColumnStatistics, DisplayAs, DisplayFormatType, Distribution,
+    ExecutionPlan, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
     SendableRecordBatchStream, Statistics,
 };
 use arrow::compute::concat_batches;
@@ -161,14 +161,12 @@ impl CrossJoinExec {
             left.schema().fields.len(),
         );
 
-        // Determine the execution mode:
-        let mut mode = execution_mode_from_children([left, right]);
-        if mode.is_unbounded() {
-            // If any of the inputs is unbounded, cross join breaks the 
pipeline.
-            mode = ExecutionMode::PipelineBreaking;
-        }
-
-        PlanProperties::new(eq_properties, output_partitioning, mode)
+        PlanProperties::new(
+            eq_properties,
+            output_partitioning,
+            EmissionType::Final,
+            boundedness_from_children([left, right]),
+        )
     }
 }
 
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs 
b/datafusion/physical-plan/src/joins/hash_join.rs
index 532a91da75..ef70392a01 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -29,11 +29,12 @@ use super::{
     utils::{OnceAsync, OnceFut},
     PartitionMode,
 };
+use crate::execution_plan::{boundedness_from_children, EmissionType};
 use crate::ExecutionPlanProperties;
 use crate::{
     coalesce_partitions::CoalescePartitionsExec,
     common::can_project,
-    execution_mode_from_children, handle_state,
+    handle_state,
     hash_utils::create_hashes,
     joins::utils::{
         adjust_indices_by_join_type, apply_join_filter_to_indices,
@@ -44,9 +45,8 @@ use crate::{
         JoinHashMapType, JoinOn, JoinOnRef, StatefulStreamResult,
     },
     metrics::{ExecutionPlanMetricsSet, MetricsSet},
-    DisplayAs, DisplayFormatType, Distribution, ExecutionMode, ExecutionPlan,
-    Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream,
-    Statistics,
+    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
 
 use arrow::array::{
@@ -526,24 +526,26 @@ impl HashJoinExec {
             }
         };
 
-        // Determine execution mode by checking whether this join is pipeline
-        // breaking. This happens when the left side is unbounded, or the right
-        // side is unbounded with `Left`, `Full`, `LeftAnti` or `LeftSemi` 
join types.
-        let pipeline_breaking = left.execution_mode().is_unbounded()
-            || (right.execution_mode().is_unbounded()
-                && matches!(
-                    join_type,
-                    JoinType::Left
-                        | JoinType::Full
-                        | JoinType::LeftAnti
-                        | JoinType::LeftSemi
-                        | JoinType::LeftMark
-                ));
-
-        let mode = if pipeline_breaking {
-            ExecutionMode::PipelineBreaking
+        let emission_type = if left.boundedness().is_unbounded() {
+            EmissionType::Final
+        } else if right.pipeline_behavior() == EmissionType::Incremental {
+            match join_type {
+                // If we only need to generate matched rows from the probe 
side,
+                // we can emit rows incrementally.
+                JoinType::Inner
+                | JoinType::LeftSemi
+                | JoinType::RightSemi
+                | JoinType::Right
+                | JoinType::RightAnti => EmissionType::Incremental,
+                // If we need to generate unmatched rows from the *build side*,
+                // we need to emit them at the end.
+                JoinType::Left
+                | JoinType::LeftAnti
+                | JoinType::LeftMark
+                | JoinType::Full => EmissionType::Both,
+            }
         } else {
-            execution_mode_from_children([left, right])
+            right.pipeline_behavior()
         };
 
         // If contains projection, update the PlanProperties.
@@ -556,10 +558,12 @@ impl HashJoinExec {
                 output_partitioning.project(&projection_mapping, 
&eq_properties);
             eq_properties = eq_properties.project(&projection_mapping, 
out_schema);
         }
+
         Ok(PlanProperties::new(
             eq_properties,
             output_partitioning,
-            mode,
+            emission_type,
+            boundedness_from_children([left, right]),
         ))
     }
 }
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs 
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index d174564178..8caf5d9b5d 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -28,6 +28,7 @@ use super::utils::{
     BatchTransformer, NoopBatchTransformer, StatefulStreamResult,
 };
 use crate::coalesce_partitions::CoalescePartitionsExec;
+use crate::execution_plan::{boundedness_from_children, EmissionType};
 use crate::joins::utils::{
     adjust_indices_by_join_type, apply_join_filter_to_indices, 
build_batch_from_indices,
     build_join_schema, check_join_is_valid, estimate_join_statistics,
@@ -36,9 +37,9 @@ use crate::joins::utils::{
 };
 use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use crate::{
-    execution_mode_from_children, handle_state, DisplayAs, DisplayFormatType,
-    Distribution, ExecutionMode, ExecutionPlan, ExecutionPlanProperties, 
PlanProperties,
-    RecordBatchStream, SendableRecordBatchStream,
+    handle_state, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
+    ExecutionPlanProperties, PlanProperties, RecordBatchStream,
+    SendableRecordBatchStream,
 };
 
 use arrow::array::{BooleanBufferBuilder, UInt32Array, UInt64Array};
@@ -241,14 +242,34 @@ impl NestedLoopJoinExec {
         let output_partitioning =
             asymmetric_join_output_partitioning(left, right, &join_type);
 
-        // Determine execution mode:
-        let mode = if left.execution_mode().is_unbounded() {
-            ExecutionMode::PipelineBreaking
+        let emission_type = if left.boundedness().is_unbounded() {
+            EmissionType::Final
+        } else if right.pipeline_behavior() == EmissionType::Incremental {
+            match join_type {
+                // If we only need to generate matched rows from the probe 
side,
+                // we can emit rows incrementally.
+                JoinType::Inner
+                | JoinType::LeftSemi
+                | JoinType::RightSemi
+                | JoinType::Right
+                | JoinType::RightAnti => EmissionType::Incremental,
+                // If we need to generate unmatched rows from the *build side*,
+                // we need to emit them at the end.
+                JoinType::Left
+                | JoinType::LeftAnti
+                | JoinType::LeftMark
+                | JoinType::Full => EmissionType::Both,
+            }
         } else {
-            execution_mode_from_children([left, right])
+            right.pipeline_behavior()
         };
 
-        PlanProperties::new(eq_properties, output_partitioning, mode)
+        PlanProperties::new(
+            eq_properties,
+            output_partitioning,
+            emission_type,
+            boundedness_from_children([left, right]),
+        )
     }
 
     /// Returns a vector indicating whether the left and right inputs maintain 
their order.
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 5e387409da..f17b99d81d 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -53,8 +53,8 @@ use datafusion_execution::TaskContext;
 use datafusion_physical_expr::equivalence::join_equivalence_properties;
 use datafusion_physical_expr::PhysicalExprRef;
 use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
-use futures::{Stream, StreamExt};
 
+use crate::execution_plan::{boundedness_from_children, EmissionType};
 use crate::expressions::PhysicalSortExpr;
 use crate::joins::utils::{
     build_join_schema, check_join_is_valid, estimate_join_statistics,
@@ -63,11 +63,13 @@ use crate::joins::utils::{
 use crate::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder, 
MetricsSet};
 use crate::spill::spill_record_batches;
 use crate::{
-    execution_mode_from_children, metrics, DisplayAs, DisplayFormatType, 
Distribution,
-    ExecutionPlan, ExecutionPlanProperties, PhysicalExpr, PlanProperties,
-    RecordBatchStream, SendableRecordBatchStream, Statistics,
+    metrics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
+    ExecutionPlanProperties, PhysicalExpr, PlanProperties, RecordBatchStream,
+    SendableRecordBatchStream, Statistics,
 };
 
+use futures::{Stream, StreamExt};
+
 /// Join execution plan that executes equi-join predicates on multiple 
partitions using Sort-Merge
 /// join algorithm and applies an optional filter post join. Can be used to 
join arbitrarily large
 /// inputs where one or both of the inputs don't fit in the available memory.
@@ -302,10 +304,13 @@ impl SortMergeJoinExec {
         let output_partitioning =
             symmetric_join_output_partitioning(left, right, &join_type);
 
-        // Determine execution mode:
-        let mode = execution_mode_from_children([left, right]);
-
-        PlanProperties::new(eq_properties, output_partitioning, mode)
+        // TODO: Emission type may be incremental if the input is sorted
+        PlanProperties::new(
+            eq_properties,
+            output_partitioning,
+            EmissionType::Final,
+            boundedness_from_children([left, right]),
+        )
     }
 }
 
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs 
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index 94ef4d5bc3..72fd5a0feb 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -33,6 +33,7 @@ use std::task::{Context, Poll};
 use std::vec;
 
 use crate::common::SharedMemoryReservation;
+use crate::execution_plan::{boundedness_from_children, 
emission_type_from_children};
 use crate::joins::hash_join::{equal_rows_arr, update_hash};
 use crate::joins::stream_join_utils::{
     calculate_filter_expr_intervals, combine_two_batches,
@@ -47,7 +48,6 @@ use crate::joins::utils::{
     NoopBatchTransformer, StatefulStreamResult,
 };
 use crate::{
-    execution_mode_from_children,
     joins::StreamJoinPartitionMode,
     metrics::{ExecutionPlanMetricsSet, MetricsSet},
     DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, 
ExecutionPlanProperties,
@@ -275,10 +275,12 @@ impl SymmetricHashJoinExec {
         let output_partitioning =
             symmetric_join_output_partitioning(left, right, &join_type);
 
-        // Determine execution mode:
-        let mode = execution_mode_from_children([left, right]);
-
-        PlanProperties::new(eq_properties, output_partitioning, mode)
+        PlanProperties::new(
+            eq_properties,
+            output_partitioning,
+            emission_type_from_children([left, right]),
+            boundedness_from_children([left, right]),
+        )
     }
 
     /// left stream
diff --git a/datafusion/physical-plan/src/lib.rs 
b/datafusion/physical-plan/src/lib.rs
index 845a74eaea..5ad37f0b1a 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -35,11 +35,10 @@ pub use datafusion_physical_expr::{
 };
 
 pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, 
VerboseDisplay};
-pub(crate) use crate::execution_plan::execution_mode_from_children;
 pub use crate::execution_plan::{
     collect, collect_partitioned, displayable, execute_input_stream, 
execute_stream,
     execute_stream_partitioned, get_plan_string, 
with_new_children_if_necessary,
-    ExecutionMode, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
+    ExecutionPlan, ExecutionPlanProperties, PlanProperties,
 };
 pub use crate::metrics::Metric;
 pub use crate::ordering::InputOrderMode;
diff --git a/datafusion/physical-plan/src/limit.rs 
b/datafusion/physical-plan/src/limit.rs
index ab1e6cb37b..9665a09e42 100644
--- a/datafusion/physical-plan/src/limit.rs
+++ b/datafusion/physical-plan/src/limit.rs
@@ -24,9 +24,10 @@ use std::task::{Context, Poll};
 
 use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
 use super::{
-    DisplayAs, ExecutionMode, ExecutionPlanProperties, PlanProperties, 
RecordBatchStream,
+    DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
     SendableRecordBatchStream, Statistics,
 };
+use crate::execution_plan::{Boundedness, CardinalityEffect};
 use crate::{DisplayFormatType, Distribution, ExecutionPlan, Partitioning};
 
 use arrow::datatypes::SchemaRef;
@@ -34,7 +35,6 @@ use arrow::record_batch::RecordBatch;
 use datafusion_common::{internal_err, Result};
 use datafusion_execution::TaskContext;
 
-use crate::execution_plan::CardinalityEffect;
 use futures::stream::{Stream, StreamExt};
 use log::trace;
 
@@ -86,7 +86,9 @@ impl GlobalLimitExec {
         PlanProperties::new(
             input.equivalence_properties().clone(), // Equivalence Properties
             Partitioning::UnknownPartitioning(1),   // Output Partitioning
-            ExecutionMode::Bounded,                 // Execution Mode
+            input.pipeline_behavior(),
+            // Limit operations are always bounded since they output a finite 
number of rows
+            Boundedness::Bounded,
         )
     }
 }
@@ -242,7 +244,9 @@ impl LocalLimitExec {
         PlanProperties::new(
             input.equivalence_properties().clone(), // Equivalence Properties
             input.output_partitioning().clone(),    // Output Partitioning
-            ExecutionMode::Bounded,                 // Execution Mode
+            input.pipeline_behavior(),
+            // Limit operations are always bounded since they output a finite 
number of rows
+            Boundedness::Bounded,
         )
     }
 }
diff --git a/datafusion/physical-plan/src/memory.rs 
b/datafusion/physical-plan/src/memory.rs
index bf6294f5a5..521008ce9b 100644
--- a/datafusion/physical-plan/src/memory.rs
+++ b/datafusion/physical-plan/src/memory.rs
@@ -24,9 +24,10 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use super::{
-    common, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, 
Partitioning,
-    PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
+    common, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
PlanProperties,
+    RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
+use crate::execution_plan::{Boundedness, EmissionType};
 
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
@@ -285,11 +286,11 @@ impl MemoryExec {
         orderings: &[LexOrdering],
         partitions: &[Vec<RecordBatch>],
     ) -> PlanProperties {
-        let eq_properties = EquivalenceProperties::new_with_orderings(schema, 
orderings);
         PlanProperties::new(
-            eq_properties,                                       // 
Equivalence Properties
-            Partitioning::UnknownPartitioning(partitions.len()), // Output 
Partitioning
-            ExecutionMode::Bounded,                              // Execution 
Mode
+            EquivalenceProperties::new_with_orderings(schema, orderings),
+            Partitioning::UnknownPartitioning(partitions.len()),
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 }
@@ -393,7 +394,8 @@ impl LazyMemoryExec {
         let cache = PlanProperties::new(
             EquivalenceProperties::new(Arc::clone(&schema)),
             Partitioning::RoundRobinBatch(generators.len()),
-            ExecutionMode::Bounded,
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         );
         Ok(Self {
             schema,
diff --git a/datafusion/physical-plan/src/placeholder_row.rs 
b/datafusion/physical-plan/src/placeholder_row.rs
index f9437f46f8..355e51070f 100644
--- a/datafusion/physical-plan/src/placeholder_row.rs
+++ b/datafusion/physical-plan/src/placeholder_row.rs
@@ -20,10 +20,8 @@
 use std::any::Any;
 use std::sync::Arc;
 
-use super::{
-    common, DisplayAs, ExecutionMode, PlanProperties, 
SendableRecordBatchStream,
-    Statistics,
-};
+use super::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, 
Statistics};
+use crate::execution_plan::{Boundedness, EmissionType};
 use crate::{memory::MemoryStream, DisplayFormatType, ExecutionPlan, 
Partitioning};
 
 use arrow::array::{ArrayRef, NullArray};
@@ -96,11 +94,12 @@ impl PlaceholderRowExec {
 
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
     fn compute_properties(schema: SchemaRef, n_partitions: usize) -> 
PlanProperties {
-        let eq_properties = EquivalenceProperties::new(schema);
-        // Get output partitioning:
-        let output_partitioning = 
Self::output_partitioning_helper(n_partitions);
-
-        PlanProperties::new(eq_properties, output_partitioning, 
ExecutionMode::Bounded)
+        PlanProperties::new(
+            EquivalenceProperties::new(schema),
+            Self::output_partitioning_helper(n_partitions),
+            EmissionType::Incremental,
+            Boundedness::Bounded,
+        )
     }
 }
 
diff --git a/datafusion/physical-plan/src/projection.rs 
b/datafusion/physical-plan/src/projection.rs
index c1d3f36836..e37a6b0dfb 100644
--- a/datafusion/physical-plan/src/projection.rs
+++ b/datafusion/physical-plan/src/projection.rs
@@ -132,7 +132,8 @@ impl ProjectionExec {
         Ok(PlanProperties::new(
             eq_properties,
             output_partitioning,
-            input.execution_mode(),
+            input.pipeline_behavior(),
+            input.boundedness(),
         ))
     }
 }
diff --git a/datafusion/physical-plan/src/recursive_query.rs 
b/datafusion/physical-plan/src/recursive_query.rs
index 0137e5d52f..0e49a791cb 100644
--- a/datafusion/physical-plan/src/recursive_query.rs
+++ b/datafusion/physical-plan/src/recursive_query.rs
@@ -26,7 +26,8 @@ use super::{
     work_table::{ReservedBatches, WorkTable, WorkTableExec},
     PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
-use crate::{DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan};
+use crate::execution_plan::{Boundedness, EmissionType};
+use crate::{DisplayAs, DisplayFormatType, ExecutionPlan};
 
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
@@ -122,7 +123,8 @@ impl RecursiveQueryExec {
         PlanProperties::new(
             eq_properties,
             Partitioning::UnknownPartitioning(1),
-            ExecutionMode::Bounded,
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 }
diff --git a/datafusion/physical-plan/src/repartition/mod.rs 
b/datafusion/physical-plan/src/repartition/mod.rs
index 0a80dcd34e..963ccc6fd8 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -726,13 +726,11 @@ impl RepartitionExec {
         partitioning: Partitioning,
         preserve_order: bool,
     ) -> PlanProperties {
-        // Equivalence Properties
-        let eq_properties = Self::eq_properties_helper(input, preserve_order);
-
         PlanProperties::new(
-            eq_properties,          // Equivalence Properties
-            partitioning,           // Output Partitioning
-            input.execution_mode(), // Execution Mode
+            Self::eq_properties_helper(input, preserve_order),
+            partitioning,
+            input.pipeline_behavior(),
+            input.boundedness(),
         )
     }
 
diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs 
b/datafusion/physical-plan/src/sorts/partial_sort.rs
index 77636f9e49..f14ba6606e 100644
--- a/datafusion/physical-plan/src/sorts/partial_sort.rs
+++ b/datafusion/physical-plan/src/sorts/partial_sort.rs
@@ -201,10 +201,12 @@ impl PartialSortExec {
         let output_partitioning =
             Self::output_partitioning_helper(input, preserve_partitioning);
 
-        // Determine execution mode:
-        let mode = input.execution_mode();
-
-        PlanProperties::new(eq_properties, output_partitioning, mode)
+        PlanProperties::new(
+            eq_properties,
+            output_partitioning,
+            input.pipeline_behavior(),
+            input.boundedness(),
+        )
     }
 }
 
diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index c2d8b093a9..8d8a5c5f70 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -25,6 +25,7 @@ use std::fmt::{Debug, Formatter};
 use std::sync::Arc;
 
 use crate::common::spawn_buffered;
+use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
 use crate::expressions::PhysicalSortExpr;
 use crate::limit::LimitStream;
 use crate::metrics::{
@@ -37,9 +38,9 @@ use crate::spill::{
 use crate::stream::RecordBatchStreamAdapter;
 use crate::topk::TopK;
 use crate::{
-    DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, 
ExecutionMode,
-    ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties,
-    SendableRecordBatchStream, Statistics,
+    DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, 
ExecutionPlan,
+    ExecutionPlanProperties, Partitioning, PlanProperties, 
SendableRecordBatchStream,
+    Statistics,
 };
 
 use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, 
SortColumn};
@@ -56,7 +57,6 @@ use datafusion_execution::TaskContext;
 use datafusion_physical_expr::LexOrdering;
 use datafusion_physical_expr_common::sort_expr::LexRequirement;
 
-use crate::execution_plan::CardinalityEffect;
 use futures::{StreamExt, TryStreamExt};
 use log::{debug, trace};
 
@@ -763,11 +763,15 @@ impl SortExec {
     /// can be dropped.
     pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
         let mut cache = self.cache.clone();
-        if fetch.is_some() && self.cache.execution_mode == 
ExecutionMode::Unbounded {
-            // When a theoretically unnecessary sort becomes a top-K (which
-            // sometimes arises as an intermediate state before full removal),
-            // its execution mode should become `Bounded`.
-            cache.execution_mode = ExecutionMode::Bounded;
+        // If the SortExec can emit incrementally (that means the sort 
requirements
+        // and properties of the input match), the SortExec can generate its 
result
+        // without scanning the entire input when a fetch value exists.
+        let is_pipeline_friendly = matches!(
+            self.cache.emission_type,
+            EmissionType::Incremental | EmissionType::Both
+        );
+        if fetch.is_some() && is_pipeline_friendly {
+            cache = cache.with_boundedness(Boundedness::Bounded);
         }
         SortExec {
             input: Arc::clone(&self.input),
@@ -817,10 +821,30 @@ impl SortExec {
         let sort_satisfied = input
             .equivalence_properties()
             .ordering_satisfy_requirement(&requirement);
-        let mode = match input.execution_mode() {
-            ExecutionMode::Unbounded if sort_satisfied => 
ExecutionMode::Unbounded,
-            ExecutionMode::Bounded => ExecutionMode::Bounded,
-            _ => ExecutionMode::PipelineBreaking,
+
+        // The emission type depends on whether the input is already sorted:
+        // - If already sorted, we can emit results in the same way as the 
input
+        // - If not sorted, we must wait until all data is processed to emit 
results (Final)
+        let emission_type = if sort_satisfied {
+            input.pipeline_behavior()
+        } else {
+            EmissionType::Final
+        };
+
+        // The boundedness depends on whether the input is already sorted:
+        // - If already sorted, we have the same property as the input
+        // - If not sorted and input is unbounded, we require infinite memory 
and generates
+        //   unbounded data (not practical).
+        // - If not sorted and input is bounded, then the SortExec is bounded, 
too.
+        let boundedness = if sort_satisfied {
+            input.boundedness()
+        } else {
+            match input.boundedness() {
+                Boundedness::Unbounded { .. } => Boundedness::Unbounded {
+                    requires_infinite_memory: true,
+                },
+                bounded => bounded,
+            }
         };
 
         // Calculate equivalence properties; i.e. reset the ordering 
equivalence
@@ -835,7 +859,12 @@ impl SortExec {
         let output_partitioning =
             Self::output_partitioning_helper(input, preserve_partitioning);
 
-        PlanProperties::new(eq_properties, output_partitioning, mode)
+        PlanProperties::new(
+            eq_properties,
+            output_partitioning,
+            emission_type,
+            boundedness,
+        )
     }
 }
 
@@ -1006,6 +1035,7 @@ mod tests {
     use super::*;
     use crate::coalesce_partitions::CoalescePartitionsExec;
     use crate::collect;
+    use crate::execution_plan::Boundedness;
     use crate::expressions::col;
     use crate::memory::MemoryExec;
     use crate::test;
@@ -1049,8 +1079,14 @@ mod tests {
             eq_properties.add_new_orderings(vec![LexOrdering::new(vec![
                 PhysicalSortExpr::new_default(Arc::new(Column::new("c1", 0))),
             ])]);
-            let mode = ExecutionMode::Unbounded;
-            PlanProperties::new(eq_properties, 
Partitioning::UnknownPartitioning(1), mode)
+            PlanProperties::new(
+                eq_properties,
+                Partitioning::UnknownPartitioning(1),
+                EmissionType::Final,
+                Boundedness::Unbounded {
+                    requires_infinite_memory: false,
+                },
+            )
         }
     }
 
diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs 
b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
index 906164f21b..21597fb856 100644
--- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
+++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
@@ -139,7 +139,8 @@ impl SortPreservingMergeExec {
         PlanProperties::new(
             eq_properties,                        // Equivalence Properties
             Partitioning::UnknownPartitioning(1), // Output Partitioning
-            input.execution_mode(),               // Execution Mode
+            input.pipeline_behavior(),            // Pipeline Behavior
+            input.boundedness(),                  // Boundedness
         )
     }
 }
@@ -323,6 +324,7 @@ mod tests {
     use super::*;
     use crate::coalesce_batches::CoalesceBatchesExec;
     use crate::coalesce_partitions::CoalescePartitionsExec;
+    use crate::execution_plan::{Boundedness, EmissionType};
     use crate::expressions::col;
     use crate::memory::MemoryExec;
     use crate::metrics::{MetricValue, Timestamp};
@@ -331,7 +333,7 @@ mod tests {
     use crate::stream::RecordBatchReceiverStream;
     use crate::test::exec::{assert_strong_count_converges_to_zero, 
BlockingExec};
     use crate::test::{self, assert_is_pending, make_partition};
-    use crate::{collect, common, ExecutionMode};
+    use crate::{collect, common};
 
     use arrow::array::{ArrayRef, Int32Array, StringArray, 
TimestampNanosecondArray};
     use arrow::compute::SortOptions;
@@ -1268,8 +1270,14 @@ mod tests {
                 .iter()
                 .map(|expr| PhysicalSortExpr::new_default(Arc::clone(expr)))
                 .collect::<LexOrdering>()]);
-            let mode = ExecutionMode::Unbounded;
-            PlanProperties::new(eq_properties, Partitioning::Hash(columns, 3), 
mode)
+            PlanProperties::new(
+                eq_properties,
+                Partitioning::Hash(columns, 3),
+                EmissionType::Incremental,
+                Boundedness::Unbounded {
+                    requires_infinite_memory: false,
+                },
+            )
         }
     }
 
diff --git a/datafusion/physical-plan/src/streaming.rs 
b/datafusion/physical-plan/src/streaming.rs
index 7ccef32480..da8b0e877d 100644
--- a/datafusion/physical-plan/src/streaming.rs
+++ b/datafusion/physical-plan/src/streaming.rs
@@ -21,8 +21,9 @@ use std::any::Any;
 use std::fmt::Debug;
 use std::sync::Arc;
 
-use super::{DisplayAs, DisplayFormatType, ExecutionMode, PlanProperties};
+use super::{DisplayAs, DisplayFormatType, PlanProperties};
 use crate::display::{display_orderings, ProjectSchemaDisplay};
+use crate::execution_plan::{Boundedness, EmissionType};
 use crate::stream::RecordBatchStreamAdapter;
 use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
 
@@ -145,22 +146,26 @@ impl StreamingTableExec {
         schema: SchemaRef,
         orderings: &[LexOrdering],
         partitions: &[Arc<dyn PartitionStream>],
-        is_infinite: bool,
+        infinite: bool,
     ) -> PlanProperties {
         // Calculate equivalence properties:
         let eq_properties = EquivalenceProperties::new_with_orderings(schema, 
orderings);
 
         // Get output partitioning:
         let output_partitioning = 
Partitioning::UnknownPartitioning(partitions.len());
-
-        // Determine execution mode:
-        let mode = if is_infinite {
-            ExecutionMode::Unbounded
+        let boundedness = if infinite {
+            Boundedness::Unbounded {
+                requires_infinite_memory: false,
+            }
         } else {
-            ExecutionMode::Bounded
+            Boundedness::Bounded
         };
-
-        PlanProperties::new(eq_properties, output_partitioning, mode)
+        PlanProperties::new(
+            eq_properties,
+            output_partitioning,
+            EmissionType::Incremental,
+            boundedness,
+        )
     }
 }
 
diff --git a/datafusion/physical-plan/src/test/exec.rs 
b/datafusion/physical-plan/src/test/exec.rs
index cc0a7cbd9b..b31a53e55e 100644
--- a/datafusion/physical-plan/src/test/exec.rs
+++ b/datafusion/physical-plan/src/test/exec.rs
@@ -24,10 +24,14 @@ use std::{
     task::{Context, Poll},
 };
 
-use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
 use crate::{
-    common, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, 
Partitioning,
-    PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
+    common, execution_plan::Boundedness, DisplayAs, DisplayFormatType, 
ExecutionPlan,
+    Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream,
+    Statistics,
+};
+use crate::{
+    execution_plan::EmissionType,
+    stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter},
 };
 
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -152,12 +156,11 @@ impl MockExec {
 
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
     fn compute_properties(schema: SchemaRef) -> PlanProperties {
-        let eq_properties = EquivalenceProperties::new(schema);
-
         PlanProperties::new(
-            eq_properties,
+            EquivalenceProperties::new(schema),
             Partitioning::UnknownPartitioning(1),
-            ExecutionMode::Bounded,
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 }
@@ -315,11 +318,11 @@ impl BarrierExec {
         schema: SchemaRef,
         data: &[Vec<RecordBatch>],
     ) -> PlanProperties {
-        let eq_properties = EquivalenceProperties::new(schema);
         PlanProperties::new(
-            eq_properties,
+            EquivalenceProperties::new(schema),
             Partitioning::UnknownPartitioning(data.len()),
-            ExecutionMode::Bounded,
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 }
@@ -427,12 +430,11 @@ impl ErrorExec {
 
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
     fn compute_properties(schema: SchemaRef) -> PlanProperties {
-        let eq_properties = EquivalenceProperties::new(schema);
-
         PlanProperties::new(
-            eq_properties,
+            EquivalenceProperties::new(schema),
             Partitioning::UnknownPartitioning(1),
-            ExecutionMode::Bounded,
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 }
@@ -509,12 +511,11 @@ impl StatisticsExec {
 
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
     fn compute_properties(schema: SchemaRef) -> PlanProperties {
-        let eq_properties = EquivalenceProperties::new(schema);
-
         PlanProperties::new(
-            eq_properties,
+            EquivalenceProperties::new(schema),
             Partitioning::UnknownPartitioning(2),
-            ExecutionMode::Bounded,
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 }
@@ -610,12 +611,11 @@ impl BlockingExec {
 
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
     fn compute_properties(schema: SchemaRef, n_partitions: usize) -> 
PlanProperties {
-        let eq_properties = EquivalenceProperties::new(schema);
-
         PlanProperties::new(
-            eq_properties,
+            EquivalenceProperties::new(schema),
             Partitioning::UnknownPartitioning(n_partitions),
-            ExecutionMode::Bounded,
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 }
@@ -752,13 +752,12 @@ impl PanicExec {
         schema: SchemaRef,
         batches_until_panics: &[usize],
     ) -> PlanProperties {
-        let eq_properties = EquivalenceProperties::new(schema);
         let num_partitions = batches_until_panics.len();
-
         PlanProperties::new(
-            eq_properties,
+            EquivalenceProperties::new(schema),
             Partitioning::UnknownPartitioning(num_partitions),
-            ExecutionMode::Bounded,
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 }
diff --git a/datafusion/physical-plan/src/union.rs 
b/datafusion/physical-plan/src/union.rs
index bd36753880..6e768a3d87 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -27,12 +27,12 @@ use std::task::{Context, Poll};
 use std::{any::Any, sync::Arc};
 
 use super::{
-    execution_mode_from_children,
     metrics::{ExecutionPlanMetricsSet, MetricsSet},
     ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan,
     ExecutionPlanProperties, Partitioning, PlanProperties, RecordBatchStream,
     SendableRecordBatchStream, Statistics,
 };
+use crate::execution_plan::{boundedness_from_children, 
emission_type_from_children};
 use crate::metrics::BaselineMetrics;
 use crate::stream::ObservedStream;
 
@@ -135,14 +135,11 @@ impl UnionExec {
             .map(|plan| plan.output_partitioning().partition_count())
             .sum();
         let output_partitioning = 
Partitioning::UnknownPartitioning(num_partitions);
-
-        // Determine execution mode:
-        let mode = execution_mode_from_children(inputs.iter());
-
         Ok(PlanProperties::new(
             eq_properties,
             output_partitioning,
-            mode,
+            emission_type_from_children(inputs),
+            boundedness_from_children(inputs),
         ))
     }
 }
@@ -335,10 +332,12 @@ impl InterleaveExec {
         let eq_properties = EquivalenceProperties::new(schema);
         // Get output partitioning:
         let output_partitioning = inputs[0].output_partitioning().clone();
-        // Determine execution mode:
-        let mode = execution_mode_from_children(inputs.iter());
-
-        PlanProperties::new(eq_properties, output_partitioning, mode)
+        PlanProperties::new(
+            eq_properties,
+            output_partitioning,
+            emission_type_from_children(inputs),
+            boundedness_from_children(inputs),
+        )
     }
 }
 
diff --git a/datafusion/physical-plan/src/unnest.rs 
b/datafusion/physical-plan/src/unnest.rs
index 19b1b46953..19a090ca28 100644
--- a/datafusion/physical-plan/src/unnest.rs
+++ b/datafusion/physical-plan/src/unnest.rs
@@ -100,12 +100,11 @@ impl UnnestExec {
         input: &Arc<dyn ExecutionPlan>,
         schema: SchemaRef,
     ) -> PlanProperties {
-        let eq_properties = EquivalenceProperties::new(schema);
-
         PlanProperties::new(
-            eq_properties,
-            input.output_partitioning().clone(),
-            input.execution_mode(),
+            EquivalenceProperties::new(schema),
+            input.output_partitioning().to_owned(),
+            input.pipeline_behavior(),
+            input.boundedness(),
         )
     }
 
diff --git a/datafusion/physical-plan/src/values.rs 
b/datafusion/physical-plan/src/values.rs
index edadf98cb1..5089b1e626 100644
--- a/datafusion/physical-plan/src/values.rs
+++ b/datafusion/physical-plan/src/values.rs
@@ -20,10 +20,8 @@
 use std::any::Any;
 use std::sync::Arc;
 
-use super::{
-    common, DisplayAs, ExecutionMode, PlanProperties, 
SendableRecordBatchStream,
-    Statistics,
-};
+use super::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, 
Statistics};
+use crate::execution_plan::{Boundedness, EmissionType};
 use crate::{
     memory::MemoryStream, ColumnarValue, DisplayFormatType, ExecutionPlan, 
Partitioning,
     PhysicalExpr,
@@ -133,12 +131,11 @@ impl ValuesExec {
 
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
     fn compute_properties(schema: SchemaRef) -> PlanProperties {
-        let eq_properties = EquivalenceProperties::new(schema);
-
         PlanProperties::new(
-            eq_properties,
+            EquivalenceProperties::new(schema),
             Partitioning::UnknownPartitioning(1),
-            ExecutionMode::Bounded,
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 }
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs 
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index c6003fe0a8..b66147bf74 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -202,9 +202,11 @@ impl BoundedWindowAggExec {
 
         // Construct properties cache
         PlanProperties::new(
-            eq_properties,          // Equivalence Properties
-            output_partitioning,    // Output Partitioning
-            input.execution_mode(), // Execution Mode
+            eq_properties,
+            output_partitioning,
+            // TODO: Emission type and boundedness information can be enhanced 
here
+            input.pipeline_behavior(),
+            input.boundedness(),
         )
     }
 }
diff --git a/datafusion/physical-plan/src/windows/mod.rs 
b/datafusion/physical-plan/src/windows/mod.rs
index 222a8bb71a..36c4b9f18d 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -383,7 +383,7 @@ pub fn get_best_fitting_window(
         } else {
             return Ok(None);
         };
-    let is_unbounded = input.execution_mode().is_unbounded();
+    let is_unbounded = input.boundedness().is_unbounded();
     if !is_unbounded && input_order_mode != InputOrderMode::Sorted {
         // Executor has bounded input and `input_order_mode` is not 
`InputOrderMode::Sorted`
         // in this case removing the sort is not helpful, return:
diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs 
b/datafusion/physical-plan/src/windows/window_agg_exec.rs
index c0ac96d22e..b132c32470 100644
--- a/datafusion/physical-plan/src/windows/window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs
@@ -23,15 +23,16 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use super::utils::create_schema;
+use crate::execution_plan::EmissionType;
 use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
 use crate::windows::{
     calc_requirements, get_ordered_partition_by_indices, 
get_partition_by_sort_exprs,
     window_equivalence_properties,
 };
 use crate::{
-    ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, 
ExecutionMode,
-    ExecutionPlan, ExecutionPlanProperties, PhysicalExpr, PlanProperties,
-    RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr,
+    ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, 
ExecutionPlan,
+    ExecutionPlanProperties, PhysicalExpr, PlanProperties, RecordBatchStream,
+    SendableRecordBatchStream, Statistics, WindowExpr,
 };
 use arrow::array::ArrayRef;
 use arrow::compute::{concat, concat_batches};
@@ -127,16 +128,14 @@ impl WindowAggExec {
         // would be either 1 or more than 1 depending on the presence of 
repartitioning.
         let output_partitioning = input.output_partitioning().clone();
 
-        // Determine execution mode:
-        let mode = match input.execution_mode() {
-            ExecutionMode::Bounded => ExecutionMode::Bounded,
-            ExecutionMode::Unbounded | ExecutionMode::PipelineBreaking => {
-                ExecutionMode::PipelineBreaking
-            }
-        };
-
         // Construct properties cache:
-        PlanProperties::new(eq_properties, output_partitioning, mode)
+        PlanProperties::new(
+            eq_properties,
+            output_partitioning,
+            // TODO: Emission type and boundedness information can be enhanced 
here
+            EmissionType::Final,
+            input.boundedness(),
+        )
     }
 }
 
diff --git a/datafusion/physical-plan/src/work_table.rs 
b/datafusion/physical-plan/src/work_table.rs
index add3863192..b1dd4d9308 100644
--- a/datafusion/physical-plan/src/work_table.rs
+++ b/datafusion/physical-plan/src/work_table.rs
@@ -24,8 +24,9 @@ use super::{
     metrics::{ExecutionPlanMetricsSet, MetricsSet},
     SendableRecordBatchStream, Statistics,
 };
+use crate::execution_plan::{Boundedness, EmissionType};
 use crate::memory::MemoryStream;
-use crate::{DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, 
PlanProperties};
+use crate::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
 
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
@@ -142,12 +143,11 @@ impl WorkTableExec {
 
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
     fn compute_properties(schema: SchemaRef) -> PlanProperties {
-        let eq_properties = EquivalenceProperties::new(schema);
-
         PlanProperties::new(
-            eq_properties,
+            EquivalenceProperties::new(schema),
             Partitioning::UnknownPartitioning(1),
-            ExecutionMode::Bounded,
+            EmissionType::Incremental,
+            Boundedness::Bounded,
         )
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to