This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 84a22eaa32 Wrap Arc to Statistics for `partition_statistics` API
(#20570)
84a22eaa32 is described below
commit 84a22eaa327ee8ad495f53e8c32e20da81ed4d86
Author: xudong.w <[email protected]>
AuthorDate: Tue Mar 10 00:34:29 2026 +0100
Wrap Arc to Statistics for `partition_statistics` API (#20570)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
- Part of https://github.com/apache/datafusion/issues/20184
## Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
## What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
## Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
## Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
.../examples/relation_planner/table_sample.rs | 6 +-
datafusion/core/tests/custom_sources_cases/mod.rs | 8 +-
.../core/tests/custom_sources_cases/statistics.rs | 14 ++--
.../tests/physical_optimizer/join_selection.rs | 6 +-
.../physical_optimizer/partition_statistics.rs | 94 +++++++++++-----------
.../core/tests/physical_optimizer/test_utils.rs | 4 +-
datafusion/core/tests/sql/path_partition.rs | 10 ++-
datafusion/datasource/src/file_scan_config.rs | 18 +++--
datafusion/datasource/src/memory.rs | 14 ++--
datafusion/datasource/src/source.rs | 4 +-
.../physical-optimizer/src/output_requirements.rs | 2 +-
datafusion/physical-plan/src/aggregates/mod.rs | 15 ++--
datafusion/physical-plan/src/buffer.rs | 2 +-
datafusion/physical-plan/src/coalesce_batches.rs | 7 +-
.../physical-plan/src/coalesce_partitions.rs | 7 +-
datafusion/physical-plan/src/coop.rs | 2 +-
datafusion/physical-plan/src/display.rs | 9 ++-
datafusion/physical-plan/src/empty.rs | 4 +-
datafusion/physical-plan/src/execution_plan.rs | 19 +++--
datafusion/physical-plan/src/filter.rs | 15 ++--
datafusion/physical-plan/src/joins/cross_join.rs | 9 ++-
.../physical-plan/src/joins/hash_join/exec.rs | 18 +++--
.../physical-plan/src/joins/nested_loop_join.rs | 10 +--
.../src/joins/sort_merge_join/exec.rs | 13 +--
datafusion/physical-plan/src/limit.rs | 14 ++--
datafusion/physical-plan/src/placeholder_row.rs | 6 +-
datafusion/physical-plan/src/projection.rs | 13 +--
datafusion/physical-plan/src/repartition/mod.rs | 8 +-
datafusion/physical-plan/src/sorts/partial_sort.rs | 2 +-
datafusion/physical-plan/src/sorts/sort.rs | 18 ++---
.../src/sorts/sort_preserving_merge.rs | 2 +-
datafusion/physical-plan/src/test.rs | 6 +-
datafusion/physical-plan/src/test/exec.rs | 22 ++---
datafusion/physical-plan/src/union.rs | 37 ++++++---
.../src/windows/bounded_window_agg_exec.rs | 7 +-
.../physical-plan/src/windows/window_agg_exec.rs | 9 ++-
datafusion/physical-plan/src/work_table.rs | 4 +-
docs/source/library-user-guide/upgrading/54.0.0.md | 33 +++++++-
38 files changed, 283 insertions(+), 208 deletions(-)
diff --git a/datafusion-examples/examples/relation_planner/table_sample.rs
b/datafusion-examples/examples/relation_planner/table_sample.rs
index aa24f6cfbf..04e5efd970 100644
--- a/datafusion-examples/examples/relation_planner/table_sample.rs
+++ b/datafusion-examples/examples/relation_planner/table_sample.rs
@@ -727,8 +727,8 @@ impl ExecutionPlan for SampleExec {
Some(self.metrics.clone_inner())
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
- let mut stats = self.input.partition_statistics(partition)?;
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
+ let mut stats =
Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
let ratio = self.upper_bound - self.lower_bound;
// Scale statistics by sampling ratio (inexact due to randomness)
@@ -741,7 +741,7 @@ impl ExecutionPlan for SampleExec {
.map(|n| (n as f64 * ratio) as usize)
.to_inexact();
- Ok(stats)
+ Ok(Arc::new(stats))
}
fn apply_expressions(
diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs
b/datafusion/core/tests/custom_sources_cases/mod.rs
index 5f20fb1c60..6919d9794b 100644
--- a/datafusion/core/tests/custom_sources_cases/mod.rs
+++ b/datafusion/core/tests/custom_sources_cases/mod.rs
@@ -184,12 +184,12 @@ impl ExecutionPlan for CustomExecutionPlan {
Ok(Box::pin(TestCustomRecordBatchStream { nb_batch: 1 }))
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
if partition.is_some() {
- return Ok(Statistics::new_unknown(&self.schema()));
+ return Ok(Arc::new(Statistics::new_unknown(&self.schema())));
}
let batch = TEST_CUSTOM_RECORD_BATCH!().unwrap();
- Ok(Statistics {
+ Ok(Arc::new(Statistics {
num_rows: Precision::Exact(batch.num_rows()),
total_byte_size: Precision::Absent,
column_statistics: self
@@ -208,7 +208,7 @@ impl ExecutionPlan for CustomExecutionPlan {
..Default::default()
})
.collect(),
- })
+ }))
}
fn apply_expressions(
diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs
b/datafusion/core/tests/custom_sources_cases/statistics.rs
index eb81c9f24d..561c6b3b24 100644
--- a/datafusion/core/tests/custom_sources_cases/statistics.rs
+++ b/datafusion/core/tests/custom_sources_cases/statistics.rs
@@ -182,11 +182,11 @@ impl ExecutionPlan for StatisticsValidation {
unimplemented!("This plan only serves for testing statistics")
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
if partition.is_some() {
- Ok(Statistics::new_unknown(&self.schema))
+ Ok(Arc::new(Statistics::new_unknown(&self.schema)))
} else {
- Ok(self.stats.clone())
+ Ok(Arc::new(self.stats.clone()))
}
}
@@ -255,7 +255,7 @@ async fn sql_basic() -> Result<()> {
let physical_plan = df.create_physical_plan().await.unwrap();
// the statistics should be those of the source
- assert_eq!(stats, physical_plan.partition_statistics(None)?);
+ assert_eq!(stats, *physical_plan.partition_statistics(None)?);
Ok(())
}
@@ -295,7 +295,7 @@ async fn sql_limit() -> Result<()> {
.collect(),
total_byte_size: Precision::Absent
},
- physical_plan.partition_statistics(None)?
+ *physical_plan.partition_statistics(None)?
);
let df = ctx
@@ -304,7 +304,7 @@ async fn sql_limit() -> Result<()> {
.unwrap();
let physical_plan = df.create_physical_plan().await.unwrap();
// when the limit is larger than the original number of lines, statistics
remain unchanged
- assert_eq!(stats, physical_plan.partition_statistics(None)?);
+ assert_eq!(stats, *physical_plan.partition_statistics(None)?);
Ok(())
}
@@ -324,7 +324,7 @@ async fn sql_window() -> Result<()> {
let result = physical_plan.partition_statistics(None)?;
assert_eq!(stats.num_rows, result.num_rows);
- let col_stats = result.column_statistics;
+ let col_stats = &result.column_statistics;
assert_eq!(2, col_stats.len());
assert_eq!(stats.column_statistics[1], col_stats[0]);
diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs
b/datafusion/core/tests/physical_optimizer/join_selection.rs
index 17156022da..1c94a7bd1e 100644
--- a/datafusion/core/tests/physical_optimizer/join_selection.rs
+++ b/datafusion/core/tests/physical_optimizer/join_selection.rs
@@ -1191,12 +1191,12 @@ impl ExecutionPlan for StatisticsExec {
unimplemented!("This plan only serves for testing statistics")
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
- Ok(if partition.is_some() {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
+ Ok(Arc::new(if partition.is_some() {
Statistics::new_unknown(&self.schema)
} else {
self.stats.clone()
- })
+ }))
}
fn apply_expressions(
diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
index a03792fae8..e4b1f1b261 100644
--- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
@@ -259,8 +259,8 @@ mod test {
);
// Check the statistics of each partition
assert_eq!(statistics.len(), 2);
- assert_eq!(statistics[0], expected_statistic_partition_1);
- assert_eq!(statistics[1], expected_statistic_partition_2);
+ assert_eq!(*statistics[0], expected_statistic_partition_1);
+ assert_eq!(*statistics[1], expected_statistic_partition_2);
// Check the statistics_by_partition with real results
let expected_stats = vec![
@@ -292,8 +292,8 @@ mod test {
create_partition_statistics(2, 8, 1, 2, None);
// Check the statistics of each partition
assert_eq!(statistics.len(), 2);
- assert_eq!(statistics[0], expected_statistic_partition_1);
- assert_eq!(statistics[1], expected_statistic_partition_2);
+ assert_eq!(*statistics[0], expected_statistic_partition_1);
+ assert_eq!(*statistics[1], expected_statistic_partition_2);
// Check the statistics_by_partition with real results
let expected_stats = vec![
@@ -326,7 +326,7 @@ mod test {
Some((DATE_2025_03_01, DATE_2025_03_04)),
);
assert_eq!(statistics.len(), 1);
- assert_eq!(statistics[0], expected_statistic_partition);
+ assert_eq!(*statistics[0], expected_statistic_partition);
// Check the statistics_by_partition with real results
let expected_stats = vec![ExpectedStatistics::NonEmpty(1, 4, 4)];
validate_statistics_with_data(sort_exec.clone(), expected_stats,
0).await?;
@@ -357,8 +357,8 @@ mod test {
.map(|idx| sort_exec.partition_statistics(Some(idx)))
.collect::<Result<Vec<_>>>()?;
assert_eq!(statistics.len(), 2);
- assert_eq!(statistics[0], expected_statistic_partition_1);
- assert_eq!(statistics[1], expected_statistic_partition_2);
+ assert_eq!(*statistics[0], expected_statistic_partition_1);
+ assert_eq!(*statistics[1], expected_statistic_partition_2);
// Check the statistics_by_partition with real results
let expected_stats = vec![
@@ -406,7 +406,7 @@ mod test {
},
],
};
- assert_eq!(full_statistics, expected_full_statistic);
+ assert_eq!(*full_statistics, expected_full_statistic);
let statistics = (0..filter.output_partitioning().partition_count())
.map(|idx| filter.partition_statistics(Some(idx)))
@@ -435,8 +435,8 @@ mod test {
},
],
};
- assert_eq!(statistics[0], expected_partition_statistic);
- assert_eq!(statistics[1], expected_partition_statistic);
+ assert_eq!(*statistics[0], expected_partition_statistic);
+ assert_eq!(*statistics[1], expected_partition_statistic);
Ok(())
}
@@ -467,13 +467,13 @@ mod test {
Some((DATE_2025_03_03, DATE_2025_03_04)),
);
// Verify first partition (from first scan)
- assert_eq!(statistics[0], expected_statistic_partition_1);
+ assert_eq!(*statistics[0], expected_statistic_partition_1);
// Verify second partition (from first scan)
- assert_eq!(statistics[1], expected_statistic_partition_2);
+ assert_eq!(*statistics[1], expected_statistic_partition_2);
// Verify third partition (from second scan - same as first partition)
- assert_eq!(statistics[2], expected_statistic_partition_1);
+ assert_eq!(*statistics[2], expected_statistic_partition_1);
// Verify fourth partition (from second scan - same as second
partition)
- assert_eq!(statistics[3], expected_statistic_partition_2);
+ assert_eq!(*statistics[3], expected_statistic_partition_2);
// Check the statistics_by_partition with real results
let expected_stats = vec![
@@ -522,8 +522,8 @@ mod test {
ColumnStatistics::new_unknown(),
],
};
- assert_eq!(stats[0], expected_stats);
- assert_eq!(stats[1], expected_stats);
+ assert_eq!(*stats[0], expected_stats);
+ assert_eq!(*stats[1], expected_stats);
// Verify the execution results
let partitions = execute_stream_partitioned(
@@ -629,8 +629,8 @@ mod test {
},
],
};
- assert_eq!(statistics[0], expected_statistic_partition_1);
- assert_eq!(statistics[1], expected_statistic_partition_2);
+ assert_eq!(*statistics[0], expected_statistic_partition_1);
+ assert_eq!(*statistics[1], expected_statistic_partition_2);
// Check the statistics_by_partition with real results
let expected_stats = vec![
@@ -674,7 +674,7 @@ mod test {
);
expected_full_statistics.num_rows = Precision::Inexact(4);
expected_full_statistics.total_byte_size = Precision::Absent;
- assert_eq!(full_statistics, expected_full_statistics);
+ assert_eq!(*full_statistics, expected_full_statistics);
// Test partition_statistics(Some(idx)) - returns partition-specific
statistics
// Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02]
@@ -703,8 +703,8 @@ mod test {
.map(|idx| nested_loop_join.partition_statistics(Some(idx)))
.collect::<Result<Vec<_>>>()?;
assert_eq!(statistics.len(), 2);
- assert_eq!(statistics[0], expected_statistic_partition_1);
- assert_eq!(statistics[1], expected_statistic_partition_2);
+ assert_eq!(*statistics[0], expected_statistic_partition_1);
+ assert_eq!(*statistics[1], expected_statistic_partition_2);
// Check the statistics_by_partition with real results
let expected_stats = vec![
@@ -733,7 +733,7 @@ mod test {
.map(|idx| coalesce_partitions.partition_statistics(Some(idx)))
.collect::<Result<Vec<_>>>()?;
assert_eq!(statistics.len(), 1);
- assert_eq!(statistics[0], expected_statistic_partition);
+ assert_eq!(*statistics[0], expected_statistic_partition);
// Check the statistics_by_partition with real results
let expected_stats = vec![ExpectedStatistics::NonEmpty(1, 4, 4)];
@@ -750,20 +750,20 @@ mod test {
.map(|idx| local_limit.partition_statistics(Some(idx)))
.collect::<Result<Vec<_>>>()?;
assert_eq!(statistics.len(), 2);
- let mut expected_0 = statistics[0].clone();
+ let mut expected_0 = Statistics::clone(&statistics[0]);
expected_0.column_statistics = expected_0
.column_statistics
.into_iter()
.map(|c| c.to_inexact())
.collect();
- let mut expected_1 = statistics[1].clone();
+ let mut expected_1 = Statistics::clone(&statistics[1]);
expected_1.column_statistics = expected_1
.column_statistics
.into_iter()
.map(|c| c.to_inexact())
.collect();
- assert_eq!(statistics[0], expected_0);
- assert_eq!(statistics[1], expected_1);
+ assert_eq!(*statistics[0], expected_0);
+ assert_eq!(*statistics[1], expected_1);
Ok(())
}
@@ -785,7 +785,7 @@ mod test {
4,
Some((DATE_2025_03_01, DATE_2025_03_02)),
);
- assert_eq!(statistics[0], expected_statistic_partition);
+ assert_eq!(*statistics[0], expected_statistic_partition);
Ok(())
}
@@ -853,7 +853,7 @@ mod test {
],
};
- assert_eq!(&p0_statistics, &expected_p0_statistics);
+ assert_eq!(*p0_statistics, expected_p0_statistics);
let expected_p1_statistics = Statistics {
num_rows: Precision::Inexact(2),
@@ -873,7 +873,7 @@ mod test {
};
let p1_statistics =
aggregate_exec_partial.partition_statistics(Some(1))?;
- assert_eq!(&p1_statistics, &expected_p1_statistics);
+ assert_eq!(*p1_statistics, expected_p1_statistics);
validate_statistics_with_data(
aggregate_exec_partial.clone(),
@@ -895,10 +895,10 @@ mod test {
)?);
let p0_statistics = agg_final.partition_statistics(Some(0))?;
- assert_eq!(&p0_statistics, &expected_p0_statistics);
+ assert_eq!(*p0_statistics, expected_p0_statistics);
let p1_statistics = agg_final.partition_statistics(Some(1))?;
- assert_eq!(&p1_statistics, &expected_p1_statistics);
+ assert_eq!(*p1_statistics, expected_p1_statistics);
validate_statistics_with_data(
agg_final.clone(),
@@ -939,8 +939,8 @@ mod test {
],
};
- assert_eq!(&empty_stat, &agg_partial.partition_statistics(Some(0))?);
- assert_eq!(&empty_stat, &agg_partial.partition_statistics(Some(1))?);
+ assert_eq!(empty_stat, *agg_partial.partition_statistics(Some(0))?);
+ assert_eq!(empty_stat, *agg_partial.partition_statistics(Some(1))?);
validate_statistics_with_data(
agg_partial.clone(),
vec![ExpectedStatistics::Empty, ExpectedStatistics::Empty],
@@ -966,8 +966,8 @@ mod test {
agg_partial.schema(),
)?);
- assert_eq!(&empty_stat, &agg_final.partition_statistics(Some(0))?);
- assert_eq!(&empty_stat, &agg_final.partition_statistics(Some(1))?);
+ assert_eq!(empty_stat, *agg_final.partition_statistics(Some(0))?);
+ assert_eq!(empty_stat, *agg_final.partition_statistics(Some(1))?);
validate_statistics_with_data(
agg_final,
@@ -1003,7 +1003,7 @@ mod test {
column_statistics: vec![ColumnStatistics::new_unknown()],
};
- assert_eq!(&expect_stat, &agg_final.partition_statistics(Some(0))?);
+ assert_eq!(expect_stat, *agg_final.partition_statistics(Some(0))?);
// Verify that the aggregate final result has exactly one partition
with one row
let mut partitions = execute_stream_partitioned(
@@ -1037,13 +1037,13 @@ mod test {
&schema,
None,
);
- assert_eq!(actual, expected);
+ assert_eq!(*actual, expected);
all_batches.push(batches);
}
let actual = plan.partition_statistics(None)?;
let expected = compute_record_batch_statistics(&all_batches, &schema,
None);
- assert_eq!(actual, expected);
+ assert_eq!(*actual, expected);
Ok(())
}
@@ -1074,7 +1074,7 @@ mod test {
// All partitions should have the same statistics
for stat in statistics.iter() {
- assert_eq!(stat, &expected_stats);
+ assert_eq!(**stat, expected_stats);
}
// Verify that the result has exactly 3 partitions
@@ -1139,7 +1139,7 @@ mod test {
)?);
let result = repartition.partition_statistics(Some(0))?;
- assert_eq!(result, Statistics::new_unknown(&scan_schema));
+ assert_eq!(*result, Statistics::new_unknown(&scan_schema));
// Verify that the result has exactly 0 partitions
let partitions = execute_stream_partitioned(
@@ -1178,8 +1178,8 @@ mod test {
ColumnStatistics::new_unknown(),
],
};
- assert_eq!(stats[0], expected_stats);
- assert_eq!(stats[1], expected_stats);
+ assert_eq!(*stats[0], expected_stats);
+ assert_eq!(*stats[1], expected_stats);
// Verify the repartition execution results
let partitions =
@@ -1286,8 +1286,8 @@ mod test {
],
};
- assert_eq!(statistics[0], expected_statistic_partition_1);
- assert_eq!(statistics[1], expected_statistic_partition_2);
+ assert_eq!(*statistics[0], expected_statistic_partition_1);
+ assert_eq!(*statistics[1], expected_statistic_partition_2);
// Verify the statistics match actual execution results
let expected_stats = vec![
@@ -1452,7 +1452,7 @@ mod test {
},
],
};
- assert_eq!(statistics[0], expected_p0_statistics);
+ assert_eq!(*statistics[0], expected_p0_statistics);
// Test Partitioned mode
let partitioned_join = Arc::new(HashJoinExec::try_new(
@@ -1526,7 +1526,7 @@ mod test {
},
],
};
- assert_eq!(statistics[0], expected_p0_statistics);
+ assert_eq!(*statistics[0], expected_p0_statistics);
// Test Auto mode - should fall back to getting all partition
statistics
let auto_join = Arc::new(HashJoinExec::try_new(
@@ -1600,7 +1600,7 @@ mod test {
},
],
};
- assert_eq!(statistics[0], expected_p0_statistics);
+ assert_eq!(*statistics[0], expected_p0_statistics);
Ok(())
}
}
diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs
b/datafusion/core/tests/physical_optimizer/test_utils.rs
index 7b000f69a6..8d9e7b68b8 100644
--- a/datafusion/core/tests/physical_optimizer/test_utils.rs
+++ b/datafusion/core/tests/physical_optimizer/test_utils.rs
@@ -958,8 +958,8 @@ impl ExecutionPlan for TestScan {
internal_err!("TestScan is for testing optimizer only, not for
execution")
}
- fn partition_statistics(&self, _partition: Option<usize>) ->
Result<Statistics> {
- Ok(Statistics::new_unknown(&self.schema))
+ fn partition_statistics(&self, _partition: Option<usize>) ->
Result<Arc<Statistics>> {
+ Ok(Arc::new(Statistics::new_unknown(&self.schema)))
}
// This is the key method - implement sort pushdown
diff --git a/datafusion/core/tests/sql/path_partition.rs
b/datafusion/core/tests/sql/path_partition.rs
index a906184979..c23f53b8db 100644
--- a/datafusion/core/tests/sql/path_partition.rs
+++ b/datafusion/core/tests/sql/path_partition.rs
@@ -461,7 +461,10 @@ async fn parquet_statistics() -> Result<()> {
let schema = physical_plan.schema();
assert_eq!(schema.fields().len(), 4);
- let stat_cols =
physical_plan.partition_statistics(None)?.column_statistics;
+ let stat_cols = physical_plan
+ .partition_statistics(None)?
+ .column_statistics
+ .clone();
assert_eq!(stat_cols.len(), 4);
// stats for the first col are read from the parquet file
assert_eq!(stat_cols[0].null_count, Precision::Exact(3));
@@ -485,7 +488,10 @@ async fn parquet_statistics() -> Result<()> {
let schema = physical_plan.schema();
assert_eq!(schema.fields().len(), 2);
- let stat_cols =
physical_plan.partition_statistics(None)?.column_statistics;
+ let stat_cols = physical_plan
+ .partition_statistics(None)?
+ .column_statistics
+ .clone();
assert_eq!(stat_cols.len(), 2);
// stats for the first col are read from the parquet file
assert_eq!(stat_cols[0].null_count, Precision::Exact(1));
diff --git a/datafusion/datasource/src/file_scan_config.rs
b/datafusion/datasource/src/file_scan_config.rs
index 524e091381..82a986a688 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -781,7 +781,7 @@ impl DataSource for FileScanConfig {
SchedulingType::Cooperative
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
if let Some(partition) = partition {
// Get statistics for a specific partition
// Note: FileGroup statistics include partition columns (computed
from partition_values)
@@ -791,22 +791,28 @@ impl DataSource for FileScanConfig {
// Project the statistics based on the projection
let output_schema = self.projected_schema()?;
return if let Some(projection) = self.file_source.projection()
{
- projection.project_statistics(stat.clone(), &output_schema)
+ Ok(Arc::new(
+ projection.project_statistics(stat.clone(),
&output_schema)?,
+ ))
} else {
- Ok(stat.clone())
+ Ok(Arc::new(stat.clone()))
};
}
// If no statistics available for this partition, return unknown
- Ok(Statistics::new_unknown(self.projected_schema()?.as_ref()))
+ Ok(Arc::new(Statistics::new_unknown(
+ self.projected_schema()?.as_ref(),
+ )))
} else {
// Return aggregate statistics across all partitions
let statistics = self.statistics();
let projection = self.file_source.projection();
let output_schema = self.projected_schema()?;
if let Some(projection) = &projection {
- projection.project_statistics(statistics.clone(),
&output_schema)
+ Ok(Arc::new(
+ projection.project_statistics(statistics.clone(),
&output_schema)?,
+ ))
} else {
- Ok(statistics)
+ Ok(Arc::new(statistics))
}
}
}
diff --git a/datafusion/datasource/src/memory.rs
b/datafusion/datasource/src/memory.rs
index e6add2a42c..aca943ed09 100644
--- a/datafusion/datasource/src/memory.rs
+++ b/datafusion/datasource/src/memory.rs
@@ -197,26 +197,26 @@ impl DataSource for MemorySourceConfig {
SchedulingType::Cooperative
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
if let Some(partition) = partition {
// Compute statistics for a specific partition
if let Some(batches) = self.partitions.get(partition) {
- Ok(common::compute_record_batch_statistics(
+ Ok(Arc::new(common::compute_record_batch_statistics(
from_ref(batches),
&self.schema,
self.projection.clone(),
- ))
+ )))
} else {
// Invalid partition index
- Ok(Statistics::new_unknown(&self.projected_schema))
+ Ok(Arc::new(Statistics::new_unknown(&self.projected_schema)))
}
} else {
// Compute statistics across all partitions
- Ok(common::compute_record_batch_statistics(
+ Ok(Arc::new(common::compute_record_batch_statistics(
&self.partitions,
&self.schema,
self.projection.clone(),
- ))
+ )))
}
}
@@ -968,7 +968,7 @@ mod tests {
let values = MemorySourceConfig::try_new_as_values(schema, data)?;
assert_eq!(
- values.partition_statistics(None)?,
+ *values.partition_statistics(None)?,
Statistics {
num_rows: Precision::Exact(rows),
total_byte_size: Precision::Exact(8), // not important
diff --git a/datafusion/datasource/src/source.rs
b/datafusion/datasource/src/source.rs
index deea108139..bffd94af21 100644
--- a/datafusion/datasource/src/source.rs
+++ b/datafusion/datasource/src/source.rs
@@ -157,7 +157,7 @@ pub trait DataSource: Send + Sync + Debug {
/// Returns statistics for a specific partition, or aggregate statistics
/// across all partitions if `partition` is `None`.
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics>;
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>>;
/// Return a copy of this DataSource with a new fetch limit
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
@@ -359,7 +359,7 @@ impl ExecutionPlan for DataSourceExec {
Some(self.data_source.metrics().clone_inner())
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
self.data_source.partition_statistics(partition)
}
diff --git a/datafusion/physical-optimizer/src/output_requirements.rs
b/datafusion/physical-optimizer/src/output_requirements.rs
index b2212c94bc..8b71fc9fbf 100644
--- a/datafusion/physical-optimizer/src/output_requirements.rs
+++ b/datafusion/physical-optimizer/src/output_requirements.rs
@@ -246,7 +246,7 @@ impl ExecutionPlan for OutputRequirementExec {
unreachable!();
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
self.input.partition_statistics(partition)
}
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index b9a1e49ab5..e573a7f85d 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -1447,9 +1447,9 @@ impl ExecutionPlan for AggregateExec {
Some(self.metrics.clone_inner())
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
let child_statistics = self.input().partition_statistics(partition)?;
- self.statistics_inner(&child_statistics)
+ Ok(Arc::new(self.statistics_inner(&child_statistics)?))
}
fn cardinality_effect(&self) -> CardinalityEffect {
@@ -2538,16 +2538,19 @@ mod tests {
Ok(Box::pin(stream))
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(
+ &self,
+ partition: Option<usize>,
+ ) -> Result<Arc<Statistics>> {
if partition.is_some() {
- return Ok(Statistics::new_unknown(self.schema().as_ref()));
+ return
Ok(Arc::new(Statistics::new_unknown(self.schema().as_ref())));
}
let (_, batches) = some_data();
- Ok(common::compute_record_batch_statistics(
+ Ok(Arc::new(common::compute_record_batch_statistics(
&[batches],
&self.schema(),
None,
- ))
+ )))
}
}
diff --git a/datafusion/physical-plan/src/buffer.rs
b/datafusion/physical-plan/src/buffer.rs
index 8477f39a9b..3e85fb32d2 100644
--- a/datafusion/physical-plan/src/buffer.rs
+++ b/datafusion/physical-plan/src/buffer.rs
@@ -245,7 +245,7 @@ impl ExecutionPlan for BufferExec {
Some(self.metrics.clone_inner())
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
self.input.partition_statistics(partition)
}
diff --git a/datafusion/physical-plan/src/coalesce_batches.rs
b/datafusion/physical-plan/src/coalesce_batches.rs
index d26d1f9bea..3e8bfc7f81 100644
--- a/datafusion/physical-plan/src/coalesce_batches.rs
+++ b/datafusion/physical-plan/src/coalesce_batches.rs
@@ -227,10 +227,9 @@ impl ExecutionPlan for CoalesceBatchesExec {
Some(self.metrics.clone_inner())
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
- self.input
- .partition_statistics(partition)?
- .with_fetch(self.fetch, 0, 1)
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
+ let stats =
Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
+ Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?))
}
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn
ExecutionPlan>> {
diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs
b/datafusion/physical-plan/src/coalesce_partitions.rs
index 3dffdd6a99..5ea3589f22 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -244,10 +244,9 @@ impl ExecutionPlan for CoalescePartitionsExec {
Some(self.metrics.clone_inner())
}
- fn partition_statistics(&self, _partition: Option<usize>) ->
Result<Statistics> {
- self.input
- .partition_statistics(None)?
- .with_fetch(self.fetch, 0, 1)
+ fn partition_statistics(&self, _partition: Option<usize>) ->
Result<Arc<Statistics>> {
+ let stats =
Arc::unwrap_or_clone(self.input.partition_statistics(None)?);
+ Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?))
}
fn supports_limit_pushdown(&self) -> bool {
diff --git a/datafusion/physical-plan/src/coop.rs
b/datafusion/physical-plan/src/coop.rs
index c7a5432994..efe6506edd 100644
--- a/datafusion/physical-plan/src/coop.rs
+++ b/datafusion/physical-plan/src/coop.rs
@@ -311,7 +311,7 @@ impl ExecutionPlan for CooperativeExec {
Ok(make_cooperative(child_stream))
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
self.input.partition_statistics(partition)
}
diff --git a/datafusion/physical-plan/src/display.rs
b/datafusion/physical-plan/src/display.rs
index 76dd6f2054..aaf83345d9 100644
--- a/datafusion/physical-plan/src/display.rs
+++ b/datafusion/physical-plan/src/display.rs
@@ -1187,14 +1187,17 @@ mod tests {
todo!()
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(
+ &self,
+ partition: Option<usize>,
+ ) -> Result<Arc<Statistics>> {
if partition.is_some() {
- return Ok(Statistics::new_unknown(self.schema().as_ref()));
+ return
Ok(Arc::new(Statistics::new_unknown(self.schema().as_ref())));
}
match self {
Self::Panic => panic!("expected panic"),
Self::Error => Err(internal_datafusion_err!("expected error")),
- Self::Ok =>
Ok(Statistics::new_unknown(self.schema().as_ref())),
+ Self::Ok =>
Ok(Arc::new(Statistics::new_unknown(self.schema().as_ref()))),
}
}
}
diff --git a/datafusion/physical-plan/src/empty.rs
b/datafusion/physical-plan/src/empty.rs
index db3f5161a9..078bc4b8d0 100644
--- a/datafusion/physical-plan/src/empty.rs
+++ b/datafusion/physical-plan/src/empty.rs
@@ -164,7 +164,7 @@ impl ExecutionPlan for EmptyExec {
)?))
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
if let Some(partition) = partition {
assert_or_internal_err!(
partition < self.partitions,
@@ -191,7 +191,7 @@ impl ExecutionPlan for EmptyExec {
});
}
- Ok(stats)
+ Ok(Arc::new(stats))
}
}
diff --git a/datafusion/physical-plan/src/execution_plan.rs
b/datafusion/physical-plan/src/execution_plan.rs
index 0a9348b68a..d1e0978cfe 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -553,7 +553,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// If statistics are not available, should return
[`Statistics::new_unknown`]
/// (the default), not an error.
/// If `partition` is `None`, it returns statistics for the entire plan.
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
if let Some(idx) = partition {
// Validate partition index
let partition_count =
self.properties().partitioning.partition_count();
@@ -564,7 +564,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
partition_count
);
}
- Ok(Statistics::new_unknown(&self.schema()))
+ Ok(Arc::new(Statistics::new_unknown(&self.schema())))
}
/// Returns `true` if a limit can be safely pushed down through this
@@ -1640,7 +1640,10 @@ mod tests {
unimplemented!()
}
- fn partition_statistics(&self, _partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(
+ &self,
+ _partition: Option<usize>,
+ ) -> Result<Arc<Statistics>> {
unimplemented!()
}
}
@@ -1710,7 +1713,10 @@ mod tests {
unimplemented!()
}
- fn partition_statistics(&self, _partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(
+ &self,
+ _partition: Option<usize>,
+ ) -> Result<Arc<Statistics>> {
unimplemented!()
}
}
@@ -1775,7 +1781,10 @@ mod tests {
unimplemented!()
}
- fn partition_statistics(&self, _partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(
+ &self,
+ _partition: Option<usize>,
+ ) -> Result<Arc<Statistics>> {
unimplemented!()
}
}
diff --git a/datafusion/physical-plan/src/filter.rs
b/datafusion/physical-plan/src/filter.rs
index 8370201c1c..1e6b4e3193 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -389,7 +389,7 @@ impl FilterExec {
let schema = input.schema();
let stats = Self::statistics_helper(
&schema,
- input.partition_statistics(None)?,
+ Arc::unwrap_or_clone(input.partition_statistics(None)?),
predicate,
default_selectivity,
)?;
@@ -563,15 +563,16 @@ impl ExecutionPlan for FilterExec {
/// The output statistics of a filtering operation can be estimated if the
/// predicate's selectivity value can be determined for the incoming data.
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
- let input_stats = self.input.partition_statistics(partition)?;
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
+ let input_stats =
+ Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
let stats = Self::statistics_helper(
&self.input.schema(),
input_stats,
self.predicate(),
self.default_selectivity,
)?;
- Ok(stats.project(self.projection.as_ref()))
+ Ok(Arc::new(stats.project(self.projection.as_ref())))
}
fn cardinality_effect(&self) -> CardinalityEffect {
@@ -1361,7 +1362,7 @@ mod tests {
];
let _ = exp_col_stats
.into_iter()
- .zip(statistics.column_statistics)
+ .zip(statistics.column_statistics.clone())
.map(|(expected, actual)| {
if let Some(val) = actual.min_value.get_value() {
if val.data_type().is_floating() {
@@ -1432,7 +1433,7 @@ mod tests {
)),
));
// Since filter predicate passes all entries, statistics after filter
shouldn't change.
- let expected = input.partition_statistics(None)?.column_statistics;
+ let expected =
input.partition_statistics(None)?.column_statistics.clone();
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let statistics = filter.partition_statistics(None)?;
@@ -1615,7 +1616,7 @@ mod tests {
}],
};
- assert_eq!(filter_statistics, expected_filter_statistics);
+ assert_eq!(*filter_statistics, expected_filter_statistics);
Ok(())
}
diff --git a/datafusion/physical-plan/src/joins/cross_join.rs
b/datafusion/physical-plan/src/joins/cross_join.rs
index bdad207503..a895f69dc5 100644
--- a/datafusion/physical-plan/src/joins/cross_join.rs
+++ b/datafusion/physical-plan/src/joins/cross_join.rs
@@ -384,12 +384,13 @@ impl ExecutionPlan for CrossJoinExec {
}
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
// Get the all partitions statistics of the left
- let left_stats = self.left.partition_statistics(None)?;
- let right_stats = self.right.partition_statistics(partition)?;
+ let left_stats =
Arc::unwrap_or_clone(self.left.partition_statistics(None)?);
+ let right_stats =
+ Arc::unwrap_or_clone(self.right.partition_statistics(partition)?);
- Ok(stats_cartesian_product(left_stats, right_stats))
+ Ok(Arc::new(stats_cartesian_product(left_stats, right_stats)))
}
/// Tries to swap the projection with its input [`CrossJoinExec`]. If it
can be done,
diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs
b/datafusion/physical-plan/src/joins/hash_join/exec.rs
index c07ae2d651..a44ddf41b0 100644
--- a/datafusion/physical-plan/src/joins/hash_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs
@@ -1441,7 +1441,7 @@ impl ExecutionPlan for HashJoinExec {
Some(self.metrics.clone_inner())
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
let stats = match (partition, self.mode) {
// For CollectLeft mode, the left side is collected into a single
partition,
// so all left partitions are available to each output partition.
@@ -1451,8 +1451,8 @@ impl ExecutionPlan for HashJoinExec {
let right_stats =
self.right.partition_statistics(Some(partition))?;
estimate_join_statistics(
- left_stats,
- right_stats,
+ (*left_stats).clone(),
+ (*right_stats).clone(),
&self.on,
&self.join_type,
&self.join_schema,
@@ -1466,8 +1466,8 @@ impl ExecutionPlan for HashJoinExec {
let right_stats =
self.right.partition_statistics(Some(partition))?;
estimate_join_statistics(
- left_stats,
- right_stats,
+ (*left_stats).clone(),
+ (*right_stats).clone(),
&self.on,
&self.join_type,
&self.join_schema,
@@ -1480,9 +1480,11 @@ impl ExecutionPlan for HashJoinExec {
// TODO stats: it is not possible in general to know the
output size of joins
// There are some special cases though, for example:
// - `A LEFT JOIN B ON A.col=B.col` with
`COUNT_DISTINCT(B.col)=COUNT(B.col)`
+ let left_stats = self.left.partition_statistics(None)?;
+ let right_stats = self.right.partition_statistics(None)?;
estimate_join_statistics(
- self.left.partition_statistics(None)?,
- self.right.partition_statistics(None)?,
+ (*left_stats).clone(),
+ (*right_stats).clone(),
&self.on,
&self.join_type,
&self.join_schema,
@@ -1492,7 +1494,7 @@ impl ExecutionPlan for HashJoinExec {
// Project statistics if there is a projection
let stats = stats.project(self.projection.as_ref());
// Apply fetch limit to statistics
- stats.with_fetch(self.fetch, 0, 1)
+ Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?))
}
/// Tries to push `projection` down through `hash_join`. If possible,
performs the
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index 1ba6ee23ca..f84cb54dac 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -646,7 +646,7 @@ impl ExecutionPlan for NestedLoopJoinExec {
Some(self.metrics.clone_inner())
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
// NestedLoopJoinExec is designed for joins without equijoin keys in
the
// ON clause (e.g., `t1 JOIN t2 ON (t1.v1 + t2.v1) % 2 = 0`). Any join
// predicates are stored in `self.filter`, but
`estimate_join_statistics`
@@ -660,11 +660,11 @@ impl ExecutionPlan for NestedLoopJoinExec {
// so we always request overall stats with `None`. Right side can have
// multiple partitions, so we forward the partition parameter to get
// partition-specific statistics when requested.
- let left_stats = self.left.partition_statistics(None)?;
- let right_stats = match partition {
+ let left_stats =
Arc::unwrap_or_clone(self.left.partition_statistics(None)?);
+ let right_stats = Arc::unwrap_or_clone(match partition {
Some(partition) =>
self.right.partition_statistics(Some(partition))?,
None => self.right.partition_statistics(None)?,
- };
+ });
let stats = estimate_join_statistics(
left_stats,
@@ -674,7 +674,7 @@ impl ExecutionPlan for NestedLoopJoinExec {
&self.join_schema,
)?;
- Ok(stats.project(self.projection.as_ref()))
+ Ok(Arc::new(stats.project(self.projection.as_ref())))
}
/// Tries to push `projection` down through `nested_loop_join`. If
possible, performs the
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
index 7a01888c28..ac077792f5 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
@@ -552,7 +552,7 @@ impl ExecutionPlan for SortMergeJoinExec {
Some(self.metrics.clone_inner())
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
// SortMergeJoinExec uses symmetric hash partitioning where both left
and right
// inputs are hash-partitioned on the join keys. This means partition
`i` of the
// left input is joined with partition `i` of the right input.
@@ -564,13 +564,16 @@ impl ExecutionPlan for SortMergeJoinExec {
// TODO stats: it is not possible in general to know the output size
of joins
// There are some special cases though, for example:
// - `A LEFT JOIN B ON A.col=B.col` with
`COUNT_DISTINCT(B.col)=COUNT(B.col)`
- estimate_join_statistics(
- self.left.partition_statistics(partition)?,
- self.right.partition_statistics(partition)?,
+ let left_stats =
Arc::unwrap_or_clone(self.left.partition_statistics(partition)?);
+ let right_stats =
+ Arc::unwrap_or_clone(self.right.partition_statistics(partition)?);
+ Ok(Arc::new(estimate_join_statistics(
+ left_stats,
+ right_stats,
&self.on,
&self.join_type,
&self.schema,
- )
+ )?))
}
/// Tries to swap the projection with its input [`SortMergeJoinExec`]. If
it can be done,
diff --git a/datafusion/physical-plan/src/limit.rs
b/datafusion/physical-plan/src/limit.rs
index 63a722ef26..d135434898 100644
--- a/datafusion/physical-plan/src/limit.rs
+++ b/datafusion/physical-plan/src/limit.rs
@@ -239,10 +239,9 @@ impl ExecutionPlan for GlobalLimitExec {
Some(self.metrics.clone_inner())
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
- self.input
- .partition_statistics(partition)?
- .with_fetch(self.fetch, self.skip, 1)
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
+ let stats =
Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
+ Ok(Arc::new(stats.with_fetch(self.fetch, self.skip, 1)?))
}
fn fetch(&self) -> Option<usize> {
@@ -421,10 +420,9 @@ impl ExecutionPlan for LocalLimitExec {
Some(self.metrics.clone_inner())
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
- self.input
- .partition_statistics(partition)?
- .with_fetch(Some(self.fetch), 0, 1)
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
+ let stats =
Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
+ Ok(Arc::new(stats.with_fetch(Some(self.fetch), 0, 1)?))
}
fn fetch(&self) -> Option<usize> {
diff --git a/datafusion/physical-plan/src/placeholder_row.rs
b/datafusion/physical-plan/src/placeholder_row.rs
index 68fe62f168..eaa895c821 100644
--- a/datafusion/physical-plan/src/placeholder_row.rs
+++ b/datafusion/physical-plan/src/placeholder_row.rs
@@ -178,7 +178,7 @@ impl ExecutionPlan for PlaceholderRowExec {
Ok(Box::pin(cooperative(ms)))
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
let batches = self
.data()
.expect("Create single row placeholder RecordBatch should not
fail");
@@ -189,11 +189,11 @@ impl ExecutionPlan for PlaceholderRowExec {
None => vec![batches; self.partitions],
};
- Ok(common::compute_record_batch_statistics(
+ Ok(Arc::new(common::compute_record_batch_statistics(
&batches,
&self.schema,
None,
- ))
+ )))
}
}
diff --git a/datafusion/physical-plan/src/projection.rs
b/datafusion/physical-plan/src/projection.rs
index a3d940df13..a4cce0436b 100644
--- a/datafusion/physical-plan/src/projection.rs
+++ b/datafusion/physical-plan/src/projection.rs
@@ -365,12 +365,15 @@ impl ExecutionPlan for ProjectionExec {
Some(self.metrics.clone_inner())
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
- let input_stats = self.input.partition_statistics(partition)?;
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
+ let input_stats =
+ Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
let output_schema = self.schema();
- self.projector
- .projection()
- .project_statistics(input_stats, &output_schema)
+ Ok(Arc::new(
+ self.projector
+ .projection()
+ .project_statistics(input_stats, &output_schema)?,
+ ))
}
fn supports_limit_pushdown(&self) -> bool {
diff --git a/datafusion/physical-plan/src/repartition/mod.rs
b/datafusion/physical-plan/src/repartition/mod.rs
index 081f10d482..21f5bd3729 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -1102,11 +1102,11 @@ impl ExecutionPlan for RepartitionExec {
Some(self.metrics.clone_inner())
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
if let Some(partition) = partition {
let partition_count = self.partitioning().partition_count();
if partition_count == 0 {
- return Ok(Statistics::new_unknown(&self.schema()));
+ return Ok(Arc::new(Statistics::new_unknown(&self.schema())));
}
assert_or_internal_err!(
@@ -1116,7 +1116,7 @@ impl ExecutionPlan for RepartitionExec {
partition_count
);
- let mut stats = self.input.partition_statistics(None)?;
+ let mut stats =
Arc::unwrap_or_clone(self.input.partition_statistics(None)?);
// Distribute statistics across partitions
stats.num_rows = stats
@@ -1137,7 +1137,7 @@ impl ExecutionPlan for RepartitionExec {
.map(|_| ColumnStatistics::new_unknown())
.collect();
- Ok(stats)
+ Ok(Arc::new(stats))
} else {
self.input.partition_statistics(None)
}
diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs
b/datafusion/physical-plan/src/sorts/partial_sort.rs
index 6df3babb8e..127998601f 100644
--- a/datafusion/physical-plan/src/sorts/partial_sort.rs
+++ b/datafusion/physical-plan/src/sorts/partial_sort.rs
@@ -350,7 +350,7 @@ impl ExecutionPlan for PartialSortExec {
Some(self.metrics_set.clone_inner())
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
self.input.partition_statistics(partition)
}
}
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index fa05ad9932..d02ef48e76 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -1335,16 +1335,14 @@ impl ExecutionPlan for SortExec {
Some(self.metrics_set.clone_inner())
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
- if !self.preserve_partitioning() {
- return self
- .input
- .partition_statistics(None)?
- .with_fetch(self.fetch, 0, 1);
- }
- self.input
- .partition_statistics(partition)?
- .with_fetch(self.fetch, 0, 1)
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
+ let p = if !self.preserve_partitioning() {
+ None
+ } else {
+ partition
+ };
+ let stats = Arc::unwrap_or_clone(self.input.partition_statistics(p)?);
+ Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?))
}
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn
ExecutionPlan>> {
diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
index 7cbc21f8cb..b1ee5b4d5e 100644
--- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
+++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
@@ -398,7 +398,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
Some(self.metrics.clone_inner())
}
- fn partition_statistics(&self, _partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, _partition: Option<usize>) ->
Result<Arc<Statistics>> {
self.input.partition_statistics(None)
}
diff --git a/datafusion/physical-plan/src/test.rs
b/datafusion/physical-plan/src/test.rs
index 499c10413a..0630b8f174 100644
--- a/datafusion/physical-plan/src/test.rs
+++ b/datafusion/physical-plan/src/test.rs
@@ -186,11 +186,11 @@ impl ExecutionPlan for TestMemoryExec {
unimplemented!()
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
if partition.is_some() {
- Ok(Statistics::new_unknown(&self.schema))
+ Ok(Arc::new(Statistics::new_unknown(&self.schema)))
} else {
- self.statistics_inner()
+ Ok(Arc::new(self.statistics_inner()?))
}
}
diff --git a/datafusion/physical-plan/src/test/exec.rs
b/datafusion/physical-plan/src/test/exec.rs
index 7b31855ce8..5458fa7ab8 100644
--- a/datafusion/physical-plan/src/test/exec.rs
+++ b/datafusion/physical-plan/src/test/exec.rs
@@ -262,9 +262,9 @@ impl ExecutionPlan for MockExec {
}
// Panics if one of the batches is an error
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
if partition.is_some() {
- return Ok(Statistics::new_unknown(&self.schema));
+ return Ok(Arc::new(Statistics::new_unknown(&self.schema)));
}
let data: Result<Vec<_>> = self
.data
@@ -277,11 +277,11 @@ impl ExecutionPlan for MockExec {
let data = data?;
- Ok(common::compute_record_batch_statistics(
+ Ok(Arc::new(common::compute_record_batch_statistics(
&[data],
&self.schema,
None,
- ))
+ )))
}
}
@@ -498,15 +498,15 @@ impl ExecutionPlan for BarrierExec {
Ok(builder.build())
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
if partition.is_some() {
- return Ok(Statistics::new_unknown(&self.schema));
+ return Ok(Arc::new(Statistics::new_unknown(&self.schema)));
}
- Ok(common::compute_record_batch_statistics(
+ Ok(Arc::new(common::compute_record_batch_statistics(
&self.data,
&self.schema,
None,
- ))
+ )))
}
}
@@ -700,12 +700,12 @@ impl ExecutionPlan for StatisticsExec {
unimplemented!("This plan only serves for testing statistics")
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
- Ok(if partition.is_some() {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
+ Ok(Arc::new(if partition.is_some() {
Statistics::new_unknown(&self.schema)
} else {
self.stats.clone()
- })
+ }))
}
}
diff --git a/datafusion/physical-plan/src/union.rs
b/datafusion/physical-plan/src/union.rs
index 168048295d..384a715820 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -331,7 +331,7 @@ impl ExecutionPlan for UnionExec {
Some(self.metrics.clone_inner())
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
if let Some(partition_idx) = partition {
// For a specific partition, find which input it belongs to
let mut remaining_idx = partition_idx;
@@ -344,19 +344,25 @@ impl ExecutionPlan for UnionExec {
remaining_idx -= input_partition_count;
}
// If we get here, the partition index is out of bounds
- Ok(Statistics::new_unknown(&self.schema()))
+ Ok(Arc::new(Statistics::new_unknown(&self.schema())))
} else {
// Collect statistics from all inputs
let stats = self
.inputs
.iter()
- .map(|input_exec| input_exec.partition_statistics(None))
+ .map(|input_exec| {
+ input_exec
+ .partition_statistics(None)
+ .map(Arc::unwrap_or_clone)
+ })
.collect::<Result<Vec<_>>>()?;
- Ok(stats
- .into_iter()
- .reduce(stats_union)
- .unwrap_or_else(|| Statistics::new_unknown(&self.schema())))
+ Ok(Arc::new(
+ stats
+ .into_iter()
+ .reduce(stats_union)
+ .unwrap_or_else(||
Statistics::new_unknown(&self.schema())),
+ ))
}
}
@@ -670,17 +676,22 @@ impl ExecutionPlan for InterleaveExec {
Some(self.metrics.clone_inner())
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
let stats = self
.inputs
.iter()
- .map(|stat| stat.partition_statistics(partition))
+ .map(|stat| {
+ stat.partition_statistics(partition)
+ .map(Arc::unwrap_or_clone)
+ })
.collect::<Result<Vec<_>>>()?;
- Ok(stats
- .into_iter()
- .reduce(stats_union)
- .unwrap_or_else(|| Statistics::new_unknown(&self.schema())))
+ Ok(Arc::new(
+ stats
+ .into_iter()
+ .reduce(stats_union)
+ .unwrap_or_else(|| Statistics::new_unknown(&self.schema())),
+ ))
}
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index f589b4d748..33570c2a21 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -395,9 +395,10 @@ impl ExecutionPlan for BoundedWindowAggExec {
Some(self.metrics.clone_inner())
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
- let input_stat = self.input.partition_statistics(partition)?;
- self.statistics_helper(input_stat)
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
+ let input_stat =
+ Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
+ Ok(Arc::new(self.statistics_helper(input_stat)?))
}
fn cardinality_effect(&self) -> CardinalityEffect {
diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs
b/datafusion/physical-plan/src/windows/window_agg_exec.rs
index f71f1cbfe6..c9958c875c 100644
--- a/datafusion/physical-plan/src/windows/window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs
@@ -298,8 +298,9 @@ impl ExecutionPlan for WindowAggExec {
Some(self.metrics.clone_inner())
}
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
- let input_stat = self.input.partition_statistics(partition)?;
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
+ let input_stat =
+ Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
let win_cols = self.window_expr.len();
let input_cols = self.input.schema().fields().len();
// TODO stats: some windowing function will maintain invariants such
as min, max...
@@ -309,11 +310,11 @@ impl ExecutionPlan for WindowAggExec {
for _ in 0..win_cols {
column_statistics.push(ColumnStatistics::new_unknown())
}
- Ok(Statistics {
+ Ok(Arc::new(Statistics {
num_rows: input_stat.num_rows,
column_statistics,
total_byte_size: Precision::Absent,
- })
+ }))
}
fn cardinality_effect(&self) -> CardinalityEffect {
diff --git a/datafusion/physical-plan/src/work_table.rs
b/datafusion/physical-plan/src/work_table.rs
index 4e8cfeea39..c2ef6bf071 100644
--- a/datafusion/physical-plan/src/work_table.rs
+++ b/datafusion/physical-plan/src/work_table.rs
@@ -239,8 +239,8 @@ impl ExecutionPlan for WorkTableExec {
Some(self.metrics.clone_inner())
}
- fn partition_statistics(&self, _partition: Option<usize>) ->
Result<Statistics> {
- Ok(Statistics::new_unknown(&self.schema()))
+ fn partition_statistics(&self, _partition: Option<usize>) ->
Result<Arc<Statistics>> {
+ Ok(Arc::new(Statistics::new_unknown(&self.schema())))
}
/// Injects run-time state into this `WorkTableExec`.
diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md
b/docs/source/library-user-guide/upgrading/54.0.0.md
index 85cb8e9dd0..77b4fb6f71 100644
--- a/docs/source/library-user-guide/upgrading/54.0.0.md
+++ b/docs/source/library-user-guide/upgrading/54.0.0.md
@@ -77,7 +77,7 @@ fn apply_expressions(
**Node whose only expressions are in `output_ordering()` (e.g. a synthetic
test node with no owned expression fields):**
-```rust,ignore
+````rust,ignore
fn apply_expressions(
&self,
f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
@@ -90,4 +90,35 @@ fn apply_expressions(
}
Ok(tnr)
}
+
+### `ExecutionPlan::partition_statistics` now returns `Arc<Statistics>`
+
+`ExecutionPlan::partition_statistics` now returns `Result<Arc<Statistics>>`
instead of `Result<Statistics>`. This avoids cloning `Statistics` when it is
shared across multiple consumers.
+
+**Before:**
+
+```rust,ignore
+fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>
{
+ Ok(Statistics::new_unknown(&self.schema()))
+}
+````
+
+**After:**
+
+```rust,ignore
+fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
+ Ok(Arc::new(Statistics::new_unknown(&self.schema())))
+}
+```
+
+If you need an owned `Statistics` value (e.g. to mutate it), use
`Arc::unwrap_or_clone`:
+
+```rust,ignore
+// If you previously consumed the Statistics directly:
+let stats = plan.partition_statistics(None)?;
+stats.column_statistics[0].min_value = ...;
+
+// Now unwrap the Arc first:
+let mut stats = Arc::unwrap_or_clone(plan.partition_statistics(None)?);
+stats.column_statistics[0].min_value = ...;
```
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]