alamb commented on code in PR #16014: URL: https://github.com/apache/datafusion/pull/16014#discussion_r2084838154
########## datafusion/datasource/src/file_stream.rs: ########## @@ -367,7 +368,7 @@ impl Default for OnError { pub trait FileOpener: Unpin + Send + Sync { /// Asynchronously open the specified file and return a stream /// of [`RecordBatch`] - fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture>; + fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result<FileOpenFuture>; Review Comment: Can you please update the documetnation for open() to mention that `file` has plan time per-file information (such as statistics) and leave a doc link back? ########## datafusion/physical-optimizer/src/pruning.rs: ########## @@ -995,6 +996,184 @@ fn build_statistics_record_batch<S: PruningStatistics>( }) } +/// Prune a set of containers represented by their statistics. +/// Each [`Statistics`] represents a container (e.g. a file or a partition of files). +pub struct PrunableStatistics { + statistics: Vec<Arc<Statistics>>, + schema: SchemaRef, +} + +impl PrunableStatistics { + /// Create a new instance of [`PrunableStatistics`]. + /// Each [`Statistics`] represents a container (e.g. a file or a partition of files). + /// The `schema` is the schema of the data in the containers and should apply to all files. + pub fn new(statistics: Vec<Arc<Statistics>>, schema: SchemaRef) -> Self { + Self { statistics, schema } + } +} + +impl PruningStatistics for PrunableStatistics { + /// Return the minimum values for the named column, if known. + /// + /// If the minimum value for a particular container is not known, the + /// returned array should have `null` in that row. If the minimum value is + /// not known for any row, return `None`. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn min_values(&self, column: &Column) -> Option<ArrayRef> { + let index = self.schema.index_of(column.name()).ok()?; + let mut values = Vec::with_capacity(self.statistics.len()); + for stats in &self.statistics { + let stat = stats.column_statistics.get(index)?; + match &stat.min_value { + Precision::Exact(min) => { + values.push(min.clone()); + } + _ => values.push(ScalarValue::Null), + } + } + match ScalarValue::iter_to_array(values) { + Ok(array) => Some(array), + Err(_) => { + log::warn!( + "Failed to convert min values to array for column {}", + column.name() + ); + None + } + } + } + + /// Return the maximum values for the named column, if known. + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn max_values(&self, column: &Column) -> Option<ArrayRef> { + let index = self.schema.index_of(column.name()).ok()?; + let mut values = Vec::with_capacity(self.statistics.len()); + for stats in &self.statistics { + let stat = stats.column_statistics.get(index)?; + match &stat.max_value { + Precision::Exact(max) => { + values.push(max.clone()); + } + _ => values.push(ScalarValue::Null), + } + } + match ScalarValue::iter_to_array(values) { + Ok(array) => Some(array), + Err(_) => { + log::warn!( + "Failed to convert max values to array for column {}", + column.name() + ); + None + } + } + } + + /// Return the number of containers (e.g. Row Groups) being pruned with + /// these statistics. + /// + /// This value corresponds to the size of the [`ArrayRef`] returned by + /// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`], + /// and [`Self::row_counts`]. + fn num_containers(&self) -> usize { + 1 Review Comment: this should be self.statistics.len(), right? ########## datafusion/physical-optimizer/src/pruning.rs: ########## @@ -995,6 +996,184 @@ fn build_statistics_record_batch<S: PruningStatistics>( }) } +/// Prune a set of containers represented by their statistics. Review Comment: This is a nice structure -- I think it makes lots of sense and is 100% Specifically, I thought there was already code that pruned individual files based on statistics but I cound not find any in LIstingTable (we have something like this in `influxdb_iox`). My opinion is if we are going to this code it into the DataFusion codebase we should 1. Ensure that it helps a as many users as possble 2. Make sure it is executed as much as possible (to ensure test coverage) Thus, what do you think about using the PrunableStatistics to prune the FileGroup in ListingTable here: https://github.com/apache/datafusion/blob/55ba4cadce5ea99de4361929226f1c99cfc94450/datafusion/core/src/datasource/listing/table.rs#L1117-L1116 ? Pruning on statistics during plan time would potentially be redundant with also trying to prune again during opening, but it would reduce the files earlier int he plan ########## datafusion/physical-optimizer/src/pruning.rs: ########## @@ -995,6 +996,184 @@ fn build_statistics_record_batch<S: PruningStatistics>( }) } +/// Prune a set of containers represented by their statistics. +/// Each [`Statistics`] represents a container (e.g. a file or a partition of files). +pub struct PrunableStatistics { + statistics: Vec<Arc<Statistics>>, + schema: SchemaRef, +} + +impl PrunableStatistics { + /// Create a new instance of [`PrunableStatistics`]. + /// Each [`Statistics`] represents a container (e.g. a file or a partition of files). + /// The `schema` is the schema of the data in the containers and should apply to all files. + pub fn new(statistics: Vec<Arc<Statistics>>, schema: SchemaRef) -> Self { + Self { statistics, schema } + } +} + +impl PruningStatistics for PrunableStatistics { + /// Return the minimum values for the named column, if known. + /// + /// If the minimum value for a particular container is not known, the + /// returned array should have `null` in that row. If the minimum value is + /// not known for any row, return `None`. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn min_values(&self, column: &Column) -> Option<ArrayRef> { + let index = self.schema.index_of(column.name()).ok()?; + let mut values = Vec::with_capacity(self.statistics.len()); + for stats in &self.statistics { + let stat = stats.column_statistics.get(index)?; + match &stat.min_value { + Precision::Exact(min) => { + values.push(min.clone()); + } + _ => values.push(ScalarValue::Null), + } + } + match ScalarValue::iter_to_array(values) { + Ok(array) => Some(array), + Err(_) => { + log::warn!( + "Failed to convert min values to array for column {}", + column.name() + ); + None + } + } + } + + /// Return the maximum values for the named column, if known. + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn max_values(&self, column: &Column) -> Option<ArrayRef> { + let index = self.schema.index_of(column.name()).ok()?; + let mut values = Vec::with_capacity(self.statistics.len()); + for stats in &self.statistics { + let stat = stats.column_statistics.get(index)?; + match &stat.max_value { + Precision::Exact(max) => { + values.push(max.clone()); + } + _ => values.push(ScalarValue::Null), + } + } + match ScalarValue::iter_to_array(values) { + Ok(array) => Some(array), + Err(_) => { + log::warn!( + "Failed to convert max values to array for column {}", + column.name() + ); + None + } + } + } + + /// Return the number of containers (e.g. Row Groups) being pruned with + /// these statistics. + /// + /// This value corresponds to the size of the [`ArrayRef`] returned by + /// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`], + /// and [`Self::row_counts`]. + fn num_containers(&self) -> usize { + 1 + } + + /// Return the number of null values for the named column as an + /// [`UInt64Array`] + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + /// + /// [`UInt64Array`]: arrow::array::UInt64Array + fn null_counts(&self, column: &Column) -> Option<ArrayRef> { + let index = self.schema.index_of(column.name()).ok()?; + let mut values = Vec::with_capacity(self.statistics.len()); + let mut has_null_count = false; + for stats in &self.statistics { + let stat = stats.column_statistics.get(index)?; + match &stat.null_count { + Precision::Exact(null_count) => match u64::try_from(*null_count) { + Ok(null_count) => { + has_null_count = true; + values.push(Some(null_count)); + } + Err(_) => { + values.push(None); + } + }, + _ => values.push(None), + } + } + if has_null_count { + Some(Arc::new(UInt64Array::from(values))) + } else { + None + } + } + + /// Return the number of rows for the named column in each container + /// as an [`UInt64Array`]. + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + /// + /// [`UInt64Array`]: arrow::array::UInt64Array + fn row_counts(&self, _column: &Column) -> Option<ArrayRef> { + let mut values = Vec::with_capacity(self.statistics.len()); + let mut has_row_count = false; + for stats in &self.statistics { + match &stats.num_rows { + Precision::Exact(row_count) => match u64::try_from(*row_count) { + Ok(row_count) => { + has_row_count = true; + values.push(Some(row_count)); + } + Err(_) => { + values.push(None); + } + }, + _ => values.push(None), + } + } + if has_row_count { + Some(Arc::new(UInt64Array::from(values))) + } else { + None + } + } + + /// Returns [`BooleanArray`] where each row represents information known Review Comment: this comment cna probably be trimmed with a link back to the original trait source -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org