This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 34dad2ccee Cache `PlanProperties`, add fast-path for
`with_new_children` (#19792)
34dad2ccee is described below
commit 34dad2ccee0e241d4db47afb3a137596b3abf11d
Author: Albert Skalt <[email protected]>
AuthorDate: Tue Feb 24 23:58:06 2026 +0300
Cache `PlanProperties`, add fast-path for `with_new_children` (#19792)
- closes https://github.com/apache/datafusion/issues/19796
This patch aims to implement a fast-path for the
ExecutionPlan::with_new_children function for some plans, moving closer
to a physical plan re-use implementation and improving planning
performance. If the passed children properties are the same as in self,
we do not actually recompute self's properties (which could be costly if
projection mapping is required). Instead, we just replace the children
and re-use self's properties as-is.
To be able to compare two different properties --
ExecutionPlan::properties(...) signature is modified and now returns
`&Arc<PlanProperties>`. If `children` properties are the same in
`with_new_children` -- we clone our properties arc and then a parent
plan will consider our properties as unchanged, doing the same.
- Return `&Arc<PlanProperties>` from `ExecutionPlan::properties(...)`
instead of a reference.
- Implement `with_new_children` fast-path if there is no children
properties changes for all
major plans.
Note: currently, `reset_plan_states` does not allow to re-use plan in
general: it is not
supported for dynamic filters and recursive queries features, as in this
case state reset
should update pointers in the children plans.
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
.../custom_data_source/custom_datasource.rs | 6 +-
.../memory_pool_execution_plan.rs | 4 +-
.../examples/proto/composed_extension_codec.rs | 4 +-
.../examples/relation_planner/table_sample.rs | 6 +-
datafusion/catalog/src/memory/table.rs | 6 +-
datafusion/core/benches/reset_plan_states.rs | 2 +
datafusion/core/src/physical_planner.rs | 14 ++--
datafusion/core/tests/custom_sources_cases/mod.rs | 9 ++-
.../provider_filter_pushdown.rs | 9 ++-
.../core/tests/custom_sources_cases/statistics.rs | 6 +-
datafusion/core/tests/fuzz_cases/once_exec.rs | 6 +-
.../physical_optimizer/enforce_distribution.rs | 6 +-
.../tests/physical_optimizer/join_selection.rs | 12 +--
.../tests/physical_optimizer/pushdown_utils.rs | 2 +-
.../core/tests/physical_optimizer/test_utils.rs | 8 +-
.../core/tests/user_defined/insert_operation.rs | 20 ++---
.../core/tests/user_defined/user_defined_plan.rs | 10 ++-
datafusion/datasource/src/sink.rs | 6 +-
datafusion/datasource/src/source.rs | 24 +++---
datafusion/ffi/src/execution_plan.rs | 20 ++---
datafusion/ffi/src/tests/async_provider.rs | 8 +-
.../src/equivalence/properties/mod.rs | 7 +-
datafusion/physical-optimizer/src/ensure_coop.rs | 6 +-
.../physical-optimizer/src/output_requirements.rs | 6 +-
datafusion/physical-plan/src/aggregates/mod.rs | 34 +++++---
datafusion/physical-plan/src/analyze.rs | 6 +-
datafusion/physical-plan/src/async_func.rs | 25 ++++--
datafusion/physical-plan/src/buffer.rs | 23 ++++--
datafusion/physical-plan/src/coalesce_batches.rs | 25 ++++--
.../physical-plan/src/coalesce_partitions.rs | 26 +++++--
datafusion/physical-plan/src/coop.rs | 24 ++++--
datafusion/physical-plan/src/display.rs | 2 +-
datafusion/physical-plan/src/empty.rs | 8 +-
datafusion/physical-plan/src/execution_plan.rs | 57 ++++++++++++--
datafusion/physical-plan/src/explain.rs | 6 +-
datafusion/physical-plan/src/filter.rs | 27 +++++--
datafusion/physical-plan/src/joins/cross_join.rs | 28 +++++--
.../physical-plan/src/joins/hash_join/exec.rs | 38 +++++----
.../physical-plan/src/joins/nested_loop_join.rs | 29 ++++++-
.../src/joins/piecewise_merge_join/exec.rs | 90 +++++++++++++++-------
.../src/joins/sort_merge_join/exec.rs | 23 +++++-
.../physical-plan/src/joins/symmetric_hash_join.rs | 22 +++++-
datafusion/physical-plan/src/limit.rs | 47 ++++++++---
datafusion/physical-plan/src/memory.rs | 19 ++---
datafusion/physical-plan/src/placeholder_row.rs | 8 +-
datafusion/physical-plan/src/projection.rs | 20 ++++-
datafusion/physical-plan/src/recursive_query.rs | 6 +-
datafusion/physical-plan/src/repartition/mod.rs | 30 ++++++--
datafusion/physical-plan/src/sorts/partial_sort.rs | 27 ++++---
datafusion/physical-plan/src/sorts/sort.rs | 53 +++++++------
.../src/sorts/sort_preserving_merge.rs | 31 +++++---
datafusion/physical-plan/src/streaming.rs | 8 +-
datafusion/physical-plan/src/test.rs | 14 ++--
datafusion/physical-plan/src/test/exec.rs | 38 ++++-----
datafusion/physical-plan/src/union.rs | 39 ++++++++--
datafusion/physical-plan/src/unnest.rs | 24 ++++--
.../src/windows/bounded_window_agg_exec.rs | 20 ++++-
.../physical-plan/src/windows/window_agg_exec.rs | 24 ++++--
datafusion/physical-plan/src/work_table.rs | 8 +-
.../library-user-guide/custom-table-providers.md | 6 +-
docs/source/library-user-guide/upgrading/53.0.0.md | 63 +++++++++++++++
61 files changed, 832 insertions(+), 353 deletions(-)
diff --git
a/datafusion-examples/examples/custom_data_source/custom_datasource.rs
b/datafusion-examples/examples/custom_data_source/custom_datasource.rs
index b276ae32cf..7abb39e1a7 100644
--- a/datafusion-examples/examples/custom_data_source/custom_datasource.rs
+++ b/datafusion-examples/examples/custom_data_source/custom_datasource.rs
@@ -192,7 +192,7 @@ impl TableProvider for CustomDataSource {
struct CustomExec {
db: CustomDataSource,
projected_schema: SchemaRef,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl CustomExec {
@@ -207,7 +207,7 @@ impl CustomExec {
Self {
db,
projected_schema,
- cache,
+ cache: Arc::new(cache),
}
}
@@ -238,7 +238,7 @@ impl ExecutionPlan for CustomExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
diff --git
a/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs
b/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs
index e51ba46a33..4c05cd2fb1 100644
---
a/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs
+++
b/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs
@@ -199,7 +199,7 @@ impl ExternalBatchBufferer {
struct BufferingExecutionPlan {
schema: SchemaRef,
input: Arc<dyn ExecutionPlan>,
- properties: PlanProperties,
+ properties: Arc<PlanProperties>,
}
impl BufferingExecutionPlan {
@@ -233,7 +233,7 @@ impl ExecutionPlan for BufferingExecutionPlan {
self.schema.clone()
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}
diff --git a/datafusion-examples/examples/proto/composed_extension_codec.rs
b/datafusion-examples/examples/proto/composed_extension_codec.rs
index f3910d461b..b4f3d4f098 100644
--- a/datafusion-examples/examples/proto/composed_extension_codec.rs
+++ b/datafusion-examples/examples/proto/composed_extension_codec.rs
@@ -106,7 +106,7 @@ impl ExecutionPlan for ParentExec {
self
}
- fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
+ fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
unreachable!()
}
@@ -182,7 +182,7 @@ impl ExecutionPlan for ChildExec {
self
}
- fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
+ fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
unreachable!()
}
diff --git a/datafusion-examples/examples/relation_planner/table_sample.rs
b/datafusion-examples/examples/relation_planner/table_sample.rs
index 657432ef31..895f2fdd4f 100644
--- a/datafusion-examples/examples/relation_planner/table_sample.rs
+++ b/datafusion-examples/examples/relation_planner/table_sample.rs
@@ -618,7 +618,7 @@ pub struct SampleExec {
upper_bound: f64,
seed: u64,
metrics: ExecutionPlanMetricsSet,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl SampleExec {
@@ -656,7 +656,7 @@ impl SampleExec {
upper_bound,
seed,
metrics: ExecutionPlanMetricsSet::new(),
- cache,
+ cache: Arc::new(cache),
})
}
@@ -686,7 +686,7 @@ impl ExecutionPlan for SampleExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
diff --git a/datafusion/catalog/src/memory/table.rs
b/datafusion/catalog/src/memory/table.rs
index 7865eb016b..484b5f805e 100644
--- a/datafusion/catalog/src/memory/table.rs
+++ b/datafusion/catalog/src/memory/table.rs
@@ -549,7 +549,7 @@ fn evaluate_filters_to_mask(
struct DmlResultExec {
rows_affected: u64,
schema: SchemaRef,
- properties: PlanProperties,
+ properties: Arc<PlanProperties>,
}
impl DmlResultExec {
@@ -570,7 +570,7 @@ impl DmlResultExec {
Self {
rows_affected,
schema,
- properties,
+ properties: Arc::new(properties),
}
}
}
@@ -604,7 +604,7 @@ impl ExecutionPlan for DmlResultExec {
Arc::clone(&self.schema)
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}
diff --git a/datafusion/core/benches/reset_plan_states.rs
b/datafusion/core/benches/reset_plan_states.rs
index f2f81f755b..5afae7f432 100644
--- a/datafusion/core/benches/reset_plan_states.rs
+++ b/datafusion/core/benches/reset_plan_states.rs
@@ -166,6 +166,8 @@ fn run_reset_states(b: &mut criterion::Bencher, plan:
&Arc<dyn ExecutionPlan>) {
/// making an independent instance of the execution plan to re-execute it,
avoiding
/// re-planning stage.
fn bench_reset_plan_states(c: &mut Criterion) {
+ env_logger::init();
+
let rt = Runtime::new().unwrap();
let ctx = SessionContext::new();
ctx.register_table(
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 4d169a58ef..828b286407 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -3711,13 +3711,15 @@ mod tests {
#[derive(Debug)]
struct NoOpExecutionPlan {
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl NoOpExecutionPlan {
fn new(schema: SchemaRef) -> Self {
let cache = Self::compute_properties(schema);
- Self { cache }
+ Self {
+ cache: Arc::new(cache),
+ }
}
/// This function creates the cache object that stores the plan
properties such as schema, equivalence properties, ordering, partitioning, etc.
@@ -3755,7 +3757,7 @@ mod tests {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -3909,7 +3911,7 @@ digraph {
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
self.0.iter().collect::<Vec<_>>()
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
unimplemented!()
}
fn execute(
@@ -3958,7 +3960,7 @@ digraph {
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
unimplemented!()
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
unimplemented!()
}
fn execute(
@@ -4079,7 +4081,7 @@ digraph {
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
unimplemented!()
}
fn execute(
diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs
b/datafusion/core/tests/custom_sources_cases/mod.rs
index ec0b9e253d..f51d0a1e36 100644
--- a/datafusion/core/tests/custom_sources_cases/mod.rs
+++ b/datafusion/core/tests/custom_sources_cases/mod.rs
@@ -79,7 +79,7 @@ struct CustomTableProvider;
#[derive(Debug, Clone)]
struct CustomExecutionPlan {
projection: Option<Vec<usize>>,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl CustomExecutionPlan {
@@ -88,7 +88,10 @@ impl CustomExecutionPlan {
let schema =
project_schema(&schema, projection.as_ref()).expect("projected
schema");
let cache = Self::compute_properties(schema);
- Self { projection, cache }
+ Self {
+ projection,
+ cache: Arc::new(cache),
+ }
}
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
@@ -157,7 +160,7 @@ impl ExecutionPlan for CustomExecutionPlan {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
diff --git
a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
index b54a57b033..96357d3103 100644
--- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
+++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
@@ -62,13 +62,16 @@ fn create_batch(value: i32, num_rows: usize) ->
Result<RecordBatch> {
#[derive(Debug)]
struct CustomPlan {
batches: Vec<RecordBatch>,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl CustomPlan {
fn new(schema: SchemaRef, batches: Vec<RecordBatch>) -> Self {
let cache = Self::compute_properties(schema);
- Self { batches, cache }
+ Self {
+ batches,
+ cache: Arc::new(cache),
+ }
}
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
@@ -109,7 +112,7 @@ impl ExecutionPlan for CustomPlan {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs
b/datafusion/core/tests/custom_sources_cases/statistics.rs
index e81cd9f6b8..03513ec730 100644
--- a/datafusion/core/tests/custom_sources_cases/statistics.rs
+++ b/datafusion/core/tests/custom_sources_cases/statistics.rs
@@ -45,7 +45,7 @@ use async_trait::async_trait;
struct StatisticsValidation {
stats: Statistics,
schema: Arc<Schema>,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl StatisticsValidation {
@@ -59,7 +59,7 @@ impl StatisticsValidation {
Self {
stats,
schema,
- cache,
+ cache: Arc::new(cache),
}
}
@@ -158,7 +158,7 @@ impl ExecutionPlan for StatisticsValidation {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
diff --git a/datafusion/core/tests/fuzz_cases/once_exec.rs
b/datafusion/core/tests/fuzz_cases/once_exec.rs
index 49e2caaa74..69edf9be1d 100644
--- a/datafusion/core/tests/fuzz_cases/once_exec.rs
+++ b/datafusion/core/tests/fuzz_cases/once_exec.rs
@@ -32,7 +32,7 @@ use std::sync::{Arc, Mutex};
pub struct OnceExec {
/// the results to send back
stream: Mutex<Option<SendableRecordBatchStream>>,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl Debug for OnceExec {
@@ -46,7 +46,7 @@ impl OnceExec {
let cache = Self::compute_properties(stream.schema());
Self {
stream: Mutex::new(Some(stream)),
- cache,
+ cache: Arc::new(cache),
}
}
@@ -83,7 +83,7 @@ impl ExecutionPlan for OnceExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
index 30edd71966..5df634c70b 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
@@ -119,7 +119,7 @@ macro_rules! assert_plan {
struct SortRequiredExec {
input: Arc<dyn ExecutionPlan>,
expr: LexOrdering,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl SortRequiredExec {
@@ -131,7 +131,7 @@ impl SortRequiredExec {
Self {
input,
expr: requirement,
- cache,
+ cache: Arc::new(cache),
}
}
@@ -173,7 +173,7 @@ impl ExecutionPlan for SortRequiredExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs
b/datafusion/core/tests/physical_optimizer/join_selection.rs
index 567af64c6a..ef0bbfc7f4 100644
--- a/datafusion/core/tests/physical_optimizer/join_selection.rs
+++ b/datafusion/core/tests/physical_optimizer/join_selection.rs
@@ -979,7 +979,7 @@ impl RecordBatchStream for UnboundedStream {
pub struct UnboundedExec {
batch_produce: Option<usize>,
batch: RecordBatch,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl UnboundedExec {
@@ -995,7 +995,7 @@ impl UnboundedExec {
Self {
batch_produce,
batch,
- cache,
+ cache: Arc::new(cache),
}
}
@@ -1052,7 +1052,7 @@ impl ExecutionPlan for UnboundedExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -1091,7 +1091,7 @@ pub enum SourceType {
pub struct StatisticsExec {
stats: Statistics,
schema: Arc<Schema>,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl StatisticsExec {
@@ -1105,7 +1105,7 @@ impl StatisticsExec {
Self {
stats,
schema: Arc::new(schema),
- cache,
+ cache: Arc::new(cache),
}
}
@@ -1153,7 +1153,7 @@ impl ExecutionPlan for StatisticsExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
diff --git a/datafusion/core/tests/physical_optimizer/pushdown_utils.rs
b/datafusion/core/tests/physical_optimizer/pushdown_utils.rs
index 524d33ae6e..91ae6c414e 100644
--- a/datafusion/core/tests/physical_optimizer/pushdown_utils.rs
+++ b/datafusion/core/tests/physical_optimizer/pushdown_utils.rs
@@ -474,7 +474,7 @@ impl ExecutionPlan for TestNode {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
self.input.properties()
}
diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs
b/datafusion/core/tests/physical_optimizer/test_utils.rs
index feac8190ff..f8c91ba272 100644
--- a/datafusion/core/tests/physical_optimizer/test_utils.rs
+++ b/datafusion/core/tests/physical_optimizer/test_utils.rs
@@ -454,7 +454,7 @@ impl ExecutionPlan for RequirementsTestExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
self.input.properties()
}
@@ -825,7 +825,7 @@ pub fn sort_expr_named(name: &str, index: usize) ->
PhysicalSortExpr {
pub struct TestScan {
schema: SchemaRef,
output_ordering: Vec<LexOrdering>,
- plan_properties: PlanProperties,
+ plan_properties: Arc<PlanProperties>,
// Store the requested ordering for display
requested_ordering: Option<LexOrdering>,
}
@@ -859,7 +859,7 @@ impl TestScan {
Self {
schema,
output_ordering,
- plan_properties,
+ plan_properties: Arc::new(plan_properties),
requested_ordering: None,
}
}
@@ -915,7 +915,7 @@ impl ExecutionPlan for TestScan {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.plan_properties
}
diff --git a/datafusion/core/tests/user_defined/insert_operation.rs
b/datafusion/core/tests/user_defined/insert_operation.rs
index 7ad00dece1..4d2a31ca1f 100644
--- a/datafusion/core/tests/user_defined/insert_operation.rs
+++ b/datafusion/core/tests/user_defined/insert_operation.rs
@@ -122,20 +122,22 @@ impl TableProvider for TestInsertTableProvider {
#[derive(Debug)]
struct TestInsertExec {
op: InsertOp,
- plan_properties: PlanProperties,
+ plan_properties: Arc<PlanProperties>,
}
impl TestInsertExec {
fn new(op: InsertOp) -> Self {
Self {
op,
- plan_properties: PlanProperties::new(
- EquivalenceProperties::new(make_count_schema()),
- Partitioning::UnknownPartitioning(1),
- EmissionType::Incremental,
- Boundedness::Bounded,
- )
- .with_scheduling_type(SchedulingType::Cooperative),
+ plan_properties: Arc::new(
+ PlanProperties::new(
+ EquivalenceProperties::new(make_count_schema()),
+ Partitioning::UnknownPartitioning(1),
+ EmissionType::Incremental,
+ Boundedness::Bounded,
+ )
+ .with_scheduling_type(SchedulingType::Cooperative),
+ ),
}
}
}
@@ -159,7 +161,7 @@ impl ExecutionPlan for TestInsertExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.plan_properties
}
diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs
b/datafusion/core/tests/user_defined/user_defined_plan.rs
index 990b05c49d..f97923ffc5 100644
--- a/datafusion/core/tests/user_defined/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined/user_defined_plan.rs
@@ -653,13 +653,17 @@ struct TopKExec {
input: Arc<dyn ExecutionPlan>,
/// The maximum number of values
k: usize,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl TopKExec {
fn new(input: Arc<dyn ExecutionPlan>, k: usize) -> Self {
let cache = Self::compute_properties(input.schema());
- Self { input, k, cache }
+ Self {
+ input,
+ k,
+ cache: Arc::new(cache),
+ }
}
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
@@ -704,7 +708,7 @@ impl ExecutionPlan for TopKExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
diff --git a/datafusion/datasource/src/sink.rs
b/datafusion/datasource/src/sink.rs
index 5acc89722b..f149109dff 100644
--- a/datafusion/datasource/src/sink.rs
+++ b/datafusion/datasource/src/sink.rs
@@ -89,7 +89,7 @@ pub struct DataSinkExec {
count_schema: SchemaRef,
/// Optional required sort order for output data.
sort_order: Option<LexRequirement>,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl Debug for DataSinkExec {
@@ -117,7 +117,7 @@ impl DataSinkExec {
sink,
count_schema: make_count_schema(),
sort_order,
- cache,
+ cache: Arc::new(cache),
}
}
@@ -174,7 +174,7 @@ impl ExecutionPlan for DataSinkExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
diff --git a/datafusion/datasource/src/source.rs
b/datafusion/datasource/src/source.rs
index a4e27dac76..05028ed0f4 100644
--- a/datafusion/datasource/src/source.rs
+++ b/datafusion/datasource/src/source.rs
@@ -74,8 +74,8 @@ use datafusion_physical_plan::filter_pushdown::{
/// ```text
/// ┌─────────────────────┐
-----► execute path
/// │ │
┄┄┄┄┄► init path
-/// │ DataSourceExec │
-/// │ │
+/// │ DataSourceExec │
+/// │ │
/// └───────▲─────────────┘
/// ┊ │
/// ┊ │
@@ -230,7 +230,7 @@ pub struct DataSourceExec {
/// The source of the data -- for example, `FileScanConfig` or
`MemorySourceConfig`
data_source: Arc<dyn DataSource>,
/// Cached plan properties such as sort order
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl DisplayAs for DataSourceExec {
@@ -254,7 +254,7 @@ impl ExecutionPlan for DataSourceExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -324,7 +324,7 @@ impl ExecutionPlan for DataSourceExec {
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn
ExecutionPlan>> {
let data_source = self.data_source.with_fetch(limit)?;
- let cache = self.cache.clone();
+ let cache = Arc::clone(&self.cache);
Some(Arc::new(Self { data_source, cache }))
}
@@ -368,7 +368,8 @@ impl ExecutionPlan for DataSourceExec {
let mut new_node = self.clone();
new_node.data_source = data_source;
// Re-compute properties since we have new filters which will
impact equivalence info
- new_node.cache =
Self::compute_properties(&new_node.data_source);
+ new_node.cache =
+ Arc::new(Self::compute_properties(&new_node.data_source));
Ok(FilterPushdownPropagation {
filters: res.filters,
@@ -416,7 +417,10 @@ impl DataSourceExec {
// Default constructor for `DataSourceExec`, setting the `cooperative`
flag to `true`.
pub fn new(data_source: Arc<dyn DataSource>) -> Self {
let cache = Self::compute_properties(&data_source);
- Self { data_source, cache }
+ Self {
+ data_source,
+ cache: Arc::new(cache),
+ }
}
/// Return the source object
@@ -425,20 +429,20 @@ impl DataSourceExec {
}
pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) ->
Self {
- self.cache = Self::compute_properties(&data_source);
+ self.cache = Arc::new(Self::compute_properties(&data_source));
self.data_source = data_source;
self
}
/// Assign constraints
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
- self.cache = self.cache.with_constraints(constraints);
+ Arc::make_mut(&mut self.cache).set_constraints(constraints);
self
}
/// Assign output partitioning
pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
- self.cache = self.cache.with_partitioning(partitioning);
+ Arc::make_mut(&mut self.cache).partitioning = partitioning;
self
}
diff --git a/datafusion/ffi/src/execution_plan.rs
b/datafusion/ffi/src/execution_plan.rs
index 94e1d03d08..524d8b4b6b 100644
--- a/datafusion/ffi/src/execution_plan.rs
+++ b/datafusion/ffi/src/execution_plan.rs
@@ -90,7 +90,7 @@ impl FFI_ExecutionPlan {
unsafe extern "C" fn properties_fn_wrapper(
plan: &FFI_ExecutionPlan,
) -> FFI_PlanProperties {
- plan.inner().properties().into()
+ plan.inner().properties().as_ref().into()
}
unsafe extern "C" fn children_fn_wrapper(
@@ -192,7 +192,7 @@ impl Drop for FFI_ExecutionPlan {
pub struct ForeignExecutionPlan {
name: String,
plan: FFI_ExecutionPlan,
- properties: PlanProperties,
+ properties: Arc<PlanProperties>,
children: Vec<Arc<dyn ExecutionPlan>>,
}
@@ -244,7 +244,7 @@ impl TryFrom<&FFI_ExecutionPlan> for Arc<dyn ExecutionPlan>
{
let plan = ForeignExecutionPlan {
name,
plan: plan.clone(),
- properties,
+ properties: Arc::new(properties),
children,
};
@@ -262,7 +262,7 @@ impl ExecutionPlan for ForeignExecutionPlan {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}
@@ -278,7 +278,7 @@ impl ExecutionPlan for ForeignExecutionPlan {
plan: self.plan.clone(),
name: self.name.clone(),
children,
- properties: self.properties.clone(),
+ properties: Arc::clone(&self.properties),
}))
}
@@ -305,19 +305,19 @@ pub(crate) mod tests {
#[derive(Debug)]
pub struct EmptyExec {
- props: PlanProperties,
+ props: Arc<PlanProperties>,
children: Vec<Arc<dyn ExecutionPlan>>,
}
impl EmptyExec {
pub fn new(schema: arrow::datatypes::SchemaRef) -> Self {
Self {
- props: PlanProperties::new(
+ props: Arc::new(PlanProperties::new(
datafusion::physical_expr::EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(3),
EmissionType::Incremental,
Boundedness::Bounded,
- ),
+ )),
children: Vec::default(),
}
}
@@ -342,7 +342,7 @@ pub(crate) mod tests {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.props
}
@@ -355,7 +355,7 @@ pub(crate) mod tests {
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(EmptyExec {
- props: self.props.clone(),
+ props: Arc::clone(&self.props),
children,
}))
}
diff --git a/datafusion/ffi/src/tests/async_provider.rs
b/datafusion/ffi/src/tests/async_provider.rs
index 6149736c58..8370cf19e6 100644
--- a/datafusion/ffi/src/tests/async_provider.rs
+++ b/datafusion/ffi/src/tests/async_provider.rs
@@ -162,7 +162,7 @@ impl Drop for AsyncTableProvider {
#[derive(Debug)]
struct AsyncTestExecutionPlan {
- properties: datafusion_physical_plan::PlanProperties,
+ properties: Arc<datafusion_physical_plan::PlanProperties>,
batch_request: mpsc::Sender<bool>,
batch_receiver: broadcast::Receiver<Option<RecordBatch>>,
}
@@ -173,12 +173,12 @@ impl AsyncTestExecutionPlan {
batch_receiver: broadcast::Receiver<Option<RecordBatch>>,
) -> Self {
Self {
- properties: datafusion_physical_plan::PlanProperties::new(
+ properties: Arc::new(datafusion_physical_plan::PlanProperties::new(
EquivalenceProperties::new(super::create_test_schema()),
Partitioning::UnknownPartitioning(3),
datafusion_physical_plan::execution_plan::EmissionType::Incremental,
datafusion_physical_plan::execution_plan::Boundedness::Bounded,
- ),
+ )),
batch_request,
batch_receiver,
}
@@ -194,7 +194,7 @@ impl ExecutionPlan for AsyncTestExecutionPlan {
self
}
- fn properties(&self) -> &datafusion_physical_plan::PlanProperties {
+ fn properties(&self) -> &Arc<datafusion_physical_plan::PlanProperties> {
&self.properties
}
diff --git a/datafusion/physical-expr/src/equivalence/properties/mod.rs
b/datafusion/physical-expr/src/equivalence/properties/mod.rs
index 996bc4b08f..a98341b107 100644
--- a/datafusion/physical-expr/src/equivalence/properties/mod.rs
+++ b/datafusion/physical-expr/src/equivalence/properties/mod.rs
@@ -207,8 +207,13 @@ impl EquivalenceProperties {
}
/// Adds constraints to the properties.
- pub fn with_constraints(mut self, constraints: Constraints) -> Self {
+ pub fn set_constraints(&mut self, constraints: Constraints) {
self.constraints = constraints;
+ }
+
+ /// Adds constraints to the properties.
+ pub fn with_constraints(mut self, constraints: Constraints) -> Self {
+ self.set_constraints(constraints);
self
}
diff --git a/datafusion/physical-optimizer/src/ensure_coop.rs
b/datafusion/physical-optimizer/src/ensure_coop.rs
index 5d00d00bce..ef8946f9a4 100644
--- a/datafusion/physical-optimizer/src/ensure_coop.rs
+++ b/datafusion/physical-optimizer/src/ensure_coop.rs
@@ -281,7 +281,7 @@ mod tests {
input: Arc<dyn ExecutionPlan>,
scheduling_type: SchedulingType,
evaluation_type: EvaluationType,
- properties: PlanProperties,
+ properties: Arc<PlanProperties>,
}
impl DummyExec {
@@ -305,7 +305,7 @@ mod tests {
input,
scheduling_type,
evaluation_type,
- properties,
+ properties: Arc::new(properties),
}
}
}
@@ -327,7 +327,7 @@ mod tests {
fn as_any(&self) -> &dyn Any {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/physical-optimizer/src/output_requirements.rs
b/datafusion/physical-optimizer/src/output_requirements.rs
index afc0ee1a33..75721951f8 100644
--- a/datafusion/physical-optimizer/src/output_requirements.rs
+++ b/datafusion/physical-optimizer/src/output_requirements.rs
@@ -98,7 +98,7 @@ pub struct OutputRequirementExec {
input: Arc<dyn ExecutionPlan>,
order_requirement: Option<OrderingRequirements>,
dist_requirement: Distribution,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
fetch: Option<usize>,
}
@@ -114,7 +114,7 @@ impl OutputRequirementExec {
input,
order_requirement: requirements,
dist_requirement,
- cache,
+ cache: Arc::new(cache),
fetch,
}
}
@@ -200,7 +200,7 @@ impl ExecutionPlan for OutputRequirementExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index 93bc8678b0..4b3ac1955d 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -33,7 +33,7 @@ use crate::filter_pushdown::{
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::{
DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode,
- SendableRecordBatchStream, Statistics,
+ SendableRecordBatchStream, Statistics, check_if_same_properties,
};
use datafusion_common::config::ConfigOptions;
use datafusion_physical_expr::utils::collect_columns;
@@ -651,7 +651,7 @@ pub struct AggregateExec {
required_input_ordering: Option<OrderingRequirements>,
/// Describes how the input is ordered relative to the group by columns
input_order_mode: InputOrderMode,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
/// During initialization, if the plan supports dynamic filtering (see
[`AggrDynFilter`]),
/// it is set to `Some(..)` regardless of whether it can be pushed down to
a child node.
///
@@ -675,7 +675,7 @@ impl AggregateExec {
required_input_ordering: self.required_input_ordering.clone(),
metrics: ExecutionPlanMetricsSet::new(),
input_order_mode: self.input_order_mode.clone(),
- cache: self.cache.clone(),
+ cache: Arc::clone(&self.cache),
mode: self.mode,
group_by: Arc::clone(&self.group_by),
filter_expr: Arc::clone(&self.filter_expr),
@@ -695,7 +695,7 @@ impl AggregateExec {
required_input_ordering: self.required_input_ordering.clone(),
metrics: ExecutionPlanMetricsSet::new(),
input_order_mode: self.input_order_mode.clone(),
- cache: self.cache.clone(),
+ cache: Arc::clone(&self.cache),
mode: self.mode,
group_by: Arc::clone(&self.group_by),
aggr_expr: Arc::clone(&self.aggr_expr),
@@ -836,7 +836,7 @@ impl AggregateExec {
required_input_ordering,
limit_options: None,
input_order_mode,
- cache,
+ cache: Arc::new(cache),
dynamic_filter: None,
};
@@ -1194,6 +1194,17 @@ impl AggregateExec {
_ => Precision::Absent,
}
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ Self {
+ input: children.swap_remove(0),
+ metrics: ExecutionPlanMetricsSet::new(),
+ ..Self::clone(self)
+ }
+ }
}
impl DisplayAs for AggregateExec {
@@ -1332,7 +1343,7 @@ impl ExecutionPlan for AggregateExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -1375,6 +1386,8 @@ impl ExecutionPlan for AggregateExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ check_if_same_properties!(self, children);
+
let mut me = AggregateExec::try_new_with_schema(
self.mode,
Arc::clone(&self.group_by),
@@ -2407,14 +2420,17 @@ mod tests {
struct TestYieldingExec {
/// True if this exec should yield back to runtime the first time it
is polled
pub yield_first: bool,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl TestYieldingExec {
fn new(yield_first: bool) -> Self {
let schema = some_data().0;
let cache = Self::compute_properties(schema);
- Self { yield_first, cache }
+ Self {
+ yield_first,
+ cache: Arc::new(cache),
+ }
}
/// This function creates the cache object that stores the plan
properties such as schema, equivalence properties, ordering, partitioning, etc.
@@ -2455,7 +2471,7 @@ mod tests {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
diff --git a/datafusion/physical-plan/src/analyze.rs
b/datafusion/physical-plan/src/analyze.rs
index 1fb8f93a38..eca31ea0e1 100644
--- a/datafusion/physical-plan/src/analyze.rs
+++ b/datafusion/physical-plan/src/analyze.rs
@@ -51,7 +51,7 @@ pub struct AnalyzeExec {
pub(crate) input: Arc<dyn ExecutionPlan>,
/// The output schema for RecordBatches of this exec node
schema: SchemaRef,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl AnalyzeExec {
@@ -70,7 +70,7 @@ impl AnalyzeExec {
metric_types,
input,
schema,
- cache,
+ cache: Arc::new(cache),
}
}
@@ -131,7 +131,7 @@ impl ExecutionPlan for AnalyzeExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
diff --git a/datafusion/physical-plan/src/async_func.rs
b/datafusion/physical-plan/src/async_func.rs
index cfb5c34c67..72741f4314 100644
--- a/datafusion/physical-plan/src/async_func.rs
+++ b/datafusion/physical-plan/src/async_func.rs
@@ -20,6 +20,7 @@ use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::stream::RecordBatchStreamAdapter;
use crate::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
PlanProperties,
+ check_if_same_properties,
};
use arrow::array::RecordBatch;
use arrow_schema::{Fields, Schema, SchemaRef};
@@ -45,12 +46,12 @@ use std::task::{Context, Poll, ready};
///
/// The schema of the output of the AsyncFuncExec is:
/// Input columns followed by one column for each async expression
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct AsyncFuncExec {
/// The async expressions to evaluate
async_exprs: Vec<Arc<AsyncFuncExpr>>,
input: Arc<dyn ExecutionPlan>,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
metrics: ExecutionPlanMetricsSet,
}
@@ -84,7 +85,7 @@ impl AsyncFuncExec {
Ok(Self {
input,
async_exprs,
- cache,
+ cache: Arc::new(cache),
metrics: ExecutionPlanMetricsSet::new(),
})
}
@@ -113,6 +114,17 @@ impl AsyncFuncExec {
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ Self {
+ input: children.swap_remove(0),
+ metrics: ExecutionPlanMetricsSet::new(),
+ ..Self::clone(self)
+ }
+ }
}
impl DisplayAs for AsyncFuncExec {
@@ -149,7 +161,7 @@ impl ExecutionPlan for AsyncFuncExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -159,16 +171,17 @@ impl ExecutionPlan for AsyncFuncExec {
fn with_new_children(
self: Arc<Self>,
- children: Vec<Arc<dyn ExecutionPlan>>,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
assert_eq_or_internal_err!(
children.len(),
1,
"AsyncFuncExec wrong number of children"
);
+ check_if_same_properties!(self, children);
Ok(Arc::new(AsyncFuncExec::try_new(
self.async_exprs.clone(),
- Arc::clone(&children[0]),
+ children.swap_remove(0),
)?))
}
diff --git a/datafusion/physical-plan/src/buffer.rs
b/datafusion/physical-plan/src/buffer.rs
index 3b80f9924e..a59d062929 100644
--- a/datafusion/physical-plan/src/buffer.rs
+++ b/datafusion/physical-plan/src/buffer.rs
@@ -27,6 +27,7 @@ use crate::projection::ProjectionExec;
use crate::stream::RecordBatchStreamAdapter;
use crate::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
SortOrderPushdownResult,
+ check_if_same_properties,
};
use arrow::array::RecordBatch;
use datafusion_common::config::ConfigOptions;
@@ -92,7 +93,7 @@ use tokio::sync::{OwnedSemaphorePermit, Semaphore};
#[derive(Debug, Clone)]
pub struct BufferExec {
input: Arc<dyn ExecutionPlan>,
- properties: PlanProperties,
+ properties: Arc<PlanProperties>,
capacity: usize,
metrics: ExecutionPlanMetricsSet,
}
@@ -100,14 +101,12 @@ pub struct BufferExec {
impl BufferExec {
/// Builds a new [BufferExec] with the provided capacity in bytes.
pub fn new(input: Arc<dyn ExecutionPlan>, capacity: usize) -> Self {
- let properties = input
- .properties()
- .clone()
+ let properties = PlanProperties::clone(input.properties())
.with_scheduling_type(SchedulingType::Cooperative);
Self {
input,
- properties,
+ properties: Arc::new(properties),
capacity,
metrics: ExecutionPlanMetricsSet::new(),
}
@@ -122,6 +121,17 @@ impl BufferExec {
pub fn capacity(&self) -> usize {
self.capacity
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ Self {
+ input: children.swap_remove(0),
+ metrics: ExecutionPlanMetricsSet::new(),
+ ..Self::clone(self)
+ }
+ }
}
impl DisplayAs for BufferExec {
@@ -146,7 +156,7 @@ impl ExecutionPlan for BufferExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}
@@ -166,6 +176,7 @@ impl ExecutionPlan for BufferExec {
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ check_if_same_properties!(self, children);
if children.len() != 1 {
return plan_err!("BufferExec can only have one child");
}
diff --git a/datafusion/physical-plan/src/coalesce_batches.rs
b/datafusion/physical-plan/src/coalesce_batches.rs
index 1356eca783..663b0b51ea 100644
--- a/datafusion/physical-plan/src/coalesce_batches.rs
+++ b/datafusion/physical-plan/src/coalesce_batches.rs
@@ -27,6 +27,7 @@ use super::{DisplayAs, ExecutionPlanProperties,
PlanProperties, Statistics};
use crate::projection::ProjectionExec;
use crate::{
DisplayFormatType, ExecutionPlan, RecordBatchStream,
SendableRecordBatchStream,
+ check_if_same_properties,
};
use arrow::datatypes::SchemaRef;
@@ -71,7 +72,7 @@ pub struct CoalesceBatchesExec {
fetch: Option<usize>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
#[expect(deprecated)]
@@ -84,7 +85,7 @@ impl CoalesceBatchesExec {
target_batch_size,
fetch: None,
metrics: ExecutionPlanMetricsSet::new(),
- cache,
+ cache: Arc::new(cache),
}
}
@@ -115,6 +116,17 @@ impl CoalesceBatchesExec {
input.boundedness(),
)
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ Self {
+ input: children.swap_remove(0),
+ metrics: ExecutionPlanMetricsSet::new(),
+ ..Self::clone(self)
+ }
+ }
}
#[expect(deprecated)]
@@ -159,7 +171,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -177,10 +189,11 @@ impl ExecutionPlan for CoalesceBatchesExec {
fn with_new_children(
self: Arc<Self>,
- children: Vec<Arc<dyn ExecutionPlan>>,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ check_if_same_properties!(self, children);
Ok(Arc::new(
- CoalesceBatchesExec::new(Arc::clone(&children[0]),
self.target_batch_size)
+ CoalesceBatchesExec::new(children.swap_remove(0),
self.target_batch_size)
.with_fetch(self.fetch),
))
}
@@ -218,7 +231,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
target_batch_size: self.target_batch_size,
fetch: limit,
metrics: self.metrics.clone(),
- cache: self.cache.clone(),
+ cache: Arc::clone(&self.cache),
}))
}
diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs
b/datafusion/physical-plan/src/coalesce_partitions.rs
index d1fc58837b..39906d3680 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -31,7 +31,7 @@ use crate::execution_plan::{CardinalityEffect,
EvaluationType, SchedulingType};
use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
use crate::projection::{ProjectionExec, make_with_child};
use crate::sort_pushdown::SortOrderPushdownResult;
-use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
+use crate::{DisplayFormatType, ExecutionPlan, Partitioning,
check_if_same_properties};
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use datafusion_common::config::ConfigOptions;
@@ -47,7 +47,7 @@ pub struct CoalescePartitionsExec {
input: Arc<dyn ExecutionPlan>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
/// Optional number of rows to fetch. Stops producing rows after this fetch
pub(crate) fetch: Option<usize>,
}
@@ -59,7 +59,7 @@ impl CoalescePartitionsExec {
CoalescePartitionsExec {
input,
metrics: ExecutionPlanMetricsSet::new(),
- cache,
+ cache: Arc::new(cache),
fetch: None,
}
}
@@ -100,6 +100,17 @@ impl CoalescePartitionsExec {
.with_evaluation_type(drive)
.with_scheduling_type(scheduling)
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ Self {
+ input: children.swap_remove(0),
+ metrics: ExecutionPlanMetricsSet::new(),
+ ..Self::clone(self)
+ }
+ }
}
impl DisplayAs for CoalescePartitionsExec {
@@ -135,7 +146,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -149,9 +160,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
fn with_new_children(
self: Arc<Self>,
- children: Vec<Arc<dyn ExecutionPlan>>,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- let mut plan = CoalescePartitionsExec::new(Arc::clone(&children[0]));
+ check_if_same_properties!(self, children);
+ let mut plan = CoalescePartitionsExec::new(children.swap_remove(0));
plan.fetch = self.fetch;
Ok(Arc::new(plan))
}
@@ -270,7 +282,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
input: Arc::clone(&self.input),
fetch: limit,
metrics: self.metrics.clone(),
- cache: self.cache.clone(),
+ cache: Arc::clone(&self.cache),
}))
}
diff --git a/datafusion/physical-plan/src/coop.rs
b/datafusion/physical-plan/src/coop.rs
index ce54a451ac..5f0040b3dd 100644
--- a/datafusion/physical-plan/src/coop.rs
+++ b/datafusion/physical-plan/src/coop.rs
@@ -87,7 +87,7 @@ use crate::filter_pushdown::{
use crate::projection::ProjectionExec;
use crate::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
RecordBatchStream,
- SendableRecordBatchStream, SortOrderPushdownResult,
+ SendableRecordBatchStream, SortOrderPushdownResult,
check_if_same_properties,
};
use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
@@ -217,16 +217,15 @@ where
#[derive(Debug, Clone)]
pub struct CooperativeExec {
input: Arc<dyn ExecutionPlan>,
- properties: PlanProperties,
+ properties: Arc<PlanProperties>,
}
impl CooperativeExec {
/// Creates a new `CooperativeExec` operator that wraps the given input
execution plan.
pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
- let properties = input
- .properties()
- .clone()
- .with_scheduling_type(SchedulingType::Cooperative);
+ let properties = PlanProperties::clone(input.properties())
+ .with_scheduling_type(SchedulingType::Cooperative)
+ .into();
Self { input, properties }
}
@@ -235,6 +234,16 @@ impl CooperativeExec {
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ Self {
+ input: children.swap_remove(0),
+ ..Self::clone(self)
+ }
+ }
}
impl DisplayAs for CooperativeExec {
@@ -260,7 +269,7 @@ impl ExecutionPlan for CooperativeExec {
self.input.schema()
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}
@@ -281,6 +290,7 @@ impl ExecutionPlan for CooperativeExec {
1,
"CooperativeExec requires exactly one child"
);
+ check_if_same_properties!(self, children);
Ok(Arc::new(CooperativeExec::new(children.swap_remove(0))))
}
diff --git a/datafusion/physical-plan/src/display.rs
b/datafusion/physical-plan/src/display.rs
index 19698cd4ea..44148f2d0e 100644
--- a/datafusion/physical-plan/src/display.rs
+++ b/datafusion/physical-plan/src/display.rs
@@ -1153,7 +1153,7 @@ mod tests {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
unimplemented!()
}
diff --git a/datafusion/physical-plan/src/empty.rs
b/datafusion/physical-plan/src/empty.rs
index 64808bbc25..e4d4da4e88 100644
--- a/datafusion/physical-plan/src/empty.rs
+++ b/datafusion/physical-plan/src/empty.rs
@@ -44,7 +44,7 @@ pub struct EmptyExec {
schema: SchemaRef,
/// Number of partitions
partitions: usize,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl EmptyExec {
@@ -54,7 +54,7 @@ impl EmptyExec {
EmptyExec {
schema,
partitions: 1,
- cache,
+ cache: Arc::new(cache),
}
}
@@ -63,7 +63,7 @@ impl EmptyExec {
self.partitions = partitions;
// Changing partitions may invalidate output partitioning, so update
it:
let output_partitioning =
Self::output_partitioning_helper(self.partitions);
- self.cache = self.cache.with_partitioning(output_partitioning);
+ Arc::make_mut(&mut self.cache).partitioning = output_partitioning;
self
}
@@ -115,7 +115,7 @@ impl ExecutionPlan for EmptyExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
diff --git a/datafusion/physical-plan/src/execution_plan.rs
b/datafusion/physical-plan/src/execution_plan.rs
index 2ce1e79601..adb3c3af55 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -128,7 +128,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
///
/// This information is available via methods on
[`ExecutionPlanProperties`]
/// trait, which is implemented for all `ExecutionPlan`s.
- fn properties(&self) -> &PlanProperties;
+ fn properties(&self) -> &Arc<PlanProperties>;
/// Returns an error if this individual node does not conform to its
invariants.
/// These invariants are typically only checked in debug mode.
@@ -1050,12 +1050,17 @@ impl PlanProperties {
self
}
- /// Overwrite equivalence properties with its new value.
- pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties)
-> Self {
+ /// Set equivalence properties having mut reference.
+ pub fn set_eq_properties(&mut self, eq_properties: EquivalenceProperties) {
// Changing equivalence properties also changes output ordering, so
// make sure to overwrite it:
self.output_ordering = eq_properties.output_ordering();
self.eq_properties = eq_properties;
+ }
+
+ /// Overwrite equivalence properties with its new value.
+ pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties)
-> Self {
+ self.set_eq_properties(eq_properties);
self
}
@@ -1087,9 +1092,14 @@ impl PlanProperties {
self
}
+ /// Set constraints having mut reference.
+ pub fn set_constraints(&mut self, constraints: Constraints) {
+ self.eq_properties.set_constraints(constraints);
+ }
+
/// Overwrite constraints with its new value.
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
- self.eq_properties = self.eq_properties.with_constraints(constraints);
+ self.set_constraints(constraints);
self
}
@@ -1412,6 +1422,41 @@ pub fn reset_plan_states(plan: Arc<dyn ExecutionPlan>)
-> Result<Arc<dyn Executi
.data()
}
+/// Check if the `plan` children has the same properties as passed `children`.
+/// In this case plan can avoid self properties re-computation when its
children
+/// replace is requested.
+/// The size of `children` must be equal to the size of
`ExecutionPlan::children()`.
+pub fn has_same_children_properties(
+ plan: &Arc<impl ExecutionPlan>,
+ children: &[Arc<dyn ExecutionPlan>],
+) -> Result<bool> {
+ let old_children = plan.children();
+ assert_eq_or_internal_err!(
+ children.len(),
+ old_children.len(),
+ "Wrong number of children"
+ );
+ for (lhs, rhs) in old_children.iter().zip(children.iter()) {
+ if !Arc::ptr_eq(lhs.properties(), rhs.properties()) {
+ return Ok(false);
+ }
+ }
+ Ok(true)
+}
+
+/// Helper macro to avoid properties re-computation if passed children
properties
+/// the same as plan already has. Could be used to implement fast-path for
method
+/// [`ExecutionPlan::with_new_children`].
+#[macro_export]
+macro_rules! check_if_same_properties {
+ ($plan: expr, $children: expr) => {
+ if $crate::execution_plan::has_same_children_properties(&$plan,
&$children)? {
+ let plan = $plan.with_new_children_and_same_properties($children);
+ return Ok(::std::sync::Arc::new(plan));
+ }
+ };
+}
+
/// Utility function yielding a string representation of the given
[`ExecutionPlan`].
pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
let formatted = displayable(plan.as_ref()).indent(true).to_string();
@@ -1474,7 +1519,7 @@ mod tests {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
unimplemented!()
}
@@ -1537,7 +1582,7 @@ mod tests {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
unimplemented!()
}
diff --git a/datafusion/physical-plan/src/explain.rs
b/datafusion/physical-plan/src/explain.rs
index aa3c0afefe..bf21b04846 100644
--- a/datafusion/physical-plan/src/explain.rs
+++ b/datafusion/physical-plan/src/explain.rs
@@ -44,7 +44,7 @@ pub struct ExplainExec {
stringified_plans: Vec<StringifiedPlan>,
/// control which plans to print
verbose: bool,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl ExplainExec {
@@ -59,7 +59,7 @@ impl ExplainExec {
schema,
stringified_plans,
verbose,
- cache,
+ cache: Arc::new(cache),
}
}
@@ -112,7 +112,7 @@ impl ExecutionPlan for ExplainExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
diff --git a/datafusion/physical-plan/src/filter.rs
b/datafusion/physical-plan/src/filter.rs
index 8cadcf9ad5..ecea4e6ebe 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -27,6 +27,7 @@ use super::{
ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
+use crate::check_if_same_properties;
use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
use crate::common::can_project;
use crate::execution_plan::CardinalityEffect;
@@ -84,7 +85,7 @@ pub struct FilterExec {
/// Selectivity for statistics. 0 = no rows, 100 = all rows
default_selectivity: u8,
/// Properties equivalence properties, partitioning, etc.
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
/// The projection indices of the columns in the output schema of join
projection: Option<ProjectionRef>,
/// Target batch size for output batches
@@ -206,7 +207,7 @@ impl FilterExecBuilder {
input: self.input,
metrics: ExecutionPlanMetricsSet::new(),
default_selectivity: self.default_selectivity,
- cache,
+ cache: Arc::new(cache),
projection: self.projection,
batch_size: self.batch_size,
fetch: self.fetch,
@@ -279,7 +280,7 @@ impl FilterExec {
input: Arc::clone(&self.input),
metrics: self.metrics.clone(),
default_selectivity: self.default_selectivity,
- cache: self.cache.clone(),
+ cache: Arc::clone(&self.cache),
projection: self.projection.clone(),
batch_size,
fetch: self.fetch,
@@ -432,6 +433,17 @@ impl FilterExec {
input.boundedness(),
))
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ Self {
+ input: children.swap_remove(0),
+ metrics: ExecutionPlanMetricsSet::new(),
+ ..Self::clone(self)
+ }
+ }
}
impl DisplayAs for FilterExec {
@@ -486,7 +498,7 @@ impl ExecutionPlan for FilterExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -503,6 +515,7 @@ impl ExecutionPlan for FilterExec {
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ check_if_same_properties!(self, children);
let new_input = children.swap_remove(0);
FilterExecBuilder::from(&*self)
.with_input(new_input)
@@ -685,12 +698,12 @@ impl ExecutionPlan for FilterExec {
input: Arc::clone(&filter_input),
metrics: self.metrics.clone(),
default_selectivity: self.default_selectivity,
- cache: Self::compute_properties(
+ cache: Arc::new(Self::compute_properties(
&filter_input,
&new_predicate,
self.default_selectivity,
self.projection.as_deref(),
- )?,
+ )?),
projection: self.projection.clone(),
batch_size: self.batch_size,
fetch: self.fetch,
@@ -710,7 +723,7 @@ impl ExecutionPlan for FilterExec {
input: Arc::clone(&self.input),
metrics: self.metrics.clone(),
default_selectivity: self.default_selectivity,
- cache: self.cache.clone(),
+ cache: Arc::clone(&self.cache),
projection: self.projection.clone(),
batch_size: self.batch_size,
fetch,
diff --git a/datafusion/physical-plan/src/joins/cross_join.rs
b/datafusion/physical-plan/src/joins/cross_join.rs
index d5b540885e..342cb7e70a 100644
--- a/datafusion/physical-plan/src/joins/cross_join.rs
+++ b/datafusion/physical-plan/src/joins/cross_join.rs
@@ -34,7 +34,7 @@ use crate::projection::{
use crate::{
ColumnStatistics, DisplayAs, DisplayFormatType, Distribution,
ExecutionPlan,
ExecutionPlanProperties, PlanProperties, RecordBatchStream,
- SendableRecordBatchStream, Statistics, handle_state,
+ SendableRecordBatchStream, Statistics, check_if_same_properties,
handle_state,
};
use arrow::array::{RecordBatch, RecordBatchOptions};
@@ -94,7 +94,7 @@ pub struct CrossJoinExec {
/// Execution plan metrics
metrics: ExecutionPlanMetricsSet,
/// Properties such as schema, equivalence properties, ordering,
partitioning, etc.
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl CrossJoinExec {
@@ -125,7 +125,7 @@ impl CrossJoinExec {
schema,
left_fut: Default::default(),
metrics: ExecutionPlanMetricsSet::default(),
- cache,
+ cache: Arc::new(cache),
}
}
@@ -192,6 +192,23 @@ impl CrossJoinExec {
&self.right.schema(),
)
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ let left = children.swap_remove(0);
+ let right = children.swap_remove(0);
+
+ Self {
+ left,
+ right,
+ metrics: ExecutionPlanMetricsSet::new(),
+ left_fut: Default::default(),
+ cache: Arc::clone(&self.cache),
+ schema: Arc::clone(&self.schema),
+ }
+ }
}
/// Asynchronously collect the result of the left child
@@ -256,7 +273,7 @@ impl ExecutionPlan for CrossJoinExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -272,6 +289,7 @@ impl ExecutionPlan for CrossJoinExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ check_if_same_properties!(self, children);
Ok(Arc::new(CrossJoinExec::new(
Arc::clone(&children[0]),
Arc::clone(&children[1]),
@@ -285,7 +303,7 @@ impl ExecutionPlan for CrossJoinExec {
schema: Arc::clone(&self.schema),
left_fut: Default::default(), // reset the build side!
metrics: ExecutionPlanMetricsSet::default(),
- cache: self.cache.clone(),
+ cache: Arc::clone(&self.cache),
};
Ok(Arc::new(new_exec))
}
diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs
b/datafusion/physical-plan/src/joins/hash_join/exec.rs
index 0f2b87c945..1d54c89491 100644
--- a/datafusion/physical-plan/src/joins/hash_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs
@@ -23,7 +23,9 @@ use std::sync::{Arc, OnceLock};
use std::{any::Any, vec};
use crate::ExecutionPlanProperties;
-use crate::execution_plan::{EmissionType, boundedness_from_children};
+use crate::execution_plan::{
+ EmissionType, boundedness_from_children, has_same_children_properties,
+};
use crate::filter_pushdown::{
ChildFilterDescription, ChildPushdownResult, FilterDescription,
FilterPushdownPhase,
FilterPushdownPropagation,
@@ -405,7 +407,7 @@ impl HashJoinExecBuilder {
column_indices,
null_equality,
null_aware,
- cache,
+ cache: Arc::new(cache),
dynamic_filter: None,
fetch,
})
@@ -657,7 +659,7 @@ pub struct HashJoinExec {
/// Flag to indicate if this is a null-aware anti join
pub null_aware: bool,
/// Cache holding plan properties like equivalences, output partitioning
etc.
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
/// Dynamic filter for pushing down to the probe side
/// Set when dynamic filter pushdown is detected in
handle_child_pushdown_result.
/// HashJoinExec also needs to keep a shared bounds accumulator for
coordinating updates.
@@ -1085,7 +1087,7 @@ impl ExecutionPlan for HashJoinExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -1146,6 +1148,20 @@ impl ExecutionPlan for HashJoinExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ let cache = if has_same_children_properties(&self, &children)? {
+ Arc::clone(&self.cache)
+ } else {
+ Arc::new(Self::compute_properties(
+ &children[0],
+ &children[1],
+ &self.join_schema,
+ self.join_type,
+ &self.on,
+ self.mode,
+ self.projection.as_deref(),
+ )?)
+ };
+
Ok(Arc::new(HashJoinExec {
left: Arc::clone(&children[0]),
right: Arc::clone(&children[1]),
@@ -1161,15 +1177,7 @@ impl ExecutionPlan for HashJoinExec {
column_indices: self.column_indices.clone(),
null_equality: self.null_equality,
null_aware: self.null_aware,
- cache: Self::compute_properties(
- &children[0],
- &children[1],
- &self.join_schema,
- self.join_type,
- &self.on,
- self.mode,
- self.projection.as_deref(),
- )?,
+ cache,
// Keep the dynamic filter, bounds accumulator will be reset
dynamic_filter: self.dynamic_filter.clone(),
fetch: self.fetch,
@@ -1193,7 +1201,7 @@ impl ExecutionPlan for HashJoinExec {
column_indices: self.column_indices.clone(),
null_equality: self.null_equality,
null_aware: self.null_aware,
- cache: self.cache.clone(),
+ cache: Arc::clone(&self.cache),
// Reset dynamic filter and bounds accumulator to initial state
dynamic_filter: None,
fetch: self.fetch,
@@ -1591,7 +1599,7 @@ impl ExecutionPlan for HashJoinExec {
column_indices: self.column_indices.clone(),
null_equality: self.null_equality,
null_aware: self.null_aware,
- cache: self.cache.clone(),
+ cache: Arc::clone(&self.cache),
dynamic_filter: Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
build_accumulator: OnceLock::new(),
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index 33fec9e181..4fb7dabf67 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -46,6 +46,7 @@ use crate::projection::{
use crate::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
ExecutionPlanProperties,
PlanProperties, RecordBatchStream, SendableRecordBatchStream,
+ check_if_same_properties,
};
use arrow::array::{
@@ -198,7 +199,7 @@ pub struct NestedLoopJoinExec {
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Cache holding plan properties like equivalences, output partitioning
etc.
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
/// Helps to build [`NestedLoopJoinExec`].
@@ -276,7 +277,7 @@ impl NestedLoopJoinExecBuilder {
column_indices,
projection,
metrics: Default::default(),
- cache,
+ cache: Arc::new(cache),
})
}
}
@@ -462,6 +463,27 @@ impl NestedLoopJoinExec {
Ok(plan)
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ let left = children.swap_remove(0);
+ let right = children.swap_remove(0);
+
+ Self {
+ left,
+ right,
+ metrics: ExecutionPlanMetricsSet::new(),
+ build_side_data: Default::default(),
+ cache: Arc::clone(&self.cache),
+ filter: self.filter.clone(),
+ join_type: self.join_type,
+ join_schema: Arc::clone(&self.join_schema),
+ column_indices: self.column_indices.clone(),
+ projection: self.projection.clone(),
+ }
+ }
}
impl DisplayAs for NestedLoopJoinExec {
@@ -516,7 +538,7 @@ impl ExecutionPlan for NestedLoopJoinExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -539,6 +561,7 @@ impl ExecutionPlan for NestedLoopJoinExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ check_if_same_properties!(self, children);
Ok(Arc::new(
NestedLoopJoinExecBuilder::new(
Arc::clone(&children[0]),
diff --git a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs
b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs
index d7ece845e9..abb6e34aa2 100644
--- a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs
@@ -51,7 +51,9 @@ use crate::joins::piecewise_merge_join::utils::{
};
use crate::joins::utils::asymmetric_join_output_partitioning;
use crate::metrics::MetricsSet;
-use crate::{DisplayAs, DisplayFormatType, ExecutionPlanProperties};
+use crate::{
+ DisplayAs, DisplayFormatType, ExecutionPlanProperties,
check_if_same_properties,
+};
use crate::{
ExecutionPlan, PlanProperties,
joins::{
@@ -86,7 +88,7 @@ use crate::{
/// Both sides are sorted so that we can iterate from index 0 to the end on
each side. This ordering ensures
/// that when we find the first matching pair of rows, we can emit the current
stream row joined with all remaining
/// probe rows from the match position onward, without rescanning earlier
probe rows.
-///
+///
/// For `<` and `<=` operators, both inputs are sorted in **descending**
order, while for `>` and `>=` operators
/// they are sorted in **ascending** order. This choice ensures that the
pointer on the buffered side can advance
/// monotonically as we stream new batches from the stream side.
@@ -129,34 +131,34 @@ use crate::{
///
/// Processing Row 1:
///
-/// Sorted Buffered Side Sorted
Streamed Side
-/// ┌──────────────────┐
┌──────────────────┐
-/// 1 │ 100 │ 1 │
100 │
-/// ├──────────────────┤
├──────────────────┤
-/// 2 │ 200 │ ─┐ 2 │
200 │
-/// ├──────────────────┤ │ For row 1 on streamed side with
├──────────────────┤
-/// 3 │ 200 │ │ value 100, we emit rows 2 - 5. 3 │
500 │
+/// Sorted Buffered Side Sorted
Streamed Side
+/// ┌──────────────────┐
┌──────────────────┐
+/// 1 │ 100 │ 1 │
100 │
+/// ├──────────────────┤
├──────────────────┤
+/// 2 │ 200 │ ─┐ 2 │
200 │
+/// ├──────────────────┤ │ For row 1 on streamed side with
├──────────────────┤
+/// 3 │ 200 │ │ value 100, we emit rows 2 - 5. 3 │
500 │
/// ├──────────────────┤ │ as matches when the operator is
└──────────────────┘
/// 4 │ 300 │ │ `Operator::Lt` (<) Emitting all
/// ├──────────────────┤ │ rows after the first match (row
/// 5 │ 400 │ ─┘ 2 buffered side; 100 < 200)
-/// └──────────────────┘
+/// └──────────────────┘
///
/// Processing Row 2:
/// By sorting the streamed side we know
///
-/// Sorted Buffered Side Sorted
Streamed Side
-/// ┌──────────────────┐
┌──────────────────┐
-/// 1 │ 100 │ 1 │
100 │
-/// ├──────────────────┤
├──────────────────┤
-/// 2 │ 200 │ <- Start here when probing for the 2 │
200 │
-/// ├──────────────────┤ streamed side row 2.
├──────────────────┤
-/// 3 │ 200 │ 3 │
500 │
+/// Sorted Buffered Side Sorted
Streamed Side
+/// ┌──────────────────┐
┌──────────────────┐
+/// 1 │ 100 │ 1 │
100 │
+/// ├──────────────────┤
├──────────────────┤
+/// 2 │ 200 │ <- Start here when probing for the 2 │
200 │
+/// ├──────────────────┤ streamed side row 2.
├──────────────────┤
+/// 3 │ 200 │ 3 │
500 │
/// ├──────────────────┤
└──────────────────┘
-/// 4 │ 300 │
-/// ├──────────────────┤
+/// 4 │ 300 │
+/// ├──────────────────┤
/// 5 │ 400 │
-/// └──────────────────┘
+/// └──────────────────┘
/// ```
///
/// ## Existence Joins (Semi, Anti, Mark)
@@ -202,10 +204,10 @@ use crate::{
/// 1 │ 100 │ 1 │ 500 │
/// ├──────────────────┤ ├──────────────────┤
/// 2 │ 200 │ 2 │ 200 │
-/// ├──────────────────┤ ├──────────────────┤
+/// ├──────────────────┤ ├──────────────────┤
/// 3 │ 200 │ 3 │ 300 │
/// ├──────────────────┤ └──────────────────┘
-/// 4 │ 300 │ ─┐
+/// 4 │ 300 │ ─┐
/// ├──────────────────┤ | We emit matches for row 4 - 5
/// 5 │ 400 │ ─┘ on the buffered side.
/// └──────────────────┘
@@ -236,11 +238,11 @@ use crate::{
///
/// # Mark Join:
/// Sorts the probe side, then computes the min/max range of the probe keys
and scans the buffered side only
-/// within that range.
+/// within that range.
/// Complexity: `O(|S| + scan(R[range]))`.
///
/// ## Nested Loop Join
-/// Compares every row from `S` with every row from `R`.
+/// Compares every row from `S` with every row from `R`.
/// Complexity: `O(|S| * |R|)`.
///
/// ## Nested Loop Join
@@ -273,13 +275,12 @@ pub struct PiecewiseMergeJoinExec {
left_child_plan_required_order: LexOrdering,
/// The right sort order, descending for `<`, `<=` operations + ascending
for `>`, `>=` operations
/// Unsorted for mark joins
- #[expect(dead_code)]
right_batch_required_orders: LexOrdering,
/// This determines the sort order of all join columns used in sorting the
stream and buffered execution plans.
sort_options: SortOptions,
/// Cache holding plan properties like equivalences, output partitioning
etc.
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
/// Number of partitions to process
num_partitions: usize,
}
@@ -373,7 +374,7 @@ impl PiecewiseMergeJoinExec {
left_child_plan_required_order,
right_batch_required_orders,
sort_options,
- cache,
+ cache: Arc::new(cache),
num_partitions,
})
}
@@ -466,6 +467,31 @@ impl PiecewiseMergeJoinExec {
pub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>> {
todo!()
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ let buffered = children.swap_remove(0);
+ let streamed = children.swap_remove(0);
+ Self {
+ buffered,
+ streamed,
+ on: self.on.clone(),
+ operator: self.operator,
+ join_type: self.join_type,
+ schema: Arc::clone(&self.schema),
+ left_child_plan_required_order:
self.left_child_plan_required_order.clone(),
+ right_batch_required_orders:
self.right_batch_required_orders.clone(),
+ sort_options: self.sort_options,
+ cache: Arc::clone(&self.cache),
+ num_partitions: self.num_partitions,
+
+ // Re-set state.
+ metrics: ExecutionPlanMetricsSet::new(),
+ buffered_fut: Default::default(),
+ }
+ }
}
impl ExecutionPlan for PiecewiseMergeJoinExec {
@@ -477,7 +503,7 @@ impl ExecutionPlan for PiecewiseMergeJoinExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -511,6 +537,7 @@ impl ExecutionPlan for PiecewiseMergeJoinExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ check_if_same_properties!(self, children);
match &children[..] {
[left, right] => Ok(Arc::new(PiecewiseMergeJoinExec::try_new(
Arc::clone(left),
@@ -527,6 +554,13 @@ impl ExecutionPlan for PiecewiseMergeJoinExec {
}
}
+ fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(Arc::new(self.with_new_children_and_same_properties(vec![
+ Arc::clone(&self.buffered),
+ Arc::clone(&self.streamed),
+ ])))
+ }
+
fn execute(
&self,
partition: usize,
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
index 160a3272fb..b34e811f91 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
@@ -39,7 +39,7 @@ use crate::projection::{
};
use crate::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
ExecutionPlanProperties,
- PlanProperties, SendableRecordBatchStream, Statistics,
+ PlanProperties, SendableRecordBatchStream, Statistics,
check_if_same_properties,
};
use arrow::compute::SortOptions;
@@ -127,7 +127,7 @@ pub struct SortMergeJoinExec {
/// Defines the null equality for the join.
pub null_equality: NullEquality,
/// Cache holding plan properties like equivalences, output partitioning
etc.
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl SortMergeJoinExec {
@@ -198,7 +198,7 @@ impl SortMergeJoinExec {
right_sort_exprs,
sort_options,
null_equality,
- cache,
+ cache: Arc::new(cache),
})
}
@@ -340,6 +340,20 @@ impl SortMergeJoinExec {
reorder_output_after_swap(Arc::new(new_join), &left.schema(),
&right.schema())
}
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ let left = children.swap_remove(0);
+ let right = children.swap_remove(0);
+ Self {
+ left,
+ right,
+ metrics: ExecutionPlanMetricsSet::new(),
+ ..Self::clone(self)
+ }
+ }
}
impl DisplayAs for SortMergeJoinExec {
@@ -405,7 +419,7 @@ impl ExecutionPlan for SortMergeJoinExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -440,6 +454,7 @@ impl ExecutionPlan for SortMergeJoinExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ check_if_same_properties!(self, children);
match &children[..] {
[left, right] => Ok(Arc::new(SortMergeJoinExec::try_new(
Arc::clone(left),
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index 4fdc5fc64d..29917dd2cc 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -32,6 +32,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::vec;
+use crate::check_if_same_properties;
use crate::common::SharedMemoryReservation;
use crate::execution_plan::{boundedness_from_children,
emission_type_from_children};
use crate::joins::stream_join_utils::{
@@ -197,7 +198,7 @@ pub struct SymmetricHashJoinExec {
/// Partition Mode
mode: StreamJoinPartitionMode,
/// Cache holding plan properties like equivalences, output partitioning
etc.
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl SymmetricHashJoinExec {
@@ -253,7 +254,7 @@ impl SymmetricHashJoinExec {
left_sort_exprs,
right_sort_exprs,
mode,
- cache,
+ cache: Arc::new(cache),
})
}
@@ -360,6 +361,20 @@ impl SymmetricHashJoinExec {
}
Ok(false)
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ let left = children.swap_remove(0);
+ let right = children.swap_remove(0);
+ Self {
+ left,
+ right,
+ metrics: ExecutionPlanMetricsSet::new(),
+ ..Self::clone(self)
+ }
+ }
}
impl DisplayAs for SymmetricHashJoinExec {
@@ -411,7 +426,7 @@ impl ExecutionPlan for SymmetricHashJoinExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -453,6 +468,7 @@ impl ExecutionPlan for SymmetricHashJoinExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ check_if_same_properties!(self, children);
Ok(Arc::new(SymmetricHashJoinExec::try_new(
Arc::clone(&children[0]),
Arc::clone(&children[1]),
diff --git a/datafusion/physical-plan/src/limit.rs
b/datafusion/physical-plan/src/limit.rs
index 9ce63a1c58..a78e5c067f 100644
--- a/datafusion/physical-plan/src/limit.rs
+++ b/datafusion/physical-plan/src/limit.rs
@@ -28,7 +28,10 @@ use super::{
SendableRecordBatchStream, Statistics,
};
use crate::execution_plan::{Boundedness, CardinalityEffect};
-use crate::{DisplayFormatType, Distribution, ExecutionPlan, Partitioning};
+use crate::{
+ DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+ check_if_same_properties,
+};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
@@ -51,10 +54,10 @@ pub struct GlobalLimitExec {
fetch: Option<usize>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
- cache: PlanProperties,
/// Does the limit have to preserve the order of its input, and if so what
is it?
/// Some optimizations may reorder the input if no particular sort is
required
required_ordering: Option<LexOrdering>,
+ cache: Arc<PlanProperties>,
}
impl GlobalLimitExec {
@@ -66,8 +69,8 @@ impl GlobalLimitExec {
skip,
fetch,
metrics: ExecutionPlanMetricsSet::new(),
- cache,
required_ordering: None,
+ cache: Arc::new(cache),
}
}
@@ -106,6 +109,17 @@ impl GlobalLimitExec {
pub fn set_required_ordering(&mut self, required_ordering:
Option<LexOrdering>) {
self.required_ordering = required_ordering;
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ Self {
+ input: children.swap_remove(0),
+ metrics: ExecutionPlanMetricsSet::new(),
+ ..Self::clone(self)
+ }
+ }
}
impl DisplayAs for GlobalLimitExec {
@@ -144,7 +158,7 @@ impl ExecutionPlan for GlobalLimitExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -166,10 +180,11 @@ impl ExecutionPlan for GlobalLimitExec {
fn with_new_children(
self: Arc<Self>,
- children: Vec<Arc<dyn ExecutionPlan>>,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ check_if_same_properties!(self, children);
Ok(Arc::new(GlobalLimitExec::new(
- Arc::clone(&children[0]),
+ children.swap_remove(0),
self.skip,
self.fetch,
)))
@@ -225,7 +240,7 @@ impl ExecutionPlan for GlobalLimitExec {
}
/// LocalLimitExec applies a limit to a single partition
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct LocalLimitExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
@@ -233,10 +248,10 @@ pub struct LocalLimitExec {
fetch: usize,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
- cache: PlanProperties,
/// If the child plan is a sort node, after the sort node is removed during
/// physical optimization, we should add the required ordering to the
limit node
required_ordering: Option<LexOrdering>,
+ cache: Arc<PlanProperties>,
}
impl LocalLimitExec {
@@ -247,8 +262,8 @@ impl LocalLimitExec {
input,
fetch,
metrics: ExecutionPlanMetricsSet::new(),
- cache,
required_ordering: None,
+ cache: Arc::new(cache),
}
}
@@ -282,6 +297,17 @@ impl LocalLimitExec {
pub fn set_required_ordering(&mut self, required_ordering:
Option<LexOrdering>) {
self.required_ordering = required_ordering;
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ Self {
+ input: children.swap_remove(0),
+ metrics: ExecutionPlanMetricsSet::new(),
+ ..Self::clone(self)
+ }
+ }
}
impl DisplayAs for LocalLimitExec {
@@ -311,7 +337,7 @@ impl ExecutionPlan for LocalLimitExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -331,6 +357,7 @@ impl ExecutionPlan for LocalLimitExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ check_if_same_properties!(self, children);
match children.len() {
1 => Ok(Arc::new(LocalLimitExec::new(
Arc::clone(&children[0]),
diff --git a/datafusion/physical-plan/src/memory.rs
b/datafusion/physical-plan/src/memory.rs
index a58abe20a2..90fd3f24cf 100644
--- a/datafusion/physical-plan/src/memory.rs
+++ b/datafusion/physical-plan/src/memory.rs
@@ -161,7 +161,7 @@ pub struct LazyMemoryExec {
/// Functions to generate batches for each partition
batch_generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
/// Plan properties cache storing equivalence properties, partitioning,
and execution mode
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}
@@ -200,7 +200,8 @@ impl LazyMemoryExec {
EmissionType::Incremental,
boundedness,
)
- .with_scheduling_type(SchedulingType::Cooperative);
+ .with_scheduling_type(SchedulingType::Cooperative)
+ .into();
Ok(Self {
schema,
@@ -215,9 +216,9 @@ impl LazyMemoryExec {
match projection.as_ref() {
Some(columns) => {
let projected =
Arc::new(self.schema.project(columns).unwrap());
- self.cache =
self.cache.with_eq_properties(EquivalenceProperties::new(
- Arc::clone(&projected),
- ));
+ Arc::make_mut(&mut self.cache).set_eq_properties(
+ EquivalenceProperties::new(Arc::clone(&projected)),
+ );
self.schema = projected;
self.projection = projection;
self
@@ -236,12 +237,12 @@ impl LazyMemoryExec {
partition_count,
generator_count
);
- self.cache.partitioning = partitioning;
+ Arc::make_mut(&mut self.cache).partitioning = partitioning;
Ok(())
}
pub fn add_ordering(&mut self, ordering: impl IntoIterator<Item =
PhysicalSortExpr>) {
- self.cache
+ Arc::make_mut(&mut self.cache)
.eq_properties
.add_orderings(std::iter::once(ordering));
}
@@ -306,7 +307,7 @@ impl ExecutionPlan for LazyMemoryExec {
Arc::clone(&self.schema)
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -361,7 +362,7 @@ impl ExecutionPlan for LazyMemoryExec {
Ok(Arc::new(LazyMemoryExec {
schema: Arc::clone(&self.schema),
batch_generators: generators,
- cache: self.cache.clone(),
+ cache: Arc::clone(&self.cache),
metrics: ExecutionPlanMetricsSet::new(),
projection: self.projection.clone(),
}))
diff --git a/datafusion/physical-plan/src/placeholder_row.rs
b/datafusion/physical-plan/src/placeholder_row.rs
index c91085965b..5dbd7b3032 100644
--- a/datafusion/physical-plan/src/placeholder_row.rs
+++ b/datafusion/physical-plan/src/placeholder_row.rs
@@ -43,7 +43,7 @@ pub struct PlaceholderRowExec {
schema: SchemaRef,
/// Number of partitions
partitions: usize,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl PlaceholderRowExec {
@@ -54,7 +54,7 @@ impl PlaceholderRowExec {
PlaceholderRowExec {
schema,
partitions,
- cache,
+ cache: Arc::new(cache),
}
}
@@ -63,7 +63,7 @@ impl PlaceholderRowExec {
self.partitions = partitions;
// Update output partitioning when updating partitions:
let output_partitioning =
Self::output_partitioning_helper(self.partitions);
- self.cache = self.cache.with_partitioning(output_partitioning);
+ Arc::make_mut(&mut self.cache).partitioning = output_partitioning;
self
}
@@ -132,7 +132,7 @@ impl ExecutionPlan for PlaceholderRowExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
diff --git a/datafusion/physical-plan/src/projection.rs
b/datafusion/physical-plan/src/projection.rs
index 55b4129223..db3a71fc70 100644
--- a/datafusion/physical-plan/src/projection.rs
+++ b/datafusion/physical-plan/src/projection.rs
@@ -33,7 +33,7 @@ use crate::filter_pushdown::{
FilterPushdownPropagation, FilterRemapper, PushedDownPredicate,
};
use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef};
-use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr};
+use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr,
check_if_same_properties};
use std::any::Any;
use std::collections::HashMap;
use std::pin::Pin;
@@ -79,7 +79,7 @@ pub struct ProjectionExec {
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Cache holding plan properties like equivalences, output partitioning
etc.
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl ProjectionExec {
@@ -160,7 +160,7 @@ impl ProjectionExec {
projector,
input,
metrics: ExecutionPlanMetricsSet::new(),
- cache,
+ cache: Arc::new(cache),
})
}
@@ -223,6 +223,17 @@ impl ProjectionExec {
}
Ok(alias_map)
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ Self {
+ input: children.swap_remove(0),
+ metrics: ExecutionPlanMetricsSet::new(),
+ ..Self::clone(self)
+ }
+ }
}
impl DisplayAs for ProjectionExec {
@@ -276,7 +287,7 @@ impl ExecutionPlan for ProjectionExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -311,6 +322,7 @@ impl ExecutionPlan for ProjectionExec {
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ check_if_same_properties!(self, children);
ProjectionExec::try_from_projector(
self.projector.clone(),
children.swap_remove(0),
diff --git a/datafusion/physical-plan/src/recursive_query.rs
b/datafusion/physical-plan/src/recursive_query.rs
index f2cba13717..995aa4822a 100644
--- a/datafusion/physical-plan/src/recursive_query.rs
+++ b/datafusion/physical-plan/src/recursive_query.rs
@@ -74,7 +74,7 @@ pub struct RecursiveQueryExec {
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Cache holding plan properties like equivalences, output partitioning
etc.
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl RecursiveQueryExec {
@@ -97,7 +97,7 @@ impl RecursiveQueryExec {
is_distinct,
work_table,
metrics: ExecutionPlanMetricsSet::new(),
- cache,
+ cache: Arc::new(cache),
})
}
@@ -143,7 +143,7 @@ impl ExecutionPlan for RecursiveQueryExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
diff --git a/datafusion/physical-plan/src/repartition/mod.rs
b/datafusion/physical-plan/src/repartition/mod.rs
index 33531e8880..da4329e2cc 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -39,7 +39,10 @@ use crate::sorts::streaming_merge::StreamingMergeBuilder;
use crate::spill::spill_manager::SpillManager;
use crate::spill::spill_pool::{self, SpillPoolWriter};
use crate::stream::RecordBatchStreamAdapter;
-use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
Statistics};
+use crate::{
+ DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics,
+ check_if_same_properties,
+};
use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions};
use arrow::compute::take_arrays;
@@ -763,7 +766,7 @@ pub struct RepartitionExec {
/// `SortPreservingRepartitionExec`, false means `RepartitionExec`.
preserve_order: bool,
/// Cache holding plan properties like equivalences, output partitioning
etc.
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
#[derive(Debug, Clone)]
@@ -832,6 +835,18 @@ impl RepartitionExec {
pub fn name(&self) -> &str {
"RepartitionExec"
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ Self {
+ input: children.swap_remove(0),
+ metrics: ExecutionPlanMetricsSet::new(),
+ state: Default::default(),
+ ..Self::clone(self)
+ }
+ }
}
impl DisplayAs for RepartitionExec {
@@ -891,7 +906,7 @@ impl ExecutionPlan for RepartitionExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -903,6 +918,7 @@ impl ExecutionPlan for RepartitionExec {
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ check_if_same_properties!(self, children);
let mut repartition = RepartitionExec::try_new(
children.swap_remove(0),
self.partitioning().clone(),
@@ -1200,7 +1216,7 @@ impl ExecutionPlan for RepartitionExec {
_config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
use Partitioning::*;
- let mut new_properties = self.cache.clone();
+ let mut new_properties = PlanProperties::clone(&self.cache);
new_properties.partitioning = match new_properties.partitioning {
RoundRobinBatch(_) => RoundRobinBatch(target_partitions),
Hash(hash, _) => Hash(hash, target_partitions),
@@ -1211,7 +1227,7 @@ impl ExecutionPlan for RepartitionExec {
state: Arc::clone(&self.state),
metrics: self.metrics.clone(),
preserve_order: self.preserve_order,
- cache: new_properties,
+ cache: new_properties.into(),
})))
}
}
@@ -1231,7 +1247,7 @@ impl RepartitionExec {
state: Default::default(),
metrics: ExecutionPlanMetricsSet::new(),
preserve_order,
- cache,
+ cache: Arc::new(cache),
})
}
@@ -1292,7 +1308,7 @@ impl RepartitionExec {
// to maintain order
self.input.output_partitioning().partition_count() > 1;
let eq_properties = Self::eq_properties_helper(&self.input,
self.preserve_order);
- self.cache = self.cache.with_eq_properties(eq_properties);
+ Arc::make_mut(&mut self.cache).set_eq_properties(eq_properties);
self
}
diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs
b/datafusion/physical-plan/src/sorts/partial_sort.rs
index 08bc73c92d..0dbb75f2ef 100644
--- a/datafusion/physical-plan/src/sorts/partial_sort.rs
+++ b/datafusion/physical-plan/src/sorts/partial_sort.rs
@@ -62,6 +62,7 @@ use crate::sorts::sort::sort_batch;
use crate::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
ExecutionPlanProperties,
Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
+ check_if_same_properties,
};
use arrow::compute::concat_batches;
@@ -93,7 +94,7 @@ pub struct PartialSortExec {
/// Fetch highest/lowest n results
fetch: Option<usize>,
/// Cache holding plan properties like equivalences, output partitioning
etc.
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl PartialSortExec {
@@ -114,7 +115,7 @@ impl PartialSortExec {
metrics_set: ExecutionPlanMetricsSet::new(),
preserve_partitioning,
fetch: None,
- cache,
+ cache: Arc::new(cache),
}
}
@@ -132,12 +133,8 @@ impl PartialSortExec {
/// input partitions producing a single, sorted partition.
pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool)
-> Self {
self.preserve_partitioning = preserve_partitioning;
- self.cache = self
- .cache
- .with_partitioning(Self::output_partitioning_helper(
- &self.input,
- self.preserve_partitioning,
- ));
+ Arc::make_mut(&mut self.cache).partitioning =
+ Self::output_partitioning_helper(&self.input,
self.preserve_partitioning);
self
}
@@ -207,6 +204,17 @@ impl PartialSortExec {
input.boundedness(),
))
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ Self {
+ input: children.swap_remove(0),
+ metrics_set: ExecutionPlanMetricsSet::new(),
+ ..Self::clone(self)
+ }
+ }
}
impl DisplayAs for PartialSortExec {
@@ -255,7 +263,7 @@ impl ExecutionPlan for PartialSortExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -283,6 +291,7 @@ impl ExecutionPlan for PartialSortExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ check_if_same_properties!(self, children);
let new_partial_sort = PartialSortExec::new(
self.expr.clone(),
Arc::clone(&children[0]),
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index 8575cdd9a6..c735963d9b 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -27,7 +27,9 @@ use std::sync::Arc;
use parking_lot::RwLock;
use crate::common::spawn_buffered;
-use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
+use crate::execution_plan::{
+ Boundedness, CardinalityEffect, EmissionType, has_same_children_properties,
+};
use crate::expressions::PhysicalSortExpr;
use crate::filter_pushdown::{
ChildFilterDescription, FilterDescription, FilterPushdownPhase,
@@ -952,7 +954,7 @@ pub struct SortExec {
/// Normalized common sort prefix between the input and the sort
expressions (only used with fetch)
common_sort_prefix: Vec<PhysicalSortExpr>,
/// Cache holding plan properties like equivalences, output partitioning
etc.
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
/// Filter matching the state of the sort for dynamic filter pushdown.
/// If `fetch` is `Some`, this will also be set and a TopK operator may be
used.
/// If `fetch` is `None`, this will be `None`.
@@ -974,7 +976,7 @@ impl SortExec {
preserve_partitioning,
fetch: None,
common_sort_prefix: sort_prefix,
- cache,
+ cache: Arc::new(cache),
filter: None,
}
}
@@ -993,12 +995,8 @@ impl SortExec {
/// input partitions producing a single, sorted partition.
pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool)
-> Self {
self.preserve_partitioning = preserve_partitioning;
- self.cache = self
- .cache
- .with_partitioning(Self::output_partitioning_helper(
- &self.input,
- self.preserve_partitioning,
- ));
+ Arc::make_mut(&mut self.cache).partitioning =
+ Self::output_partitioning_helper(&self.input,
self.preserve_partitioning);
self
}
@@ -1022,7 +1020,7 @@ impl SortExec {
preserve_partitioning: self.preserve_partitioning,
common_sort_prefix: self.common_sort_prefix.clone(),
fetch: self.fetch,
- cache: self.cache.clone(),
+ cache: Arc::clone(&self.cache),
filter: self.filter.clone(),
}
}
@@ -1035,12 +1033,12 @@ impl SortExec {
/// operation since rows that are not going to be included
/// can be dropped.
pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
- let mut cache = self.cache.clone();
+ let mut cache = PlanProperties::clone(&self.cache);
// If the SortExec can emit incrementally (that means the sort
requirements
// and properties of the input match), the SortExec can generate its
result
// without scanning the entire input when a fetch value exists.
let is_pipeline_friendly = matches!(
- self.cache.emission_type,
+ cache.emission_type,
EmissionType::Incremental | EmissionType::Both
);
if fetch.is_some() && is_pipeline_friendly {
@@ -1052,7 +1050,7 @@ impl SortExec {
});
let mut new_sort = self.cloned();
new_sort.fetch = fetch;
- new_sort.cache = cache;
+ new_sort.cache = cache.into();
new_sort.filter = filter;
new_sort
}
@@ -1207,7 +1205,7 @@ impl ExecutionPlan for SortExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -1236,14 +1234,17 @@ impl ExecutionPlan for SortExec {
let mut new_sort = self.cloned();
assert_eq!(children.len(), 1, "SortExec should have exactly one
child");
new_sort.input = Arc::clone(&children[0]);
- // Recompute the properties based on the new input since they may have
changed
- let (cache, sort_prefix) = Self::compute_properties(
- &new_sort.input,
- new_sort.expr.clone(),
- new_sort.preserve_partitioning,
- )?;
- new_sort.cache = cache;
- new_sort.common_sort_prefix = sort_prefix;
+
+ if !has_same_children_properties(&self, &children)? {
+ // Recompute the properties based on the new input since they may
have changed
+ let (cache, sort_prefix) = Self::compute_properties(
+ &new_sort.input,
+ new_sort.expr.clone(),
+ new_sort.preserve_partitioning,
+ )?;
+ new_sort.cache = Arc::new(cache);
+ new_sort.common_sort_prefix = sort_prefix;
+ }
Ok(Arc::new(new_sort))
}
@@ -1463,7 +1464,7 @@ mod tests {
pub struct SortedUnboundedExec {
schema: Schema,
batch_size: u64,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl DisplayAs for SortedUnboundedExec {
@@ -1503,7 +1504,7 @@ mod tests {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -2271,7 +2272,9 @@ mod tests {
let source = SortedUnboundedExec {
schema: schema.clone(),
batch_size: 2,
- cache:
SortedUnboundedExec::compute_properties(Arc::new(schema.clone())),
+ cache: Arc::new(SortedUnboundedExec::compute_properties(Arc::new(
+ schema.clone(),
+ ))),
};
let mut plan = SortExec::new(
[PhysicalSortExpr::new_default(Arc::new(Column::new(
diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
index b313fbf4da..763b72a660 100644
--- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
+++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
@@ -28,6 +28,7 @@ use crate::sorts::streaming_merge::StreamingMergeBuilder;
use crate::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
ExecutionPlanProperties,
Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
+ check_if_same_properties,
};
use datafusion_common::{Result, assert_eq_or_internal_err, internal_err};
@@ -93,7 +94,7 @@ pub struct SortPreservingMergeExec {
/// Optional number of rows to fetch. Stops producing rows after this fetch
fetch: Option<usize>,
/// Cache holding plan properties like equivalences, output partitioning
etc.
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
/// Use round-robin selection of tied winners of loser tree
///
/// See [`Self::with_round_robin_repartition`] for more information.
@@ -109,7 +110,7 @@ impl SortPreservingMergeExec {
expr,
metrics: ExecutionPlanMetricsSet::new(),
fetch: None,
- cache,
+ cache: Arc::new(cache),
enable_round_robin_repartition: true,
}
}
@@ -180,6 +181,17 @@ impl SortPreservingMergeExec {
.with_evaluation_type(drive)
.with_scheduling_type(scheduling)
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ Self {
+ input: children.swap_remove(0),
+ metrics: ExecutionPlanMetricsSet::new(),
+ ..Self::clone(self)
+ }
+ }
}
impl DisplayAs for SortPreservingMergeExec {
@@ -225,7 +237,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -240,7 +252,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
expr: self.expr.clone(),
metrics: self.metrics.clone(),
fetch: limit,
- cache: self.cache.clone(),
+ cache: Arc::clone(&self.cache),
enable_round_robin_repartition: true,
}))
}
@@ -280,10 +292,11 @@ impl ExecutionPlan for SortPreservingMergeExec {
fn with_new_children(
self: Arc<Self>,
- children: Vec<Arc<dyn ExecutionPlan>>,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ check_if_same_properties!(self, children);
Ok(Arc::new(
- SortPreservingMergeExec::new(self.expr.clone(),
Arc::clone(&children[0]))
+ SortPreservingMergeExec::new(self.expr.clone(),
children.swap_remove(0))
.with_fetch(self.fetch),
))
}
@@ -1358,7 +1371,7 @@ mod tests {
#[derive(Debug, Clone)]
struct CongestedExec {
schema: Schema,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
congestion: Arc<Congestion>,
}
@@ -1394,7 +1407,7 @@ mod tests {
fn as_any(&self) -> &dyn Any {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
@@ -1487,7 +1500,7 @@ mod tests {
};
let source = CongestedExec {
schema: schema.clone(),
- cache: properties,
+ cache: Arc::new(properties),
congestion: Arc::new(Congestion::new(partition_count)),
};
let spm = SortPreservingMergeExec::new(
diff --git a/datafusion/physical-plan/src/streaming.rs
b/datafusion/physical-plan/src/streaming.rs
index c8b8d95718..1535482374 100644
--- a/datafusion/physical-plan/src/streaming.rs
+++ b/datafusion/physical-plan/src/streaming.rs
@@ -67,7 +67,7 @@ pub struct StreamingTableExec {
projected_output_ordering: Vec<LexOrdering>,
infinite: bool,
limit: Option<usize>,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
metrics: ExecutionPlanMetricsSet,
}
@@ -111,7 +111,7 @@ impl StreamingTableExec {
projected_output_ordering,
infinite,
limit,
- cache,
+ cache: Arc::new(cache),
metrics: ExecutionPlanMetricsSet::new(),
})
}
@@ -236,7 +236,7 @@ impl ExecutionPlan for StreamingTableExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -335,7 +335,7 @@ impl ExecutionPlan for StreamingTableExec {
projected_output_ordering: self.projected_output_ordering.clone(),
infinite: self.infinite,
limit,
- cache: self.cache.clone(),
+ cache: Arc::clone(&self.cache),
metrics: self.metrics.clone(),
}))
}
diff --git a/datafusion/physical-plan/src/test.rs
b/datafusion/physical-plan/src/test.rs
index a967d035bd..0e7b900eb6 100644
--- a/datafusion/physical-plan/src/test.rs
+++ b/datafusion/physical-plan/src/test.rs
@@ -75,7 +75,7 @@ pub struct TestMemoryExec {
/// The maximum number of records to read from this plan. If `None`,
/// all records after filtering are returned.
fetch: Option<usize>,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl DisplayAs for TestMemoryExec {
@@ -134,7 +134,7 @@ impl ExecutionPlan for TestMemoryExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -235,7 +235,7 @@ impl TestMemoryExec {
Ok(Self {
partitions: partitions.to_vec(),
schema,
- cache: PlanProperties::new(
+ cache: Arc::new(PlanProperties::new(
EquivalenceProperties::new_with_orderings(
Arc::clone(&projected_schema),
Vec::<LexOrdering>::new(),
@@ -243,7 +243,7 @@ impl TestMemoryExec {
Partitioning::UnknownPartitioning(partitions.len()),
EmissionType::Incremental,
Boundedness::Bounded,
- ),
+ )),
projected_schema,
projection,
sort_information: vec![],
@@ -261,7 +261,7 @@ impl TestMemoryExec {
) -> Result<Arc<TestMemoryExec>> {
let mut source = Self::try_new(partitions, schema, projection)?;
let cache = source.compute_properties();
- source.cache = cache;
+ source.cache = Arc::new(cache);
Ok(Arc::new(source))
}
@@ -269,7 +269,7 @@ impl TestMemoryExec {
pub fn update_cache(source: &Arc<TestMemoryExec>) -> TestMemoryExec {
let cache = source.compute_properties();
let mut source = (**source).clone();
- source.cache = cache;
+ source.cache = Arc::new(cache);
source
}
@@ -338,7 +338,7 @@ impl TestMemoryExec {
}
self.sort_information = sort_information;
- self.cache = self.compute_properties();
+ self.cache = Arc::new(self.compute_properties());
Ok(self)
}
diff --git a/datafusion/physical-plan/src/test/exec.rs
b/datafusion/physical-plan/src/test/exec.rs
index df5093226e..d628fb819f 100644
--- a/datafusion/physical-plan/src/test/exec.rs
+++ b/datafusion/physical-plan/src/test/exec.rs
@@ -125,7 +125,7 @@ pub struct MockExec {
/// if true (the default), sends data using a separate task to ensure the
/// batches are not available without this stream yielding first
use_task: bool,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl MockExec {
@@ -142,7 +142,7 @@ impl MockExec {
data,
schema,
use_task: true,
- cache,
+ cache: Arc::new(cache),
}
}
@@ -192,7 +192,7 @@ impl ExecutionPlan for MockExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -299,7 +299,7 @@ pub struct BarrierExec {
/// the stream wait for this to return Poll::Ready(None)
finish_barrier: Option<Arc<(Barrier, AtomicUsize)>>,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
log: bool,
}
@@ -314,7 +314,7 @@ impl BarrierExec {
data,
schema,
start_data_barrier: barrier,
- cache,
+ cache: Arc::new(cache),
finish_barrier: None,
log: true,
}
@@ -422,7 +422,7 @@ impl ExecutionPlan for BarrierExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -498,7 +498,7 @@ impl ExecutionPlan for BarrierExec {
/// A mock execution plan that errors on a call to execute
#[derive(Debug)]
pub struct ErrorExec {
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl Default for ErrorExec {
@@ -515,7 +515,9 @@ impl ErrorExec {
true,
)]));
let cache = Self::compute_properties(schema);
- Self { cache }
+ Self {
+ cache: Arc::new(cache),
+ }
}
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
@@ -556,7 +558,7 @@ impl ExecutionPlan for ErrorExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -586,7 +588,7 @@ impl ExecutionPlan for ErrorExec {
pub struct StatisticsExec {
stats: Statistics,
schema: Arc<Schema>,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl StatisticsExec {
pub fn new(stats: Statistics, schema: Schema) -> Self {
@@ -599,7 +601,7 @@ impl StatisticsExec {
Self {
stats,
schema: Arc::new(schema),
- cache,
+ cache: Arc::new(cache),
}
}
@@ -646,7 +648,7 @@ impl ExecutionPlan for StatisticsExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -688,7 +690,7 @@ pub struct BlockingExec {
/// Ref-counting helper to check if the plan and the produced stream are
still in memory.
refs: Arc<()>,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl BlockingExec {
@@ -698,7 +700,7 @@ impl BlockingExec {
Self {
schema,
refs: Default::default(),
- cache,
+ cache: Arc::new(cache),
}
}
@@ -749,7 +751,7 @@ impl ExecutionPlan for BlockingExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -831,7 +833,7 @@ pub struct PanicExec {
/// Number of output partitions. Each partition will produce this
/// many empty output record batches prior to panicking
batches_until_panics: Vec<usize>,
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl PanicExec {
@@ -843,7 +845,7 @@ impl PanicExec {
Self {
schema,
batches_until_panics,
- cache,
+ cache: Arc::new(cache),
}
}
@@ -895,7 +897,7 @@ impl ExecutionPlan for PanicExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
diff --git a/datafusion/physical-plan/src/union.rs
b/datafusion/physical-plan/src/union.rs
index 4ff6b1a59c..9fc02e730d 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -32,6 +32,7 @@ use super::{
SendableRecordBatchStream, Statistics,
metrics::{ExecutionPlanMetricsSet, MetricsSet},
};
+use crate::check_if_same_properties;
use crate::execution_plan::{
InvariantLevel, boundedness_from_children, check_default_invariants,
emission_type_from_children,
@@ -106,7 +107,7 @@ pub struct UnionExec {
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Cache holding plan properties like equivalences, output partitioning
etc.
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl UnionExec {
@@ -124,7 +125,7 @@ impl UnionExec {
UnionExec {
inputs,
metrics: ExecutionPlanMetricsSet::new(),
- cache,
+ cache: Arc::new(cache),
}
}
@@ -153,7 +154,7 @@ impl UnionExec {
Ok(Arc::new(UnionExec {
inputs,
metrics: ExecutionPlanMetricsSet::new(),
- cache,
+ cache: Arc::new(cache),
}))
}
}
@@ -189,6 +190,17 @@ impl UnionExec {
boundedness_from_children(inputs),
))
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ Self {
+ inputs: children,
+ metrics: ExecutionPlanMetricsSet::new(),
+ ..Self::clone(self)
+ }
+ }
}
impl DisplayAs for UnionExec {
@@ -216,7 +228,7 @@ impl ExecutionPlan for UnionExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -265,6 +277,7 @@ impl ExecutionPlan for UnionExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ check_if_same_properties!(self, children);
UnionExec::try_new(children)
}
@@ -490,7 +503,7 @@ pub struct InterleaveExec {
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Cache holding plan properties like equivalences, output partitioning
etc.
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl InterleaveExec {
@@ -504,7 +517,7 @@ impl InterleaveExec {
Ok(InterleaveExec {
inputs,
metrics: ExecutionPlanMetricsSet::new(),
- cache,
+ cache: Arc::new(cache),
})
}
@@ -526,6 +539,17 @@ impl InterleaveExec {
boundedness_from_children(inputs),
))
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ Self {
+ inputs: children,
+ metrics: ExecutionPlanMetricsSet::new(),
+ ..Self::clone(self)
+ }
+ }
}
impl DisplayAs for InterleaveExec {
@@ -553,7 +577,7 @@ impl ExecutionPlan for InterleaveExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -574,6 +598,7 @@ impl ExecutionPlan for InterleaveExec {
can_interleave(children.iter()),
"Can not create InterleaveExec: new children can not be
interleaved"
);
+ check_if_same_properties!(self, children);
Ok(Arc::new(InterleaveExec::try_new(children)?))
}
diff --git a/datafusion/physical-plan/src/unnest.rs
b/datafusion/physical-plan/src/unnest.rs
index 5fef754e80..422a9dd0d3 100644
--- a/datafusion/physical-plan/src/unnest.rs
+++ b/datafusion/physical-plan/src/unnest.rs
@@ -28,7 +28,7 @@ use super::metrics::{
use super::{DisplayAs, ExecutionPlanProperties, PlanProperties};
use crate::{
DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream,
- SendableRecordBatchStream,
+ SendableRecordBatchStream, check_if_same_properties,
};
use arrow::array::{
@@ -74,7 +74,7 @@ pub struct UnnestExec {
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Cache holding plan properties like equivalences, output partitioning
etc.
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl UnnestExec {
@@ -100,7 +100,7 @@ impl UnnestExec {
struct_column_indices,
options,
metrics: Default::default(),
- cache,
+ cache: Arc::new(cache),
})
}
@@ -193,6 +193,17 @@ impl UnnestExec {
pub fn options(&self) -> &UnnestOptions {
&self.options
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ Self {
+ input: children.swap_remove(0),
+ metrics: ExecutionPlanMetricsSet::new(),
+ ..Self::clone(self)
+ }
+ }
}
impl DisplayAs for UnnestExec {
@@ -221,7 +232,7 @@ impl ExecutionPlan for UnnestExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -231,10 +242,11 @@ impl ExecutionPlan for UnnestExec {
fn with_new_children(
self: Arc<Self>,
- children: Vec<Arc<dyn ExecutionPlan>>,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ check_if_same_properties!(self, children);
Ok(Arc::new(UnnestExec::new(
- Arc::clone(&children[0]),
+ children.swap_remove(0),
self.list_column_indices.clone(),
self.struct_column_indices.clone(),
Arc::clone(&self.schema),
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index 20d54303a9..a31268b9c6 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -36,7 +36,7 @@ use crate::windows::{
use crate::{
ColumnStatistics, DisplayAs, DisplayFormatType, Distribution,
ExecutionPlan,
ExecutionPlanProperties, InputOrderMode, PlanProperties, RecordBatchStream,
- SendableRecordBatchStream, Statistics, WindowExpr,
+ SendableRecordBatchStream, Statistics, WindowExpr,
check_if_same_properties,
};
use arrow::compute::take_record_batch;
@@ -93,7 +93,7 @@ pub struct BoundedWindowAggExec {
// See `get_ordered_partition_by_indices` for more details.
ordered_partition_by_indices: Vec<usize>,
/// Cache holding plan properties like equivalences, output partitioning
etc.
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
/// If `can_rerepartition` is false, partition_keys is always empty.
can_repartition: bool,
}
@@ -134,7 +134,7 @@ impl BoundedWindowAggExec {
metrics: ExecutionPlanMetricsSet::new(),
input_order_mode,
ordered_partition_by_indices,
- cache,
+ cache: Arc::new(cache),
can_repartition,
})
}
@@ -248,6 +248,17 @@ impl BoundedWindowAggExec {
total_byte_size: Precision::Absent,
})
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ Self {
+ input: children.swap_remove(0),
+ metrics: ExecutionPlanMetricsSet::new(),
+ ..Self::clone(self)
+ }
+ }
}
impl DisplayAs for BoundedWindowAggExec {
@@ -304,7 +315,7 @@ impl ExecutionPlan for BoundedWindowAggExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -339,6 +350,7 @@ impl ExecutionPlan for BoundedWindowAggExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ check_if_same_properties!(self, children);
Ok(Arc::new(BoundedWindowAggExec::try_new(
self.window_expr.clone(),
Arc::clone(&children[0]),
diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs
b/datafusion/physical-plan/src/windows/window_agg_exec.rs
index 0c73cf2352..0a146d51d6 100644
--- a/datafusion/physical-plan/src/windows/window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs
@@ -32,7 +32,7 @@ use crate::windows::{
use crate::{
ColumnStatistics, DisplayAs, DisplayFormatType, Distribution,
ExecutionPlan,
ExecutionPlanProperties, PhysicalExpr, PlanProperties, RecordBatchStream,
- SendableRecordBatchStream, Statistics, WindowExpr,
+ SendableRecordBatchStream, Statistics, WindowExpr,
check_if_same_properties,
};
use arrow::array::ArrayRef;
@@ -65,7 +65,7 @@ pub struct WindowAggExec {
// see `get_ordered_partition_by_indices` for more details.
ordered_partition_by_indices: Vec<usize>,
/// Cache holding plan properties like equivalences, output partitioning
etc.
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
/// If `can_partition` is false, partition_keys is always empty.
can_repartition: bool,
}
@@ -89,7 +89,7 @@ impl WindowAggExec {
schema,
metrics: ExecutionPlanMetricsSet::new(),
ordered_partition_by_indices,
- cache,
+ cache: Arc::new(cache),
can_repartition,
})
}
@@ -158,6 +158,17 @@ impl WindowAggExec {
.unwrap_or_else(Vec::new)
}
}
+
+ fn with_new_children_and_same_properties(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Self {
+ Self {
+ input: children.swap_remove(0),
+ metrics: ExecutionPlanMetricsSet::new(),
+ ..Self::clone(self)
+ }
+ }
}
impl DisplayAs for WindowAggExec {
@@ -206,7 +217,7 @@ impl ExecutionPlan for WindowAggExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -242,11 +253,12 @@ impl ExecutionPlan for WindowAggExec {
fn with_new_children(
self: Arc<Self>,
- children: Vec<Arc<dyn ExecutionPlan>>,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ check_if_same_properties!(self, children);
Ok(Arc::new(WindowAggExec::try_new(
self.window_expr.clone(),
- Arc::clone(&children[0]),
+ children.swap_remove(0),
true,
)?))
}
diff --git a/datafusion/physical-plan/src/work_table.rs
b/datafusion/physical-plan/src/work_table.rs
index 08390f87a2..4c7f77e0ff 100644
--- a/datafusion/physical-plan/src/work_table.rs
+++ b/datafusion/physical-plan/src/work_table.rs
@@ -109,7 +109,7 @@ pub struct WorkTableExec {
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Cache holding plan properties like equivalences, output partitioning
etc.
- cache: PlanProperties,
+ cache: Arc<PlanProperties>,
}
impl WorkTableExec {
@@ -129,7 +129,7 @@ impl WorkTableExec {
projection,
work_table: Arc::new(WorkTable::new(name)),
metrics: ExecutionPlanMetricsSet::new(),
- cache,
+ cache: Arc::new(cache),
})
}
@@ -181,7 +181,7 @@ impl ExecutionPlan for WorkTableExec {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
@@ -259,7 +259,7 @@ impl ExecutionPlan for WorkTableExec {
projection: self.projection.clone(),
metrics: ExecutionPlanMetricsSet::new(),
work_table,
- cache: self.cache.clone(),
+ cache: Arc::clone(&self.cache),
}))
}
}
diff --git a/docs/source/library-user-guide/custom-table-providers.md
b/docs/source/library-user-guide/custom-table-providers.md
index 8e1dee9e84..50005a7527 100644
--- a/docs/source/library-user-guide/custom-table-providers.md
+++ b/docs/source/library-user-guide/custom-table-providers.md
@@ -108,7 +108,7 @@ impl ExecutionPlan for CustomExec {
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
unreachable!()
}
@@ -232,7 +232,7 @@ The `scan` method of the `TableProvider` returns a
`Result<Arc<dyn ExecutionPlan
# }
#
#
-# fn properties(&self) -> &PlanProperties {
+# fn properties(&self) -> &Arc<PlanProperties> {
# unreachable!()
# }
#
@@ -424,7 +424,7 @@ This will allow you to use the custom table provider in
DataFusion. For example,
# }
#
#
-# fn properties(&self) -> &PlanProperties {
+# fn properties(&self) -> &Arc<PlanProperties> {
# unreachable!()
# }
#
diff --git a/docs/source/library-user-guide/upgrading/53.0.0.md
b/docs/source/library-user-guide/upgrading/53.0.0.md
index 06c917b2ab..ad2a69c0cc 100644
--- a/docs/source/library-user-guide/upgrading/53.0.0.md
+++ b/docs/source/library-user-guide/upgrading/53.0.0.md
@@ -28,6 +28,69 @@
[#19692]: https://github.com/apache/datafusion/issues/19692
+### `ExecutionPlan::properties` now returns `&Arc<PlanProperties>`
+
+Now `ExecutionPlan::properties()` returns `&Arc<PlanProperties>` instead of a
+reference. This make it possible to cheaply clone properties and reuse them
across multiple
+`ExecutionPlans`. It also makes it possible to optimize
[`ExecutionPlan::with_new_children`]
+to reuse properties when the children plans have not changed, which can
significantly reduce
+planning time for complex queries.
+
+[`ExecutionPlan::with_new_children`](https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#tymethod.with_new_children)
+
+To migrate, in all `ExecutionPlan` implementations, you will likely need to
wrap
+stored `PlanProperties` in an `Arc`:
+
+```diff
+- cache: PlanProperties,
++ cache: Arc<PlanProperties>,
+
+...
+
+- fn properties(&self) -> &PlanProperties {
++ fn properties(&self) -> &Arc<PlanProperties> {
+ &self.cache
+ }
+```
+
+To improve performance of `with_new_children` for custom `ExecutionPlan`
+implementations, you can use the new macro: `check_if_same_properties`. For it
+to work, you need to implement the function:
+`with_new_children_and_same_properties` with semantics identical to
+`with_new_children`, but operating under the assumption that the properties of
+the children plans have not changed.
+
+An example of supporting this optimization for `ProjectionExec`:
+
+```diff
+ impl ProjectionExec {
++ fn with_new_children_and_same_properties(
++ &self,
++ mut children: Vec<Arc<dyn ExecutionPlan>>,
++ ) -> Self {
++ Self {
++ input: children.swap_remove(0),
++ metrics: ExecutionPlanMetricsSet::new(),
++ ..Self::clone(self)
++ }
++ }
+ }
+
+ impl ExecutionPlan for ProjectionExec {
+ fn with_new_children(
+ self: Arc<Self>,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
++ check_if_same_properties!(self, children);
+ ProjectionExec::try_new(
+ self.projector.projection().into_iter().cloned(),
+ children.swap_remove(0),
+ )
+ .map(|p| Arc::new(p) as _)
+ }
+ }
+```
+
### `PlannerContext` outer query schema API now uses a stack
`PlannerContext` no longer stores a single `outer_query_schema`. It now tracks
a
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]