This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 011bcf490 Implement parquet page-level skipping with column index, 
using min/ma… (#3780)
011bcf490 is described below

commit 011bcf4901718a8a467352275f4074647d2d8ba2
Author: Yang Jiang <[email protected]>
AuthorDate: Sat Oct 15 18:35:44 2022 +0800

    Implement parquet page-level skipping with column index, using min/ma… 
(#3780)
    
    * Implement parquet page-level skipping with column index, using min/max 
stats
    
    Signed-off-by: yangjiang <[email protected]>
    
    * add test
    
    Signed-off-by: yangjiang <[email protected]>
    
    * support some types
    
    Signed-off-by: yangjiang <[email protected]>
    
    * fix typo
    
    Signed-off-by: yangjiang <[email protected]>
    
    * add comments
    
    Signed-off-by: yangjiang <[email protected]>
    
    Signed-off-by: yangjiang <[email protected]>
---
 datafusion/core/src/physical_optimizer/pruning.rs  |  14 +
 .../core/src/physical_plan/file_format/parquet.rs  | 329 ++++++++++++++++++++-
 2 files changed, 340 insertions(+), 3 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/pruning.rs 
b/datafusion/core/src/physical_optimizer/pruning.rs
index 8abd5f7c4..18ea532ea 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -223,6 +223,20 @@ impl PruningPredicate {
     pub fn predicate_expr(&self) -> &Arc<dyn PhysicalExpr> {
         &self.predicate_expr
     }
+
+    /// Returns all need column indexes to evaluate this pruning predicate
+    pub(crate) fn need_input_columns_ids(&self) -> HashSet<usize> {
+        let mut set = HashSet::new();
+        self.required_columns.columns.iter().for_each(|x| {
+            match self.schema().column_with_name(x.0.name.as_str()) {
+                None => {}
+                Some(y) => {
+                    set.insert(y.0);
+                }
+            }
+        });
+        set
+    }
 }
 
 /// Handles creating references to the min/max statistics
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs 
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 5f72c7acc..a5b146dff 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -44,6 +44,7 @@ use crate::{
     },
     scalar::ScalarValue,
 };
+use arrow::array::{BooleanArray, Float32Array, Float64Array, Int32Array, 
Int64Array};
 use arrow::datatypes::DataType;
 use arrow::{
     array::ArrayRef,
@@ -57,16 +58,18 @@ use futures::future::BoxFuture;
 use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
 use log::debug;
 use object_store::{ObjectMeta, ObjectStore};
-use parquet::arrow::arrow_reader::ArrowReaderOptions;
+use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection, 
RowSelector};
 use parquet::arrow::async_reader::AsyncFileReader;
 use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, 
ProjectionMask};
 use parquet::basic::{ConvertedType, LogicalType};
 use parquet::errors::ParquetError;
+use parquet::file::page_index::index::Index;
 use parquet::file::{
     metadata::{ParquetMetaData, RowGroupMetaData},
     properties::WriterProperties,
     statistics::Statistics as ParquetStatistics,
 };
+use parquet::format::PageLocation;
 use parquet::schema::types::ColumnDescriptor;
 
 #[derive(Debug, Clone, Default)]
@@ -435,9 +438,44 @@ impl FileOpener for ParquetOpener {
                 }
             };
 
-            let groups = builder.metadata().row_groups();
+            let file_metadata = builder.metadata();
+            let groups = file_metadata.row_groups();
             let row_groups =
