This is an automated email from the ASF dual-hosted git repository.
ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new f3b1141d0f Replace `execution_mode` with `emission_type` and
`boundedness` (#13823)
f3b1141d0f is described below
commit f3b1141d0f417e9d9e6c0ada03592c9d9ec60cd4
Author: Jay Zhan <[email protected]>
AuthorDate: Fri Dec 20 17:51:06 2024 +0800
Replace `execution_mode` with `emission_type` and `boundedness` (#13823)
* feat: update execution modes and add bitflags dependency
- Introduced `Incremental` execution mode alongside existing modes in the
DataFusion execution plan.
- Updated various execution plans to utilize the new `Incremental` mode
where applicable, enhancing streaming capabilities.
- Added `bitflags` dependency to `Cargo.toml` for better management of
execution modes.
- Adjusted execution mode handling in multiple files to ensure
compatibility with the new structure.
* add exec API
Signed-off-by: Jay Zhan <[email protected]>
* replace done but has stackoverflow
Signed-off-by: Jay Zhan <[email protected]>
* exec API done
Signed-off-by: Jay Zhan <[email protected]>
* Refactor execution plan properties to remove execution mode
- Removed the `ExecutionMode` parameter from `PlanProperties` across
multiple physical plan implementations.
- Updated related functions to utilize the new structure, ensuring
compatibility with the changes.
- Adjusted comments and cleaned up imports to reflect the removal of
execution mode handling.
This refactor simplifies the execution plan properties and enhances
maintainability.
* Refactor execution plan to remove `ExecutionMode` and introduce
`EmissionType`
- Removed the `ExecutionMode` parameter from `PlanProperties` and related
implementations across multiple files.
- Introduced `EmissionType` to better represent the output characteristics
of execution plans.
- Updated functions and tests to reflect the new structure, ensuring
compatibility and enhancing maintainability.
- Cleaned up imports and adjusted comments accordingly.
This refactor simplifies the execution plan properties and improves the
clarity of memory handling in execution plans.
* fix test
Signed-off-by: Jay Zhan <[email protected]>
* Refactor join handling and emission type logic
- Updated test cases in `sanity_checker.rs` to reflect changes in expected
outcomes for bounded and unbounded joins, ensuring accurate test coverage.
- Simplified the `is_pipeline_breaking` method in `execution_plan.rs` to
clarify the conditions under which a plan is considered pipeline-breaking.
- Enhanced the emission type determination logic in `execution_plan.rs` to
prioritize `Final` over `Both` and `Incremental`, improving clarity in
execution plan behavior.
- Adjusted join type handling in `hash_join.rs` to classify `Right` joins
as `Incremental`, allowing for immediate row emission.
These changes improve the accuracy of tests and the clarity of execution
plan properties.
* Implement emission type for execution plans
- Updated multiple execution plan implementations to replace
`unimplemented!()` with `EmissionType::Incremental`, ensuring that the emission
type is correctly defined for various plans.
- This change enhances the clarity and functionality of the execution plans
by explicitly specifying their emission behavior.
These updates contribute to a more robust execution plan framework within
the DataFusion project.
* Enhance join type documentation and refine emission type logic
- Updated the `JoinType` enum in `join_type.rs` to include detailed
descriptions for each join type, improving clarity on their behavior and
expected results.
- Modified the emission type logic in `hash_join.rs` to ensure that `Right`
and `RightAnti` joins are classified as `Incremental`, allowing for immediate
row emission when applicable.
These changes improve the documentation and functionality of join
operations within the DataFusion project.
* Refactor emission type logic in join and sort execution plans
- Updated the emission type determination in `SortMergeJoinExec` and
`SymmetricHashJoinExec` to utilize the `emission_type_from_children` function,
enhancing the accuracy of emission behavior based on input characteristics.
- Clarified comments in `sort.rs` regarding the conditions under which
results are emitted, emphasizing the relationship between input sorting and
emission type.
- These changes improve the clarity and functionality of the execution
plans within the DataFusion project, ensuring more robust handling of emission
types.
* Refactor emission type handling in execution plans
- Updated the `emission_type_from_children` function to accept an iterator
instead of a slice, enhancing flexibility in how child execution plans are
passed.
- Modified the `SymmetricHashJoinExec` implementation to utilize the new
function signature, improving code clarity and maintainability.
These changes streamline the emission type determination process within the
DataFusion project, contributing to a more robust execution plan framework.
* Enhance execution plan properties with boundedness and emission type
- Introduced `boundedness` and `pipeline_behavior` methods to the
`ExecutionPlanProperties` trait, improving the handling of execution plan
characteristics.
- Updated the `CsvExec`, `SortExec`, and related implementations to utilize
the new methods for determining boundedness and emission behavior.
- Refactored the `ensure_distribution` function to use the new boundedness
logic, enhancing clarity in distribution decisions.
- These changes contribute to a more robust and maintainable execution plan
framework within the DataFusion project.
* Refactor execution plans to enhance boundedness and emission type handling
- Updated multiple execution plan implementations to incorporate
`Boundedness` and `EmissionType`, improving the clarity and functionality of
execution plans.
- Replaced instances of `unimplemented!()` with appropriate emission types,
ensuring that plans correctly define their output behavior.
- Refactored the `PlanProperties` structure to utilize the new boundedness
logic, enhancing decision-making in execution plans.
- These changes contribute to a more robust and maintainable execution plan
framework within the DataFusion project.
* Refactor memory handling in execution plans
- Updated the condition for checking memory requirements in execution plans
from `has_finite_memory()` to `boundedness().requires_finite_memory()`,
improving clarity in memory management.
- This change enhances the robustness of execution plans within the
DataFusion project by ensuring more accurate assessments of memory constraints.
* Refactor boundedness checks in execution plans
- Updated conditions for checking boundedness in various execution plans to
use `is_unbounded()` instead of `requires_finite_memory()`, enhancing clarity
in memory management.
- Adjusted the `PlanProperties` structure to reflect these changes,
ensuring more accurate assessments of memory constraints across the DataFusion
project.
- These modifications contribute to a more robust and maintainable
execution plan framework, improving the handling of boundedness in execution
strategies.
* Remove TODO comment regarding unbounded execution plans in
`UnboundedExec` implementation
- Eliminated the outdated comment suggesting a switch to unbounded
execution with finite memory, streamlining the code and improving clarity.
- This change contributes to a cleaner and more maintainable codebase
within the DataFusion project.
* Refactor execution plan boundedness and emission type handling
- Updated the `is_pipeline_breaking` method to use
`requires_finite_memory()` for improved clarity in determining pipeline
behavior.
- Enhanced the `Boundedness` enum to include detailed documentation on
memory requirements for unbounded streams.
- Refactored `compute_properties` methods in `GlobalLimitExec` and
`LocalLimitExec` to directly use the input's boundedness, simplifying the logic.
- Adjusted emission type determination in `NestedLoopJoinExec` to utilize
the `emission_type_from_children` function, ensuring accurate output behavior
based on input characteristics.
These changes contribute to a more robust and maintainable execution plan
framework within the DataFusion project, improving clarity and functionality in
handling boundedness and emission types.
* Refactor emission type and boundedness handling in execution plans
- Removed the `OptionalEmissionType` struct from `plan_properties.rs`,
simplifying the codebase.
- Updated the `is_pipeline_breaking` function in `execution_plan.rs` for
improved readability by formatting the condition across multiple lines.
- Adjusted the `GlobalLimitExec` implementation in `limit.rs` to directly
use the input's boundedness, enhancing clarity in memory management.
These changes contribute to a more streamlined and maintainable execution
plan framework within the DataFusion project, improving the handling of
emission types and boundedness.
* Refactor GlobalLimitExec and LocalLimitExec to enhance boundedness
handling
- Updated the `compute_properties` methods in both `GlobalLimitExec` and
`LocalLimitExec` to replace `EmissionType::Final` with `Boundedness::Bounded`,
reflecting that limit operations always produce a finite number of rows.
- Changed the input's boundedness reference to `pipeline_behavior()` for
improved clarity in execution plan properties.
These changes contribute to a more streamlined and maintainable execution
plan framework within the DataFusion project, enhancing the handling of
boundedness in limit operations.
* Review Part1
* Update sanity_checker.rs
* addressing reviews
* Review Part 1
* Update datafusion/physical-plan/src/execution_plan.rs
* Update datafusion/physical-plan/src/execution_plan.rs
* Shorten imports
* Enhance documentation for JoinType and Boundedness enums
- Improved descriptions for the Inner and Full join types in join_type.rs
to clarify their behavior and examples.
- Added explanations regarding the boundedness of output streams and memory
requirements in execution_plan.rs, including specific examples for operators
like Median and Min/Max.
---------
Signed-off-by: Jay Zhan <[email protected]>
Co-authored-by: berkaysynnada <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
datafusion-cli/src/exec.rs | 14 +-
datafusion-examples/examples/custom_datasource.rs | 8 +-
datafusion/common/src/join_type.rs | 25 +-
.../src/datasource/physical_plan/arrow_file.rs | 6 +-
.../core/src/datasource/physical_plan/avro.rs | 8 +-
.../core/src/datasource/physical_plan/csv.rs | 8 +-
.../core/src/datasource/physical_plan/json.rs | 8 +-
.../src/datasource/physical_plan/parquet/mod.rs | 11 +-
.../src/physical_optimizer/enforce_distribution.rs | 15 +-
.../core/src/physical_optimizer/enforce_sorting.rs | 2 +-
.../core/src/physical_optimizer/join_selection.rs | 28 ++-
.../replace_with_order_preserving_variants.rs | 6 +-
.../core/src/physical_optimizer/sanity_checker.rs | 19 +-
datafusion/core/src/physical_planner.rs | 11 +-
datafusion/core/src/test/mod.rs | 13 +-
datafusion/core/src/test_util/mod.rs | 23 +-
datafusion/core/tests/custom_sources_cases/mod.rs | 12 +-
.../provider_filter_pushdown.rs | 13 +-
.../core/tests/custom_sources_cases/statistics.rs | 14 +-
.../core/tests/user_defined/insert_operation.rs | 18 +-
.../core/tests/user_defined/user_defined_plan.rs | 19 +-
datafusion/ffi/src/execution_plan.rs | 11 +-
datafusion/ffi/src/plan_properties.rs | 107 ++++++---
.../physical-optimizer/src/output_requirements.rs | 3 +-
datafusion/physical-plan/src/aggregates/mod.rs | 36 +--
datafusion/physical-plan/src/analyze.rs | 10 +-
datafusion/physical-plan/src/coalesce_batches.rs | 3 +-
.../physical-plan/src/coalesce_partitions.rs | 3 +-
datafusion/physical-plan/src/empty.rs | 20 +-
datafusion/physical-plan/src/execution_plan.rs | 253 ++++++++++++++-------
datafusion/physical-plan/src/explain.rs | 9 +-
datafusion/physical-plan/src/filter.rs | 4 +-
datafusion/physical-plan/src/insert.rs | 8 +-
datafusion/physical-plan/src/joins/cross_join.rs | 20 +-
datafusion/physical-plan/src/joins/hash_join.rs | 48 ++--
.../physical-plan/src/joins/nested_loop_join.rs | 37 ++-
.../physical-plan/src/joins/sort_merge_join.rs | 21 +-
.../physical-plan/src/joins/symmetric_hash_join.rs | 12 +-
datafusion/physical-plan/src/lib.rs | 3 +-
datafusion/physical-plan/src/limit.rs | 12 +-
datafusion/physical-plan/src/memory.rs | 16 +-
datafusion/physical-plan/src/placeholder_row.rs | 17 +-
datafusion/physical-plan/src/projection.rs | 3 +-
datafusion/physical-plan/src/recursive_query.rs | 6 +-
datafusion/physical-plan/src/repartition/mod.rs | 10 +-
datafusion/physical-plan/src/sorts/partial_sort.rs | 10 +-
datafusion/physical-plan/src/sorts/sort.rs | 68 ++++--
.../src/sorts/sort_preserving_merge.rs | 16 +-
datafusion/physical-plan/src/streaming.rs | 23 +-
datafusion/physical-plan/src/test/exec.rs | 51 ++---
datafusion/physical-plan/src/union.rs | 19 +-
datafusion/physical-plan/src/unnest.rs | 9 +-
datafusion/physical-plan/src/values.rs | 13 +-
.../src/windows/bounded_window_agg_exec.rs | 8 +-
datafusion/physical-plan/src/windows/mod.rs | 2 +-
.../physical-plan/src/windows/window_agg_exec.rs | 23 +-
datafusion/physical-plan/src/work_table.rs | 10 +-
57 files changed, 748 insertions(+), 457 deletions(-)
diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
index 1890653669..a4f154b2de 100644
--- a/datafusion-cli/src/exec.rs
+++ b/datafusion-cli/src/exec.rs
@@ -33,11 +33,12 @@ use crate::{
};
use datafusion::common::instant::Instant;
-use datafusion::common::plan_datafusion_err;
+use datafusion::common::{plan_datafusion_err, plan_err};
use datafusion::config::ConfigFileType;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{DdlStatement, LogicalPlan};
+use datafusion::physical_plan::execution_plan::EmissionType;
use datafusion::physical_plan::{collect, execute_stream,
ExecutionPlanProperties};
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;
@@ -234,10 +235,19 @@ pub(super) async fn exec_and_print(
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;
- if physical_plan.execution_mode().is_unbounded() {
+ if physical_plan.boundedness().is_unbounded() {
+ if physical_plan.pipeline_behavior() == EmissionType::Final {
+ return plan_err!(
+ "The given query can generate a valid result only once \
+ the source finishes, but the source is unbounded"
+ );
+ }
+ // As the input stream comes, we can generate results.
+ // However, memory safety is not guaranteed.
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
} else {
+ // Bounded stream; collected results are printed after all input
consumed.
let schema = physical_plan.schema();
let results = collect(physical_plan, task_ctx.clone()).await?;
adjusted.into_inner().print_batches(schema, &results, now)?;
diff --git a/datafusion-examples/examples/custom_datasource.rs
b/datafusion-examples/examples/custom_datasource.rs
index 90e9d2c7a6..bc865fac5a 100644
--- a/datafusion-examples/examples/custom_datasource.rs
+++ b/datafusion-examples/examples/custom_datasource.rs
@@ -30,10 +30,11 @@ use datafusion::error::Result;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::LogicalPlanBuilder;
use datafusion::physical_expr::EquivalenceProperties;
+use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
- project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
- Partitioning, PlanProperties, SendableRecordBatchStream,
+ project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
+ PlanProperties, SendableRecordBatchStream,
};
use datafusion::prelude::*;
@@ -214,7 +215,8 @@ impl CustomExec {
PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
- ExecutionMode::Bounded,
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
}
diff --git a/datafusion/common/src/join_type.rs
b/datafusion/common/src/join_type.rs
index e98f34199b..bdca253c5f 100644
--- a/datafusion/common/src/join_type.rs
+++ b/datafusion/common/src/join_type.rs
@@ -28,21 +28,30 @@ use crate::{DataFusionError, Result};
/// Join type
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
pub enum JoinType {
- /// Inner Join
+ /// Inner Join - Returns only rows where there is a matching value in both
tables based on the join condition.
+ /// For example, if joining table A and B on A.id = B.id, only rows where
A.id equals B.id will be included.
+ /// All columns from both tables are returned for the matching rows.
Non-matching rows are excluded entirely.
Inner,
- /// Left Join
+ /// Left Join - Returns all rows from the left table and matching rows
from the right table.
+ /// If no match, NULL values are returned for columns from the right table.
Left,
- /// Right Join
+ /// Right Join - Returns all rows from the right table and matching rows
from the left table.
+ /// If no match, NULL values are returned for columns from the left table.
Right,
- /// Full Join
+ /// Full Join (also called Full Outer Join) - Returns all rows from both
tables, matching rows where possible.
+ /// When a row from either table has no match in the other table, the
missing columns are filled with NULL values.
+ /// For example, if table A has row X with no match in table B, the result
will contain row X with NULL values for all of table B's columns.
+ /// This join type preserves all records from both tables, making it
useful when you need to see all data regardless of matches.
Full,
- /// Left Semi Join
+ /// Left Semi Join - Returns rows from the left table that have matching
rows in the right table.
+ /// Only columns from the left table are returned.
LeftSemi,
- /// Right Semi Join
+ /// Right Semi Join - Returns rows from the right table that have matching
rows in the left table.
+ /// Only columns from the right table are returned.
RightSemi,
- /// Left Anti Join
+ /// Left Anti Join - Returns rows from the left table that do not have a
matching row in the right table.
LeftAnti,
- /// Right Anti Join
+ /// Right Anti Join - Returns rows from the right table that do not have a
matching row in the left table.
RightAnti,
/// Left Mark join
///
diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs
b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
index 8df5ef82cd..4e76b087ab 100644
--- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs
+++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
@@ -38,7 +38,8 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::Statistics;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
-use datafusion_physical_plan::{ExecutionMode, PlanProperties};
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
+use datafusion_physical_plan::PlanProperties;
use futures::StreamExt;
use itertools::Itertools;
@@ -97,7 +98,8 @@ impl ArrowExec {
PlanProperties::new(
eq_properties,
Self::output_partitioning_helper(file_scan_config), // Output
Partitioning
- ExecutionMode::Bounded, // Execution
Mode
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs
b/datafusion/core/src/datasource/physical_plan/avro.rs
index def6504189..fb36179c3c 100644
--- a/datafusion/core/src/datasource/physical_plan/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -24,13 +24,14 @@ use super::FileScanConfig;
use crate::error::Result;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
- DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
- PlanProperties, SendableRecordBatchStream, Statistics,
+ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
+ SendableRecordBatchStream, Statistics,
};
use arrow::datatypes::SchemaRef;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
/// Execution plan for scanning Avro data source
#[derive(Debug, Clone)]
@@ -81,7 +82,8 @@ impl AvroExec {
PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(n_partitions), // Output
Partitioning
- ExecutionMode::Bounded, // Execution Mode
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
}
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs
b/datafusion/core/src/datasource/physical_plan/csv.rs
index c54c663dca..a00e74cf4f 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -33,8 +33,8 @@ use crate::datasource::physical_plan::FileMeta;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
- DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
ExecutionPlanProperties,
- Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
+ DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
Partitioning,
+ PlanProperties, SendableRecordBatchStream, Statistics,
};
use arrow::csv;
@@ -43,6 +43,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use futures::{StreamExt, TryStreamExt};
use object_store::buffered::BufWriter;
use object_store::{GetOptions, GetResultPayload, ObjectStore};
@@ -327,7 +328,8 @@ impl CsvExec {
PlanProperties::new(
eq_properties,
Self::output_partitioning_helper(file_scan_config), // Output
Partitioning
- ExecutionMode::Bounded, // Execution
Mode
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs
b/datafusion/core/src/datasource/physical_plan/json.rs
index 5c70968fbb..879c9817a3 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -33,14 +33,15 @@ use crate::datasource::physical_plan::FileMeta;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
- DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
ExecutionPlanProperties,
- Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
+ DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
Partitioning,
+ PlanProperties, SendableRecordBatchStream, Statistics,
};
use arrow::json::ReaderBuilder;
use arrow::{datatypes::SchemaRef, json};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use futures::{StreamExt, TryStreamExt};
use object_store::buffered::BufWriter;
@@ -107,7 +108,8 @@ impl NdJsonExec {
PlanProperties::new(
eq_properties,
Self::output_partitioning_helper(file_scan_config), // Output
Partitioning
- ExecutionMode::Bounded, // Execution
Mode
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 446d5f5311..cb79055ce3 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -34,13 +34,14 @@ use crate::{
physical_optimizer::pruning::PruningPredicate,
physical_plan::{
metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
- DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
PlanProperties,
+ DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
SendableRecordBatchStream, Statistics,
},
};
use arrow::datatypes::SchemaRef;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering,
PhysicalExpr};
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use itertools::Itertools;
use log::debug;
@@ -654,13 +655,11 @@ impl ParquetExec {
orderings: &[LexOrdering],
file_config: &FileScanConfig,
) -> PlanProperties {
- // Equivalence Properties
- let eq_properties = EquivalenceProperties::new_with_orderings(schema,
orderings);
-
PlanProperties::new(
- eq_properties,
+ EquivalenceProperties::new_with_orderings(schema, orderings),
Self::output_partitioning_helper(file_config), // Output
Partitioning
- ExecutionMode::Bounded, // Execution Mode
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 27323eaedc..76c4d668d7 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -52,12 +52,13 @@ use
datafusion_physical_expr::utils::map_columns_before_projection;
use datafusion_physical_expr::{
physical_exprs_equal, EquivalenceProperties, PhysicalExpr, PhysicalExprRef,
};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::execution_plan::EmissionType;
use datafusion_physical_plan::windows::{get_best_fitting_window,
BoundedWindowAggExec};
use datafusion_physical_plan::ExecutionPlanProperties;
-use datafusion_physical_expr_common::sort_expr::LexOrdering;
use itertools::izip;
/// The `EnforceDistribution` rule ensures that distribution requirements are
@@ -1161,12 +1162,17 @@ fn ensure_distribution(
let should_use_estimates = config
.execution
.use_row_number_estimates_to_optimize_partitioning;
- let is_unbounded = dist_context.plan.execution_mode().is_unbounded();
+ let unbounded_and_pipeline_friendly =
dist_context.plan.boundedness().is_unbounded()
+ && matches!(
+ dist_context.plan.pipeline_behavior(),
+ EmissionType::Incremental | EmissionType::Both
+ );
// Use order preserving variants either of the conditions true
// - it is desired according to config
// - when plan is unbounded
+ // - when it is pipeline friendly (can incrementally produce results)
let order_preserving_variants_desirable =
- is_unbounded || config.optimizer.prefer_existing_sort;
+ unbounded_and_pipeline_friendly ||
config.optimizer.prefer_existing_sort;
// Remove unnecessary repartition from the physical plan if any
let DistributionContext {
@@ -1459,7 +1465,8 @@ pub(crate) mod tests {
PlanProperties::new(
input.equivalence_properties().clone(), // Equivalence
Properties
input.output_partitioning().clone(), // Output Partitioning
- input.execution_mode(), // Execution Mode
+ input.pipeline_behavior(), // Pipeline Behavior
+ input.boundedness(), // Boundedness
)
}
}
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index cfc08562f7..85fe9ecfcd 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -214,7 +214,7 @@ fn replace_with_partial_sort(
let plan_any = plan.as_any();
if let Some(sort_plan) = plan_any.downcast_ref::<SortExec>() {
let child = Arc::clone(sort_plan.children()[0]);
- if !child.execution_mode().is_unbounded() {
+ if !child.boundedness().is_unbounded() {
return Ok(plan);
}
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs
b/datafusion/core/src/physical_optimizer/join_selection.rs
index 9d65c6ded4..009757f3a9 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -43,6 +43,7 @@ use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::execution_plan::EmissionType;
/// The [`JoinSelection`] rule tries to modify a given plan so that it can
/// accommodate infinite sources and optimize joins in the plan according to
@@ -516,7 +517,8 @@ fn statistical_join_selection_subrule(
pub type PipelineFixerSubrule =
dyn Fn(Arc<dyn ExecutionPlan>, &ConfigOptions) -> Result<Arc<dyn
ExecutionPlan>>;
-/// Converts a hash join to a symmetric hash join in the case of infinite
inputs on both sides.
+/// Converts a hash join to a symmetric hash join if both its inputs are
+/// unbounded and incremental.
///
/// This subrule checks if a hash join can be replaced with a symmetric hash
join when dealing
/// with unbounded (infinite) inputs on both sides. This replacement avoids
pipeline breaking and
@@ -537,10 +539,18 @@ fn hash_join_convert_symmetric_subrule(
) -> Result<Arc<dyn ExecutionPlan>> {
// Check if the current plan node is a HashJoinExec.
if let Some(hash_join) = input.as_any().downcast_ref::<HashJoinExec>() {
- let left_unbounded = hash_join.left.execution_mode().is_unbounded();
- let right_unbounded = hash_join.right.execution_mode().is_unbounded();
- // Process only if both left and right sides are unbounded.
- if left_unbounded && right_unbounded {
+ let left_unbounded = hash_join.left.boundedness().is_unbounded();
+ let left_incremental = matches!(
+ hash_join.left.pipeline_behavior(),
+ EmissionType::Incremental | EmissionType::Both
+ );
+ let right_unbounded = hash_join.right.boundedness().is_unbounded();
+ let right_incremental = matches!(
+ hash_join.right.pipeline_behavior(),
+ EmissionType::Incremental | EmissionType::Both
+ );
+ // Process only if both left and right sides are unbounded and
incrementally emit.
+ if left_unbounded && right_unbounded & left_incremental &
right_incremental {
// Determine the partition mode based on configuration.
let mode = if config_options.optimizer.repartition_joins {
StreamJoinPartitionMode::Partitioned
@@ -669,8 +679,8 @@ fn hash_join_swap_subrule(
_config_options: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
if let Some(hash_join) = input.as_any().downcast_ref::<HashJoinExec>() {
- if hash_join.left.execution_mode().is_unbounded()
- && !hash_join.right.execution_mode().is_unbounded()
+ if hash_join.left.boundedness().is_unbounded()
+ && !hash_join.right.boundedness().is_unbounded()
&& matches!(
*hash_join.join_type(),
JoinType::Inner
@@ -2025,12 +2035,12 @@ mod hash_join_tests {
assert_eq!(
(
t.case.as_str(),
- if left.execution_mode().is_unbounded() {
+ if left.boundedness().is_unbounded() {
SourceType::Unbounded
} else {
SourceType::Bounded
},
- if right.execution_mode().is_unbounded() {
+ if right.boundedness().is_unbounded() {
SourceType::Unbounded
} else {
SourceType::Bounded
diff --git
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 2f6b7a51ee..96b2454fa3 100644
---
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -29,11 +29,12 @@ use
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::Transformed;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use datafusion_physical_plan::execution_plan::EmissionType;
use datafusion_physical_plan::tree_node::PlanContext;
use datafusion_physical_plan::ExecutionPlanProperties;
-use datafusion_physical_expr_common::sort_expr::LexOrdering;
use itertools::izip;
/// For a given `plan`, this object carries the information one needs from its
@@ -246,7 +247,8 @@ pub(crate) fn replace_with_order_preserving_variants(
// For unbounded cases, we replace with the order-preserving variant in any
// case, as doing so helps fix the pipeline. Also replace if config allows.
let use_order_preserving_variant = config.optimizer.prefer_existing_sort
- || !requirements.plan.execution_mode().pipeline_friendly();
+ || (requirements.plan.boundedness().is_unbounded()
+ && requirements.plan.pipeline_behavior() == EmissionType::Final);
// Create an alternate plan with order-preserving variants:
let mut alternate_plan = plan_with_order_preserving_variants(
diff --git a/datafusion/core/src/physical_optimizer/sanity_checker.rs
b/datafusion/core/src/physical_optimizer/sanity_checker.rs
index 99bd1cab3e..b6d22320d0 100644
--- a/datafusion/core/src/physical_optimizer/sanity_checker.rs
+++ b/datafusion/core/src/physical_optimizer/sanity_checker.rs
@@ -30,6 +30,7 @@ use datafusion_common::config::{ConfigOptions,
OptimizerOptions};
use datafusion_common::plan_err;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::intervals::utils::{check_support,
is_datatype_supported};
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::joins::SymmetricHashJoinExec;
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};
@@ -85,7 +86,15 @@ pub fn check_finiteness_requirements(
the 'allow_symmetric_joins_without_pruning'
configuration flag");
}
}
- if !input.execution_mode().pipeline_friendly() {
+
+ if matches!(
+ input.boundedness(),
+ Boundedness::Unbounded {
+ requires_infinite_memory: true
+ }
+ ) || (input.boundedness().is_unbounded()
+ && input.pipeline_behavior() == EmissionType::Final)
+ {
plan_err!(
"Cannot execute pipeline breaking queries, operator: {:?}",
input
@@ -215,7 +224,9 @@ mod tests {
let test2 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Unbounded),
- expect_fail: true,
+ // Left join for bounded build side and unbounded probe side can
generate
+ // both incremental matched rows and final non-matched rows.
+ expect_fail: false,
};
let test3 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Bounded),
@@ -290,7 +301,9 @@ mod tests {
};
let test2 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Unbounded),
- expect_fail: true,
+ // Full join for bounded build side and unbounded probe side can
generate
+ // both incremental matched rows and final non-matched rows.
+ expect_fail: false,
};
let test3 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Bounded),
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 44537c951f..47b31d2f4e 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -2017,7 +2017,7 @@ mod tests {
use crate::datasource::file_format::options::CsvReadOptions;
use crate::datasource::MemTable;
use crate::physical_plan::{
- expressions, DisplayAs, DisplayFormatType, ExecutionMode,
PlanProperties,
+ expressions, DisplayAs, DisplayFormatType, PlanProperties,
SendableRecordBatchStream,
};
use crate::prelude::{SessionConfig, SessionContext};
@@ -2032,6 +2032,7 @@ mod tests {
use datafusion_expr::{col, lit, LogicalPlanBuilder,
UserDefinedLogicalNodeCore};
use datafusion_functions_aggregate::expr_fn::sum;
use datafusion_physical_expr::EquivalenceProperties;
+ use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
fn make_session_state() -> SessionState {
let runtime = Arc::new(RuntimeEnv::default());
@@ -2619,13 +2620,11 @@ mod tests {
/// This function creates the cache object that stores the plan
properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
- let eq_properties = EquivalenceProperties::new(schema);
PlanProperties::new(
- eq_properties,
- // Output Partitioning
+ EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
- // Execution Mode
- ExecutionMode::Bounded,
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
}
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index d8304c2f0a..e5ce28e738 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -45,10 +45,9 @@ use arrow::record_batch::RecordBatch;
use datafusion_common::{DataFusionError, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning,
PhysicalSortExpr};
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
-use datafusion_physical_plan::{
- DisplayAs, DisplayFormatType, ExecutionMode, PlanProperties,
-};
+use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PlanProperties};
#[cfg(feature = "compression")]
use bzip2::write::BzEncoder;
@@ -386,13 +385,11 @@ impl StatisticsExec {
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
- let eq_properties = EquivalenceProperties::new(schema);
PlanProperties::new(
- eq_properties,
- // Output Partitioning
+ EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(2),
- // Execution Mode
- ExecutionMode::Bounded,
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
}
diff --git a/datafusion/core/src/test_util/mod.rs
b/datafusion/core/src/test_util/mod.rs
index 09608887c0..aa134f28fe 100644
--- a/datafusion/core/src/test_util/mod.rs
+++ b/datafusion/core/src/test_util/mod.rs
@@ -39,22 +39,23 @@ use crate::error::Result;
use crate::execution::context::TaskContext;
use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE};
use crate::physical_plan::{
- DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
- PlanProperties, RecordBatchStream, SendableRecordBatchStream,
+ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
+ RecordBatchStream, SendableRecordBatchStream,
};
use crate::prelude::{CsvReadOptions, SessionContext};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
+use datafusion_catalog::Session;
use datafusion_common::TableReference;
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
use datafusion_functions_aggregate::count::count_udaf;
+use datafusion_physical_expr::aggregate::{AggregateExprBuilder,
AggregateFunctionExpr};
use datafusion_physical_expr::{expressions, EquivalenceProperties,
PhysicalExpr};
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use async_trait::async_trait;
-use datafusion_catalog::Session;
-use datafusion_physical_expr::aggregate::{AggregateExprBuilder,
AggregateFunctionExpr};
use futures::Stream;
use tempfile::TempDir;
// backwards compatibility
@@ -259,16 +260,18 @@ impl UnboundedExec {
batch_produce: Option<usize>,
n_partitions: usize,
) -> PlanProperties {
- let eq_properties = EquivalenceProperties::new(schema);
- let mode = if batch_produce.is_none() {
- ExecutionMode::Unbounded
+ let boundedness = if batch_produce.is_none() {
+ Boundedness::Unbounded {
+ requires_infinite_memory: false,
+ }
} else {
- ExecutionMode::Bounded
+ Boundedness::Bounded
};
PlanProperties::new(
- eq_properties,
+ EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(n_partitions),
- mode,
+ EmissionType::Incremental,
+ boundedness,
)
}
}
diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs
b/datafusion/core/tests/custom_sources_cases/mod.rs
index e1bd14105e..aafefac04e 100644
--- a/datafusion/core/tests/custom_sources_cases/mod.rs
+++ b/datafusion/core/tests/custom_sources_cases/mod.rs
@@ -35,15 +35,16 @@ use datafusion::physical_plan::{
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use datafusion::scalar::ScalarValue;
+use datafusion_catalog::Session;
use datafusion_common::cast::as_primitive_array;
use datafusion_common::project_schema;
use datafusion_common::stats::Precision;
use datafusion_physical_expr::EquivalenceProperties;
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
-use datafusion_physical_plan::{ExecutionMode, PlanProperties};
+use datafusion_physical_plan::PlanProperties;
use async_trait::async_trait;
-use datafusion_catalog::Session;
use futures::stream::Stream;
mod provider_filter_pushdown;
@@ -91,12 +92,11 @@ impl CustomExecutionPlan {
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
- let eq_properties = EquivalenceProperties::new(schema);
PlanProperties::new(
- eq_properties,
- // Output Partitioning
+ EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
- ExecutionMode::Bounded,
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
}
diff --git
a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
index 09f7265d63..af0506a505 100644
--- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
+++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
@@ -28,19 +28,20 @@ use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::TableProviderFilterPushDown;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
- DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
- PlanProperties, SendableRecordBatchStream, Statistics,
+ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
+ SendableRecordBatchStream, Statistics,
};
use datafusion::prelude::*;
use datafusion::scalar::ScalarValue;
+use datafusion_catalog::Session;
use datafusion_common::cast::as_primitive_array;
use datafusion_common::{internal_err, not_impl_err};
use datafusion_expr::expr::{BinaryExpr, Cast};
use datafusion_functions_aggregate::expr_fn::count;
use datafusion_physical_expr::EquivalenceProperties;
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use async_trait::async_trait;
-use datafusion_catalog::Session;
fn create_batch(value: i32, num_rows: usize) -> Result<RecordBatch> {
let mut builder = Int32Builder::with_capacity(num_rows);
@@ -72,11 +73,11 @@ impl CustomPlan {
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
- let eq_properties = EquivalenceProperties::new(schema);
PlanProperties::new(
- eq_properties,
+ EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
- ExecutionMode::Bounded,
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
}
diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs
b/datafusion/core/tests/custom_sources_cases/statistics.rs
index 41d182a376..9d3bd594a9 100644
--- a/datafusion/core/tests/custom_sources_cases/statistics.rs
+++ b/datafusion/core/tests/custom_sources_cases/statistics.rs
@@ -26,17 +26,18 @@ use datafusion::{
error::Result,
logical_expr::Expr,
physical_plan::{
- ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionMode,
ExecutionPlan,
- Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
+ ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan,
Partitioning,
+ PlanProperties, SendableRecordBatchStream, Statistics,
},
prelude::SessionContext,
scalar::ScalarValue,
};
+use datafusion_catalog::Session;
use datafusion_common::{project_schema, stats::Precision};
use datafusion_physical_expr::EquivalenceProperties;
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use async_trait::async_trait;
-use datafusion_catalog::Session;
/// This is a testing structure for statistics
/// It will act both as a table provider and execution plan
@@ -64,12 +65,11 @@ impl StatisticsValidation {
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
- let eq_properties = EquivalenceProperties::new(schema);
-
PlanProperties::new(
- eq_properties,
+ EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(2),
- ExecutionMode::Bounded,
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
}
diff --git a/datafusion/core/tests/user_defined/insert_operation.rs
b/datafusion/core/tests/user_defined/insert_operation.rs
index ff14fa0be3..aa531632c6 100644
--- a/datafusion/core/tests/user_defined/insert_operation.rs
+++ b/datafusion/core/tests/user_defined/insert_operation.rs
@@ -26,7 +26,10 @@ use datafusion::{
use datafusion_catalog::{Session, TableProvider};
use datafusion_expr::{dml::InsertOp, Expr, TableType};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
-use datafusion_physical_plan::{DisplayAs, ExecutionMode, ExecutionPlan,
PlanProperties};
+use datafusion_physical_plan::{
+ execution_plan::{Boundedness, EmissionType},
+ DisplayAs, ExecutionPlan, PlanProperties,
+};
#[tokio::test]
async fn insert_operation_is_passed_correctly_to_table_provider() {
@@ -122,15 +125,14 @@ struct TestInsertExec {
impl TestInsertExec {
fn new(op: InsertOp) -> Self {
- let eq_properties = EquivalenceProperties::new(make_count_schema());
- let plan_properties = PlanProperties::new(
- eq_properties,
- Partitioning::UnknownPartitioning(1),
- ExecutionMode::Bounded,
- );
Self {
op,
- plan_properties,
+ plan_properties: PlanProperties::new(
+ EquivalenceProperties::new(make_count_schema()),
+ Partitioning::UnknownPartitioning(1),
+ EmissionType::Incremental,
+ Boundedness::Bounded,
+ ),
}
}
}
diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs
b/datafusion/core/tests/user_defined/user_defined_plan.rs
index 520a91aeb4..77753290c3 100644
--- a/datafusion/core/tests/user_defined/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined/user_defined_plan.rs
@@ -68,9 +68,6 @@ use arrow::{
record_batch::RecordBatch,
util::pretty::pretty_format_batches,
};
-use async_trait::async_trait;
-use futures::{Stream, StreamExt};
-
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::{
common::cast::{as_int64_array, as_string_array},
@@ -87,9 +84,8 @@ use datafusion::{
optimizer::{OptimizerConfig, OptimizerRule},
physical_expr::EquivalenceProperties,
physical_plan::{
- DisplayAs, DisplayFormatType, Distribution, ExecutionMode,
ExecutionPlan,
- Partitioning, PlanProperties, RecordBatchStream,
SendableRecordBatchStream,
- Statistics,
+ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
Partitioning,
+ PlanProperties, RecordBatchStream, SendableRecordBatchStream,
Statistics,
},
physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner,
PhysicalPlanner},
prelude::{SessionConfig, SessionContext},
@@ -100,6 +96,10 @@ use datafusion_common::ScalarValue;
use datafusion_expr::{FetchType, Projection, SortExpr};
use datafusion_optimizer::optimizer::ApplyOrder;
use datafusion_optimizer::AnalyzerRule;
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
+
+use async_trait::async_trait;
+use futures::{Stream, StreamExt};
/// Execute the specified sql and return the resulting record batches
/// pretty printed as a String.
@@ -495,12 +495,11 @@ impl TopKExec {
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
- let eq_properties = EquivalenceProperties::new(schema);
-
PlanProperties::new(
- eq_properties,
+ EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
- ExecutionMode::Bounded,
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
}
diff --git a/datafusion/ffi/src/execution_plan.rs
b/datafusion/ffi/src/execution_plan.rs
index d10eda8990..5ab321cc01 100644
--- a/datafusion/ffi/src/execution_plan.rs
+++ b/datafusion/ffi/src/execution_plan.rs
@@ -272,7 +272,13 @@ impl ExecutionPlan for ForeignExecutionPlan {
#[cfg(test)]
mod tests {
- use datafusion::{physical_plan::Partitioning, prelude::SessionContext};
+ use datafusion::{
+ physical_plan::{
+ execution_plan::{Boundedness, EmissionType},
+ Partitioning,
+ },
+ prelude::SessionContext,
+ };
use super::*;
@@ -287,7 +293,8 @@ mod tests {
props: PlanProperties::new(
datafusion::physical_expr::EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(3),
- datafusion::physical_plan::ExecutionMode::Unbounded,
+ EmissionType::Incremental,
+ Boundedness::Bounded,
),
}
}
diff --git a/datafusion/ffi/src/plan_properties.rs
b/datafusion/ffi/src/plan_properties.rs
index 722681ae4a..3c7bc886ae 100644
--- a/datafusion/ffi/src/plan_properties.rs
+++ b/datafusion/ffi/src/plan_properties.rs
@@ -28,7 +28,10 @@ use arrow::datatypes::SchemaRef;
use datafusion::{
error::{DataFusionError, Result},
physical_expr::EquivalenceProperties,
- physical_plan::{ExecutionMode, PlanProperties},
+ physical_plan::{
+ execution_plan::{Boundedness, EmissionType},
+ PlanProperties,
+ },
prelude::SessionContext,
};
use datafusion_proto::{
@@ -53,8 +56,11 @@ pub struct FFI_PlanProperties {
pub output_partitioning:
unsafe extern "C" fn(plan: &Self) -> RResult<RVec<u8>, RStr<'static>>,
- /// Return the execution mode of the plan.
- pub execution_mode: unsafe extern "C" fn(plan: &Self) -> FFI_ExecutionMode,
+ /// Return the emission type of the plan.
+ pub emission_type: unsafe extern "C" fn(plan: &Self) -> FFI_EmissionType,
+
+ /// Indicate boundedness of the plan and its memory requirements.
+ pub boundedness: unsafe extern "C" fn(plan: &Self) -> FFI_Boundedness,
/// The output ordering is a [`PhysicalSortExprNodeCollection`] protobuf
message
/// serialized into bytes to pass across the FFI boundary.
@@ -98,12 +104,20 @@ unsafe extern "C" fn output_partitioning_fn_wrapper(
ROk(output_partitioning.into())
}
-unsafe extern "C" fn execution_mode_fn_wrapper(
+unsafe extern "C" fn emission_type_fn_wrapper(
properties: &FFI_PlanProperties,
-) -> FFI_ExecutionMode {
+) -> FFI_EmissionType {
let private_data = properties.private_data as *const
PlanPropertiesPrivateData;
let props = &(*private_data).props;
- props.execution_mode().into()
+ props.emission_type.into()
+}
+
+unsafe extern "C" fn boundedness_fn_wrapper(
+ properties: &FFI_PlanProperties,
+) -> FFI_Boundedness {
+ let private_data = properties.private_data as *const
PlanPropertiesPrivateData;
+ let props = &(*private_data).props;
+ props.boundedness.into()
}
unsafe extern "C" fn output_ordering_fn_wrapper(
@@ -164,7 +178,8 @@ impl From<&PlanProperties> for FFI_PlanProperties {
FFI_PlanProperties {
output_partitioning: output_partitioning_fn_wrapper,
- execution_mode: execution_mode_fn_wrapper,
+ emission_type: emission_type_fn_wrapper,
+ boundedness: boundedness_fn_wrapper,
output_ordering: output_ordering_fn_wrapper,
schema: schema_fn_wrapper,
release: release_fn_wrapper,
@@ -220,9 +235,6 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
RErr(e) => Err(DataFusionError::Plan(e.to_string())),
}?;
- let execution_mode: ExecutionMode =
- unsafe { (ffi_props.execution_mode)(&ffi_props).into() };
-
let eq_properties = match orderings {
Some(ordering) => {
EquivalenceProperties::new_with_orderings(Arc::new(schema),
&[ordering])
@@ -230,40 +242,82 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
None => EquivalenceProperties::new(Arc::new(schema)),
};
+ let emission_type: EmissionType =
+ unsafe { (ffi_props.emission_type)(&ffi_props).into() };
+
+ let boundedness: Boundedness =
+ unsafe { (ffi_props.boundedness)(&ffi_props).into() };
+
Ok(PlanProperties::new(
eq_properties,
partitioning,
- execution_mode,
+ emission_type,
+ boundedness,
))
}
}
-/// FFI safe version of [`ExecutionMode`].
+/// FFI safe version of [`Boundedness`].
#[repr(C)]
#[allow(non_camel_case_types)]
#[derive(Clone, StableAbi)]
-pub enum FFI_ExecutionMode {
+pub enum FFI_Boundedness {
Bounded,
- Unbounded,
- PipelineBreaking,
+ Unbounded { requires_infinite_memory: bool },
+}
+
+impl From<Boundedness> for FFI_Boundedness {
+ fn from(value: Boundedness) -> Self {
+ match value {
+ Boundedness::Bounded => FFI_Boundedness::Bounded,
+ Boundedness::Unbounded {
+ requires_infinite_memory,
+ } => FFI_Boundedness::Unbounded {
+ requires_infinite_memory,
+ },
+ }
+ }
+}
+
+impl From<FFI_Boundedness> for Boundedness {
+ fn from(value: FFI_Boundedness) -> Self {
+ match value {
+ FFI_Boundedness::Bounded => Boundedness::Bounded,
+ FFI_Boundedness::Unbounded {
+ requires_infinite_memory,
+ } => Boundedness::Unbounded {
+ requires_infinite_memory,
+ },
+ }
+ }
+}
+
+/// FFI safe version of [`EmissionType`].
+#[repr(C)]
+#[allow(non_camel_case_types)]
+#[derive(Clone, StableAbi)]
+pub enum FFI_EmissionType {
+ Incremental,
+ Final,
+ Both,
}
-impl From<ExecutionMode> for FFI_ExecutionMode {
- fn from(value: ExecutionMode) -> Self {
+impl From<EmissionType> for FFI_EmissionType {
+ fn from(value: EmissionType) -> Self {
match value {
- ExecutionMode::Bounded => FFI_ExecutionMode::Bounded,
- ExecutionMode::Unbounded => FFI_ExecutionMode::Unbounded,
- ExecutionMode::PipelineBreaking =>
FFI_ExecutionMode::PipelineBreaking,
+ EmissionType::Incremental => FFI_EmissionType::Incremental,
+ EmissionType::Final => FFI_EmissionType::Final,
+ EmissionType::Both => FFI_EmissionType::Both,
}
}
}
-impl From<FFI_ExecutionMode> for ExecutionMode {
- fn from(value: FFI_ExecutionMode) -> Self {
+impl From<FFI_EmissionType> for EmissionType {
+ fn from(value: FFI_EmissionType) -> Self {
match value {
- FFI_ExecutionMode::Bounded => ExecutionMode::Bounded,
- FFI_ExecutionMode::Unbounded => ExecutionMode::Unbounded,
- FFI_ExecutionMode::PipelineBreaking =>
ExecutionMode::PipelineBreaking,
+ FFI_EmissionType::Incremental => EmissionType::Incremental,
+ FFI_EmissionType::Final => EmissionType::Final,
+ FFI_EmissionType::Both => EmissionType::Both,
}
}
}
@@ -283,7 +337,8 @@ mod tests {
let original_props = PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(3),
- ExecutionMode::Unbounded,
+ EmissionType::Incremental,
+ Boundedness::Bounded,
);
let local_props_ptr = FFI_PlanProperties::from(&original_props);
diff --git a/datafusion/physical-optimizer/src/output_requirements.rs
b/datafusion/physical-optimizer/src/output_requirements.rs
index 6c8e76bff8..d5ffaad6d8 100644
--- a/datafusion/physical-optimizer/src/output_requirements.rs
+++ b/datafusion/physical-optimizer/src/output_requirements.rs
@@ -121,7 +121,8 @@ impl OutputRequirementExec {
PlanProperties::new(
input.equivalence_properties().clone(), // Equivalence Properties
input.output_partitioning().clone(), // Output Partitioning
- input.execution_mode(), // Execution Mode
+ input.pipeline_behavior(), // Pipeline Behavior
+ input.boundedness(), // Boundedness
)
}
}
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index feca5eb1db..2e0103defd 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -20,11 +20,12 @@
use std::any::Any;
use std::sync::Arc;
-use super::{DisplayAs, ExecutionMode, ExecutionPlanProperties, PlanProperties};
+use super::{DisplayAs, ExecutionPlanProperties, PlanProperties};
use crate::aggregates::{
no_grouping::AggregateStream, row_hash::GroupedHashAggregateStream,
topk_stream::GroupedTopKAggregateStream,
};
+use crate::execution_plan::{CardinalityEffect, EmissionType};
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::projection::get_field_metadata;
use crate::windows::get_ordered_partition_by_indices;
@@ -41,6 +42,7 @@ use datafusion_common::stats::Precision;
use datafusion_common::{internal_err, not_impl_err, Result};
use datafusion_execution::TaskContext;
use datafusion_expr::{Accumulator, Aggregate};
+use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
use datafusion_physical_expr::{
equivalence::{collapse_lex_req, ProjectionMapping},
expressions::Column,
@@ -48,8 +50,6 @@ use datafusion_physical_expr::{
PhysicalExpr, PhysicalSortRequirement,
};
-use crate::execution_plan::CardinalityEffect;
-use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
use itertools::Itertools;
pub(crate) mod group_values;
@@ -663,16 +663,19 @@ impl AggregateExec {
input_partitioning.clone()
};
- // Determine execution mode:
- let mut exec_mode = input.execution_mode();
- if exec_mode == ExecutionMode::Unbounded
- && *input_order_mode == InputOrderMode::Linear
- {
- // Cannot run without breaking the pipeline
- exec_mode = ExecutionMode::PipelineBreaking;
- }
+ // TODO: Emission type and boundedness information can be enhanced here
+ let emission_type = if *input_order_mode == InputOrderMode::Linear {
+ EmissionType::Final
+ } else {
+ input.pipeline_behavior()
+ };
- PlanProperties::new(eq_properties, output_partitioning, exec_mode)
+ PlanProperties::new(
+ eq_properties,
+ output_partitioning,
+ emission_type,
+ input.boundedness(),
+ )
}
pub fn input_order_mode(&self) -> &InputOrderMode {
@@ -1298,6 +1301,7 @@ mod tests {
use crate::coalesce_batches::CoalesceBatchesExec;
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::common;
+ use crate::execution_plan::Boundedness;
use crate::expressions::col;
use crate::memory::MemoryExec;
use crate::test::assert_is_pending;
@@ -1730,13 +1734,11 @@ mod tests {
/// This function creates the cache object that stores the plan
properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
- let eq_properties = EquivalenceProperties::new(schema);
PlanProperties::new(
- eq_properties,
- // Output Partitioning
+ EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
- // Execution Mode
- ExecutionMode::Bounded,
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
}
diff --git a/datafusion/physical-plan/src/analyze.rs
b/datafusion/physical-plan/src/analyze.rs
index c8b329fabd..1fc3280ceb 100644
--- a/datafusion/physical-plan/src/analyze.rs
+++ b/datafusion/physical-plan/src/analyze.rs
@@ -89,10 +89,12 @@ impl AnalyzeExec {
input: &Arc<dyn ExecutionPlan>,
schema: SchemaRef,
) -> PlanProperties {
- let eq_properties = EquivalenceProperties::new(schema);
- let output_partitioning = Partitioning::UnknownPartitioning(1);
- let exec_mode = input.execution_mode();
- PlanProperties::new(eq_properties, output_partitioning, exec_mode)
+ PlanProperties::new(
+ EquivalenceProperties::new(schema),
+ Partitioning::UnknownPartitioning(1),
+ input.pipeline_behavior(),
+ input.boundedness(),
+ )
}
}
diff --git a/datafusion/physical-plan/src/coalesce_batches.rs
b/datafusion/physical-plan/src/coalesce_batches.rs
index 11678e7a46..fa8d125d62 100644
--- a/datafusion/physical-plan/src/coalesce_batches.rs
+++ b/datafusion/physical-plan/src/coalesce_batches.rs
@@ -97,7 +97,8 @@ impl CoalesceBatchesExec {
PlanProperties::new(
input.equivalence_properties().clone(), // Equivalence Properties
input.output_partitioning().clone(), // Output Partitioning
- input.execution_mode(), // Execution Mode
+ input.pipeline_behavior(),
+ input.boundedness(),
)
}
}
diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs
b/datafusion/physical-plan/src/coalesce_partitions.rs
index 3da101d609..7c1bdba2f3 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -70,7 +70,8 @@ impl CoalescePartitionsExec {
PlanProperties::new(
eq_properties, // Equivalence Properties
Partitioning::UnknownPartitioning(1), // Output Partitioning
- input.execution_mode(), // Execution Mode
+ input.pipeline_behavior(),
+ input.boundedness(),
)
}
}
diff --git a/datafusion/physical-plan/src/empty.rs
b/datafusion/physical-plan/src/empty.rs
index 192619f69f..5168c3cc10 100644
--- a/datafusion/physical-plan/src/empty.rs
+++ b/datafusion/physical-plan/src/empty.rs
@@ -20,11 +20,12 @@
use std::any::Any;
use std::sync::Arc;
-use super::{
- common, DisplayAs, ExecutionMode, PlanProperties,
SendableRecordBatchStream,
- Statistics,
+use super::{common, DisplayAs, PlanProperties, SendableRecordBatchStream,
Statistics};
+use crate::{
+ execution_plan::{Boundedness, EmissionType},
+ memory::MemoryStream,
+ DisplayFormatType, ExecutionPlan, Partitioning,
};
-use crate::{memory::MemoryStream, DisplayFormatType, ExecutionPlan,
Partitioning};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
@@ -74,14 +75,11 @@ impl EmptyExec {
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef, n_partitions: usize) ->
PlanProperties {
- let eq_properties = EquivalenceProperties::new(schema);
- let output_partitioning =
Self::output_partitioning_helper(n_partitions);
PlanProperties::new(
- eq_properties,
- // Output Partitioning
- output_partitioning,
- // Execution Mode
- ExecutionMode::Bounded,
+ EquivalenceProperties::new(schema),
+ Self::output_partitioning_helper(n_partitions),
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
}
diff --git a/datafusion/physical-plan/src/execution_plan.rs
b/datafusion/physical-plan/src/execution_plan.rs
index ba9e4b0697..09bb807344 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -431,11 +431,6 @@ pub trait ExecutionPlanProperties {
/// partitions.
fn output_partitioning(&self) -> &Partitioning;
- /// Specifies whether this plan generates an infinite stream of records.
- /// If the plan does not support pipelining, but its input(s) are
- /// infinite, returns [`ExecutionMode::PipelineBreaking`] to indicate this.
- fn execution_mode(&self) -> ExecutionMode;
-
/// If the output of this `ExecutionPlan` within each partition is sorted,
/// returns `Some(keys)` describing the ordering. A `None` return value
/// indicates no assumptions should be made on the output ordering.
@@ -445,6 +440,14 @@ pub trait ExecutionPlanProperties {
/// output if its input is sorted as it does not reorder the input rows.
fn output_ordering(&self) -> Option<&LexOrdering>;
+ /// Boundedness information of the stream corresponding to this
`ExecutionPlan`.
+ /// For more details, see [`Boundedness`].
+ fn boundedness(&self) -> Boundedness;
+
+ /// Indicates how the stream of this `ExecutionPlan` emits its results.
+ /// For more details, see [`EmissionType`].
+ fn pipeline_behavior(&self) -> EmissionType;
+
/// Get the [`EquivalenceProperties`] within the plan.
///
/// Equivalence properties tell DataFusion what columns are known to be
@@ -470,14 +473,18 @@ impl ExecutionPlanProperties for Arc<dyn ExecutionPlan> {
self.properties().output_partitioning()
}
- fn execution_mode(&self) -> ExecutionMode {
- self.properties().execution_mode()
- }
-
fn output_ordering(&self) -> Option<&LexOrdering> {
self.properties().output_ordering()
}
+ fn boundedness(&self) -> Boundedness {
+ self.properties().boundedness
+ }
+
+ fn pipeline_behavior(&self) -> EmissionType {
+ self.properties().emission_type
+ }
+
fn equivalence_properties(&self) -> &EquivalenceProperties {
self.properties().equivalence_properties()
}
@@ -488,95 +495,159 @@ impl ExecutionPlanProperties for &dyn ExecutionPlan {
self.properties().output_partitioning()
}
- fn execution_mode(&self) -> ExecutionMode {
- self.properties().execution_mode()
- }
-
fn output_ordering(&self) -> Option<&LexOrdering> {
self.properties().output_ordering()
}
+ fn boundedness(&self) -> Boundedness {
+ self.properties().boundedness
+ }
+
+ fn pipeline_behavior(&self) -> EmissionType {
+ self.properties().emission_type
+ }
+
fn equivalence_properties(&self) -> &EquivalenceProperties {
self.properties().equivalence_properties()
}
}
-/// Describes the execution mode of the result of calling
-/// [`ExecutionPlan::execute`] with respect to its size and behavior.
+/// Represents whether a stream of data **generated** by an operator is
bounded (finite)
+/// or unbounded (infinite).
///
-/// The mode of the execution plan is determined by the mode of its input
-/// execution plans and the details of the operator itself. For example, a
-/// `FilterExec` operator will have the same execution mode as its input, but a
-/// `SortExec` operator may have a different execution mode than its input,
-/// depending on how the input stream is sorted.
+/// This is used to determine whether an execution plan will eventually
complete
+/// processing all its data (bounded) or could potentially run forever
(unbounded).
///
-/// There are three possible execution modes: `Bounded`, `Unbounded` and
-/// `PipelineBreaking`.
-#[derive(Clone, Copy, PartialEq, Debug)]
-pub enum ExecutionMode {
- /// The stream is bounded / finite.
- ///
- /// In this case the stream will eventually return `None` to indicate that
- /// there are no more records to process.
+/// For unbounded streams, it also tracks whether the operator requires finite
memory
+/// to process the stream or if memory usage could grow unbounded.
+///
+/// Bounedness of the output stream is based on the the boundedness of the
input stream and the nature of
+/// the operator. For example, limit or topk with fetch operator can convert
an unbounded stream to a bounded stream.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum Boundedness {
+ /// The data stream is bounded (finite) and will eventually complete
Bounded,
- /// The stream is unbounded / infinite.
- ///
- /// In this case, the stream will never be done (never return `None`),
- /// except in case of error.
- ///
- /// This mode is often used in "Steaming" use cases where data is
- /// incrementally processed as it arrives.
- ///
- /// Note that even though the operator generates an unbounded stream of
- /// results, it can execute with bounded memory and incrementally produces
- /// output.
- Unbounded,
- /// Some of the operator's input stream(s) are unbounded, but the operator
- /// cannot generate streaming results from these streaming inputs.
- ///
- /// In this case, the execution mode will be pipeline breaking, e.g. the
- /// operator requires unbounded memory to generate results. This
- /// information is used by the planner when performing sanity checks
- /// on plans processings unbounded data sources.
- PipelineBreaking,
+ /// The data stream is unbounded (infinite) and could run forever
+ Unbounded {
+ /// Whether this operator requires infinite memory to process the
unbounded stream.
+ /// If false, the operator can process an infinite stream with bounded
memory.
+ /// If true, memory usage may grow unbounded while processing the
stream.
+ ///
+ /// For example, `Median` requires infinite memory to compute the
median of an unbounded stream.
+ /// `Min/Max` requires infinite memory if the stream is unordered, but
can be computed with bounded memory if the stream is ordered.
+ requires_infinite_memory: bool,
+ },
}
-impl ExecutionMode {
- /// Check whether the execution mode is unbounded or not.
+impl Boundedness {
pub fn is_unbounded(&self) -> bool {
- matches!(self, ExecutionMode::Unbounded)
+ matches!(self, Boundedness::Unbounded { .. })
}
+}
- /// Check whether the execution is pipeline friendly. If so, operator can
- /// execute safely.
- pub fn pipeline_friendly(&self) -> bool {
- matches!(self, ExecutionMode::Bounded | ExecutionMode::Unbounded)
- }
+/// Represents how an operator emits its output records.
+///
+/// This is used to determine whether an operator emits records incrementally
as they arrive,
+/// only emits a final result at the end, or can do both. Note that it
generates the output -- record batch with `batch_size` rows
+/// but it may still buffer data internally until it has enough data to emit a
record batch or the source is exhausted.
+///
+/// For example, in the following plan:
+/// ```text
+/// SortExec [EmissionType::Final]
+/// |_ on: [col1 ASC]
+/// FilterExec [EmissionType::Incremental]
+/// |_ pred: col2 > 100
+/// CsvExec [EmissionType::Incremental]
+/// |_ file: "data.csv"
+/// ```
+/// - CsvExec emits records incrementally as it reads from the file
+/// - FilterExec processes and emits filtered records incrementally as they
arrive
+/// - SortExec must wait for all input records before it can emit the sorted
result,
+/// since it needs to see all values to determine their final order
+///
+/// Left joins can emit both incrementally and finally:
+/// - Incrementally emit matches as they are found
+/// - Finally emit non-matches after all input is processed
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum EmissionType {
+ /// Records are emitted incrementally as they arrive and are processed
+ Incremental,
+ /// Records are only emitted once all input has been processed
+ Final,
+ /// Records can be emitted both incrementally and as a final result
+ Both,
}
-/// Conservatively "combines" execution modes of a given collection of
operators.
-pub(crate) fn execution_mode_from_children<'a>(
+/// Utility to determine an operator's boundedness based on its children's
boundedness.
+///
+/// Assumes boundedness can be inferred from child operators:
+/// - Unbounded (requires_infinite_memory: true) takes precedence.
+/// - Unbounded (requires_infinite_memory: false) is considered next.
+/// - Otherwise, the operator is bounded.
+///
+/// **Note:** This is a general-purpose utility and may not apply to
+/// all multi-child operators. Ensure your operator's behavior aligns
+/// with these assumptions before using.
+pub(crate) fn boundedness_from_children<'a>(
children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
-) -> ExecutionMode {
- let mut result = ExecutionMode::Bounded;
- for mode in children.into_iter().map(|child| child.execution_mode()) {
- match (mode, result) {
- (ExecutionMode::PipelineBreaking, _)
- | (_, ExecutionMode::PipelineBreaking) => {
- // If any of the modes is `PipelineBreaking`, so is the result:
- return ExecutionMode::PipelineBreaking;
- }
- (ExecutionMode::Unbounded, _) | (_, ExecutionMode::Unbounded) => {
- // Unbounded mode eats up bounded mode:
- result = ExecutionMode::Unbounded;
+) -> Boundedness {
+ let mut unbounded_with_finite_mem = false;
+
+ for child in children {
+ match child.boundedness() {
+ Boundedness::Unbounded {
+ requires_infinite_memory: true,
+ } => {
+ return Boundedness::Unbounded {
+ requires_infinite_memory: true,
+ }
}
- (ExecutionMode::Bounded, ExecutionMode::Bounded) => {
- // When both modes are bounded, so is the result:
- result = ExecutionMode::Bounded;
+ Boundedness::Unbounded {
+ requires_infinite_memory: false,
+ } => {
+ unbounded_with_finite_mem = true;
}
+ Boundedness::Bounded => {}
}
}
- result
+
+ if unbounded_with_finite_mem {
+ Boundedness::Unbounded {
+ requires_infinite_memory: false,
+ }
+ } else {
+ Boundedness::Bounded
+ }
+}
+
+/// Determines the emission type of an operator based on its children's
pipeline behavior.
+///
+/// The precedence of emission types is:
+/// - `Final` has the highest precedence.
+/// - `Both` is next: if any child emits both incremental and final results,
the parent inherits this behavior unless a `Final` is present.
+/// - `Incremental` is the default if all children emit incremental results.
+///
+/// **Note:** This is a general-purpose utility and may not apply to
+/// all multi-child operators. Verify your operator's behavior aligns
+/// with these assumptions.
+pub(crate) fn emission_type_from_children<'a>(
+ children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
+) -> EmissionType {
+ let mut inc_and_final = false;
+
+ for child in children {
+ match child.pipeline_behavior() {
+ EmissionType::Final => return EmissionType::Final,
+ EmissionType::Both => inc_and_final = true,
+ EmissionType::Incremental => continue,
+ }
+ }
+
+ if inc_and_final {
+ EmissionType::Both
+ } else {
+ EmissionType::Incremental
+ }
}
/// Stores certain, often expensive to compute, plan properties used in query
@@ -591,8 +662,10 @@ pub struct PlanProperties {
pub eq_properties: EquivalenceProperties,
/// See [ExecutionPlanProperties::output_partitioning]
pub partitioning: Partitioning,
- /// See [ExecutionPlanProperties::execution_mode]
- pub execution_mode: ExecutionMode,
+ /// See [ExecutionPlanProperties::pipeline_behavior]
+ pub emission_type: EmissionType,
+ /// See [ExecutionPlanProperties::boundedness]
+ pub boundedness: Boundedness,
/// See [ExecutionPlanProperties::output_ordering]
output_ordering: Option<LexOrdering>,
}
@@ -602,14 +675,16 @@ impl PlanProperties {
pub fn new(
eq_properties: EquivalenceProperties,
partitioning: Partitioning,
- execution_mode: ExecutionMode,
+ emission_type: EmissionType,
+ boundedness: Boundedness,
) -> Self {
// Output ordering can be derived from `eq_properties`.
let output_ordering = eq_properties.output_ordering();
Self {
eq_properties,
partitioning,
- execution_mode,
+ emission_type,
+ boundedness,
output_ordering,
}
}
@@ -620,12 +695,6 @@ impl PlanProperties {
self
}
- /// Overwrite the execution Mode with its new value.
- pub fn with_execution_mode(mut self, execution_mode: ExecutionMode) ->
Self {
- self.execution_mode = execution_mode;
- self
- }
-
/// Overwrite equivalence properties with its new value.
pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties)
-> Self {
// Changing equivalence properties also changes output ordering, so
@@ -635,6 +704,18 @@ impl PlanProperties {
self
}
+ /// Overwrite boundedness with its new value.
+ pub fn with_boundedness(mut self, boundedness: Boundedness) -> Self {
+ self.boundedness = boundedness;
+ self
+ }
+
+ /// Overwrite emission type with its new value.
+ pub fn with_emission_type(mut self, emission_type: EmissionType) -> Self {
+ self.emission_type = emission_type;
+ self
+ }
+
pub fn equivalence_properties(&self) -> &EquivalenceProperties {
&self.eq_properties
}
@@ -647,10 +728,6 @@ impl PlanProperties {
self.output_ordering.as_ref()
}
- pub fn execution_mode(&self) -> ExecutionMode {
- self.execution_mode
- }
-
/// Get schema of the node.
fn schema(&self) -> &SchemaRef {
self.eq_properties.schema()
diff --git a/datafusion/physical-plan/src/explain.rs
b/datafusion/physical-plan/src/explain.rs
index cc42e05871..cb00958cec 100644
--- a/datafusion/physical-plan/src/explain.rs
+++ b/datafusion/physical-plan/src/explain.rs
@@ -20,7 +20,8 @@
use std::any::Any;
use std::sync::Arc;
-use super::{DisplayAs, ExecutionMode, PlanProperties,
SendableRecordBatchStream};
+use super::{DisplayAs, PlanProperties, SendableRecordBatchStream};
+use crate::execution_plan::{Boundedness, EmissionType};
use crate::stream::RecordBatchStreamAdapter;
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
@@ -74,11 +75,11 @@ impl ExplainExec {
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
- let eq_properties = EquivalenceProperties::new(schema);
PlanProperties::new(
- eq_properties,
+ EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
- ExecutionMode::Bounded,
+ EmissionType::Final,
+ Boundedness::Bounded,
)
}
}
diff --git a/datafusion/physical-plan/src/filter.rs
b/datafusion/physical-plan/src/filter.rs
index 07898e8d22..901907cf38 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -272,10 +272,12 @@ impl FilterExec {
output_partitioning.project(&projection_mapping,
&eq_properties);
eq_properties = eq_properties.project(&projection_mapping,
out_schema);
}
+
Ok(PlanProperties::new(
eq_properties,
output_partitioning,
- input.execution_mode(),
+ input.pipeline_behavior(),
+ input.boundedness(),
))
}
}
diff --git a/datafusion/physical-plan/src/insert.rs
b/datafusion/physical-plan/src/insert.rs
index ae8a2acce6..e848640386 100644
--- a/datafusion/physical-plan/src/insert.rs
+++ b/datafusion/physical-plan/src/insert.rs
@@ -23,11 +23,12 @@ use std::fmt::Debug;
use std::sync::Arc;
use super::{
- execute_input_stream, DisplayAs, DisplayFormatType, ExecutionPlan,
- ExecutionPlanProperties, Partitioning, PlanProperties,
SendableRecordBatchStream,
+ execute_input_stream, DisplayAs, DisplayFormatType, ExecutionPlan,
Partitioning,
+ PlanProperties, SendableRecordBatchStream,
};
use crate::metrics::MetricsSet;
use crate::stream::RecordBatchStreamAdapter;
+use crate::ExecutionPlanProperties;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
@@ -140,7 +141,8 @@ impl DataSinkExec {
PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
- input.execution_mode(),
+ input.pipeline_behavior(),
+ input.boundedness(),
)
}
}
diff --git a/datafusion/physical-plan/src/joins/cross_join.rs
b/datafusion/physical-plan/src/joins/cross_join.rs
index 8bf675e873..b70eeb313b 100644
--- a/datafusion/physical-plan/src/joins/cross_join.rs
+++ b/datafusion/physical-plan/src/joins/cross_join.rs
@@ -24,11 +24,11 @@ use super::utils::{
StatefulStreamResult,
};
use crate::coalesce_partitions::CoalescePartitionsExec;
+use crate::execution_plan::{boundedness_from_children, EmissionType};
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::{
- execution_mode_from_children, handle_state, ColumnStatistics, DisplayAs,
- DisplayFormatType, Distribution, ExecutionMode, ExecutionPlan,
- ExecutionPlanProperties, PlanProperties, RecordBatchStream,
+ handle_state, ColumnStatistics, DisplayAs, DisplayFormatType, Distribution,
+ ExecutionPlan, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use arrow::compute::concat_batches;
@@ -161,14 +161,12 @@ impl CrossJoinExec {
left.schema().fields.len(),
);
- // Determine the execution mode:
- let mut mode = execution_mode_from_children([left, right]);
- if mode.is_unbounded() {
- // If any of the inputs is unbounded, cross join breaks the
pipeline.
- mode = ExecutionMode::PipelineBreaking;
- }
-
- PlanProperties::new(eq_properties, output_partitioning, mode)
+ PlanProperties::new(
+ eq_properties,
+ output_partitioning,
+ EmissionType::Final,
+ boundedness_from_children([left, right]),
+ )
}
}
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs
b/datafusion/physical-plan/src/joins/hash_join.rs
index 532a91da75..ef70392a01 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -29,11 +29,12 @@ use super::{
utils::{OnceAsync, OnceFut},
PartitionMode,
};
+use crate::execution_plan::{boundedness_from_children, EmissionType};
use crate::ExecutionPlanProperties;
use crate::{
coalesce_partitions::CoalescePartitionsExec,
common::can_project,
- execution_mode_from_children, handle_state,
+ handle_state,
hash_utils::create_hashes,
joins::utils::{
adjust_indices_by_join_type, apply_join_filter_to_indices,
@@ -44,9 +45,8 @@ use crate::{
JoinHashMapType, JoinOn, JoinOnRef, StatefulStreamResult,
},
metrics::{ExecutionPlanMetricsSet, MetricsSet},
- DisplayAs, DisplayFormatType, Distribution, ExecutionMode, ExecutionPlan,
- Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream,
- Statistics,
+ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+ PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use arrow::array::{
@@ -526,24 +526,26 @@ impl HashJoinExec {
}
};
- // Determine execution mode by checking whether this join is pipeline
- // breaking. This happens when the left side is unbounded, or the right
- // side is unbounded with `Left`, `Full`, `LeftAnti` or `LeftSemi`
join types.
- let pipeline_breaking = left.execution_mode().is_unbounded()
- || (right.execution_mode().is_unbounded()
- && matches!(
- join_type,
- JoinType::Left
- | JoinType::Full
- | JoinType::LeftAnti
- | JoinType::LeftSemi
- | JoinType::LeftMark
- ));
-
- let mode = if pipeline_breaking {
- ExecutionMode::PipelineBreaking
+ let emission_type = if left.boundedness().is_unbounded() {
+ EmissionType::Final
+ } else if right.pipeline_behavior() == EmissionType::Incremental {
+ match join_type {
+ // If we only need to generate matched rows from the probe
side,
+ // we can emit rows incrementally.
+ JoinType::Inner
+ | JoinType::LeftSemi
+ | JoinType::RightSemi
+ | JoinType::Right
+ | JoinType::RightAnti => EmissionType::Incremental,
+ // If we need to generate unmatched rows from the *build side*,
+ // we need to emit them at the end.
+ JoinType::Left
+ | JoinType::LeftAnti
+ | JoinType::LeftMark
+ | JoinType::Full => EmissionType::Both,
+ }
} else {
- execution_mode_from_children([left, right])
+ right.pipeline_behavior()
};
// If contains projection, update the PlanProperties.
@@ -556,10 +558,12 @@ impl HashJoinExec {
output_partitioning.project(&projection_mapping,
&eq_properties);
eq_properties = eq_properties.project(&projection_mapping,
out_schema);
}
+
Ok(PlanProperties::new(
eq_properties,
output_partitioning,
- mode,
+ emission_type,
+ boundedness_from_children([left, right]),
))
}
}
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index d174564178..8caf5d9b5d 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -28,6 +28,7 @@ use super::utils::{
BatchTransformer, NoopBatchTransformer, StatefulStreamResult,
};
use crate::coalesce_partitions::CoalescePartitionsExec;
+use crate::execution_plan::{boundedness_from_children, EmissionType};
use crate::joins::utils::{
adjust_indices_by_join_type, apply_join_filter_to_indices,
build_batch_from_indices,
build_join_schema, check_join_is_valid, estimate_join_statistics,
@@ -36,9 +37,9 @@ use crate::joins::utils::{
};
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::{
- execution_mode_from_children, handle_state, DisplayAs, DisplayFormatType,
- Distribution, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
PlanProperties,
- RecordBatchStream, SendableRecordBatchStream,
+ handle_state, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
+ ExecutionPlanProperties, PlanProperties, RecordBatchStream,
+ SendableRecordBatchStream,
};
use arrow::array::{BooleanBufferBuilder, UInt32Array, UInt64Array};
@@ -241,14 +242,34 @@ impl NestedLoopJoinExec {
let output_partitioning =
asymmetric_join_output_partitioning(left, right, &join_type);
- // Determine execution mode:
- let mode = if left.execution_mode().is_unbounded() {
- ExecutionMode::PipelineBreaking
+ let emission_type = if left.boundedness().is_unbounded() {
+ EmissionType::Final
+ } else if right.pipeline_behavior() == EmissionType::Incremental {
+ match join_type {
+ // If we only need to generate matched rows from the probe
side,
+ // we can emit rows incrementally.
+ JoinType::Inner
+ | JoinType::LeftSemi
+ | JoinType::RightSemi
+ | JoinType::Right
+ | JoinType::RightAnti => EmissionType::Incremental,
+ // If we need to generate unmatched rows from the *build side*,
+ // we need to emit them at the end.
+ JoinType::Left
+ | JoinType::LeftAnti
+ | JoinType::LeftMark
+ | JoinType::Full => EmissionType::Both,
+ }
} else {
- execution_mode_from_children([left, right])
+ right.pipeline_behavior()
};
- PlanProperties::new(eq_properties, output_partitioning, mode)
+ PlanProperties::new(
+ eq_properties,
+ output_partitioning,
+ emission_type,
+ boundedness_from_children([left, right]),
+ )
}
/// Returns a vector indicating whether the left and right inputs maintain
their order.
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 5e387409da..f17b99d81d 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -53,8 +53,8 @@ use datafusion_execution::TaskContext;
use datafusion_physical_expr::equivalence::join_equivalence_properties;
use datafusion_physical_expr::PhysicalExprRef;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
-use futures::{Stream, StreamExt};
+use crate::execution_plan::{boundedness_from_children, EmissionType};
use crate::expressions::PhysicalSortExpr;
use crate::joins::utils::{
build_join_schema, check_join_is_valid, estimate_join_statistics,
@@ -63,11 +63,13 @@ use crate::joins::utils::{
use crate::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder,
MetricsSet};
use crate::spill::spill_record_batches;
use crate::{
- execution_mode_from_children, metrics, DisplayAs, DisplayFormatType,
Distribution,
- ExecutionPlan, ExecutionPlanProperties, PhysicalExpr, PlanProperties,
- RecordBatchStream, SendableRecordBatchStream, Statistics,
+ metrics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
+ ExecutionPlanProperties, PhysicalExpr, PlanProperties, RecordBatchStream,
+ SendableRecordBatchStream, Statistics,
};
+use futures::{Stream, StreamExt};
+
/// Join execution plan that executes equi-join predicates on multiple
partitions using Sort-Merge
/// join algorithm and applies an optional filter post join. Can be used to
join arbitrarily large
/// inputs where one or both of the inputs don't fit in the available memory.
@@ -302,10 +304,13 @@ impl SortMergeJoinExec {
let output_partitioning =
symmetric_join_output_partitioning(left, right, &join_type);
- // Determine execution mode:
- let mode = execution_mode_from_children([left, right]);
-
- PlanProperties::new(eq_properties, output_partitioning, mode)
+ // TODO: Emission type may be incremental if the input is sorted
+ PlanProperties::new(
+ eq_properties,
+ output_partitioning,
+ EmissionType::Final,
+ boundedness_from_children([left, right]),
+ )
}
}
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index 94ef4d5bc3..72fd5a0feb 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -33,6 +33,7 @@ use std::task::{Context, Poll};
use std::vec;
use crate::common::SharedMemoryReservation;
+use crate::execution_plan::{boundedness_from_children,
emission_type_from_children};
use crate::joins::hash_join::{equal_rows_arr, update_hash};
use crate::joins::stream_join_utils::{
calculate_filter_expr_intervals, combine_two_batches,
@@ -47,7 +48,6 @@ use crate::joins::utils::{
NoopBatchTransformer, StatefulStreamResult,
};
use crate::{
- execution_mode_from_children,
joins::StreamJoinPartitionMode,
metrics::{ExecutionPlanMetricsSet, MetricsSet},
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
ExecutionPlanProperties,
@@ -275,10 +275,12 @@ impl SymmetricHashJoinExec {
let output_partitioning =
symmetric_join_output_partitioning(left, right, &join_type);
- // Determine execution mode:
- let mode = execution_mode_from_children([left, right]);
-
- PlanProperties::new(eq_properties, output_partitioning, mode)
+ PlanProperties::new(
+ eq_properties,
+ output_partitioning,
+ emission_type_from_children([left, right]),
+ boundedness_from_children([left, right]),
+ )
}
/// left stream
diff --git a/datafusion/physical-plan/src/lib.rs
b/datafusion/physical-plan/src/lib.rs
index 845a74eaea..5ad37f0b1a 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -35,11 +35,10 @@ pub use datafusion_physical_expr::{
};
pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType,
VerboseDisplay};
-pub(crate) use crate::execution_plan::execution_mode_from_children;
pub use crate::execution_plan::{
collect, collect_partitioned, displayable, execute_input_stream,
execute_stream,
execute_stream_partitioned, get_plan_string,
with_new_children_if_necessary,
- ExecutionMode, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
+ ExecutionPlan, ExecutionPlanProperties, PlanProperties,
};
pub use crate::metrics::Metric;
pub use crate::ordering::InputOrderMode;
diff --git a/datafusion/physical-plan/src/limit.rs
b/datafusion/physical-plan/src/limit.rs
index ab1e6cb37b..9665a09e42 100644
--- a/datafusion/physical-plan/src/limit.rs
+++ b/datafusion/physical-plan/src/limit.rs
@@ -24,9 +24,10 @@ use std::task::{Context, Poll};
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{
- DisplayAs, ExecutionMode, ExecutionPlanProperties, PlanProperties,
RecordBatchStream,
+ DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
+use crate::execution_plan::{Boundedness, CardinalityEffect};
use crate::{DisplayFormatType, Distribution, ExecutionPlan, Partitioning};
use arrow::datatypes::SchemaRef;
@@ -34,7 +35,6 @@ use arrow::record_batch::RecordBatch;
use datafusion_common::{internal_err, Result};
use datafusion_execution::TaskContext;
-use crate::execution_plan::CardinalityEffect;
use futures::stream::{Stream, StreamExt};
use log::trace;
@@ -86,7 +86,9 @@ impl GlobalLimitExec {
PlanProperties::new(
input.equivalence_properties().clone(), // Equivalence Properties
Partitioning::UnknownPartitioning(1), // Output Partitioning
- ExecutionMode::Bounded, // Execution Mode
+ input.pipeline_behavior(),
+ // Limit operations are always bounded since they output a finite
number of rows
+ Boundedness::Bounded,
)
}
}
@@ -242,7 +244,9 @@ impl LocalLimitExec {
PlanProperties::new(
input.equivalence_properties().clone(), // Equivalence Properties
input.output_partitioning().clone(), // Output Partitioning
- ExecutionMode::Bounded, // Execution Mode
+ input.pipeline_behavior(),
+ // Limit operations are always bounded since they output a finite
number of rows
+ Boundedness::Bounded,
)
}
}
diff --git a/datafusion/physical-plan/src/memory.rs
b/datafusion/physical-plan/src/memory.rs
index bf6294f5a5..521008ce9b 100644
--- a/datafusion/physical-plan/src/memory.rs
+++ b/datafusion/physical-plan/src/memory.rs
@@ -24,9 +24,10 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use super::{
- common, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
Partitioning,
- PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
+ common, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
PlanProperties,
+ RecordBatchStream, SendableRecordBatchStream, Statistics,
};
+use crate::execution_plan::{Boundedness, EmissionType};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
@@ -285,11 +286,11 @@ impl MemoryExec {
orderings: &[LexOrdering],
partitions: &[Vec<RecordBatch>],
) -> PlanProperties {
- let eq_properties = EquivalenceProperties::new_with_orderings(schema,
orderings);
PlanProperties::new(
- eq_properties, //
Equivalence Properties
- Partitioning::UnknownPartitioning(partitions.len()), // Output
Partitioning
- ExecutionMode::Bounded, // Execution
Mode
+ EquivalenceProperties::new_with_orderings(schema, orderings),
+ Partitioning::UnknownPartitioning(partitions.len()),
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
}
@@ -393,7 +394,8 @@ impl LazyMemoryExec {
let cache = PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&schema)),
Partitioning::RoundRobinBatch(generators.len()),
- ExecutionMode::Bounded,
+ EmissionType::Incremental,
+ Boundedness::Bounded,
);
Ok(Self {
schema,
diff --git a/datafusion/physical-plan/src/placeholder_row.rs
b/datafusion/physical-plan/src/placeholder_row.rs
index f9437f46f8..355e51070f 100644
--- a/datafusion/physical-plan/src/placeholder_row.rs
+++ b/datafusion/physical-plan/src/placeholder_row.rs
@@ -20,10 +20,8 @@
use std::any::Any;
use std::sync::Arc;
-use super::{
- common, DisplayAs, ExecutionMode, PlanProperties,
SendableRecordBatchStream,
- Statistics,
-};
+use super::{common, DisplayAs, PlanProperties, SendableRecordBatchStream,
Statistics};
+use crate::execution_plan::{Boundedness, EmissionType};
use crate::{memory::MemoryStream, DisplayFormatType, ExecutionPlan,
Partitioning};
use arrow::array::{ArrayRef, NullArray};
@@ -96,11 +94,12 @@ impl PlaceholderRowExec {
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef, n_partitions: usize) ->
PlanProperties {
- let eq_properties = EquivalenceProperties::new(schema);
- // Get output partitioning:
- let output_partitioning =
Self::output_partitioning_helper(n_partitions);
-
- PlanProperties::new(eq_properties, output_partitioning,
ExecutionMode::Bounded)
+ PlanProperties::new(
+ EquivalenceProperties::new(schema),
+ Self::output_partitioning_helper(n_partitions),
+ EmissionType::Incremental,
+ Boundedness::Bounded,
+ )
}
}
diff --git a/datafusion/physical-plan/src/projection.rs
b/datafusion/physical-plan/src/projection.rs
index c1d3f36836..e37a6b0dfb 100644
--- a/datafusion/physical-plan/src/projection.rs
+++ b/datafusion/physical-plan/src/projection.rs
@@ -132,7 +132,8 @@ impl ProjectionExec {
Ok(PlanProperties::new(
eq_properties,
output_partitioning,
- input.execution_mode(),
+ input.pipeline_behavior(),
+ input.boundedness(),
))
}
}
diff --git a/datafusion/physical-plan/src/recursive_query.rs
b/datafusion/physical-plan/src/recursive_query.rs
index 0137e5d52f..0e49a791cb 100644
--- a/datafusion/physical-plan/src/recursive_query.rs
+++ b/datafusion/physical-plan/src/recursive_query.rs
@@ -26,7 +26,8 @@ use super::{
work_table::{ReservedBatches, WorkTable, WorkTableExec},
PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
};
-use crate::{DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan};
+use crate::execution_plan::{Boundedness, EmissionType};
+use crate::{DisplayAs, DisplayFormatType, ExecutionPlan};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
@@ -122,7 +123,8 @@ impl RecursiveQueryExec {
PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
- ExecutionMode::Bounded,
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
}
diff --git a/datafusion/physical-plan/src/repartition/mod.rs
b/datafusion/physical-plan/src/repartition/mod.rs
index 0a80dcd34e..963ccc6fd8 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -726,13 +726,11 @@ impl RepartitionExec {
partitioning: Partitioning,
preserve_order: bool,
) -> PlanProperties {
- // Equivalence Properties
- let eq_properties = Self::eq_properties_helper(input, preserve_order);
-
PlanProperties::new(
- eq_properties, // Equivalence Properties
- partitioning, // Output Partitioning
- input.execution_mode(), // Execution Mode
+ Self::eq_properties_helper(input, preserve_order),
+ partitioning,
+ input.pipeline_behavior(),
+ input.boundedness(),
)
}
diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs
b/datafusion/physical-plan/src/sorts/partial_sort.rs
index 77636f9e49..f14ba6606e 100644
--- a/datafusion/physical-plan/src/sorts/partial_sort.rs
+++ b/datafusion/physical-plan/src/sorts/partial_sort.rs
@@ -201,10 +201,12 @@ impl PartialSortExec {
let output_partitioning =
Self::output_partitioning_helper(input, preserve_partitioning);
- // Determine execution mode:
- let mode = input.execution_mode();
-
- PlanProperties::new(eq_properties, output_partitioning, mode)
+ PlanProperties::new(
+ eq_properties,
+ output_partitioning,
+ input.pipeline_behavior(),
+ input.boundedness(),
+ )
}
}
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index c2d8b093a9..8d8a5c5f70 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -25,6 +25,7 @@ use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use crate::common::spawn_buffered;
+use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
use crate::expressions::PhysicalSortExpr;
use crate::limit::LimitStream;
use crate::metrics::{
@@ -37,9 +38,9 @@ use crate::spill::{
use crate::stream::RecordBatchStreamAdapter;
use crate::topk::TopK;
use crate::{
- DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream,
ExecutionMode,
- ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties,
- SendableRecordBatchStream, Statistics,
+ DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream,
ExecutionPlan,
+ ExecutionPlanProperties, Partitioning, PlanProperties,
SendableRecordBatchStream,
+ Statistics,
};
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays,
SortColumn};
@@ -56,7 +57,6 @@ use datafusion_execution::TaskContext;
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
-use crate::execution_plan::CardinalityEffect;
use futures::{StreamExt, TryStreamExt};
use log::{debug, trace};
@@ -763,11 +763,15 @@ impl SortExec {
/// can be dropped.
pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
let mut cache = self.cache.clone();
- if fetch.is_some() && self.cache.execution_mode ==
ExecutionMode::Unbounded {
- // When a theoretically unnecessary sort becomes a top-K (which
- // sometimes arises as an intermediate state before full removal),
- // its execution mode should become `Bounded`.
- cache.execution_mode = ExecutionMode::Bounded;
+ // If the SortExec can emit incrementally (that means the sort
requirements
+ // and properties of the input match), the SortExec can generate its
result
+ // without scanning the entire input when a fetch value exists.
+ let is_pipeline_friendly = matches!(
+ self.cache.emission_type,
+ EmissionType::Incremental | EmissionType::Both
+ );
+ if fetch.is_some() && is_pipeline_friendly {
+ cache = cache.with_boundedness(Boundedness::Bounded);
}
SortExec {
input: Arc::clone(&self.input),
@@ -817,10 +821,30 @@ impl SortExec {
let sort_satisfied = input
.equivalence_properties()
.ordering_satisfy_requirement(&requirement);
- let mode = match input.execution_mode() {
- ExecutionMode::Unbounded if sort_satisfied =>
ExecutionMode::Unbounded,
- ExecutionMode::Bounded => ExecutionMode::Bounded,
- _ => ExecutionMode::PipelineBreaking,
+
+ // The emission type depends on whether the input is already sorted:
+ // - If already sorted, we can emit results in the same way as the
input
+ // - If not sorted, we must wait until all data is processed to emit
results (Final)
+ let emission_type = if sort_satisfied {
+ input.pipeline_behavior()
+ } else {
+ EmissionType::Final
+ };
+
+ // The boundedness depends on whether the input is already sorted:
+ // - If already sorted, we have the same property as the input
+ // - If not sorted and input is unbounded, we require infinite memory
and generates
+ // unbounded data (not practical).
+ // - If not sorted and input is bounded, then the SortExec is bounded,
too.
+ let boundedness = if sort_satisfied {
+ input.boundedness()
+ } else {
+ match input.boundedness() {
+ Boundedness::Unbounded { .. } => Boundedness::Unbounded {
+ requires_infinite_memory: true,
+ },
+ bounded => bounded,
+ }
};
// Calculate equivalence properties; i.e. reset the ordering
equivalence
@@ -835,7 +859,12 @@ impl SortExec {
let output_partitioning =
Self::output_partitioning_helper(input, preserve_partitioning);
- PlanProperties::new(eq_properties, output_partitioning, mode)
+ PlanProperties::new(
+ eq_properties,
+ output_partitioning,
+ emission_type,
+ boundedness,
+ )
}
}
@@ -1006,6 +1035,7 @@ mod tests {
use super::*;
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::collect;
+ use crate::execution_plan::Boundedness;
use crate::expressions::col;
use crate::memory::MemoryExec;
use crate::test;
@@ -1049,8 +1079,14 @@ mod tests {
eq_properties.add_new_orderings(vec![LexOrdering::new(vec![
PhysicalSortExpr::new_default(Arc::new(Column::new("c1", 0))),
])]);
- let mode = ExecutionMode::Unbounded;
- PlanProperties::new(eq_properties,
Partitioning::UnknownPartitioning(1), mode)
+ PlanProperties::new(
+ eq_properties,
+ Partitioning::UnknownPartitioning(1),
+ EmissionType::Final,
+ Boundedness::Unbounded {
+ requires_infinite_memory: false,
+ },
+ )
}
}
diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
index 906164f21b..21597fb856 100644
--- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
+++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
@@ -139,7 +139,8 @@ impl SortPreservingMergeExec {
PlanProperties::new(
eq_properties, // Equivalence Properties
Partitioning::UnknownPartitioning(1), // Output Partitioning
- input.execution_mode(), // Execution Mode
+ input.pipeline_behavior(), // Pipeline Behavior
+ input.boundedness(), // Boundedness
)
}
}
@@ -323,6 +324,7 @@ mod tests {
use super::*;
use crate::coalesce_batches::CoalesceBatchesExec;
use crate::coalesce_partitions::CoalescePartitionsExec;
+ use crate::execution_plan::{Boundedness, EmissionType};
use crate::expressions::col;
use crate::memory::MemoryExec;
use crate::metrics::{MetricValue, Timestamp};
@@ -331,7 +333,7 @@ mod tests {
use crate::stream::RecordBatchReceiverStream;
use crate::test::exec::{assert_strong_count_converges_to_zero,
BlockingExec};
use crate::test::{self, assert_is_pending, make_partition};
- use crate::{collect, common, ExecutionMode};
+ use crate::{collect, common};
use arrow::array::{ArrayRef, Int32Array, StringArray,
TimestampNanosecondArray};
use arrow::compute::SortOptions;
@@ -1268,8 +1270,14 @@ mod tests {
.iter()
.map(|expr| PhysicalSortExpr::new_default(Arc::clone(expr)))
.collect::<LexOrdering>()]);
- let mode = ExecutionMode::Unbounded;
- PlanProperties::new(eq_properties, Partitioning::Hash(columns, 3),
mode)
+ PlanProperties::new(
+ eq_properties,
+ Partitioning::Hash(columns, 3),
+ EmissionType::Incremental,
+ Boundedness::Unbounded {
+ requires_infinite_memory: false,
+ },
+ )
}
}
diff --git a/datafusion/physical-plan/src/streaming.rs
b/datafusion/physical-plan/src/streaming.rs
index 7ccef32480..da8b0e877d 100644
--- a/datafusion/physical-plan/src/streaming.rs
+++ b/datafusion/physical-plan/src/streaming.rs
@@ -21,8 +21,9 @@ use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;
-use super::{DisplayAs, DisplayFormatType, ExecutionMode, PlanProperties};
+use super::{DisplayAs, DisplayFormatType, PlanProperties};
use crate::display::{display_orderings, ProjectSchemaDisplay};
+use crate::execution_plan::{Boundedness, EmissionType};
use crate::stream::RecordBatchStreamAdapter;
use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
@@ -145,22 +146,26 @@ impl StreamingTableExec {
schema: SchemaRef,
orderings: &[LexOrdering],
partitions: &[Arc<dyn PartitionStream>],
- is_infinite: bool,
+ infinite: bool,
) -> PlanProperties {
// Calculate equivalence properties:
let eq_properties = EquivalenceProperties::new_with_orderings(schema,
orderings);
// Get output partitioning:
let output_partitioning =
Partitioning::UnknownPartitioning(partitions.len());
-
- // Determine execution mode:
- let mode = if is_infinite {
- ExecutionMode::Unbounded
+ let boundedness = if infinite {
+ Boundedness::Unbounded {
+ requires_infinite_memory: false,
+ }
} else {
- ExecutionMode::Bounded
+ Boundedness::Bounded
};
-
- PlanProperties::new(eq_properties, output_partitioning, mode)
+ PlanProperties::new(
+ eq_properties,
+ output_partitioning,
+ EmissionType::Incremental,
+ boundedness,
+ )
}
}
diff --git a/datafusion/physical-plan/src/test/exec.rs
b/datafusion/physical-plan/src/test/exec.rs
index cc0a7cbd9b..b31a53e55e 100644
--- a/datafusion/physical-plan/src/test/exec.rs
+++ b/datafusion/physical-plan/src/test/exec.rs
@@ -24,10 +24,14 @@ use std::{
task::{Context, Poll},
};
-use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
use crate::{
- common, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
Partitioning,
- PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
+ common, execution_plan::Boundedness, DisplayAs, DisplayFormatType,
ExecutionPlan,
+ Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream,
+ Statistics,
+};
+use crate::{
+ execution_plan::EmissionType,
+ stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter},
};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -152,12 +156,11 @@ impl MockExec {
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
- let eq_properties = EquivalenceProperties::new(schema);
-
PlanProperties::new(
- eq_properties,
+ EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
- ExecutionMode::Bounded,
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
}
@@ -315,11 +318,11 @@ impl BarrierExec {
schema: SchemaRef,
data: &[Vec<RecordBatch>],
) -> PlanProperties {
- let eq_properties = EquivalenceProperties::new(schema);
PlanProperties::new(
- eq_properties,
+ EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(data.len()),
- ExecutionMode::Bounded,
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
}
@@ -427,12 +430,11 @@ impl ErrorExec {
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
- let eq_properties = EquivalenceProperties::new(schema);
-
PlanProperties::new(
- eq_properties,
+ EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
- ExecutionMode::Bounded,
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
}
@@ -509,12 +511,11 @@ impl StatisticsExec {
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
- let eq_properties = EquivalenceProperties::new(schema);
-
PlanProperties::new(
- eq_properties,
+ EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(2),
- ExecutionMode::Bounded,
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
}
@@ -610,12 +611,11 @@ impl BlockingExec {
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef, n_partitions: usize) ->
PlanProperties {
- let eq_properties = EquivalenceProperties::new(schema);
-
PlanProperties::new(
- eq_properties,
+ EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(n_partitions),
- ExecutionMode::Bounded,
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
}
@@ -752,13 +752,12 @@ impl PanicExec {
schema: SchemaRef,
batches_until_panics: &[usize],
) -> PlanProperties {
- let eq_properties = EquivalenceProperties::new(schema);
let num_partitions = batches_until_panics.len();
-
PlanProperties::new(
- eq_properties,
+ EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(num_partitions),
- ExecutionMode::Bounded,
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
}
diff --git a/datafusion/physical-plan/src/union.rs
b/datafusion/physical-plan/src/union.rs
index bd36753880..6e768a3d87 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -27,12 +27,12 @@ use std::task::{Context, Poll};
use std::{any::Any, sync::Arc};
use super::{
- execution_mode_from_children,
metrics::{ExecutionPlanMetricsSet, MetricsSet},
ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan,
ExecutionPlanProperties, Partitioning, PlanProperties, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
+use crate::execution_plan::{boundedness_from_children,
emission_type_from_children};
use crate::metrics::BaselineMetrics;
use crate::stream::ObservedStream;
@@ -135,14 +135,11 @@ impl UnionExec {
.map(|plan| plan.output_partitioning().partition_count())
.sum();
let output_partitioning =
Partitioning::UnknownPartitioning(num_partitions);
-
- // Determine execution mode:
- let mode = execution_mode_from_children(inputs.iter());
-
Ok(PlanProperties::new(
eq_properties,
output_partitioning,
- mode,
+ emission_type_from_children(inputs),
+ boundedness_from_children(inputs),
))
}
}
@@ -335,10 +332,12 @@ impl InterleaveExec {
let eq_properties = EquivalenceProperties::new(schema);
// Get output partitioning:
let output_partitioning = inputs[0].output_partitioning().clone();
- // Determine execution mode:
- let mode = execution_mode_from_children(inputs.iter());
-
- PlanProperties::new(eq_properties, output_partitioning, mode)
+ PlanProperties::new(
+ eq_properties,
+ output_partitioning,
+ emission_type_from_children(inputs),
+ boundedness_from_children(inputs),
+ )
}
}
diff --git a/datafusion/physical-plan/src/unnest.rs
b/datafusion/physical-plan/src/unnest.rs
index 19b1b46953..19a090ca28 100644
--- a/datafusion/physical-plan/src/unnest.rs
+++ b/datafusion/physical-plan/src/unnest.rs
@@ -100,12 +100,11 @@ impl UnnestExec {
input: &Arc<dyn ExecutionPlan>,
schema: SchemaRef,
) -> PlanProperties {
- let eq_properties = EquivalenceProperties::new(schema);
-
PlanProperties::new(
- eq_properties,
- input.output_partitioning().clone(),
- input.execution_mode(),
+ EquivalenceProperties::new(schema),
+ input.output_partitioning().to_owned(),
+ input.pipeline_behavior(),
+ input.boundedness(),
)
}
diff --git a/datafusion/physical-plan/src/values.rs
b/datafusion/physical-plan/src/values.rs
index edadf98cb1..5089b1e626 100644
--- a/datafusion/physical-plan/src/values.rs
+++ b/datafusion/physical-plan/src/values.rs
@@ -20,10 +20,8 @@
use std::any::Any;
use std::sync::Arc;
-use super::{
- common, DisplayAs, ExecutionMode, PlanProperties,
SendableRecordBatchStream,
- Statistics,
-};
+use super::{common, DisplayAs, PlanProperties, SendableRecordBatchStream,
Statistics};
+use crate::execution_plan::{Boundedness, EmissionType};
use crate::{
memory::MemoryStream, ColumnarValue, DisplayFormatType, ExecutionPlan,
Partitioning,
PhysicalExpr,
@@ -133,12 +131,11 @@ impl ValuesExec {
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
- let eq_properties = EquivalenceProperties::new(schema);
-
PlanProperties::new(
- eq_properties,
+ EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
- ExecutionMode::Bounded,
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
}
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index c6003fe0a8..b66147bf74 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -202,9 +202,11 @@ impl BoundedWindowAggExec {
// Construct properties cache
PlanProperties::new(
- eq_properties, // Equivalence Properties
- output_partitioning, // Output Partitioning
- input.execution_mode(), // Execution Mode
+ eq_properties,
+ output_partitioning,
+ // TODO: Emission type and boundedness information can be enhanced
here
+ input.pipeline_behavior(),
+ input.boundedness(),
)
}
}
diff --git a/datafusion/physical-plan/src/windows/mod.rs
b/datafusion/physical-plan/src/windows/mod.rs
index 222a8bb71a..36c4b9f18d 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -383,7 +383,7 @@ pub fn get_best_fitting_window(
} else {
return Ok(None);
};
- let is_unbounded = input.execution_mode().is_unbounded();
+ let is_unbounded = input.boundedness().is_unbounded();
if !is_unbounded && input_order_mode != InputOrderMode::Sorted {
// Executor has bounded input and `input_order_mode` is not
`InputOrderMode::Sorted`
// in this case removing the sort is not helpful, return:
diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs
b/datafusion/physical-plan/src/windows/window_agg_exec.rs
index c0ac96d22e..b132c32470 100644
--- a/datafusion/physical-plan/src/windows/window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs
@@ -23,15 +23,16 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use super::utils::create_schema;
+use crate::execution_plan::EmissionType;
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use crate::windows::{
calc_requirements, get_ordered_partition_by_indices,
get_partition_by_sort_exprs,
window_equivalence_properties,
};
use crate::{
- ColumnStatistics, DisplayAs, DisplayFormatType, Distribution,
ExecutionMode,
- ExecutionPlan, ExecutionPlanProperties, PhysicalExpr, PlanProperties,
- RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr,
+ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution,
ExecutionPlan,
+ ExecutionPlanProperties, PhysicalExpr, PlanProperties, RecordBatchStream,
+ SendableRecordBatchStream, Statistics, WindowExpr,
};
use arrow::array::ArrayRef;
use arrow::compute::{concat, concat_batches};
@@ -127,16 +128,14 @@ impl WindowAggExec {
// would be either 1 or more than 1 depending on the presence of
repartitioning.
let output_partitioning = input.output_partitioning().clone();
- // Determine execution mode:
- let mode = match input.execution_mode() {
- ExecutionMode::Bounded => ExecutionMode::Bounded,
- ExecutionMode::Unbounded | ExecutionMode::PipelineBreaking => {
- ExecutionMode::PipelineBreaking
- }
- };
-
// Construct properties cache:
- PlanProperties::new(eq_properties, output_partitioning, mode)
+ PlanProperties::new(
+ eq_properties,
+ output_partitioning,
+ // TODO: Emission type and boundedness information can be enhanced
here
+ EmissionType::Final,
+ input.boundedness(),
+ )
}
}
diff --git a/datafusion/physical-plan/src/work_table.rs
b/datafusion/physical-plan/src/work_table.rs
index add3863192..b1dd4d9308 100644
--- a/datafusion/physical-plan/src/work_table.rs
+++ b/datafusion/physical-plan/src/work_table.rs
@@ -24,8 +24,9 @@ use super::{
metrics::{ExecutionPlanMetricsSet, MetricsSet},
SendableRecordBatchStream, Statistics,
};
+use crate::execution_plan::{Boundedness, EmissionType};
use crate::memory::MemoryStream;
-use crate::{DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
PlanProperties};
+use crate::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
@@ -142,12 +143,11 @@ impl WorkTableExec {
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
- let eq_properties = EquivalenceProperties::new(schema);
-
PlanProperties::new(
- eq_properties,
+ EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
- ExecutionMode::Bounded,
+ EmissionType::Incremental,
+ Boundedness::Bounded,
)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]