This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new afc299a48 Add parquet predicate pushdown metrics (#3989)
afc299a48 is described below
commit afc299a48b6c2438643d0c84c408ba104424dbd4
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Oct 30 07:32:13 2022 -0400
Add parquet predicate pushdown metrics (#3989)
* Log error building row filters
Inspired by @liukun4515 at
https://github.com/apache/arrow-datafusion/pull/3380/files#r970198755
* Add parquet predicate pushdown metrics
* more efficient bit counting
---
.../core/src/physical_plan/file_format/parquet.rs | 186 +++++++++++++++++----
.../src/physical_plan/file_format/row_filter.rs | 68 +++++++-
datafusion/core/src/physical_plan/metrics/mod.rs | 13 +-
3 files changed, 227 insertions(+), 40 deletions(-)
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 0dda94322..f9ec72ab0 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -237,6 +237,10 @@ pub struct ParquetFileMetrics {
pub row_groups_pruned: metrics::Count,
/// Total number of bytes scanned
pub bytes_scanned: metrics::Count,
+ /// Total rows filtered out by predicates pushed into parquet scan
+ pub pushdown_rows_filtered: metrics::Count,
+ /// Total time spent evaluating pushdown filters
+ pub pushdown_eval_time: metrics::Time,
}
impl ParquetFileMetrics {
@@ -258,10 +262,20 @@ impl ParquetFileMetrics {
.with_new_label("filename", filename.to_string())
.counter("bytes_scanned", partition);
+ let pushdown_rows_filtered = MetricBuilder::new(metrics)
+ .with_new_label("filename", filename.to_string())
+ .counter("pushdown_rows_filtered", partition);
+
+ let pushdown_eval_time = MetricBuilder::new(metrics)
+ .with_new_label("filename", filename.to_string())
+ .subset_time("pushdown_eval_time", partition);
+
Self {
predicate_evaluation_errors,
row_groups_pruned,
bytes_scanned,
+ pushdown_rows_filtered,
+ pushdown_eval_time,
}
}
}
@@ -410,7 +424,7 @@ impl FileOpener for ParquetOpener {
) -> Result<FileOpenFuture> {
let file_range = file_meta.range.clone();
- let metrics = ParquetFileMetrics::new(
+ let file_metrics = ParquetFileMetrics::new(
self.partition_index,
file_meta.location().as_ref(),
&self.metrics,
@@ -450,21 +464,38 @@ impl FileOpener for ParquetOpener {
.then(|| pruning_predicate.as_ref().map(|p| p.logical_expr()))
.flatten()
{
- if let Ok(Some(filter)) = build_row_filter(
+ let row_filter = build_row_filter(
predicate.clone(),
builder.schema().as_ref(),
table_schema.as_ref(),
builder.metadata(),
reorder_predicates,
- ) {
- builder = builder.with_row_filter(filter);
- }
+ &file_metrics.pushdown_rows_filtered,
+ &file_metrics.pushdown_eval_time,
+ );
+
+ match row_filter {
+ Ok(Some(filter)) => {
+ builder = builder.with_row_filter(filter);
+ }
+ Ok(None) => {}
+ Err(e) => {
+ debug!(
+ "Ignoring error building row filter for '{:?}':
{}",
+ predicate, e
+ );
+ }
+ };
};
let file_metadata = builder.metadata();
let groups = file_metadata.row_groups();
- let row_groups =
- prune_row_groups(groups, file_range,
pruning_predicate.clone(), &metrics);
+ let row_groups = prune_row_groups(
+ groups,
+ file_range,
+ pruning_predicate.clone(),
+ &file_metrics,
+ );
if enable_page_index &&
check_page_index_push_down_valid(&pruning_predicate) {
let file_offset_indexes = file_metadata.offset_indexes();
@@ -480,7 +511,7 @@ impl FileOpener for ParquetOpener {
pruning_predicate.clone(),
file_offset_indexes.get(*r),
file_page_indexes.get(*r),
- &metrics,
+ &file_metrics,
)
.map_err(|e| {
ArrowError::ParquetError(format!(
@@ -564,7 +595,7 @@ impl DefaultParquetFileReaderFactory {
struct ParquetFileReader {
store: Arc<dyn ObjectStore>,
meta: ObjectMeta,
- metrics: ParquetFileMetrics,
+ file_metrics: ParquetFileMetrics,
metadata_size_hint: Option<usize>,
}
@@ -573,7 +604,7 @@ impl AsyncFileReader for ParquetFileReader {
&mut self,
range: Range<usize>,
) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
- self.metrics.bytes_scanned.add(range.end - range.start);
+ self.file_metrics.bytes_scanned.add(range.end - range.start);
self.store
.get_range(&self.meta.location, range)
@@ -591,7 +622,7 @@ impl AsyncFileReader for ParquetFileReader {
Self: Send,
{
let total = ranges.iter().map(|r| r.end - r.start).sum();
- self.metrics.bytes_scanned.add(total);
+ self.file_metrics.bytes_scanned.add(total);
async move {
self.store
@@ -636,7 +667,7 @@ impl ParquetFileReaderFactory for
DefaultParquetFileReaderFactory {
metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> Result<Box<dyn AsyncFileReader + Send>> {
- let parquet_file_metrics = ParquetFileMetrics::new(
+ let file_metrics = ParquetFileMetrics::new(
partition_index,
file_meta.location().as_ref(),
metrics,
@@ -646,7 +677,7 @@ impl ParquetFileReaderFactory for
DefaultParquetFileReaderFactory {
meta: file_meta.object_meta,
store: Arc::clone(&self.store),
metadata_size_hint,
- metrics: parquet_file_metrics,
+ file_metrics,
}))
}
}
@@ -1167,6 +1198,7 @@ mod tests {
use crate::datasource::listing::{FileRange, PartitionedFile};
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::options::CsvReadOptions;
+ use crate::physical_plan::metrics::MetricValue;
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use crate::test::object_store::local_unpartitioned_file;
use crate::{
@@ -1199,8 +1231,15 @@ mod tests {
use std::io::Write;
use tempfile::TempDir;
- /// writes each RecordBatch as an individual parquet file and then
- /// reads it back in to the named location.
+ struct RoundTripResult {
+ /// Data that was read back from ParquetFiles
+ batches: Result<Vec<RecordBatch>>,
+ /// The physical plan that was created (that has statistics, etc)
+ parquet_exec: Arc<ParquetExec>,
+ }
+
+ /// writes each RecordBatch as an individual parquet file and re-reads
+ /// the data back. Returns the data as [RecordBatch]es
async fn round_trip_to_parquet(
batches: Vec<RecordBatch>,
projection: Option<Vec<usize>>,
@@ -1208,14 +1247,30 @@ mod tests {
predicate: Option<Expr>,
pushdown_predicate: bool,
) -> Result<Vec<RecordBatch>> {
+ round_trip(batches, projection, schema, predicate, pushdown_predicate)
+ .await
+ .batches
+ }
+
+ /// Writes each RecordBatch as an individual parquet file and then
+ /// reads them back. Returns the parquet exec as well as the data
+ /// as [RecordBatch]es
+ async fn round_trip(
+ batches: Vec<RecordBatch>,
+ projection: Option<Vec<usize>>,
+ schema: Option<SchemaRef>,
+ predicate: Option<Expr>,
+ pushdown_predicate: bool,
+ ) -> RoundTripResult {
let file_schema = match schema {
Some(schema) => schema,
- None => Arc::new(Schema::try_merge(
- batches.iter().map(|b| b.schema().as_ref().clone()),
- )?),
+ None => Arc::new(
+ Schema::try_merge(batches.iter().map(|b|
b.schema().as_ref().clone()))
+ .unwrap(),
+ ),
};
- let (meta, _files) = store_parquet(batches).await?;
+ let (meta, _files) = store_parquet(batches).await.unwrap();
let file_groups = meta.into_iter().map(Into::into).collect();
// prepare the scan
@@ -1242,7 +1297,11 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
- collect(Arc::new(parquet_exec), task_ctx).await
+ let parquet_exec = Arc::new(parquet_exec);
+ RoundTripResult {
+ batches: collect(parquet_exec.clone(), task_ctx).await,
+ parquet_exec,
+ }
}
// Add a new column with the specified field name to the RecordBatch
@@ -1453,10 +1512,7 @@ mod tests {
let filter = col("c2").eq(lit(2_i64));
// read/write them files:
- let read =
- round_trip_to_parquet(vec![batch1, batch2], None, None,
Some(filter), true)
- .await
- .unwrap();
+ let rt = round_trip(vec![batch1, batch2], None, None, Some(filter),
true).await;
let expected = vec![
"+----+----+----+",
"| c1 | c3 | c2 |",
@@ -1464,7 +1520,10 @@ mod tests {
"| | 20 | 2 |",
"+----+----+----+",
];
- assert_batches_sorted_eq!(expected, &read);
+ assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
+ let metrics = rt.parquet_exec.metrics().unwrap();
+ // Note there are were 6 rows in total (across three batches)
+ assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
}
#[tokio::test]
@@ -1587,7 +1646,7 @@ mod tests {
}
#[tokio::test]
- async fn evolved_schema_disjoint_schema_filter_with_pushdown() {
+ async fn evolved_schema_disjoint_schema_with_filter_pushdown() {
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
@@ -1602,10 +1661,7 @@ mod tests {
let filter = col("c2").eq(lit(1_i64));
// read/write them files:
- let read =
- round_trip_to_parquet(vec![batch1, batch2], None, None,
Some(filter), true)
- .await
- .unwrap();
+ let rt = round_trip(vec![batch1, batch2], None, None, Some(filter),
true).await;
let expected = vec![
"+----+----+",
@@ -1614,7 +1670,10 @@ mod tests {
"| | 1 |",
"+----+----+",
];
- assert_batches_sorted_eq!(expected, &read);
+ assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
+ let metrics = rt.parquet_exec.metrics().unwrap();
+ // Note there are were 6 rows in total (across three batches)
+ assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
}
#[tokio::test]
@@ -1895,6 +1954,71 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn parquet_exec_metrics() {
+ let c1: ArrayRef = Arc::new(StringArray::from(vec![
+ Some("Foo"),
+ None,
+ Some("bar"),
+ Some("bar"),
+ Some("bar"),
+ Some("bar"),
+ Some("zzz"),
+ ]));
+
+ // batch1: c1(string)
+ let batch1 = create_batch(vec![("c1", c1.clone())]);
+
+ // on
+ let filter = col("c1").not_eq(lit("bar"));
+
+ // read/write them files:
+ let rt = round_trip(vec![batch1], None, None, Some(filter),
true).await;
+
+ let metrics = rt.parquet_exec.metrics().unwrap();
+
+ // assert the batches and some metrics
+ let expected = vec![
+ "+-----+", "| c1 |", "+-----+", "| Foo |", "| zzz |", "+-----+",
+ ];
+ assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
+
+ // pushdown predicates have eliminated all 4 bar rows and the
+ // null row for 5 rows total
+ assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
+ assert!(
+ get_value(&metrics, "pushdown_eval_time") > 0,
+ "no eval time in metrics: {:#?}",
+ metrics
+ );
+ }
+
+ /// returns the sum of all the metrics with the specified name
+ /// the returned set.
+ ///
+ /// Count: returns value
+ /// Time: returns elapsed nanoseconds
+ ///
+ /// Panics if no such metric.
+ fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
+ let sum = metrics.sum(|m| match m.value() {
+ MetricValue::Count { name, .. } if name == metric_name => true,
+ MetricValue::Time { name, .. } if name == metric_name => true,
+ _ => false,
+ });
+
+ match sum {
+ Some(MetricValue::Count { count, .. }) => count.value(),
+ Some(MetricValue::Time { time, .. }) => time.value(),
+ _ => {
+ panic!(
+ "Expected metric not found. Looking for '{}' in\n\n{:#?}",
+ metric_name, metrics
+ );
+ }
+ }
+ }
+
fn parquet_file_metrics() -> ParquetFileMetrics {
let metrics = Arc::new(ExecutionPlanMetricsSet::new());
ParquetFileMetrics::new(0, "file.parquet", &metrics)
diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs
b/datafusion/core/src/physical_plan/file_format/row_filter.rs
index dd9c8fb65..49ec6b5ca 100644
--- a/datafusion/core/src/physical_plan/file_format/row_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/row_filter.rs
@@ -31,6 +31,8 @@ use parquet::arrow::ProjectionMask;
use parquet::file::metadata::ParquetMetaData;
use std::sync::Arc;
+use crate::physical_plan::metrics;
+
/// This module contains utilities for enabling the pushdown of DataFusion
filter predicates (which
/// can be any DataFusion `Expr` that evaluates to a `BooleanArray`) to the
parquet decoder level in `arrow-rs`.
/// DataFusion will use a `ParquetRecordBatchStream` to read data from parquet
into arrow `RecordBatch`es.
@@ -66,6 +68,10 @@ use std::sync::Arc;
pub(crate) struct DatafusionArrowPredicate {
physical_expr: Arc<dyn PhysicalExpr>,
projection: ProjectionMask,
+ /// how many rows were filtered out by this predicate
+ rows_filtered: metrics::Count,
+ /// how long was spent evaluating this predicate
+ time: metrics::Time,
}
impl DatafusionArrowPredicate {
@@ -73,6 +79,8 @@ impl DatafusionArrowPredicate {
candidate: FilterCandidate,
schema: &Schema,
metadata: &ParquetMetaData,
+ rows_filtered: metrics::Count,
+ time: metrics::Time,
) -> Result<Self> {
let props = ExecutionProps::default();
@@ -88,6 +96,8 @@ impl DatafusionArrowPredicate {
metadata.file_metadata().schema_descr(),
candidate.projection,
),
+ rows_filtered,
+ time,
})
}
}
@@ -98,6 +108,8 @@ impl ArrowPredicate for DatafusionArrowPredicate {
}
fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
+ // scoped timer updates on drop
+ let mut timer = self.time.timer();
match self
.physical_expr
.evaluate(&batch)
@@ -105,7 +117,11 @@ impl ArrowPredicate for DatafusionArrowPredicate {
{
Ok(array) => {
if let Some(mask) =
array.as_any().downcast_ref::<BooleanArray>() {
- Ok(BooleanArray::from(mask.data().clone()))
+ let bool_arr = BooleanArray::from(mask.data().clone());
+ let num_filtered = bool_arr.len() - true_count(&bool_arr);
+ self.rows_filtered.add(num_filtered);
+ timer.stop();
+ Ok(bool_arr)
} else {
Err(ArrowError::ComputeError(
"Unexpected result of predicate evaluation, expected
BooleanArray".to_owned(),
@@ -120,6 +136,27 @@ impl ArrowPredicate for DatafusionArrowPredicate {
}
}
+/// Return the number of non null true vaulues in an array
+// TODO remove when https://github.com/apache/arrow-rs/issues/2963 is released
+fn true_count(arr: &BooleanArray) -> usize {
+ match arr.data().null_buffer() {
+ Some(nulls) => {
+ let null_chunks = nulls.bit_chunks(arr.offset(), arr.len());
+ let value_chunks = arr.values().bit_chunks(arr.offset(),
arr.len());
+ null_chunks
+ .iter()
+ .zip(value_chunks.iter())
+ .chain(std::iter::once((
+ null_chunks.remainder_bits(),
+ value_chunks.remainder_bits(),
+ )))
+ .map(|(a, b)| (a & b).count_ones() as usize)
+ .sum()
+ }
+ None => arr.values().count_set_bits_offset(arr.offset(), arr.len()),
+ }
+}
+
/// A candidate expression for creating a `RowFilter` contains the
/// expression as well as data to estimate the cost of evaluating
/// the resulting expression.
@@ -252,6 +289,8 @@ pub fn build_row_filter(
table_schema: &Schema,
metadata: &ParquetMetaData,
reorder_predicates: bool,
+ rows_filtered: &metrics::Count,
+ time: &metrics::Time,
) -> Result<Option<RowFilter>> {
let predicates = split_conjunction_owned(expr);
@@ -280,15 +319,25 @@ pub fn build_row_filter(
let mut filters: Vec<Box<dyn ArrowPredicate>> = vec![];
for candidate in indexed_candidates {
- let filter =
- DatafusionArrowPredicate::try_new(candidate, file_schema,
metadata)?;
+ let filter = DatafusionArrowPredicate::try_new(
+ candidate,
+ file_schema,
+ metadata,
+ rows_filtered.clone(),
+ time.clone(),
+ )?;
filters.push(Box::new(filter));
}
for candidate in other_candidates {
- let filter =
- DatafusionArrowPredicate::try_new(candidate, file_schema,
metadata)?;
+ let filter = DatafusionArrowPredicate::try_new(
+ candidate,
+ file_schema,
+ metadata,
+ rows_filtered.clone(),
+ time.clone(),
+ )?;
filters.push(Box::new(filter));
}
@@ -297,8 +346,13 @@ pub fn build_row_filter(
} else {
let mut filters: Vec<Box<dyn ArrowPredicate>> = vec![];
for candidate in candidates {
- let filter =
- DatafusionArrowPredicate::try_new(candidate, file_schema,
metadata)?;
+ let filter = DatafusionArrowPredicate::try_new(
+ candidate,
+ file_schema,
+ metadata,
+ rows_filtered.clone(),
+ time.clone(),
+ )?;
filters.push(Box::new(filter));
}
diff --git a/datafusion/core/src/physical_plan/metrics/mod.rs
b/datafusion/core/src/physical_plan/metrics/mod.rs
index dbbb8af4f..7d6d56c70 100644
--- a/datafusion/core/src/physical_plan/metrics/mod.rs
+++ b/datafusion/core/src/physical_plan/metrics/mod.rs
@@ -166,8 +166,7 @@ impl Metric {
}
}
-/// A snapshot of the metrics for a particular operator (`dyn
-/// ExecutionPlan`).
+/// A snapshot of the metrics for a particular ([`ExecutionPlan`]).
#[derive(Default, Debug, Clone)]
pub struct MetricsSet {
metrics: Vec<Arc<Metric>>,
@@ -379,6 +378,16 @@ impl Label {
let value = value.into();
Self { name, value }
}
+
+ /// Return the name of this label
+ pub fn name(&self) -> &str {
+ self.name.as_ref()
+ }
+
+ /// Return the value of this label
+ pub fn value(&self) -> &str {
+ self.value.as_ref()
+ }
}
impl Display for Label {