-                prune_row_groups(groups, file_range, pruning_predicate, 
&metrics);
+                prune_row_groups(groups, file_range, 
pruning_predicate.clone(), &metrics);
+
+            if enable_page_index && 
check_page_index_push_down_valid(&pruning_predicate) {
+                let file_offset_indexes = file_metadata.offset_indexes();
+                let file_page_indexes = file_metadata.page_indexes();
+                if let (Some(file_offset_indexes), Some(file_page_indexes)) =
+                    (file_offset_indexes, file_page_indexes)
+                {
+                    let mut selectors = Vec::with_capacity(row_groups.len());
+                    for r in &row_groups {
+                        selectors.extend(
+                            prune_pages_in_one_row_group(
+                                &groups[*r],
+                                pruning_predicate.clone(),
+                                file_offset_indexes.get(*r),
+                                file_page_indexes.get(*r),
+                                &metrics,
+                            )
+                            .map_err(|e| {
+                                ArrowError::ParquetError(format!(
+                                    "Fail in prune_pages_in_one_row_group: {}",
+                                    e
+                                ))
+                            }),
+                        );
+                    }
+                    debug!(
+                        "Use filter and page index create RowSelection {:?} ",
+                        &selectors
+                    );
+                    builder = builder.with_row_selection(RowSelection::from(
+                        selectors.into_iter().flatten().collect::<Vec<_>>(),
+                    ));
+                }
+            }
 
             let stream = builder
                 .with_projection(mask)
@@ -460,6 +498,20 @@ impl FileOpener for ParquetOpener {
     }
 }
 
+// Check PruningPredicates just work on one column.
+fn check_page_index_push_down_valid(predicate: &Option<PruningPredicate>) -> 
bool {
+    if let Some(predicate) = predicate {
+        // for now we only support pushDown on one col, because each col may 
have different page numbers, its hard to get
+        // `num_containers` from `PruningStatistics`.
+        let cols = predicate.need_input_columns_ids();
+        //Todo more specific rules
+        if cols.len() == 1 {
+            return true;
+        }
+    }
+    false
+}
+
 /// Factory of parquet file readers.
 ///
 /// Provides means to implement custom data access interface.
@@ -617,6 +669,16 @@ struct RowGroupPruningStatistics<'a> {
     parquet_schema: &'a Schema,
 }
 
+/// Wraps page_index statistics in a way
+/// that implements [`PruningStatistics`]
+struct PagesPruningStatistics<'a> {
+    //row_group_metadata: &'a RowGroupMetaData,
+    page_indexes: &'a Vec<Index>,
+    offset_indexes: &'a Vec<Vec<PageLocation>>,
+    parquet_schema: &'a Schema,
+    col_id: usize,
+}
+
 // TODO: consolidate code with arrow-rs
 // Convert the bytes array to i128.
 // The endian of the input bytes array must be big-endian.
@@ -749,6 +811,61 @@ macro_rules! get_null_count_values {
     }};
 }
 
+// Extract the min or max value calling `func` from page idex
+macro_rules! get_min_max_values_for_page_index {
+    ($self:expr, $column:expr, $func:ident) => {{
+        if let Some((col_id_index, _field)) =
+            $self.parquet_schema.column_with_name(&$column.name)
+        {
+            if let Some(page_index) = $self.page_indexes.get(col_id_index) {
+                match page_index {
+                    Index::NONE => None,
+                    Index::INT32(index) => {
+                        let vec = &index.indexes;
+                        Some(Arc::new(Int32Array::from_iter(
+                            vec.iter().map(|x| x.$func().cloned()),
+                        )))
+                    }
+                    Index::INT64(index) => {
+                        let vec = &index.indexes;
+                        Some(Arc::new(Int64Array::from_iter(
+                            vec.iter().map(|x| x.$func().cloned()),
+                        )))
+                    }
+                    Index::FLOAT(index) => {
+                        let vec = &index.indexes;
+                        Some(Arc::new(Float32Array::from_iter(
+                            vec.iter().map(|x| x.$func().cloned()),
+                        )))
+                    }
+                    Index::DOUBLE(index) => {
+                        let vec = &index.indexes;
+                        Some(Arc::new(Float64Array::from_iter(
+                            vec.iter().map(|x| x.$func().cloned()),
+                        )))
+                    }
+                    Index::BOOLEAN(index) => {
+                        let vec = &index.indexes;
+                        Some(Arc::new(BooleanArray::from_iter(
+                            vec.iter().map(|x| x.$func().cloned()),
+                        )))
+                    }
+                    Index::INT96(_)
+                    | Index::BYTE_ARRAY(_)
+                    | Index::FIXED_LEN_BYTE_ARRAY(_) => {
+                        //Todo support these type
+                        None
+                    }
+                }
+            } else {
+                None
+            }
+        } else {
+            None
+        }
+    }};
+}
+
 // Convert parquet column schema to arrow data type, and just consider the
 // decimal data type.
 fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) -> 
