alamb commented on code in PR #16014:
URL: https://github.com/apache/datafusion/pull/16014#discussion_r2084838154


##########
datafusion/datasource/src/file_stream.rs:
##########
@@ -367,7 +368,7 @@ impl Default for OnError {
 pub trait FileOpener: Unpin + Send + Sync {
     /// Asynchronously open the specified file and return a stream
     /// of [`RecordBatch`]
-    fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture>;
+    fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> 
Result<FileOpenFuture>;

Review Comment:
   Can you please update the documetnation for open() to mention that `file` 
has plan time per-file information (such as statistics) and leave a doc link 
back?



##########
datafusion/physical-optimizer/src/pruning.rs:
##########
@@ -995,6 +996,184 @@ fn build_statistics_record_batch<S: PruningStatistics>(
     })
 }
 
+/// Prune a set of containers represented by their statistics.
+/// Each [`Statistics`] represents a container (e.g. a file or a partition of 
files).
+pub struct PrunableStatistics {
+    statistics: Vec<Arc<Statistics>>,
+    schema: SchemaRef,
+}
+
+impl PrunableStatistics {
+    /// Create a new instance of [`PrunableStatistics`].
+    /// Each [`Statistics`] represents a container (e.g. a file or a partition 
of files).
+    /// The `schema` is the schema of the data in the containers and should 
apply to all files.
+    pub fn new(statistics: Vec<Arc<Statistics>>, schema: SchemaRef) -> Self {
+        Self { statistics, schema }
+    }
+}
+
+impl PruningStatistics for PrunableStatistics {
+    /// Return the minimum values for the named column, if known.
+    ///
+    /// If the minimum value for a particular container is not known, the
+    /// returned array should have `null` in that row. If the minimum value is
+    /// not known for any row, return `None`.
+    ///
+    /// Note: the returned array must contain [`Self::num_containers`] rows
+    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
+        let index = self.schema.index_of(column.name()).ok()?;
+        let mut values = Vec::with_capacity(self.statistics.len());
+        for stats in &self.statistics {
+            let stat = stats.column_statistics.get(index)?;
+            match &stat.min_value {
+                Precision::Exact(min) => {
+                    values.push(min.clone());
+                }
+                _ => values.push(ScalarValue::Null),
+            }
+        }
+        match ScalarValue::iter_to_array(values) {
+            Ok(array) => Some(array),
+            Err(_) => {
+                log::warn!(
+                    "Failed to convert min values to array for column {}",
+                    column.name()
+                );
+                None
+            }
+        }
+    }
+
+    /// Return the maximum values for the named column, if known.
+    ///
+    /// See [`Self::min_values`] for when to return `None` and null values.
+    ///
+    /// Note: the returned array must contain [`Self::num_containers`] rows
+    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
+        let index = self.schema.index_of(column.name()).ok()?;
+        let mut values = Vec::with_capacity(self.statistics.len());
+        for stats in &self.statistics {
+            let stat = stats.column_statistics.get(index)?;
+            match &stat.max_value {
+                Precision::Exact(max) => {
+                    values.push(max.clone());
+                }
+                _ => values.push(ScalarValue::Null),
+            }
+        }
+        match ScalarValue::iter_to_array(values) {
+            Ok(array) => Some(array),
+            Err(_) => {
+                log::warn!(
+                    "Failed to convert max values to array for column {}",
+                    column.name()
+                );
+                None
+            }
+        }
+    }
+
+    /// Return the number of containers (e.g. Row Groups) being pruned with
+    /// these statistics.
+    ///
+    /// This value corresponds to the size of the [`ArrayRef`] returned by
+    /// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`],
+    /// and [`Self::row_counts`].
+    fn num_containers(&self) -> usize {
+        1

Review Comment:
   this should be self.statistics.len(), right?



##########
datafusion/physical-optimizer/src/pruning.rs:
##########
@@ -995,6 +996,184 @@ fn build_statistics_record_batch<S: PruningStatistics>(
     })
 }
 
+/// Prune a set of containers represented by their statistics.

Review Comment:
   This is a nice structure -- I think it makes lots of sense and is 100%
   
   Specifically, I thought there was already code that pruned individual files 
based on statistics but I cound not find any in LIstingTable (we have something 
like this in `influxdb_iox`).
   
   My opinion is if we are going to this code  it into the DataFusion codebase 
we should 
   1. Ensure that it helps a as many users as possble
   2. Make sure it is executed as much as possible (to ensure test coverage)
   
   Thus, what do you think about using the PrunableStatistics to prune the 
FileGroup in ListingTable here:  
   
   
https://github.com/apache/datafusion/blob/55ba4cadce5ea99de4361929226f1c99cfc94450/datafusion/core/src/datasource/listing/table.rs#L1117-L1116
   
   ?
   
   Pruning on statistics during plan time would potentially be redundant with 
also trying to prune again during opening, but it would reduce the files 
earlier int he plan 



##########
datafusion/physical-optimizer/src/pruning.rs:
##########
@@ -995,6 +996,184 @@ fn build_statistics_record_batch<S: PruningStatistics>(
     })
 }
 
