kosiew commented on code in PR #16139: URL: https://github.com/apache/datafusion/pull/16139#discussion_r2103751109
########## datafusion/common/src/pruning.rs: ########## @@ -122,3 +126,1032 @@ pub trait PruningStatistics { values: &HashSet<ScalarValue>, ) -> Option<BooleanArray>; } + +/// Prune files based on their partition values. +/// This is used both at planning time and execution time to prune +/// files based on their partition values. +/// This feeds into [`CompositePruningStatistics`] to allow pruning +/// with filters that depend both on partition columns and data columns +/// (e.g. `WHERE partition_col = data_col`). +#[derive(Clone)] +pub struct PartitionPruningStatistics { + /// Values for each column for each container. + /// The outer vectors represent the columns while the inner + /// vectors represent the containers. + /// The order must match the order of the partition columns in + /// [`PartitionPruningStatistics::partition_schema`]. + partition_values: Vec<Vec<ScalarValue>>, + /// The number of containers. + /// Stored since the partition values are column-major and if + /// there are no columns we wouldn't know the number of containers. + num_containers: usize, + /// The schema of the partition columns. + /// This must **not** be the schema of the entire file or table: + /// it must only be the schema of the partition columns, + /// in the same order as the values in [`PartitionPruningStatistics::partition_values`]. + partition_schema: SchemaRef, +} + +impl PartitionPruningStatistics { + /// Create a new instance of [`PartitionPruningStatistics`]. + /// + /// Args: + /// * `partition_values`: A vector of vectors of [`ScalarValue`]s. + /// The outer vector represents the containers while the inner + /// vector represents the partition values for each column. Review Comment: constructor accepts partition_values as a Vec<Vec>, documented as “outer vector represents the containers while the inner vector represents the partition values for each column.” In code however, each inner Vec is treated as the values for one container, then transpose that into column-major storage. The phrasing “inner vector represents the partition values for each column” can be read as “one column’s values across containers.” ########## datafusion/common/src/pruning.rs: ########## @@ -122,3 +126,1032 @@ pub trait PruningStatistics { values: &HashSet<ScalarValue>, ) -> Option<BooleanArray>; } + +/// Prune files based on their partition values. +/// This is used both at planning time and execution time to prune +/// files based on their partition values. +/// This feeds into [`CompositePruningStatistics`] to allow pruning +/// with filters that depend both on partition columns and data columns +/// (e.g. `WHERE partition_col = data_col`). +#[derive(Clone)] +pub struct PartitionPruningStatistics { + /// Values for each column for each container. + /// The outer vectors represent the columns while the inner + /// vectors represent the containers. + /// The order must match the order of the partition columns in + /// [`PartitionPruningStatistics::partition_schema`]. + partition_values: Vec<Vec<ScalarValue>>, + /// The number of containers. + /// Stored since the partition values are column-major and if + /// there are no columns we wouldn't know the number of containers. + num_containers: usize, + /// The schema of the partition columns. + /// This must **not** be the schema of the entire file or table: + /// it must only be the schema of the partition columns, + /// in the same order as the values in [`PartitionPruningStatistics::partition_values`]. + partition_schema: SchemaRef, +} + +impl PartitionPruningStatistics { + /// Create a new instance of [`PartitionPruningStatistics`]. + /// + /// Args: + /// * `partition_values`: A vector of vectors of [`ScalarValue`]s. + /// The outer vector represents the containers while the inner + /// vector represents the partition values for each column. + /// Note that this is the **opposite** of the order of the + /// partition columns in `PartitionPruningStatistics::partition_schema`. + /// * `partition_schema`: The schema of the partition columns. + /// This must **not** be the schema of the entire file or table: + /// instead it must only be the schema of the partition columns, + /// in the same order as the values in `partition_values`. + pub fn new( + partition_values: Vec<Vec<ScalarValue>>, + partition_fields: Vec<FieldRef>, + ) -> Self { + let num_containers = partition_values.len(); + let partition_schema = Arc::new(Schema::new(partition_fields)); + let mut partition_valeus_by_column = Review Comment: ```suggestion let mut partition_values_by_column = ``` ########## datafusion/common/src/pruning.rs: ########## @@ -122,3 +126,1032 @@ pub trait PruningStatistics { values: &HashSet<ScalarValue>, ) -> Option<BooleanArray>; } + +/// Prune files based on their partition values. +/// This is used both at planning time and execution time to prune +/// files based on their partition values. +/// This feeds into [`CompositePruningStatistics`] to allow pruning +/// with filters that depend both on partition columns and data columns +/// (e.g. `WHERE partition_col = data_col`). +#[derive(Clone)] +pub struct PartitionPruningStatistics { + /// Values for each column for each container. + /// The outer vectors represent the columns while the inner + /// vectors represent the containers. + /// The order must match the order of the partition columns in + /// [`PartitionPruningStatistics::partition_schema`]. + partition_values: Vec<Vec<ScalarValue>>, + /// The number of containers. + /// Stored since the partition values are column-major and if + /// there are no columns we wouldn't know the number of containers. + num_containers: usize, + /// The schema of the partition columns. + /// This must **not** be the schema of the entire file or table: + /// it must only be the schema of the partition columns, + /// in the same order as the values in [`PartitionPruningStatistics::partition_values`]. + partition_schema: SchemaRef, +} + +impl PartitionPruningStatistics { + /// Create a new instance of [`PartitionPruningStatistics`]. + /// + /// Args: + /// * `partition_values`: A vector of vectors of [`ScalarValue`]s. + /// The outer vector represents the containers while the inner + /// vector represents the partition values for each column. + /// Note that this is the **opposite** of the order of the + /// partition columns in `PartitionPruningStatistics::partition_schema`. + /// * `partition_schema`: The schema of the partition columns. + /// This must **not** be the schema of the entire file or table: + /// instead it must only be the schema of the partition columns, + /// in the same order as the values in `partition_values`. + pub fn new( + partition_values: Vec<Vec<ScalarValue>>, + partition_fields: Vec<FieldRef>, + ) -> Self { + let num_containers = partition_values.len(); + let partition_schema = Arc::new(Schema::new(partition_fields)); + let mut partition_valeus_by_column = + vec![vec![]; partition_schema.fields().len()]; + for partition_value in partition_values { + for (i, value) in partition_value.into_iter().enumerate() { + partition_valeus_by_column[i].push(value); + } + } + Self { + partition_values: partition_valeus_by_column, + num_containers, + partition_schema, + } + } +} + +impl PruningStatistics for PartitionPruningStatistics { + fn min_values(&self, column: &Column) -> Option<ArrayRef> { + let index = self.partition_schema.index_of(column.name()).ok()?; + let partition_values = self.partition_values.get(index)?; + match ScalarValue::iter_to_array(partition_values.iter().cloned()) { + Ok(array) => Some(array), + Err(_) => { + log::warn!( + "Failed to convert min values to array for column {}", + column.name() + ); + None + } + } + } + + fn max_values(&self, column: &Column) -> Option<ArrayRef> { + self.min_values(column) + } + + fn num_containers(&self) -> usize { + self.num_containers + } + + fn null_counts(&self, _column: &Column) -> Option<ArrayRef> { + None + } + + fn row_counts(&self, _column: &Column) -> Option<ArrayRef> { + None + } + + fn contained( + &self, + column: &Column, + values: &HashSet<ScalarValue>, + ) -> Option<BooleanArray> { + let index = self.partition_schema.index_of(column.name()).ok()?; + let partition_values = self.partition_values.get(index)?; + let mut contained = Vec::with_capacity(self.partition_values.len()); + for partition_value in partition_values { + let contained_value = if values.contains(partition_value) { + Some(true) + } else { + Some(false) + }; + contained.push(contained_value); + } + let array = BooleanArray::from(contained); Review Comment: Instead of explicit loops; would simplifying to .map(...) chains followed by collect() be better? ```rust let array = BooleanArray::from( partition_values .iter() .map(|pv| Some(values.contains(pv))) .collect::<Vec<_>>() ); ``` Benefits: - Eliminates manual push logic - More concise: transforms each pv into a boolean directly - Clearly shows “map input → output” intent ########## datafusion/common/src/pruning.rs: ########## @@ -122,3 +126,1032 @@ pub trait PruningStatistics { values: &HashSet<ScalarValue>, ) -> Option<BooleanArray>; } + +/// Prune files based on their partition values. +/// This is used both at planning time and execution time to prune +/// files based on their partition values. +/// This feeds into [`CompositePruningStatistics`] to allow pruning +/// with filters that depend both on partition columns and data columns +/// (e.g. `WHERE partition_col = data_col`). +#[derive(Clone)] +pub struct PartitionPruningStatistics { + /// Values for each column for each container. + /// The outer vectors represent the columns while the inner + /// vectors represent the containers. + /// The order must match the order of the partition columns in + /// [`PartitionPruningStatistics::partition_schema`]. + partition_values: Vec<Vec<ScalarValue>>, + /// The number of containers. + /// Stored since the partition values are column-major and if + /// there are no columns we wouldn't know the number of containers. + num_containers: usize, + /// The schema of the partition columns. + /// This must **not** be the schema of the entire file or table: + /// it must only be the schema of the partition columns, + /// in the same order as the values in [`PartitionPruningStatistics::partition_values`]. + partition_schema: SchemaRef, +} + +impl PartitionPruningStatistics { + /// Create a new instance of [`PartitionPruningStatistics`]. + /// + /// Args: + /// * `partition_values`: A vector of vectors of [`ScalarValue`]s. + /// The outer vector represents the containers while the inner + /// vector represents the partition values for each column. + /// Note that this is the **opposite** of the order of the + /// partition columns in `PartitionPruningStatistics::partition_schema`. + /// * `partition_schema`: The schema of the partition columns. + /// This must **not** be the schema of the entire file or table: + /// instead it must only be the schema of the partition columns, + /// in the same order as the values in `partition_values`. + pub fn new( + partition_values: Vec<Vec<ScalarValue>>, + partition_fields: Vec<FieldRef>, + ) -> Self { + let num_containers = partition_values.len(); + let partition_schema = Arc::new(Schema::new(partition_fields)); + let mut partition_valeus_by_column = + vec![vec![]; partition_schema.fields().len()]; + for partition_value in partition_values { + for (i, value) in partition_value.into_iter().enumerate() { + partition_valeus_by_column[i].push(value); + } + } + Self { + partition_values: partition_valeus_by_column, + num_containers, + partition_schema, + } + } +} + +impl PruningStatistics for PartitionPruningStatistics { + fn min_values(&self, column: &Column) -> Option<ArrayRef> { + let index = self.partition_schema.index_of(column.name()).ok()?; + let partition_values = self.partition_values.get(index)?; + match ScalarValue::iter_to_array(partition_values.iter().cloned()) { + Ok(array) => Some(array), + Err(_) => { + log::warn!( + "Failed to convert min values to array for column {}", + column.name() + ); + None + } + } + } + + fn max_values(&self, column: &Column) -> Option<ArrayRef> { + self.min_values(column) + } + + fn num_containers(&self) -> usize { + self.num_containers + } + + fn null_counts(&self, _column: &Column) -> Option<ArrayRef> { + None + } + + fn row_counts(&self, _column: &Column) -> Option<ArrayRef> { + None + } + + fn contained( + &self, + column: &Column, + values: &HashSet<ScalarValue>, + ) -> Option<BooleanArray> { + let index = self.partition_schema.index_of(column.name()).ok()?; + let partition_values = self.partition_values.get(index)?; + let mut contained = Vec::with_capacity(self.partition_values.len()); + for partition_value in partition_values { + let contained_value = if values.contains(partition_value) { + Some(true) + } else { + Some(false) + }; + contained.push(contained_value); + } + let array = BooleanArray::from(contained); + Some(array) + } +} + +/// Prune a set of containers represented by their statistics. +/// Each [`Statistics`] represents a container (e.g. a file or a partition of files). +#[derive(Clone)] +pub struct PrunableStatistics { + /// Statistics for each container. + /// These are taken as a reference since they may be rather large / expensive to clone + /// and we often won't return all of them as ArrayRefs (we only return the columns the predicate requests). + statistics: Vec<Arc<Statistics>>, + /// The schema of the file these statistics are for. + schema: SchemaRef, +} + +impl PrunableStatistics { + /// Create a new instance of [`PrunableStatistics`]. + /// Each [`Statistics`] represents a container (e.g. a file or a partition of files). + /// The `schema` is the schema of the data in the containers and should apply to all files. + pub fn new(statistics: Vec<Arc<Statistics>>, schema: SchemaRef) -> Self { + Self { statistics, schema } + } +} + +impl PruningStatistics for PrunableStatistics { + fn min_values(&self, column: &Column) -> Option<ArrayRef> { + let index = self.schema.index_of(column.name()).ok()?; + if self.statistics.iter().any(|s| { + s.column_statistics + .get(index) + .is_some_and(|stat| stat.min_value.is_exact().unwrap_or(false)) + }) { + match ScalarValue::iter_to_array(self.statistics.iter().map(|s| { + s.column_statistics + .get(index) + .and_then(|stat| { + if let Precision::Exact(min) = &stat.min_value { + Some(min.clone()) + } else { + None + } + }) + .unwrap_or(ScalarValue::Null) + })) { + Ok(array) => Some(array), + Err(_) => { + log::warn!( + "Failed to convert min values to array for column {}", + column.name() + ); + None + } + } + } else { + None + } + } Review Comment: Both `PrunableStatistics::min_values` and `max_values` walk the same steps: 1. Find the column index in the schema. 2. Check whether any `Statistics` entry has an “exact” value for that column. 3. Iterate over all `Statistics`, pulling out the exact values or substituting `ScalarValue::Null`. 4. Call `ScalarValue::iter_to_array(...)` and log or return `None` on error. By lifting steps (2)–(4) into a helper, we: - **Eliminate duplicate code** in each method - **Centralize error handling** and logging - **Make future changes** (e.g. using a different logging framework) in one place -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org