Option<DataType> {
@@ -785,6 +902,57 @@ impl<'a> PruningStatistics for 
RowGroupPruningStatistics<'a> {
     }
 }
 
+impl<'a> PruningStatistics for PagesPruningStatistics<'a> {
+    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
+        get_min_max_values_for_page_index!(self, column, min)
+    }
+
+    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
+        get_min_max_values_for_page_index!(self, column, max)
+    }
+
+    fn num_containers(&self) -> usize {
+        self.offset_indexes.get(self.col_id).unwrap().len()
+    }
+
+    fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
+        if let Some((col_id_index, _field)) =
+            self.parquet_schema.column_with_name(&column.name)
+        {
+            if let Some(page_index) = self.page_indexes.get(col_id_index) {
+                match page_index {
+                    Index::NONE => None,
+                    Index::BOOLEAN(index) => 
Some(Arc::new(Int64Array::from_iter(
+                        index.indexes.iter().map(|x| x.null_count),
+                    ))),
+                    Index::INT32(index) => Some(Arc::new(Int64Array::from_iter(
+                        index.indexes.iter().map(|x| x.null_count),
+                    ))),
+                    Index::INT64(index) => Some(Arc::new(Int64Array::from_iter(
+                        index.indexes.iter().map(|x| x.null_count),
+                    ))),
+                    Index::FLOAT(index) => Some(Arc::new(Int64Array::from_iter(
+                        index.indexes.iter().map(|x| x.null_count),
+                    ))),
+                    Index::DOUBLE(index) => 
Some(Arc::new(Int64Array::from_iter(
+                        index.indexes.iter().map(|x| x.null_count),
+                    ))),
+                    Index::INT96(_)
+                    | Index::BYTE_ARRAY(_)
+                    | Index::FIXED_LEN_BYTE_ARRAY(_) => {
+                        // Todo support these types
+                        None
+                    }
+                }
+            } else {
+                None
+            }
+        } else {
+            None
+        }
+    }
+}
+
 fn prune_row_groups(
     groups: &[RowGroupMetaData],
     range: Option<FileRange>,
@@ -828,6 +996,97 @@ fn prune_row_groups(
     filtered
 }
 
+fn prune_pages_in_one_row_group(
+    group: &RowGroupMetaData,
+    predicate: Option<PruningPredicate>,
+    offset_indexes: Option<&Vec<Vec<PageLocation>>>,
+    page_indexes: Option<&Vec<Index>>,
+    metrics: &ParquetFileMetrics,
+) -> Result<Vec<RowSelector>> {
+    let num_rows = group.num_rows() as usize;
+    if let (Some(predicate), Some(offset_indexes), Some(page_indexes)) =
+        (&predicate, offset_indexes, page_indexes)
+    {
+        let pruning_stats = PagesPruningStatistics {
+            page_indexes,
+            offset_indexes,
+            parquet_schema: predicate.schema().as_ref(),
+            // now we assume only support one col.
+            col_id: *predicate
+                .need_input_columns_ids()
+                .iter()
+                .take(1)
+                .next()
+                .unwrap(),
+        };
+
+        match predicate.prune(&pruning_stats) {
+            Ok(values) => {
+                let mut vec = Vec::with_capacity(values.len());
+                if let Some(cols_offset_indexes) =
+                    offset_indexes.get(pruning_stats.col_id)
+                {
+                    let row_vec =
+                        create_row_count_in_each_page(cols_offset_indexes, 
num_rows);
+                    assert_eq!(row_vec.len(), values.len());
+                    let mut sum_row = *row_vec.first().unwrap();
+                    let mut selected = *values.first().unwrap();
+
+                    for (i, &f) in values.iter().skip(1).enumerate() {
+                        if f == selected {
+                            sum_row += *row_vec.get(i).unwrap();
+                        } else {
+                            let selector = if selected {
+                                RowSelector::select(sum_row)
+                            } else {
+                                RowSelector::skip(sum_row)
+                            };
+                            vec.push(selector);
+                            sum_row = *row_vec.get(i).unwrap();
+                            selected = f;
+                        }
+                    }
+
+                    let selector = if selected {
+                        RowSelector::select(sum_row)
+                    } else {
+                        RowSelector::skip(sum_row)
+                    };
+                    vec.push(selector);
+                    return Ok(vec);
+                } else {
+                    debug!("Error evaluating page index predicate values 
missing page index col_id is{}", pruning_stats.col_id);
+                    metrics.predicate_evaluation_errors.add(1);
+                }
+            }
+            // stats filter array could not be built
+            // return a closure which will not filter out any row groups
+            Err(e) => {
+                debug!("Error evaluating page index predicate values {}", e);
+                metrics.predicate_evaluation_errors.add(1);
+            }
+        }
+    }
+    Err(DataFusionError::ParquetError(ParquetError::General(
+        "Got some error in prune_pages_in_one_row_group, plz try open the 
debuglog mode"
+            .to_string(),
+    )))
+}
+
+fn create_row_count_in_each_page(
+    location: &Vec<PageLocation>,
+    num_rows: usize,
+) -> Vec<usize> {
+    let mut vec = Vec::with_capacity(location.len());
+    location.windows(2).for_each(|x| {
+        let start = x[0].first_row_index as usize;
+        let end = x[1].first_row_index as usize;
+        vec.push(end - start);
+    });
+    vec.push(num_rows - location.last().unwrap().first_row_index as usize);
+    vec
+}
+
 /// Executes a query and writes the results to a partitioned Parquet file.
 pub async fn plan_to_parquet(
     state: &SessionState,
@@ -2176,4 +2435,68 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn parquet_exec_with_page_index_filter() -> Result<()> {
+        let session_ctx = SessionContext::new();
+        let task_ctx = session_ctx.task_ctx();
+
+        let object_store_url = ObjectStoreUrl::local_filesystem();
+        let store = session_ctx
+            .runtime_env()
+            .object_store(&object_store_url)
+            .unwrap();
+
+        let testdata = crate::test_util::parquet_test_data();
+        let filename = format!("{}/alltypes_tiny_pages.parquet", testdata);
+
+        let meta = local_unpartitioned_file(filename);
+
+        let schema = ParquetFormat::default()
+            .infer_schema(&store, &[meta.clone()])
+            .await
+            .unwrap();
+
+        let partitioned_file = PartitionedFile {
+            object_meta: meta,
+            partition_values: vec![],
+            range: None,
+            extensions: None,
+        };
+
+        // create filter month == 1;
+        let filter = col("month").eq(lit(1_i32));
+        let parquet_exec = ParquetExec::new(
+            FileScanConfig {
+                object_store_url,
+                file_groups: vec![vec![partitioned_file]],
+                file_schema: schema,
+                statistics: Statistics::default(),
+                // file has 10 cols so index 12 should be month
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+            },
+            Some(filter),
+            None,
+        );
+
+        let parquet_exec_page_index = parquet_exec
+            .clone()
+            
.with_scan_options(ParquetScanOptions::default().with_page_index(true));
+
+        let mut results = parquet_exec_page_index.execute(0, task_ctx)?;
+
+        let batch = results.next().await.unwrap()?;
+
+        //  from the page index should create below RowSelection
+        //  vec.push(RowSelector::select(312));
+        //  vec.push(RowSelector::skip(3330));
+        //  vec.push(RowSelector::select(333));
+        //  vec.push(RowSelector::skip(3330));
+        // total 645 row
+
+        assert_eq!(batch.num_rows(), 645);
+        Ok(())
+    }
 }

Reply via email to