alamb commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r637954100



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -383,210 +455,218 @@ enum StatisticsType {
     Max,
 }
 
-fn build_statistics_array(
-    statistics: &[Option<&ParquetStatistics>],
-    statistics_type: StatisticsType,
-    data_type: &DataType,
-) -> ArrayRef {
-    let statistics_count = statistics.len();
-    let first_group_stats = statistics.iter().find(|s| s.is_some());
-    let first_group_stats = if let Some(Some(statistics)) = first_group_stats {
-        // found first row group with statistics defined
-        statistics
-    } else {
-        // no row group has statistics defined
-        return new_null_array(data_type, statistics_count);
-    };
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+
+    use super::*;
+    use crate::logical_plan::{col, lit};
+    use crate::{assert_batches_eq, 
physical_optimizer::pruning::StatisticsType};
+    use arrow::datatypes::{DataType, TimeUnit};
+
+    #[derive(Debug, Default)]
+    struct MinMax {
+        min: Option<ScalarValue>,
+        max: Option<ScalarValue>,
+    }
 
-    let (data_size, arrow_type) = match first_group_stats {
-        ParquetStatistics::Int32(_) => (std::mem::size_of::<i32>(), 
DataType::Int32),
-        ParquetStatistics::Int64(_) => (std::mem::size_of::<i64>(), 
DataType::Int64),
-        ParquetStatistics::Float(_) => (std::mem::size_of::<f32>(), 
DataType::Float32),
-        ParquetStatistics::Double(_) => (std::mem::size_of::<f64>(), 
DataType::Float64),
-        ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => {
-            (0, DataType::Utf8)
+    impl MinMax {
+        fn new(min: Option<ScalarValue>, max: Option<ScalarValue>) -> Self {
+            Self { min, max }
         }
-        _ => {
-            // type of statistics not supported
-            return new_null_array(data_type, statistics_count);
+    }
+
+    #[derive(Debug, Default)]
+    struct TestStatistics {
+        // key: column name
+        stats: HashMap<String, MinMax>,
+    }
+
+    impl TestStatistics {
+        fn new() -> Self {
+            Self::default()
         }
-    };
 
-    let statistics = statistics.iter().map(|s| {
-        s.filter(|s| s.has_min_max_set())
-            .map(|s| match statistics_type {
-                StatisticsType::Min => s.min_bytes(),
-                StatisticsType::Max => s.max_bytes(),
-            })
-    });
-
-    if arrow_type == DataType::Utf8 {
-        let data_size = statistics
-            .clone()
-            .map(|x| x.map(|b| b.len()).unwrap_or(0))
-            .sum();
-        let mut builder =
-            arrow::array::StringBuilder::with_capacity(statistics_count, 
data_size);
-        let string_statistics =
-            statistics.map(|x| x.and_then(|bytes| 
std::str::from_utf8(bytes).ok()));
-        for maybe_string in string_statistics {
-            match maybe_string {
-                Some(string_value) => 
builder.append_value(string_value).unwrap(),
-                None => builder.append_null().unwrap(),
-            };
+        fn with(mut self, name: impl Into<String>, min_max: MinMax) -> Self {
+            self.stats.insert(name.into(), min_max);
+            self
         }
-        return Arc::new(builder.finish());
     }
 
-    let mut data_buffer = MutableBuffer::new(statistics_count * data_size);
-    let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count);
-    let mut null_count = 0;
-    for s in statistics {
-        if let Some(stat_data) = s {
-            bitmap_builder.append(true);
-            data_buffer.extend_from_slice(stat_data);
-        } else {
-            bitmap_builder.append(false);
-            data_buffer.resize(data_buffer.len() + data_size, 0);
-            null_count += 1;
+    impl PruningStatistics for TestStatistics {
+        fn min_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.min.clone())
+                .unwrap_or(None)
         }
-    }
 
-    let mut builder = ArrayData::builder(arrow_type)
-        .len(statistics_count)
-        .add_buffer(data_buffer.into());
-    if null_count > 0 {
-        builder = builder.null_bit_buffer(bitmap_builder.finish());
-    }
-    let array_data = builder.build();
-    let statistics_array = make_array(array_data);
-    if statistics_array.data_type() == data_type {
-        return statistics_array;
+        fn max_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.max.clone())
+                .unwrap_or(None)
+        }
     }
-    // cast statistics array to required data type
-    arrow::compute::cast(&statistics_array, data_type)
-        .unwrap_or_else(|_| new_null_array(data_type, statistics_count))
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::physical_optimizer::pruning::StatisticsType;
-    use arrow::{
-        array::{Int32Array, StringArray},
-        datatypes::DataType,
-    };
-    use parquet::file::statistics::Statistics as ParquetStatistics;
 
     #[test]
-    fn build_statistics_array_int32() {
-        // build row group metadata array
-        let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false);
-        let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false);
-        let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false);
-        let statistics = vec![Some(&s1), Some(&s2), Some(&s3)];
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Min, 
&DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        assert_eq!(int32_vec, vec![None, Some(2), Some(3)]);
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Max, 
&DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        // here the first max value is None and not the Some(10) value which 
was actually set
-        // because the min value is None
-        assert_eq!(int32_vec, vec![None, Some(20), Some(30)]);
+    fn test_build_statistics_record_batch() {
+        // Request a record batch with of s1_min, s2_max, s3_max, s3_min
+        let stat_column_req = vec![
+            // min of original column s1, named s1_min
+            (
+                "s1".to_string(),
+                StatisticsType::Min,
+                Field::new("s1_min", DataType::Int32, true),
+            ),
+            // max of original column s2, named s2_max
+            (
+                "s2".to_string(),
+                StatisticsType::Max,
+                Field::new("s2_min", DataType::Int32, true),
+            ),
+            // max of original column s3, named s3_max
+            (
+                "s3".to_string(),
+                StatisticsType::Max,
+                Field::new("s3_max", DataType::Utf8, true),
+            ),
+            // min of original column s3, named s3_min
+            (
+                "s3".to_string(),
+                StatisticsType::Min,
+                Field::new("s3_min", DataType::Utf8, true),
+            ),
+        ];
+
+        // s1: [None, 10]
+        // s2: [2, 20]
+        // s2: ["a", "q"]
+        let stats1 = TestStatistics::new()
+            .with("s1", MinMax::new(None, Some(10i32.into())))
+            .with("s2", MinMax::new(Some(2i32.into()), Some(20i32.into())))
+            .with("s3", MinMax::new(Some("a".into()), Some("q".into())));
+
+        // s1: [None, None]
+        // s2: [None, None]
+        // s2: [None, None]
+        let stats2 = TestStatistics::new()
+            .with("s1", MinMax::new(None, None))
+            .with("s2", MinMax::new(None, None))
+            .with("s3", MinMax::new(None, None));
+
+        // s1: [9, None]
+        // s2: None
+        // s2: [None, "r"]
+        let stats3 = TestStatistics::new()
+            .with("s1", MinMax::new(Some(9i32.into()), None))
+            .with("s3", MinMax::new(None, Some("r".into())));
+
+        // This one returns a statistics value, but the value itself is NULL
+        // s1: [Some(None), None]
+        let stats4 = TestStatistics::new()
+            .with("s1", MinMax::new(Some(ScalarValue::Int32(None)), None));
+
+        let statistics = [stats1, stats2, stats3, stats4];
+        let batch = build_statistics_record_batch(&statistics, 
&stat_column_req).unwrap();
+        let expected = vec![
+            "+--------+--------+--------+--------+",
+            "| s1_min | s2_min | s3_max | s3_min |",

Review comment:
       I'll update it 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to