This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7fa3c1f23b Minor: move datasource statistics code into its own module
(#7391)
7fa3c1f23b is described below
commit 7fa3c1f23b5e3de5ae5c4d386d542c0816020146
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Aug 24 06:26:28 2023 -0400
Minor: move datasource statistics code into its own module (#7391)
* Minor: move datasource statistics code into its own module
* fixup
---
datafusion/core/src/datasource/mod.rs | 161 +--------------------
.../core/src/datasource/{mod.rs => statistics.rs} | 36 +----
2 files changed, 8 insertions(+), 189 deletions(-)
diff --git a/datafusion/core/src/datasource/mod.rs
b/datafusion/core/src/datasource/mod.rs
index 169a17d090..35f5653651 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -28,174 +28,19 @@ pub mod listing_table_factory;
pub mod memory;
pub mod physical_plan;
pub mod provider;
+mod statistics;
pub mod streaming;
pub mod view;
// backwards compatibility
pub use datafusion_execution::object_store;
-use futures::Stream;
-
pub use self::default_table_source::{
provider_as_source, source_as_provider, DefaultTableSource,
};
-use self::listing::PartitionedFile;
pub use self::memory::MemTable;
pub use self::provider::TableProvider;
pub use self::view::ViewTable;
-use crate::arrow::datatypes::{Schema, SchemaRef};
-use crate::error::Result;
pub use crate::logical_expr::TableType;
-use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
-use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
-use futures::StreamExt;
-
-/// Get all files as well as the file level summary statistics (no statistic
for partition columns).
-/// If the optional `limit` is provided, includes only sufficient files.
-/// Needed to read up to `limit` number of rows.
-pub async fn get_statistics_with_limit(
- all_files: impl Stream<Item = Result<(PartitionedFile, Statistics)>>,
- file_schema: SchemaRef,
- limit: Option<usize>,
-) -> Result<(Vec<PartitionedFile>, Statistics)> {
- let mut result_files = vec![];
-
- let mut null_counts = vec![0; file_schema.fields().len()];
- let mut has_statistics = false;
- let (mut max_values, mut min_values) = create_max_min_accs(&file_schema);
-
- let mut is_exact = true;
-
- // The number of rows and the total byte size can be calculated as long as
- // at least one file has them. If none of the files provide them, then they
- // will be omitted from the statistics. The missing values will be counted
- // as zero.
- let mut num_rows = None;
- let mut total_byte_size = None;
-
- // fusing the stream allows us to call next safely even once it is finished
- let mut all_files = Box::pin(all_files.fuse());
- while let Some(res) = all_files.next().await {
- let (file, file_stats) = res?;
- result_files.push(file);
- is_exact &= file_stats.is_exact;
- num_rows = if let Some(num_rows) = num_rows {
- Some(num_rows + file_stats.num_rows.unwrap_or(0))
- } else {
- file_stats.num_rows
- };
- total_byte_size = if let Some(total_byte_size) = total_byte_size {
- Some(total_byte_size + file_stats.total_byte_size.unwrap_or(0))
- } else {
- file_stats.total_byte_size
- };
- if let Some(vec) = &file_stats.column_statistics {
- has_statistics = true;
- for (i, cs) in vec.iter().enumerate() {
- null_counts[i] += cs.null_count.unwrap_or(0);
-
- if let Some(max_value) = &mut max_values[i] {
- if let Some(file_max) = cs.max_value.clone() {
- match max_value.update_batch(&[file_max.to_array()]) {
- Ok(_) => {}
- Err(_) => {
- max_values[i] = None;
- }
- }
- } else {
- max_values[i] = None;
- }
- }
-
- if let Some(min_value) = &mut min_values[i] {
- if let Some(file_min) = cs.min_value.clone() {
- match min_value.update_batch(&[file_min.to_array()]) {
- Ok(_) => {}
- Err(_) => {
- min_values[i] = None;
- }
- }
- } else {
- min_values[i] = None;
- }
- }
- }
- }
-
- // If the number of rows exceeds the limit, we can stop processing
- // files. This only applies when we know the number of rows. It also
- // currently ignores tables that have no statistics regarding the
- // number of rows.
- if num_rows.unwrap_or(usize::MIN) > limit.unwrap_or(usize::MAX) {
- break;
- }
- }
- // if we still have files in the stream, it means that the limit kicked
- // in and that the statistic could have been different if we processed
- // the files in a different order.
- if all_files.next().await.is_some() {
- is_exact = false;
- }
-
- let column_stats = if has_statistics {
- Some(get_col_stats(
- &file_schema,
- null_counts,
- &mut max_values,
- &mut min_values,
- ))
- } else {
- None
- };
-
- let statistics = Statistics {
- num_rows,
- total_byte_size,
- column_statistics: column_stats,
- is_exact,
- };
-
- Ok((result_files, statistics))
-}
-
-fn create_max_min_accs(
- schema: &Schema,
-) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
- let max_values: Vec<Option<MaxAccumulator>> = schema
- .fields()
- .iter()
- .map(|field| MaxAccumulator::try_new(field.data_type()).ok())
- .collect::<Vec<_>>();
- let min_values: Vec<Option<MinAccumulator>> = schema
- .fields()
- .iter()
- .map(|field| MinAccumulator::try_new(field.data_type()).ok())
- .collect::<Vec<_>>();
- (max_values, min_values)
-}
-
-fn get_col_stats(
- schema: &Schema,
- null_counts: Vec<usize>,
- max_values: &mut [Option<MaxAccumulator>],
- min_values: &mut [Option<MinAccumulator>],
-) -> Vec<ColumnStatistics> {
- (0..schema.fields().len())
- .map(|i| {
- let max_value = match &max_values[i] {
- Some(max_value) => max_value.evaluate().ok(),
- None => None,
- };
- let min_value = match &min_values[i] {
- Some(min_value) => min_value.evaluate().ok(),
- None => None,
- };
- ColumnStatistics {
- null_count: Some(null_counts[i]),
- max_value,
- min_value,
- distinct_count: None,
- }
- })
- .collect()
-}
+pub use statistics::get_statistics_with_limit;
+pub(crate) use statistics::{create_max_min_accs, get_col_stats};
diff --git a/datafusion/core/src/datasource/mod.rs
b/datafusion/core/src/datasource/statistics.rs
similarity index 89%
copy from datafusion/core/src/datasource/mod.rs
copy to datafusion/core/src/datasource/statistics.rs
index 169a17d090..1b6a03e15c 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/statistics.rs
@@ -15,41 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-//! DataFusion data sources: [`TableProvider`] and [`ListingTable`]
-//!
-//! [`ListingTable`]: crate::datasource::listing::ListingTable
-
-pub mod avro_to_arrow;
-pub mod default_table_source;
-pub mod empty;
-pub mod file_format;
-pub mod listing;
-pub mod listing_table_factory;
-pub mod memory;
-pub mod physical_plan;
-pub mod provider;
-pub mod streaming;
-pub mod view;
-
-// backwards compatibility
-pub use datafusion_execution::object_store;
-
-use futures::Stream;
-
-pub use self::default_table_source::{
- provider_as_source, source_as_provider, DefaultTableSource,
-};
-use self::listing::PartitionedFile;
-pub use self::memory::MemTable;
-pub use self::provider::TableProvider;
-pub use self::view::ViewTable;
use crate::arrow::datatypes::{Schema, SchemaRef};
use crate::error::Result;
-pub use crate::logical_expr::TableType;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
+use futures::Stream;
use futures::StreamExt;
+use super::listing::PartitionedFile;
+
/// Get all files as well as the file level summary statistics (no statistic
for partition columns).
/// If the optional `limit` is provided, includes only sufficient files.
/// Needed to read up to `limit` number of rows.
@@ -158,7 +132,7 @@ pub async fn get_statistics_with_limit(
Ok((result_files, statistics))
}
-fn create_max_min_accs(
+pub(crate) fn create_max_min_accs(
schema: &Schema,
) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
let max_values: Vec<Option<MaxAccumulator>> = schema
@@ -174,7 +148,7 @@ fn create_max_min_accs(
(max_values, min_values)
}
-fn get_col_stats(
+pub(crate) fn get_col_stats(
schema: &Schema,
null_counts: Vec<usize>,
max_values: &mut [Option<MaxAccumulator>],