This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7fa3c1f23b Minor: move datasource statistics code into its own module 
(#7391)
7fa3c1f23b is described below

commit 7fa3c1f23b5e3de5ae5c4d386d542c0816020146
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Aug 24 06:26:28 2023 -0400

    Minor: move datasource statistics code into its own module (#7391)
    
    * Minor: move datasource statistics code into its own module
    
    * fixup
---
 datafusion/core/src/datasource/mod.rs              | 161 +--------------------
 .../core/src/datasource/{mod.rs => statistics.rs}  |  36 +----
 2 files changed, 8 insertions(+), 189 deletions(-)

diff --git a/datafusion/core/src/datasource/mod.rs 
b/datafusion/core/src/datasource/mod.rs
index 169a17d090..35f5653651 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -28,174 +28,19 @@ pub mod listing_table_factory;
 pub mod memory;
 pub mod physical_plan;
 pub mod provider;
+mod statistics;
 pub mod streaming;
 pub mod view;
 
 // backwards compatibility
 pub use datafusion_execution::object_store;
 
-use futures::Stream;
-
 pub use self::default_table_source::{
     provider_as_source, source_as_provider, DefaultTableSource,
 };
-use self::listing::PartitionedFile;
 pub use self::memory::MemTable;
 pub use self::provider::TableProvider;
 pub use self::view::ViewTable;
-use crate::arrow::datatypes::{Schema, SchemaRef};
-use crate::error::Result;
 pub use crate::logical_expr::TableType;
-use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
-use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
-use futures::StreamExt;
-
-/// Get all files as well as the file level summary statistics (no statistic 
for partition columns).
-/// If the optional `limit` is provided, includes only sufficient files.
-/// Needed to read up to `limit` number of rows.
-pub async fn get_statistics_with_limit(
-    all_files: impl Stream<Item = Result<(PartitionedFile, Statistics)>>,
-    file_schema: SchemaRef,
-    limit: Option<usize>,
-) -> Result<(Vec<PartitionedFile>, Statistics)> {
-    let mut result_files = vec![];
-
-    let mut null_counts = vec![0; file_schema.fields().len()];
-    let mut has_statistics = false;
-    let (mut max_values, mut min_values) = create_max_min_accs(&file_schema);
-
-    let mut is_exact = true;
-
-    // The number of rows and the total byte size can be calculated as long as
-    // at least one file has them. If none of the files provide them, then they
-    // will be omitted from the statistics. The missing values will be counted
-    // as zero.
-    let mut num_rows = None;
-    let mut total_byte_size = None;
-
-    // fusing the stream allows us to call next safely even once it is finished
-    let mut all_files = Box::pin(all_files.fuse());
-    while let Some(res) = all_files.next().await {
-        let (file, file_stats) = res?;
-        result_files.push(file);
-        is_exact &= file_stats.is_exact;
-        num_rows = if let Some(num_rows) = num_rows {
-            Some(num_rows + file_stats.num_rows.unwrap_or(0))
-        } else {
-            file_stats.num_rows
-        };
-        total_byte_size = if let Some(total_byte_size) = total_byte_size {
-            Some(total_byte_size + file_stats.total_byte_size.unwrap_or(0))
-        } else {
-            file_stats.total_byte_size
-        };
-        if let Some(vec) = &file_stats.column_statistics {
-            has_statistics = true;
-            for (i, cs) in vec.iter().enumerate() {
-                null_counts[i] += cs.null_count.unwrap_or(0);
-
-                if let Some(max_value) = &mut max_values[i] {
-                    if let Some(file_max) = cs.max_value.clone() {
-                        match max_value.update_batch(&[file_max.to_array()]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                max_values[i] = None;
-                            }
-                        }
-                    } else {
-                        max_values[i] = None;
-                    }
-                }
-
-                if let Some(min_value) = &mut min_values[i] {
-                    if let Some(file_min) = cs.min_value.clone() {
-                        match min_value.update_batch(&[file_min.to_array()]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                min_values[i] = None;
-                            }
-                        }
-                    } else {
-                        min_values[i] = None;
-                    }
-                }
-            }
-        }
-
-        // If the number of rows exceeds the limit, we can stop processing
-        // files. This only applies when we know the number of rows. It also
-        // currently ignores tables that have no statistics regarding the
-        // number of rows.
-        if num_rows.unwrap_or(usize::MIN) > limit.unwrap_or(usize::MAX) {
-            break;
-        }
-    }
-    // if we still have files in the stream, it means that the limit kicked
-    // in and that the statistic could have been different if we processed
-    // the files in a different order.
-    if all_files.next().await.is_some() {
-        is_exact = false;
-    }
-
-    let column_stats = if has_statistics {
-        Some(get_col_stats(
-            &file_schema,
-            null_counts,
-            &mut max_values,
-            &mut min_values,
-        ))
-    } else {
-        None
-    };
-
-    let statistics = Statistics {
-        num_rows,
-        total_byte_size,
-        column_statistics: column_stats,
-        is_exact,
-    };
-
-    Ok((result_files, statistics))
-}
-
-fn create_max_min_accs(
-    schema: &Schema,
-) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
-    let max_values: Vec<Option<MaxAccumulator>> = schema
-        .fields()
-        .iter()
-        .map(|field| MaxAccumulator::try_new(field.data_type()).ok())
-        .collect::<Vec<_>>();
-    let min_values: Vec<Option<MinAccumulator>> = schema
-        .fields()
-        .iter()
-        .map(|field| MinAccumulator::try_new(field.data_type()).ok())
-        .collect::<Vec<_>>();
-    (max_values, min_values)
-}
-
-fn get_col_stats(
-    schema: &Schema,
-    null_counts: Vec<usize>,
-    max_values: &mut [Option<MaxAccumulator>],
-    min_values: &mut [Option<MinAccumulator>],
-) -> Vec<ColumnStatistics> {
-    (0..schema.fields().len())
-        .map(|i| {
-            let max_value = match &max_values[i] {
-                Some(max_value) => max_value.evaluate().ok(),
-                None => None,
-            };
-            let min_value = match &min_values[i] {
-                Some(min_value) => min_value.evaluate().ok(),
-                None => None,
-            };
-            ColumnStatistics {
-                null_count: Some(null_counts[i]),
-                max_value,
-                min_value,
-                distinct_count: None,
-            }
-        })
-        .collect()
-}
+pub use statistics::get_statistics_with_limit;
+pub(crate) use statistics::{create_max_min_accs, get_col_stats};
diff --git a/datafusion/core/src/datasource/mod.rs 
b/datafusion/core/src/datasource/statistics.rs
similarity index 89%
copy from datafusion/core/src/datasource/mod.rs
copy to datafusion/core/src/datasource/statistics.rs
index 169a17d090..1b6a03e15c 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/statistics.rs
@@ -15,41 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! DataFusion data sources: [`TableProvider`] and [`ListingTable`]
-//!
-//! [`ListingTable`]: crate::datasource::listing::ListingTable
-
-pub mod avro_to_arrow;
-pub mod default_table_source;
-pub mod empty;
-pub mod file_format;
-pub mod listing;
-pub mod listing_table_factory;
-pub mod memory;
-pub mod physical_plan;
-pub mod provider;
-pub mod streaming;
-pub mod view;
-
-// backwards compatibility
-pub use datafusion_execution::object_store;
-
-use futures::Stream;
-
-pub use self::default_table_source::{
-    provider_as_source, source_as_provider, DefaultTableSource,
-};
-use self::listing::PartitionedFile;
-pub use self::memory::MemTable;
-pub use self::provider::TableProvider;
-pub use self::view::ViewTable;
 use crate::arrow::datatypes::{Schema, SchemaRef};
 use crate::error::Result;
-pub use crate::logical_expr::TableType;
 use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
 use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
+use futures::Stream;
 use futures::StreamExt;
 
+use super::listing::PartitionedFile;
+
 /// Get all files as well as the file level summary statistics (no statistic 
for partition columns).
 /// If the optional `limit` is provided, includes only sufficient files.
 /// Needed to read up to `limit` number of rows.
@@ -158,7 +132,7 @@ pub async fn get_statistics_with_limit(
     Ok((result_files, statistics))
 }
 
-fn create_max_min_accs(
+pub(crate) fn create_max_min_accs(
     schema: &Schema,
 ) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
     let max_values: Vec<Option<MaxAccumulator>> = schema
@@ -174,7 +148,7 @@ fn create_max_min_accs(
     (max_values, min_values)
 }
 
-fn get_col_stats(
+pub(crate) fn get_col_stats(
     schema: &Schema,
     null_counts: Vec<usize>,
     max_values: &mut [Option<MaxAccumulator>],

Reply via email to