This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new afc299a48 Add parquet predicate pushdown metrics (#3989)
afc299a48 is described below

commit afc299a48b6c2438643d0c84c408ba104424dbd4
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Oct 30 07:32:13 2022 -0400

    Add parquet predicate pushdown metrics (#3989)
    
    * Log error building row filters
    
    Inspired by @liukun4515 at 
https://github.com/apache/arrow-datafusion/pull/3380/files#r970198755
    
    * Add parquet predicate pushdown metrics
    
    * more efficient bit counting
---
 .../core/src/physical_plan/file_format/parquet.rs  | 186 +++++++++++++++++----
 .../src/physical_plan/file_format/row_filter.rs    |  68 +++++++-
 datafusion/core/src/physical_plan/metrics/mod.rs   |  13 +-
 3 files changed, 227 insertions(+), 40 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs 
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 0dda94322..f9ec72ab0 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -237,6 +237,10 @@ pub struct ParquetFileMetrics {
     pub row_groups_pruned: metrics::Count,
     /// Total number of bytes scanned
     pub bytes_scanned: metrics::Count,
+    /// Total rows filtered out by predicates pushed into parquet scan
+    pub pushdown_rows_filtered: metrics::Count,
+    /// Total time spent evaluating pushdown filters
+    pub pushdown_eval_time: metrics::Time,
 }
 
 impl ParquetFileMetrics {
@@ -258,10 +262,20 @@ impl ParquetFileMetrics {
             .with_new_label("filename", filename.to_string())
             .counter("bytes_scanned", partition);
 
+        let pushdown_rows_filtered = MetricBuilder::new(metrics)
+            .with_new_label("filename", filename.to_string())
+            .counter("pushdown_rows_filtered", partition);
+
+        let pushdown_eval_time = MetricBuilder::new(metrics)
+            .with_new_label("filename", filename.to_string())
+            .subset_time("pushdown_eval_time", partition);
+
         Self {
             predicate_evaluation_errors,
             row_groups_pruned,
             bytes_scanned,
+            pushdown_rows_filtered,
+            pushdown_eval_time,
         }
     }
 }
@@ -410,7 +424,7 @@ impl FileOpener for ParquetOpener {
     ) -> Result<FileOpenFuture> {
         let file_range = file_meta.range.clone();
 
-        let metrics = ParquetFileMetrics::new(
+        let file_metrics = ParquetFileMetrics::new(
             self.partition_index,
             file_meta.location().as_ref(),
             &self.metrics,
@@ -450,21 +464,38 @@ impl FileOpener for ParquetOpener {
                 .then(|| pruning_predicate.as_ref().map(|p| p.logical_expr()))
                 .flatten()
             {
-                if let Ok(Some(filter)) = build_row_filter(
+                let row_filter = build_row_filter(
                     predicate.clone(),
                     builder.schema().as_ref(),
                     table_schema.as_ref(),
                     builder.metadata(),
                     reorder_predicates,
-                ) {
-                    builder = builder.with_row_filter(filter);
-                }
+                    &file_metrics.pushdown_rows_filtered,
+                    &file_metrics.pushdown_eval_time,
+                );
+
+                match row_filter {
+                    Ok(Some(filter)) => {
+                        builder = builder.with_row_filter(filter);
+                    }
+                    Ok(None) => {}
+                    Err(e) => {
+                        debug!(
+                            "Ignoring error building row filter for '{:?}': 
{}",
+                            predicate, e
+                        );
+                    }
+                };
             };
 
             let file_metadata = builder.metadata();
             let groups = file_metadata.row_groups();
-            let row_groups =
-                prune_row_groups(groups, file_range, 
pruning_predicate.clone(), &metrics);
+            let row_groups = prune_row_groups(
+                groups,
+                file_range,
+                pruning_predicate.clone(),
+                &file_metrics,
+            );
 
             if enable_page_index && 
check_page_index_push_down_valid(&pruning_predicate) {
                 let file_offset_indexes = file_metadata.offset_indexes();
@@ -480,7 +511,7 @@ impl FileOpener for ParquetOpener {
                                 pruning_predicate.clone(),
                                 file_offset_indexes.get(*r),
                                 file_page_indexes.get(*r),
-                                &metrics,
+                                &file_metrics,
                             )
                             .map_err(|e| {
                                 ArrowError::ParquetError(format!(
@@ -564,7 +595,7 @@ impl DefaultParquetFileReaderFactory {
 struct ParquetFileReader {
     store: Arc<dyn ObjectStore>,
     meta: ObjectMeta,
-    metrics: ParquetFileMetrics,
+    file_metrics: ParquetFileMetrics,
     metadata_size_hint: Option<usize>,
 }
 
@@ -573,7 +604,7 @@ impl AsyncFileReader for ParquetFileReader {
         &mut self,
         range: Range<usize>,
     ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
-        self.metrics.bytes_scanned.add(range.end - range.start);
+        self.file_metrics.bytes_scanned.add(range.end - range.start);
 
         self.store
             .get_range(&self.meta.location, range)
@@ -591,7 +622,7 @@ impl AsyncFileReader for ParquetFileReader {
         Self: Send,
     {
         let total = ranges.iter().map(|r| r.end - r.start).sum();
-        self.metrics.bytes_scanned.add(total);
+        self.file_metrics.bytes_scanned.add(total);
 
         async move {
             self.store
@@ -636,7 +667,7 @@ impl ParquetFileReaderFactory for 
DefaultParquetFileReaderFactory {
         metadata_size_hint: Option<usize>,
         metrics: &ExecutionPlanMetricsSet,
     ) -> Result<Box<dyn AsyncFileReader + Send>> {
-        let parquet_file_metrics = ParquetFileMetrics::new(
+        let file_metrics = ParquetFileMetrics::new(
             partition_index,
             file_meta.location().as_ref(),
             metrics,
@@ -646,7 +677,7 @@ impl ParquetFileReaderFactory for 
DefaultParquetFileReaderFactory {
             meta: file_meta.object_meta,
             store: Arc::clone(&self.store),
             metadata_size_hint,
-            metrics: parquet_file_metrics,
+            file_metrics,
         }))
     }
 }
@@ -1167,6 +1198,7 @@ mod tests {
     use crate::datasource::listing::{FileRange, PartitionedFile};
     use crate::datasource::object_store::ObjectStoreUrl;
     use crate::execution::options::CsvReadOptions;
+    use crate::physical_plan::metrics::MetricValue;
     use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
     use crate::test::object_store::local_unpartitioned_file;
     use crate::{
@@ -1199,8 +1231,15 @@ mod tests {
     use std::io::Write;
     use tempfile::TempDir;
 
-    /// writes each RecordBatch as an individual parquet file and then
-    /// reads it back in to the named location.
+    struct RoundTripResult {
+        /// Data that was read back from ParquetFiles
+        batches: Result<Vec<RecordBatch>>,
+        /// The physical plan that was created (that has statistics, etc)
+        parquet_exec: Arc<ParquetExec>,
+    }
+
+    /// writes each RecordBatch as an individual parquet file and re-reads
+    /// the data back. Returns the data as [RecordBatch]es
     async fn round_trip_to_parquet(
         batches: Vec<RecordBatch>,
         projection: Option<Vec<usize>>,
@@ -1208,14 +1247,30 @@ mod tests {
         predicate: Option<Expr>,
         pushdown_predicate: bool,
     ) -> Result<Vec<RecordBatch>> {
+        round_trip(batches, projection, schema, predicate, pushdown_predicate)
+            .await
+            .batches
+    }
+
+    /// Writes each RecordBatch as an individual parquet file and then
+    /// reads them back. Returns the parquet exec as well as the data
+    /// as [RecordBatch]es
+    async fn round_trip(
+        batches: Vec<RecordBatch>,
+        projection: Option<Vec<usize>>,
+        schema: Option<SchemaRef>,
+        predicate: Option<Expr>,
+        pushdown_predicate: bool,
+    ) -> RoundTripResult {
         let file_schema = match schema {
             Some(schema) => schema,
-            None => Arc::new(Schema::try_merge(
-                batches.iter().map(|b| b.schema().as_ref().clone()),
-            )?),
+            None => Arc::new(
+                Schema::try_merge(batches.iter().map(|b| 
b.schema().as_ref().clone()))
+                    .unwrap(),
+            ),
         };
 
-        let (meta, _files) = store_parquet(batches).await?;
+        let (meta, _files) = store_parquet(batches).await.unwrap();
         let file_groups = meta.into_iter().map(Into::into).collect();
 
         // prepare the scan
@@ -1242,7 +1297,11 @@ mod tests {
 
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
-        collect(Arc::new(parquet_exec), task_ctx).await
+        let parquet_exec = Arc::new(parquet_exec);
+        RoundTripResult {
+            batches: collect(parquet_exec.clone(), task_ctx).await,
+            parquet_exec,
+        }
     }
 
     // Add a new column with the specified field name to the RecordBatch
@@ -1453,10 +1512,7 @@ mod tests {
         let filter = col("c2").eq(lit(2_i64));
 
         // read/write them files:
-        let read =
-            round_trip_to_parquet(vec![batch1, batch2], None, None, 
Some(filter), true)
-                .await
-                .unwrap();
+        let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), 
true).await;
         let expected = vec![
             "+----+----+----+",
             "| c1 | c3 | c2 |",
@@ -1464,7 +1520,10 @@ mod tests {
             "|    | 20 | 2  |",
             "+----+----+----+",
         ];
-        assert_batches_sorted_eq!(expected, &read);
+        assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
+        let metrics = rt.parquet_exec.metrics().unwrap();
+        // Note there are were 6 rows in total (across three batches)
+        assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
     }
 
     #[tokio::test]
@@ -1587,7 +1646,7 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn evolved_schema_disjoint_schema_filter_with_pushdown() {
+    async fn evolved_schema_disjoint_schema_with_filter_pushdown() {
         let c1: ArrayRef =
             Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
 
@@ -1602,10 +1661,7 @@ mod tests {
         let filter = col("c2").eq(lit(1_i64));
 
         // read/write them files:
-        let read =
-            round_trip_to_parquet(vec![batch1, batch2], None, None, 
Some(filter), true)
-                .await
-                .unwrap();
+        let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), 
true).await;
 
         let expected = vec![
             "+----+----+",
@@ -1614,7 +1670,10 @@ mod tests {
             "|    | 1  |",
             "+----+----+",
         ];
-        assert_batches_sorted_eq!(expected, &read);
+        assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
+        let metrics = rt.parquet_exec.metrics().unwrap();
+        // Note there are were 6 rows in total (across three batches)
+        assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
     }
 
     #[tokio::test]
@@ -1895,6 +1954,71 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn parquet_exec_metrics() {
+        let c1: ArrayRef = Arc::new(StringArray::from(vec![
+            Some("Foo"),
+            None,
+            Some("bar"),
+            Some("bar"),
+            Some("bar"),
+            Some("bar"),
+            Some("zzz"),
+        ]));
+
+        // batch1: c1(string)
+        let batch1 = create_batch(vec![("c1", c1.clone())]);
+
+        // on
+        let filter = col("c1").not_eq(lit("bar"));
+
+        // read/write them files:
+        let rt = round_trip(vec![batch1], None, None, Some(filter), 
true).await;
+
+        let metrics = rt.parquet_exec.metrics().unwrap();
+
+        // assert the batches and some metrics
+        let expected = vec![
+            "+-----+", "| c1  |", "+-----+", "| Foo |", "| zzz |", "+-----+",
+        ];
+        assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
+
+        // pushdown predicates have eliminated all 4 bar rows and the
+        // null row for 5 rows total
+        assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
+        assert!(
+            get_value(&metrics, "pushdown_eval_time") > 0,
+            "no eval time in metrics: {:#?}",
+            metrics
+        );
+    }
+
+    /// returns the sum of all the metrics with the specified name
+    /// the returned set.
+    ///
+    /// Count: returns value
+    /// Time: returns elapsed nanoseconds
+    ///
+    /// Panics if no such metric.
+    fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
+        let sum = metrics.sum(|m| match m.value() {
+            MetricValue::Count { name, .. } if name == metric_name => true,
+            MetricValue::Time { name, .. } if name == metric_name => true,
+            _ => false,
+        });
+
+        match sum {
+            Some(MetricValue::Count { count, .. }) => count.value(),
+            Some(MetricValue::Time { time, .. }) => time.value(),
+            _ => {
+                panic!(
+                    "Expected metric not found. Looking for '{}' in\n\n{:#?}",
+                    metric_name, metrics
+                );
+            }
+        }
+    }
+
     fn parquet_file_metrics() -> ParquetFileMetrics {
         let metrics = Arc::new(ExecutionPlanMetricsSet::new());
         ParquetFileMetrics::new(0, "file.parquet", &metrics)
diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs 
b/datafusion/core/src/physical_plan/file_format/row_filter.rs
index dd9c8fb65..49ec6b5ca 100644
--- a/datafusion/core/src/physical_plan/file_format/row_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/row_filter.rs
@@ -31,6 +31,8 @@ use parquet::arrow::ProjectionMask;
 use parquet::file::metadata::ParquetMetaData;
 use std::sync::Arc;
 
+use crate::physical_plan::metrics;
+
 /// This module contains utilities for enabling the pushdown of DataFusion 
filter predicates (which
 /// can be any DataFusion `Expr` that evaluates to a `BooleanArray`) to the 
parquet decoder level in `arrow-rs`.
 /// DataFusion will use a `ParquetRecordBatchStream` to read data from parquet 
into arrow `RecordBatch`es.
@@ -66,6 +68,10 @@ use std::sync::Arc;
 pub(crate) struct DatafusionArrowPredicate {
     physical_expr: Arc<dyn PhysicalExpr>,
     projection: ProjectionMask,
+    /// how many rows were filtered out by this predicate
+    rows_filtered: metrics::Count,
+    /// how long was spent evaluating this predicate
+    time: metrics::Time,
 }
 
 impl DatafusionArrowPredicate {
@@ -73,6 +79,8 @@ impl DatafusionArrowPredicate {
         candidate: FilterCandidate,
         schema: &Schema,
         metadata: &ParquetMetaData,
+        rows_filtered: metrics::Count,
+        time: metrics::Time,
     ) -> Result<Self> {
         let props = ExecutionProps::default();
 
@@ -88,6 +96,8 @@ impl DatafusionArrowPredicate {
                 metadata.file_metadata().schema_descr(),
                 candidate.projection,
             ),
+            rows_filtered,
+            time,
         })
     }
 }
@@ -98,6 +108,8 @@ impl ArrowPredicate for DatafusionArrowPredicate {
     }
 
     fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
+        // scoped timer updates on drop
+        let mut timer = self.time.timer();
         match self
             .physical_expr
             .evaluate(&batch)
@@ -105,7 +117,11 @@ impl ArrowPredicate for DatafusionArrowPredicate {
         {
             Ok(array) => {
                 if let Some(mask) = 
array.as_any().downcast_ref::<BooleanArray>() {
-                    Ok(BooleanArray::from(mask.data().clone()))
+                    let bool_arr = BooleanArray::from(mask.data().clone());
+                    let num_filtered = bool_arr.len() - true_count(&bool_arr);
+                    self.rows_filtered.add(num_filtered);
+                    timer.stop();
+                    Ok(bool_arr)
                 } else {
                     Err(ArrowError::ComputeError(
                         "Unexpected result of predicate evaluation, expected 
BooleanArray".to_owned(),
@@ -120,6 +136,27 @@ impl ArrowPredicate for DatafusionArrowPredicate {
     }
 }
 
+/// Return the number of non null true vaulues in an array
+// TODO remove when https://github.com/apache/arrow-rs/issues/2963 is released
+fn true_count(arr: &BooleanArray) -> usize {
+    match arr.data().null_buffer() {
+        Some(nulls) => {
+            let null_chunks = nulls.bit_chunks(arr.offset(), arr.len());
+            let value_chunks = arr.values().bit_chunks(arr.offset(), 
arr.len());
+            null_chunks
+                .iter()
+                .zip(value_chunks.iter())
+                .chain(std::iter::once((
+                    null_chunks.remainder_bits(),
+                    value_chunks.remainder_bits(),
+                )))
+                .map(|(a, b)| (a & b).count_ones() as usize)
+                .sum()
+        }
+        None => arr.values().count_set_bits_offset(arr.offset(), arr.len()),
+    }
+}
+
 /// A candidate expression for creating a `RowFilter` contains the
 /// expression as well as data to estimate the cost of evaluating
 /// the resulting expression.
@@ -252,6 +289,8 @@ pub fn build_row_filter(
     table_schema: &Schema,
     metadata: &ParquetMetaData,
     reorder_predicates: bool,
+    rows_filtered: &metrics::Count,
+    time: &metrics::Time,
 ) -> Result<Option<RowFilter>> {
     let predicates = split_conjunction_owned(expr);
 
@@ -280,15 +319,25 @@ pub fn build_row_filter(
         let mut filters: Vec<Box<dyn ArrowPredicate>> = vec![];
 
         for candidate in indexed_candidates {
-            let filter =
-                DatafusionArrowPredicate::try_new(candidate, file_schema, 
metadata)?;
+            let filter = DatafusionArrowPredicate::try_new(
+                candidate,
+                file_schema,
+                metadata,
+                rows_filtered.clone(),
+                time.clone(),
+            )?;
 
             filters.push(Box::new(filter));
         }
 
         for candidate in other_candidates {
-            let filter =
-                DatafusionArrowPredicate::try_new(candidate, file_schema, 
metadata)?;
+            let filter = DatafusionArrowPredicate::try_new(
+                candidate,
+                file_schema,
+                metadata,
+                rows_filtered.clone(),
+                time.clone(),
+            )?;
 
             filters.push(Box::new(filter));
         }
@@ -297,8 +346,13 @@ pub fn build_row_filter(
     } else {
         let mut filters: Vec<Box<dyn ArrowPredicate>> = vec![];
         for candidate in candidates {
-            let filter =
-                DatafusionArrowPredicate::try_new(candidate, file_schema, 
metadata)?;
+            let filter = DatafusionArrowPredicate::try_new(
+                candidate,
+                file_schema,
+                metadata,
+                rows_filtered.clone(),
+                time.clone(),
+            )?;
 
             filters.push(Box::new(filter));
         }
diff --git a/datafusion/core/src/physical_plan/metrics/mod.rs 
b/datafusion/core/src/physical_plan/metrics/mod.rs
index dbbb8af4f..7d6d56c70 100644
--- a/datafusion/core/src/physical_plan/metrics/mod.rs
+++ b/datafusion/core/src/physical_plan/metrics/mod.rs
@@ -166,8 +166,7 @@ impl Metric {
     }
 }
 
-/// A snapshot of the metrics for a particular operator (`dyn
-/// ExecutionPlan`).
+/// A snapshot of the metrics for a particular ([`ExecutionPlan`]).
 #[derive(Default, Debug, Clone)]
 pub struct MetricsSet {
     metrics: Vec<Arc<Metric>>,
@@ -379,6 +378,16 @@ impl Label {
         let value = value.into();
         Self { name, value }
     }
+
+    /// Return the name of this label
+    pub fn name(&self) -> &str {
+        self.name.as_ref()
+    }
+
+    /// Return the value of this label
+    pub fn value(&self) -> &str {
+        self.value.as_ref()
+    }
 }
 
 impl Display for Label {

Reply via email to