+/// Prune a set of containers represented by their statistics.
+/// Each [`Statistics`] represents a container (e.g. a file or a partition of 
files).
+pub struct PrunableStatistics {
+    statistics: Vec<Arc<Statistics>>,
+    schema: SchemaRef,
+}
+
+impl PrunableStatistics {
+    /// Create a new instance of [`PrunableStatistics`].
+    /// Each [`Statistics`] represents a container (e.g. a file or a partition 
of files).
+    /// The `schema` is the schema of the data in the containers and should 
apply to all files.
+    pub fn new(statistics: Vec<Arc<Statistics>>, schema: SchemaRef) -> Self {
+        Self { statistics, schema }
+    }
+}
+
+impl PruningStatistics for PrunableStatistics {
+    /// Return the minimum values for the named column, if known.
+    ///
+    /// If the minimum value for a particular container is not known, the
+    /// returned array should have `null` in that row. If the minimum value is
+    /// not known for any row, return `None`.
+    ///
+    /// Note: the returned array must contain [`Self::num_containers`] rows
+    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
+        let index = self.schema.index_of(column.name()).ok()?;
+        let mut values = Vec::with_capacity(self.statistics.len());
+        for stats in &self.statistics {
+            let stat = stats.column_statistics.get(index)?;
+            match &stat.min_value {
+                Precision::Exact(min) => {
+                    values.push(min.clone());
+                }
+                _ => values.push(ScalarValue::Null),
+            }
+        }
+        match ScalarValue::iter_to_array(values) {
+            Ok(array) => Some(array),
+            Err(_) => {
+                log::warn!(
+                    "Failed to convert min values to array for column {}",
+                    column.name()
+                );
+                None
+            }
+        }
+    }
+
+    /// Return the maximum values for the named column, if known.
+    ///
+    /// See [`Self::min_values`] for when to return `None` and null values.
+    ///
+    /// Note: the returned array must contain [`Self::num_containers`] rows
+    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
+        let index = self.schema.index_of(column.name()).ok()?;
+        let mut values = Vec::with_capacity(self.statistics.len());
+        for stats in &self.statistics {
+            let stat = stats.column_statistics.get(index)?;
+            match &stat.max_value {
+                Precision::Exact(max) => {
+                    values.push(max.clone());
+                }
+                _ => values.push(ScalarValue::Null),
+            }
+        }
+        match ScalarValue::iter_to_array(values) {
+            Ok(array) => Some(array),
+            Err(_) => {
+                log::warn!(
+                    "Failed to convert max values to array for column {}",
+                    column.name()
+                );
+                None
+            }
+        }
+    }
+
+    /// Return the number of containers (e.g. Row Groups) being pruned with
+    /// these statistics.
+    ///
+    /// This value corresponds to the size of the [`ArrayRef`] returned by
+    /// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`],
+    /// and [`Self::row_counts`].
+    fn num_containers(&self) -> usize {
+        1
+    }
+
+    /// Return the number of null values for the named column as an
+    /// [`UInt64Array`]
+    ///
+    /// See [`Self::min_values`] for when to return `None` and null values.
+    ///
+    /// Note: the returned array must contain [`Self::num_containers`] rows
+    ///
+    /// [`UInt64Array`]: arrow::array::UInt64Array
+    fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
+        let index = self.schema.index_of(column.name()).ok()?;
+        let mut values = Vec::with_capacity(self.statistics.len());
+        let mut has_null_count = false;
+        for stats in &self.statistics {
+            let stat = stats.column_statistics.get(index)?;
+            match &stat.null_count {
+                Precision::Exact(null_count) => match 
u64::try_from(*null_count) {
+                    Ok(null_count) => {
+                        has_null_count = true;
+                        values.push(Some(null_count));
+                    }
+                    Err(_) => {
+                        values.push(None);
+                    }
+                },
+                _ => values.push(None),
+            }
+        }
+        if has_null_count {
+            Some(Arc::new(UInt64Array::from(values)))
+        } else {
+            None
+        }
+    }
+
+    /// Return the number of rows for the named column in each container
+    /// as an [`UInt64Array`].
+    ///
+    /// See [`Self::min_values`] for when to return `None` and null values.
+    ///
+    /// Note: the returned array must contain [`Self::num_containers`] rows
+    ///
+    /// [`UInt64Array`]: arrow::array::UInt64Array
+    fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
+        let mut values = Vec::with_capacity(self.statistics.len());
+        let mut has_row_count = false;
+        for stats in &self.statistics {
+            match &stats.num_rows {
+                Precision::Exact(row_count) => match u64::try_from(*row_count) 
{
+                    Ok(row_count) => {
+                        has_row_count = true;
+                        values.push(Some(row_count));
+                    }
+                    Err(_) => {
+                        values.push(None);
+                    }
+                },
+                _ => values.push(None),
+            }
+        }
+        if has_row_count {
+            Some(Arc::new(UInt64Array::from(values)))
+        } else {
+            None
+        }
+    }
+
+    /// Returns [`BooleanArray`] where each row represents information known

Review Comment:
   this comment cna probably be trimmed with a link back to the original trait 
source



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to