This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 011bcf490 Implement parquet page-level skipping with column index,
using min/ma… (#3780)
011bcf490 is described below
commit 011bcf4901718a8a467352275f4074647d2d8ba2
Author: Yang Jiang <[email protected]>
AuthorDate: Sat Oct 15 18:35:44 2022 +0800
Implement parquet page-level skipping with column index, using min/ma…
(#3780)
* Implement parquet page-level skipping with column index, using min/max
stats
Signed-off-by: yangjiang <[email protected]>
* add test
Signed-off-by: yangjiang <[email protected]>
* support some types
Signed-off-by: yangjiang <[email protected]>
* fix typo
Signed-off-by: yangjiang <[email protected]>
* add comments
Signed-off-by: yangjiang <[email protected]>
Signed-off-by: yangjiang <[email protected]>
---
datafusion/core/src/physical_optimizer/pruning.rs | 14 +
.../core/src/physical_plan/file_format/parquet.rs | 329 ++++++++++++++++++++-
2 files changed, 340 insertions(+), 3 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs
b/datafusion/core/src/physical_optimizer/pruning.rs
index 8abd5f7c4..18ea532ea 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -223,6 +223,20 @@ impl PruningPredicate {
pub fn predicate_expr(&self) -> &Arc<dyn PhysicalExpr> {
&self.predicate_expr
}
+
+ /// Returns all need column indexes to evaluate this pruning predicate
+ pub(crate) fn need_input_columns_ids(&self) -> HashSet<usize> {
+ let mut set = HashSet::new();
+ self.required_columns.columns.iter().for_each(|x| {
+ match self.schema().column_with_name(x.0.name.as_str()) {
+ None => {}
+ Some(y) => {
+ set.insert(y.0);
+ }
+ }
+ });
+ set
+ }
}
/// Handles creating references to the min/max statistics
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 5f72c7acc..a5b146dff 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -44,6 +44,7 @@ use crate::{
},
scalar::ScalarValue,
};
+use arrow::array::{BooleanArray, Float32Array, Float64Array, Int32Array,
Int64Array};
use arrow::datatypes::DataType;
use arrow::{
array::ArrayRef,
@@ -57,16 +58,18 @@ use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use log::debug;
use object_store::{ObjectMeta, ObjectStore};
-use parquet::arrow::arrow_reader::ArrowReaderOptions;
+use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection,
RowSelector};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder,
ProjectionMask};
use parquet::basic::{ConvertedType, LogicalType};
use parquet::errors::ParquetError;
+use parquet::file::page_index::index::Index;
use parquet::file::{
metadata::{ParquetMetaData, RowGroupMetaData},
properties::WriterProperties,
statistics::Statistics as ParquetStatistics,
};
+use parquet::format::PageLocation;
use parquet::schema::types::ColumnDescriptor;
#[derive(Debug, Clone, Default)]
@@ -435,9 +438,44 @@ impl FileOpener for ParquetOpener {
}
};
- let groups = builder.metadata().row_groups();
+ let file_metadata = builder.metadata();
+ let groups = file_metadata.row_groups();
let row_groups =
- prune_row_groups(groups, file_range, pruning_predicate,
&metrics);
+ prune_row_groups(groups, file_range,
pruning_predicate.clone(), &metrics);
+
+ if enable_page_index &&
check_page_index_push_down_valid(&pruning_predicate) {
+ let file_offset_indexes = file_metadata.offset_indexes();
+ let file_page_indexes = file_metadata.page_indexes();
+ if let (Some(file_offset_indexes), Some(file_page_indexes)) =
+ (file_offset_indexes, file_page_indexes)
+ {
+ let mut selectors = Vec::with_capacity(row_groups.len());
+ for r in &row_groups {
+ selectors.extend(
+ prune_pages_in_one_row_group(
+ &groups[*r],
+ pruning_predicate.clone(),
+ file_offset_indexes.get(*r),
+ file_page_indexes.get(*r),
+ &metrics,
+ )
+ .map_err(|e| {
+ ArrowError::ParquetError(format!(
+ "Fail in prune_pages_in_one_row_group: {}",
+ e
+ ))
+ }),
+ );
+ }
+ debug!(
+ "Use filter and page index create RowSelection {:?} ",
+ &selectors
+ );
+ builder = builder.with_row_selection(RowSelection::from(
+ selectors.into_iter().flatten().collect::<Vec<_>>(),
+ ));
+ }
+ }
let stream = builder
.with_projection(mask)
@@ -460,6 +498,20 @@ impl FileOpener for ParquetOpener {
}
}
+// Check PruningPredicates just work on one column.
+fn check_page_index_push_down_valid(predicate: &Option<PruningPredicate>) ->
bool {
+ if let Some(predicate) = predicate {
+ // for now we only support pushDown on one col, because each col may
have different page numbers, its hard to get
+ // `num_containers` from `PruningStatistics`.
+ let cols = predicate.need_input_columns_ids();
+ //Todo more specific rules
+ if cols.len() == 1 {
+ return true;
+ }
+ }
+ false
+}
+
/// Factory of parquet file readers.
///
/// Provides means to implement custom data access interface.
@@ -617,6 +669,16 @@ struct RowGroupPruningStatistics<'a> {
parquet_schema: &'a Schema,
}
+/// Wraps page_index statistics in a way
+/// that implements [`PruningStatistics`]
+struct PagesPruningStatistics<'a> {
+ //row_group_metadata: &'a RowGroupMetaData,
+ page_indexes: &'a Vec<Index>,
+ offset_indexes: &'a Vec<Vec<PageLocation>>,
+ parquet_schema: &'a Schema,
+ col_id: usize,
+}
+
// TODO: consolidate code with arrow-rs
// Convert the bytes array to i128.
// The endian of the input bytes array must be big-endian.
@@ -749,6 +811,61 @@ macro_rules! get_null_count_values {
}};
}
+// Extract the min or max value calling `func` from page idex
+macro_rules! get_min_max_values_for_page_index {
+ ($self:expr, $column:expr, $func:ident) => {{
+ if let Some((col_id_index, _field)) =
+ $self.parquet_schema.column_with_name(&$column.name)
+ {
+ if let Some(page_index) = $self.page_indexes.get(col_id_index) {
+ match page_index {
+ Index::NONE => None,
+ Index::INT32(index) => {
+ let vec = &index.indexes;
+ Some(Arc::new(Int32Array::from_iter(
+ vec.iter().map(|x| x.$func().cloned()),
+ )))
+ }
+ Index::INT64(index) => {
+ let vec = &index.indexes;
+ Some(Arc::new(Int64Array::from_iter(
+ vec.iter().map(|x| x.$func().cloned()),
+ )))
+ }
+ Index::FLOAT(index) => {
+ let vec = &index.indexes;
+ Some(Arc::new(Float32Array::from_iter(
+ vec.iter().map(|x| x.$func().cloned()),
+ )))
+ }
+ Index::DOUBLE(index) => {
+ let vec = &index.indexes;
+ Some(Arc::new(Float64Array::from_iter(
+ vec.iter().map(|x| x.$func().cloned()),
+ )))
+ }
+ Index::BOOLEAN(index) => {
+ let vec = &index.indexes;
+ Some(Arc::new(BooleanArray::from_iter(
+ vec.iter().map(|x| x.$func().cloned()),
+ )))
+ }
+ Index::INT96(_)
+ | Index::BYTE_ARRAY(_)
+ | Index::FIXED_LEN_BYTE_ARRAY(_) => {
+ //Todo support these type
+ None
+ }
+ }
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ }};
+}
+
// Convert parquet column schema to arrow data type, and just consider the
// decimal data type.
fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) ->
Option<DataType> {
@@ -785,6 +902,57 @@ impl<'a> PruningStatistics for
RowGroupPruningStatistics<'a> {
}
}
+impl<'a> PruningStatistics for PagesPruningStatistics<'a> {
+ fn min_values(&self, column: &Column) -> Option<ArrayRef> {
+ get_min_max_values_for_page_index!(self, column, min)
+ }
+
+ fn max_values(&self, column: &Column) -> Option<ArrayRef> {
+ get_min_max_values_for_page_index!(self, column, max)
+ }
+
+ fn num_containers(&self) -> usize {
+ self.offset_indexes.get(self.col_id).unwrap().len()
+ }
+
+ fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
+ if let Some((col_id_index, _field)) =
+ self.parquet_schema.column_with_name(&column.name)
+ {
+ if let Some(page_index) = self.page_indexes.get(col_id_index) {
+ match page_index {
+ Index::NONE => None,
+ Index::BOOLEAN(index) =>
Some(Arc::new(Int64Array::from_iter(
+ index.indexes.iter().map(|x| x.null_count),
+ ))),
+ Index::INT32(index) => Some(Arc::new(Int64Array::from_iter(
+ index.indexes.iter().map(|x| x.null_count),
+ ))),
+ Index::INT64(index) => Some(Arc::new(Int64Array::from_iter(
+ index.indexes.iter().map(|x| x.null_count),
+ ))),
+ Index::FLOAT(index) => Some(Arc::new(Int64Array::from_iter(
+ index.indexes.iter().map(|x| x.null_count),
+ ))),
+ Index::DOUBLE(index) =>
Some(Arc::new(Int64Array::from_iter(
+ index.indexes.iter().map(|x| x.null_count),
+ ))),
+ Index::INT96(_)
+ | Index::BYTE_ARRAY(_)
+ | Index::FIXED_LEN_BYTE_ARRAY(_) => {
+ // Todo support these types
+ None
+ }
+ }
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ }
+}
+
fn prune_row_groups(
groups: &[RowGroupMetaData],
range: Option<FileRange>,
@@ -828,6 +996,97 @@ fn prune_row_groups(
filtered
}
+fn prune_pages_in_one_row_group(
+ group: &RowGroupMetaData,
+ predicate: Option<PruningPredicate>,
+ offset_indexes: Option<&Vec<Vec<PageLocation>>>,
+ page_indexes: Option<&Vec<Index>>,
+ metrics: &ParquetFileMetrics,
+) -> Result<Vec<RowSelector>> {
+ let num_rows = group.num_rows() as usize;
+ if let (Some(predicate), Some(offset_indexes), Some(page_indexes)) =
+ (&predicate, offset_indexes, page_indexes)
+ {
+ let pruning_stats = PagesPruningStatistics {
+ page_indexes,
+ offset_indexes,
+ parquet_schema: predicate.schema().as_ref(),
+ // now we assume only support one col.
+ col_id: *predicate
+ .need_input_columns_ids()
+ .iter()
+ .take(1)
+ .next()
+ .unwrap(),
+ };
+
+ match predicate.prune(&pruning_stats) {
+ Ok(values) => {
+ let mut vec = Vec::with_capacity(values.len());
+ if let Some(cols_offset_indexes) =
+ offset_indexes.get(pruning_stats.col_id)
+ {
+ let row_vec =
+ create_row_count_in_each_page(cols_offset_indexes,
num_rows);
+ assert_eq!(row_vec.len(), values.len());
+ let mut sum_row = *row_vec.first().unwrap();
+ let mut selected = *values.first().unwrap();
+
+ for (i, &f) in values.iter().skip(1).enumerate() {
+ if f == selected {
+ sum_row += *row_vec.get(i).unwrap();
+ } else {
+ let selector = if selected {
+ RowSelector::select(sum_row)
+ } else {
+ RowSelector::skip(sum_row)
+ };
+ vec.push(selector);
+ sum_row = *row_vec.get(i).unwrap();
+ selected = f;
+ }
+ }
+
+ let selector = if selected {
+ RowSelector::select(sum_row)
+ } else {
+ RowSelector::skip(sum_row)
+ };
+ vec.push(selector);
+ return Ok(vec);
+ } else {
+ debug!("Error evaluating page index predicate values
missing page index col_id is{}", pruning_stats.col_id);
+ metrics.predicate_evaluation_errors.add(1);
+ }
+ }
+ // stats filter array could not be built
+ // return a closure which will not filter out any row groups
+ Err(e) => {
+ debug!("Error evaluating page index predicate values {}", e);
+ metrics.predicate_evaluation_errors.add(1);
+ }
+ }
+ }
+ Err(DataFusionError::ParquetError(ParquetError::General(
+ "Got some error in prune_pages_in_one_row_group, plz try open the
debuglog mode"
+ .to_string(),
+ )))
+}
+
+fn create_row_count_in_each_page(
+ location: &Vec<PageLocation>,
+ num_rows: usize,
+) -> Vec<usize> {
+ let mut vec = Vec::with_capacity(location.len());
+ location.windows(2).for_each(|x| {
+ let start = x[0].first_row_index as usize;
+ let end = x[1].first_row_index as usize;
+ vec.push(end - start);
+ });
+ vec.push(num_rows - location.last().unwrap().first_row_index as usize);
+ vec
+}
+
/// Executes a query and writes the results to a partitioned Parquet file.
pub async fn plan_to_parquet(
state: &SessionState,
@@ -2176,4 +2435,68 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn parquet_exec_with_page_index_filter() -> Result<()> {
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+
+ let object_store_url = ObjectStoreUrl::local_filesystem();
+ let store = session_ctx
+ .runtime_env()
+ .object_store(&object_store_url)
+ .unwrap();
+
+ let testdata = crate::test_util::parquet_test_data();
+ let filename = format!("{}/alltypes_tiny_pages.parquet", testdata);
+
+ let meta = local_unpartitioned_file(filename);
+
+ let schema = ParquetFormat::default()
+ .infer_schema(&store, &[meta.clone()])
+ .await
+ .unwrap();
+
+ let partitioned_file = PartitionedFile {
+ object_meta: meta,
+ partition_values: vec![],
+ range: None,
+ extensions: None,
+ };
+
+ // create filter month == 1;
+ let filter = col("month").eq(lit(1_i32));
+ let parquet_exec = ParquetExec::new(
+ FileScanConfig {
+ object_store_url,
+ file_groups: vec![vec![partitioned_file]],
+ file_schema: schema,
+ statistics: Statistics::default(),
+ // file has 10 cols so index 12 should be month
+ projection: None,
+ limit: None,
+ table_partition_cols: vec![],
+ },
+ Some(filter),
+ None,
+ );
+
+ let parquet_exec_page_index = parquet_exec
+ .clone()
+
.with_scan_options(ParquetScanOptions::default().with_page_index(true));
+
+ let mut results = parquet_exec_page_index.execute(0, task_ctx)?;
+
+ let batch = results.next().await.unwrap()?;
+
+ // from the page index should create below RowSelection
+ // vec.push(RowSelector::select(312));
+ // vec.push(RowSelector::skip(3330));
+ // vec.push(RowSelector::select(333));
+ // vec.push(RowSelector::skip(3330));
+ // total 645 row
+
+ assert_eq!(batch.num_rows(), 645);
+ Ok(())
+ }
}