alamb commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r637077408



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -85,94 +115,132 @@ impl PruningPredicateBuilder {
         })
     }
 
-    /// Generate a predicate function used to filter based on
-    /// statistics
+    /// For each set of statistics, evalates the pruning predicate
+    /// and returns a `bool` with the following meaning for a
+    /// all rows whose values match the statistics:
     ///
-    /// This function takes a slice of statistics as parameter, so
-    /// that DataFusion's physical expressions can be executed once
-    /// against a single RecordBatch, containing statistics arrays, on
-    /// which the physical predicate expression is executed to
-    /// generate a row group filter array.
+    /// `true`: There MAY be rows that match the predicate
     ///
-    /// The generated filter function is then used in the returned
-    /// closure to filter row groups. NOTE this is parquet specific at the 
moment
-    pub fn build_pruning_predicate(
-        &self,
-        row_group_metadata: &[RowGroupMetaData],
-    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+    /// `false`: There are no rows that could match the predicate
+    ///
+    /// Note this function takes a slice of statistics as a parameter
+    /// to amortize the cost of the evaluation of the predicate
+    /// against a single record batch.
+    pub fn prune<S: PruningStatistics>(&self, statistics: &[S]) -> 
Result<Vec<bool>> {

Review comment:
       Here is the heart of the change -- rather than building up `Arrays` 
directly from ParquetMetadata, this PR now builds the arrays up from 
`ScalarValue`s provided by the `PruningStatistics` trait.
   
   I also tried to improve the comments to make it easier to follow what is 
going on

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -383,210 +455,218 @@ enum StatisticsType {
     Max,
 }
 
-fn build_statistics_array(
-    statistics: &[Option<&ParquetStatistics>],
-    statistics_type: StatisticsType,
-    data_type: &DataType,
-) -> ArrayRef {
-    let statistics_count = statistics.len();
-    let first_group_stats = statistics.iter().find(|s| s.is_some());
-    let first_group_stats = if let Some(Some(statistics)) = first_group_stats {
-        // found first row group with statistics defined
-        statistics
-    } else {
-        // no row group has statistics defined
-        return new_null_array(data_type, statistics_count);
-    };
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+
+    use super::*;
+    use crate::logical_plan::{col, lit};
+    use crate::{assert_batches_eq, 
physical_optimizer::pruning::StatisticsType};
+    use arrow::datatypes::{DataType, TimeUnit};
+
+    #[derive(Debug, Default)]
+    struct MinMax {
+        min: Option<ScalarValue>,
+        max: Option<ScalarValue>,
+    }
 
-    let (data_size, arrow_type) = match first_group_stats {
-        ParquetStatistics::Int32(_) => (std::mem::size_of::<i32>(), 
DataType::Int32),
-        ParquetStatistics::Int64(_) => (std::mem::size_of::<i64>(), 
DataType::Int64),
-        ParquetStatistics::Float(_) => (std::mem::size_of::<f32>(), 
DataType::Float32),
-        ParquetStatistics::Double(_) => (std::mem::size_of::<f64>(), 
DataType::Float64),
-        ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => {
-            (0, DataType::Utf8)
+    impl MinMax {
+        fn new(min: Option<ScalarValue>, max: Option<ScalarValue>) -> Self {
+            Self { min, max }
         }
-        _ => {
-            // type of statistics not supported
-            return new_null_array(data_type, statistics_count);
+    }
+
+    #[derive(Debug, Default)]
+    struct TestStatistics {
+        // key: column name
+        stats: HashMap<String, MinMax>,
+    }
+
+    impl TestStatistics {
+        fn new() -> Self {
+            Self::default()
         }
-    };
 
-    let statistics = statistics.iter().map(|s| {
-        s.filter(|s| s.has_min_max_set())
-            .map(|s| match statistics_type {
-                StatisticsType::Min => s.min_bytes(),
-                StatisticsType::Max => s.max_bytes(),
-            })
-    });
-
-    if arrow_type == DataType::Utf8 {
-        let data_size = statistics
-            .clone()
-            .map(|x| x.map(|b| b.len()).unwrap_or(0))
-            .sum();
-        let mut builder =
-            arrow::array::StringBuilder::with_capacity(statistics_count, 
data_size);
-        let string_statistics =
-            statistics.map(|x| x.and_then(|bytes| 
std::str::from_utf8(bytes).ok()));
-        for maybe_string in string_statistics {
-            match maybe_string {
-                Some(string_value) => 
builder.append_value(string_value).unwrap(),
-                None => builder.append_null().unwrap(),
-            };
+        fn with(mut self, name: impl Into<String>, min_max: MinMax) -> Self {
+            self.stats.insert(name.into(), min_max);
+            self
         }
-        return Arc::new(builder.finish());
     }
 
-    let mut data_buffer = MutableBuffer::new(statistics_count * data_size);
-    let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count);
-    let mut null_count = 0;
-    for s in statistics {
-        if let Some(stat_data) = s {
-            bitmap_builder.append(true);
-            data_buffer.extend_from_slice(stat_data);
-        } else {
-            bitmap_builder.append(false);
-            data_buffer.resize(data_buffer.len() + data_size, 0);
-            null_count += 1;
+    impl PruningStatistics for TestStatistics {
+        fn min_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.min.clone())
+                .unwrap_or(None)
         }
-    }
 
-    let mut builder = ArrayData::builder(arrow_type)
-        .len(statistics_count)
-        .add_buffer(data_buffer.into());
-    if null_count > 0 {
-        builder = builder.null_bit_buffer(bitmap_builder.finish());
-    }
-    let array_data = builder.build();
-    let statistics_array = make_array(array_data);
-    if statistics_array.data_type() == data_type {
-        return statistics_array;
+        fn max_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.max.clone())
+                .unwrap_or(None)
+        }
     }
-    // cast statistics array to required data type
-    arrow::compute::cast(&statistics_array, data_type)
-        .unwrap_or_else(|_| new_null_array(data_type, statistics_count))
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::physical_optimizer::pruning::StatisticsType;
-    use arrow::{
-        array::{Int32Array, StringArray},
-        datatypes::DataType,
-    };
-    use parquet::file::statistics::Statistics as ParquetStatistics;
 
     #[test]
-    fn build_statistics_array_int32() {
-        // build row group metadata array
-        let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false);
-        let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false);
-        let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false);
-        let statistics = vec![Some(&s1), Some(&s2), Some(&s3)];
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Min, 
&DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        assert_eq!(int32_vec, vec![None, Some(2), Some(3)]);
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Max, 
&DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        // here the first max value is None and not the Some(10) value which 
was actually set
-        // because the min value is None
-        assert_eq!(int32_vec, vec![None, Some(20), Some(30)]);
+    fn test_build_statistics_record_batch() {

Review comment:
       this shows how creating the statistics record batch works

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -748,4 +821,40 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn prune_api() {

Review comment:
       This shows end-to-end how to use the prune API (what I want to be able 
to do in IOx)

##########
File path: datafusion/src/scalar.rs
##########
@@ -283,6 +283,155 @@ impl ScalarValue {
         self.to_array_of_size(1)
     }
 
+    /// Converts an iterator of references [`ScalarValue`] into an [`ArrayRef`]

Review comment:
       I couldn't find any other way to take a bunch of `ScalarValues` and turn 
them back into an Array. Perhaps I missed something




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to