kumarUjjawal commented on code in PR #21157: URL: https://github.com/apache/datafusion/pull/21157#discussion_r2999010390
########## datafusion/pruning/src/statistics.rs: ########## @@ -0,0 +1,538 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, new_null_array}; +use datafusion_common::pruning::PruningStatistics; +use datafusion_expr::Expr; +use std::collections::{HashMap, HashSet}; + +use datafusion_common::error::DataFusionError; + +/// A source of runtime statistical information for pruning. +/// +/// This trait accepts a set of [`Expr`] expressions and returns +/// statistics for those expressions that can be used for pruning. +/// +/// It is up to implementors to determine how to collect these statistics. +/// Some example use cases include: +/// 1. Matching on basic expressions like `min(column)` or `max(column)` +/// and returning statistics from file metadata. +/// 2. Sampling data at runtime to get more accurate statistics. +/// 3. Querying an external metastore for statistics. +/// +/// # Supported expression types +/// +/// The following expression types are meaningful for pruning: +/// +/// - **Aggregate functions**: `min(column)`, `max(column)`, +/// `count(*) FILTER (WHERE column IS NULL)`, +/// `count(*) FILTER (WHERE column IS NOT NULL)` +/// - **InList**: `column IN (v1, v2, ...)` — see [InList semantics] below. +/// +/// Implementors return `None` for any expression they cannot answer. +/// +/// # InList semantics +/// +/// For `column IN (v1, v2, ..., vN)`, the returned `BooleanArray` has one +/// entry per container with three-valued logic: +/// +/// - `true` — the column in this container ONLY contains values in +/// `{v1, ..., vN}`. Every row in the container satisfies the `IN` +/// predicate (assuming non-null values; see below). +/// - `false` — the column in this container contains NONE of the values +/// in `{v1, ..., vN}`. No row can satisfy the `IN` predicate, so the +/// container can be pruned. +/// - `null` — it is not known whether the column contains any of the +/// values. The container cannot be pruned. +/// +/// ## Null handling +/// +/// - **Null values in the column**: SQL `IN` returns `NULL` when the +/// column value is `NULL`, regardless of the list contents. Containers +/// where the column has null values should return `null` (unknown) +/// unless the implementation can determine that all non-null values +/// still satisfy or violate the predicate. +/// - **Null values in the list** (`column IN (1, NULL, 3)`): Per SQL +/// semantics, `x IN (1, NULL, 3)` returns `TRUE` if `x` is 1 or 3, +/// `NULL` if `x` is any other non-null value (because `x = NULL` is +/// unknown), and `NULL` if `x` is `NULL`. Null literals in the list +/// therefore weaken pruning — a container can no longer return `false` +/// unless it can prove the column has no values at all. +/// - **`NOT IN` with nulls** (`column NOT IN (1, NULL, 3)`): This can +/// never return `TRUE` for non-null column values because `x != NULL` +/// is always unknown. A container can only be pruned if it is known +/// to contain exclusively values in the list. +#[async_trait::async_trait] +pub trait StatisticsSource: Send + Sync { + /// Returns the number of containers (row groups, files, etc.) that + /// statistics are provided for. All returned arrays must have this length. + fn num_containers(&self) -> usize; + + /// Returns statistics for each expression, or `None` for expressions + /// that cannot be answered. + async fn expression_statistics( + &self, + expressions: &[Expr], + ) -> Result<Vec<Option<ArrayRef>>, DataFusionError>; +} + +/// Blanket implementation of [`StatisticsSource`] for types that implement +/// [`PruningStatistics`]. +/// +/// This allows any type that implements [`PruningStatistics`] to be used as +/// a [`StatisticsSource`] without needing to implement the trait directly. +/// +/// The implementation matches on expressions that can be directly answered +/// by the underlying [`PruningStatistics`]: +/// - `min(column)` → [`PruningStatistics::min_values`] +/// - `max(column)` → [`PruningStatistics::max_values`] +/// - `count(*) FILTER (WHERE column IS NOT NULL)` → [`PruningStatistics::row_counts`] +/// - `count(*) FILTER (WHERE column IS NULL)` → [`PruningStatistics::null_counts`] +/// - `column IN (lit1, lit2, ...)` → [`PruningStatistics::contained`] +/// +/// Any other expressions return `None`. +#[async_trait::async_trait] +impl<T: PruningStatistics + Send + Sync> StatisticsSource for T { + fn num_containers(&self) -> usize { + PruningStatistics::num_containers(self) + } + + async fn expression_statistics( + &self, + expressions: &[Expr], + ) -> Result<Vec<Option<ArrayRef>>, DataFusionError> { + Ok(expressions + .iter() + .map(|expr| resolve_expression_sync(self, expr)) + .collect()) + } +} + +/// Pre-resolved statistics cache. Created asynchronously via +/// [`StatisticsSource`], evaluated synchronously by +/// [`PruningPredicate::evaluate`]. +/// +/// Keyed by [`Expr`] so that a single cache can serve multiple +/// [`PruningPredicate`](crate::PruningPredicate) instances (e.g., after dynamic filter changes +/// rebuild the predicate but reuse the same resolved stats). +/// Missing entries are treated as unknown — safe for pruning +/// (the predicate will conservatively keep the container). +/// +/// [`PruningPredicate::evaluate`]: crate::PruningPredicate::evaluate +pub struct ResolvedStatistics { + num_containers: usize, + cache: HashMap<Expr, ArrayRef>, +} + +impl ResolvedStatistics { + /// Create an empty cache with no resolved statistics. + /// All lookups will return `None`, causing `evaluate()` to use + /// null arrays (conservative — no pruning). + pub fn new_empty(num_containers: usize) -> Self { + Self { + num_containers, + cache: HashMap::new(), + } + } + + /// Resolve statistics for the given expressions from an async source. + pub async fn resolve( + source: &(impl StatisticsSource + ?Sized), + expressions: &[Expr], + ) -> Result<Self, DataFusionError> { + let num_containers = source.num_containers(); + let arrays = source.expression_statistics(expressions).await?; + let cache = expressions + .iter() + .zip(arrays) + .filter_map(|(expr, arr)| arr.map(|a| (expr.clone(), a))) + .collect(); Review Comment: I think it silently truncates on length mismatch; If it returns a wrong-length vector, we silently lose mappings and continue with null fallback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
