alamb commented on code in PR #3967:
URL: https://github.com/apache/arrow-datafusion/pull/3967#discussion_r1010681234


##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -497,37 +500,67 @@ impl FileOpener for ParquetOpener {
                 &file_metrics,
             );
 
-            if enable_page_index && 
check_page_index_push_down_valid(&pruning_predicate) {
+            let page_index_predicates = 
extract_page_index_push_down_predicates(
+                &pruning_predicate,
+                builder.schema().clone(),
+            )?;
+
+            if enable_page_index && !page_index_predicates.is_empty() {
                 let file_offset_indexes = file_metadata.offset_indexes();
                 let file_page_indexes = file_metadata.page_indexes();
                 if let (Some(file_offset_indexes), Some(file_page_indexes)) =
                     (file_offset_indexes, file_page_indexes)
                 {
-                    let mut selectors = Vec::with_capacity(row_groups.len());
-                    for r in &row_groups {
-                        selectors.extend(
-                            prune_pages_in_one_row_group(
-                                &groups[*r],
-                                pruning_predicate.clone(),
-                                file_offset_indexes.get(*r),
-                                file_page_indexes.get(*r),
-                                &file_metrics,
-                            )
-                            .map_err(|e| {
-                                ArrowError::ParquetError(format!(
-                                    "Fail in prune_pages_in_one_row_group: {}",
-                                    e
-                                ))
-                            }),
+                    let mut row_selections =
+                        VecDeque::with_capacity(page_index_predicates.len());
+                    for predicate in page_index_predicates {
+                        // `extract_page_index_push_down_predicates` only 
return predicate with one col.
+                        let col_id =
+                            
*predicate.need_input_columns_ids().iter().next().unwrap();
+                        let mut selectors = 
Vec::with_capacity(row_groups.len());
+                        for r in &row_groups {
+                            let rg_offset_indexes = 
file_offset_indexes.get(*r);
+                            let rg_page_indexes = file_page_indexes.get(*r);
+                            if let (Some(rg_page_indexes), 
Some(rg_offset_indexes)) =
+                                (rg_page_indexes, rg_offset_indexes)
+                            {
+                                selectors.extend(
+                                    prune_pages_in_one_row_group(
+                                        &groups[*r],
+                                        &predicate,
+                                        rg_offset_indexes.get(col_id),
+                                        rg_page_indexes.get(col_id),
+                                        &file_metrics,
+                                    )
+                                    .map_err(|e| {
+                                        ArrowError::ParquetError(format!(
+                                            "Fail in 
prune_pages_in_one_row_group: {}",
+                                            e
+                                        ))
+                                    }),
+                                );
+                            } else {
+                                // fallback select all rows

Review Comment:
   👍 



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -552,18 +585,111 @@ impl FileOpener for ParquetOpener {
     }
 }
 
-// Check PruningPredicates just work on one column.
-fn check_page_index_push_down_valid(predicate: &Option<PruningPredicate>) -> 
bool {
-    if let Some(predicate) = predicate {
-        // for now we only support pushDown on one col, because each col may 
have different page numbers, its hard to get
-        // `num_containers` from `PruningStatistics`.
-        let cols = predicate.need_input_columns_ids();
-        //Todo more specific rules
-        if cols.len() == 1 {
-            return true;
+// combine two `RowSelection` return the intersection
+// For example:
+// self:     NNYYYYNNY
+// other:    NYNNNNNNY
+//
+// returned: NNNNNNNNY
+// set `need_combine` true will combine result: Select(2) + Select(1) + 
Skip(2) -> Select(3) + Skip(2)
+pub(crate) fn intersect_row_selection(
+    left: Vec<RowSelector>,
+    right: Vec<RowSelector>,
+    need_combine: bool,
+) -> Vec<RowSelector> {
+    let mut res = vec![];
+    let mut l_iter = left.into_iter().peekable();
+    let mut r_iter = right.into_iter().peekable();
+
+    while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) {
+        if a.row_count == 0 {
+            l_iter.next().unwrap();
+            continue;
+        }
+        if b.row_count == 0 {
+            r_iter.next().unwrap();
+            continue;
+        }
+        match (a.skip, b.skip) {
+            (false, false) => {
+                if a.row_count < b.row_count {
+                    res.push(RowSelector::select(a.row_count));
+                    b.row_count -= a.row_count;
+                    l_iter.next().unwrap();
+                } else {
+                    res.push(RowSelector::select(b.row_count));
+                    a.row_count -= b.row_count;
+                    r_iter.next().unwrap();
+                }
+            }
+            _ => {
+                if a.row_count < b.row_count {
+                    res.push(RowSelector::skip(a.row_count));
+                    b.row_count -= a.row_count;
+                    l_iter.next().unwrap();
+                } else {
+                    res.push(RowSelector::skip(b.row_count));
+                    a.row_count -= b.row_count;
+                    r_iter.next().unwrap();
+                }
+            }
         }
     }
-    false
+    if l_iter.peek().is_some() {
+        res.extend(l_iter);
+    }
+    if r_iter.peek().is_some() {
+        res.extend(r_iter);
+    }
+    // combine the adjacent same operators and last zero row count

Review Comment:
   ```suggestion
       // combine the adjacent same operators and last zero row count
       // TODO: remove when https://github.com/apache/arrow-rs/pull/2994 is 
released
   ```



##########
datafusion/core/tests/parquet_page_index_pruning.rs:
##########
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::config::ConfigOptions;
+use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::file_format::FileFormat;
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::SessionContext;
+use datafusion_common::Statistics;
+use datafusion_expr::{col, lit, Expr};
+use object_store::path::Path;
+use object_store::ObjectMeta;
+use tokio_stream::StreamExt;
+
+async fn get_parquet_exec(filter: Expr, session_ctx: SessionContext) -> 
ParquetExec {
+    let object_store_url = ObjectStoreUrl::local_filesystem();
+    let store = session_ctx
+        .runtime_env()
+        .object_store(&object_store_url)
+        .unwrap();
+
+    let testdata = datafusion::test_util::parquet_test_data();
+    let filename = format!("{}/alltypes_tiny_pages.parquet", testdata);
+
+    let location = Path::from_filesystem_path(filename.as_str()).unwrap();
+    let metadata = std::fs::metadata(filename).expect("Local file metadata");
+    let meta = ObjectMeta {
+        location,
+        last_modified: 
metadata.modified().map(chrono::DateTime::from).unwrap(),
+        size: metadata.len() as usize,
+    };
+
+    let schema = ParquetFormat::default()
+        .infer_schema(&store, &[meta.clone()])
+        .await
+        .unwrap();
+
+    let partitioned_file = PartitionedFile {
+        object_meta: meta,
+        partition_values: vec![],
+        range: None,
+        extensions: None,
+    };
+
+    let parquet_exec = ParquetExec::new(
+        FileScanConfig {
+            object_store_url,
+            file_groups: vec![vec![partitioned_file]],
+            file_schema: schema,
+            statistics: Statistics::default(),
+            // file has 10 cols so index 12 should be month
+            projection: None,
+            limit: None,
+            table_partition_cols: vec![],
+            config_options: ConfigOptions::new().into_shareable(),
+        },
+        Some(filter),
+        None,
+    );
+    parquet_exec.with_enable_page_index(true)
+}
+
+#[tokio::test]
+async fn page_index_filter_one_col() {
+    let session_ctx = SessionContext::new();
+    let task_ctx = session_ctx.task_ctx();
+
+    // 1.create filter month == 1;
+    let filter = col("month").eq(lit(1_i32));
+
+    let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+    let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+    let batch = results.next().await.unwrap().unwrap();
+
+    // `month = 1` from the page index should create below RowSelection
+    //  vec.push(RowSelector::select(312));
+    //  vec.push(RowSelector::skip(3330));
+    //  vec.push(RowSelector::select(333));
+    //  vec.push(RowSelector::skip(3330));
+    // total 645 row
+    assert_eq!(batch.num_rows(), 645);
+
+    // 2. create filter month == 1 or month == 2;
+    let filter = col("month").eq(lit(1_i32)).or(col("month").eq(lit(2_i32)));
+
+    let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+    let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+    let batch = results.next().await.unwrap().unwrap();
+
+    // `month = 12` from the page index should create below RowSelection
+    //  vec.push(RowSelector::skip(894));
+    //  vec.push(RowSelector::select(339));
+    //  vec.push(RowSelector::skip(3330));
+    //  vec.push(RowSelector::select(312));
+    //  vec.push(RowSelector::skip(2430));
+    //  combine with before filter total 1275 row
+    assert_eq!(batch.num_rows(), 1275);
+
+    // 3. create filter month == 1 and month == 12;
+    let filter = col("month")
+        .eq(lit(1_i32))
+        .and(col("month").eq(lit(12_i32)));
+
+    let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+    let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+    let batch = results.next().await;
+
+    assert!(batch.is_none());
+
+    // 4.create filter 0 < month < 2 ;
+    let filter = col("month").gt(lit(0_i32)).and(col("month").lt(lit(2_i32)));
+
+    let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+    let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+    let batch = results.next().await.unwrap().unwrap();
+
+    // should same with `month = 1`
+    assert_eq!(batch.num_rows(), 645);
+}
+
+#[tokio::test]
+async fn page_index_filter_multi_col() {
+    let session_ctx = SessionContext::new();
+    let task_ctx = session_ctx.task_ctx();
+
+    // create filter month == 1 and year = 2009;
+    let filter = col("month").eq(lit(1_i32)).and(col("year").eq(lit(2009)));
+
+    let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+    let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+    let batch = results.next().await.unwrap().unwrap();
+
+    //  `year = 2009` from the page index should create below RowSelection
+    //  vec.push(RowSelector::select(3663));
+    //  vec.push(RowSelector::skip(3642));
+    //  combine with `month = 1` total 333 row
+    assert_eq!(batch.num_rows(), 333);
+
+    // create filter (year = 2009 or id = 1) and month = 1;
+    // this should only use `month = 1` to evaluate the page index.
+    let filter = col("month")
+        .eq(lit(1_i32))
+        .and(col("year").eq(lit(2009)).or(col("id").eq(lit(1))));
+
+    let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+    let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+    let batch = results.next().await.unwrap().unwrap();
+    assert_eq!(batch.num_rows(), 645);
+
+    // create filter (year = 2009 or id = 1)
+    // this filter use two columns will not push down
+    let filter = col("year").eq(lit(2009)).or(col("id").eq(lit(1)));
+
+    let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+    let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+    let batch = results.next().await.unwrap().unwrap();
+    assert_eq!(batch.num_rows(), 7300);
+
+    // create filter (year = 2009 and id = 1) or (year = 2010)
+    // this filter use two columns will not push down
+    // todo but after use CNF rewrite it could rewrite to (year = 2009 or  
year = 2010) and (id = 1 or year = 2010)

Review Comment:
   Not sure about the CNF rewrite comment here



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -552,18 +585,111 @@ impl FileOpener for ParquetOpener {
     }
 }
 
-// Check PruningPredicates just work on one column.
-fn check_page_index_push_down_valid(predicate: &Option<PruningPredicate>) -> 
bool {
-    if let Some(predicate) = predicate {
-        // for now we only support pushDown on one col, because each col may 
have different page numbers, its hard to get
-        // `num_containers` from `PruningStatistics`.
-        let cols = predicate.need_input_columns_ids();
-        //Todo more specific rules
-        if cols.len() == 1 {
-            return true;
+// combine two `RowSelection` return the intersection
+// For example:
+// self:     NNYYYYNNY
+// other:    NYNNNNNNY
+//
+// returned: NNNNNNNNY
+// set `need_combine` true will combine result: Select(2) + Select(1) + 
Skip(2) -> Select(3) + Skip(2)
+pub(crate) fn intersect_row_selection(
+    left: Vec<RowSelector>,
+    right: Vec<RowSelector>,
+    need_combine: bool,
+) -> Vec<RowSelector> {
+    let mut res = vec![];
+    let mut l_iter = left.into_iter().peekable();
+    let mut r_iter = right.into_iter().peekable();
+
+    while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) {
+        if a.row_count == 0 {
+            l_iter.next().unwrap();
+            continue;
+        }
+        if b.row_count == 0 {
+            r_iter.next().unwrap();
+            continue;
+        }
+        match (a.skip, b.skip) {
+            (false, false) => {

Review Comment:
   ```suggestion
               // Keep both ranges
               (false, false) => {
   ```



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -497,37 +500,67 @@ impl FileOpener for ParquetOpener {
                 &file_metrics,
             );
 
-            if enable_page_index && 
check_page_index_push_down_valid(&pruning_predicate) {
+            let page_index_predicates = 
extract_page_index_push_down_predicates(
+                &pruning_predicate,
+                builder.schema().clone(),
+            )?;
+
+            if enable_page_index && !page_index_predicates.is_empty() {
                 let file_offset_indexes = file_metadata.offset_indexes();
                 let file_page_indexes = file_metadata.page_indexes();
                 if let (Some(file_offset_indexes), Some(file_page_indexes)) =
                     (file_offset_indexes, file_page_indexes)
                 {
-                    let mut selectors = Vec::with_capacity(row_groups.len());
-                    for r in &row_groups {
-                        selectors.extend(
-                            prune_pages_in_one_row_group(
-                                &groups[*r],
-                                pruning_predicate.clone(),
-                                file_offset_indexes.get(*r),
-                                file_page_indexes.get(*r),
-                                &file_metrics,
-                            )
-                            .map_err(|e| {
-                                ArrowError::ParquetError(format!(
-                                    "Fail in prune_pages_in_one_row_group: {}",
-                                    e
-                                ))
-                            }),
+                    let mut row_selections =
+                        VecDeque::with_capacity(page_index_predicates.len());
+                    for predicate in page_index_predicates {
+                        // `extract_page_index_push_down_predicates` only 
return predicate with one col.
+                        let col_id =
+                            
*predicate.need_input_columns_ids().iter().next().unwrap();
+                        let mut selectors = 
Vec::with_capacity(row_groups.len());
+                        for r in &row_groups {
+                            let rg_offset_indexes = 
file_offset_indexes.get(*r);
+                            let rg_page_indexes = file_page_indexes.get(*r);
+                            if let (Some(rg_page_indexes), 
Some(rg_offset_indexes)) =
+                                (rg_page_indexes, rg_offset_indexes)
+                            {
+                                selectors.extend(
+                                    prune_pages_in_one_row_group(
+                                        &groups[*r],
+                                        &predicate,
+                                        rg_offset_indexes.get(col_id),
+                                        rg_page_indexes.get(col_id),
+                                        &file_metrics,
+                                    )
+                                    .map_err(|e| {
+                                        ArrowError::ParquetError(format!(
+                                            "Fail in 
prune_pages_in_one_row_group: {}",
+                                            e
+                                        ))
+                                    }),
+                                );
+                            } else {
+                                // fallback select all rows
+                                let all_selected = vec![RowSelector::select(
+                                    groups[*r].num_rows() as usize,
+                                )];
+                                selectors.push(all_selected);
+                            }
+                        }
+                        debug!(
+                            "Use filter and page index create RowSelection 
{:?} from predicate:{:?}",
+                            &selectors, predicate
+                        );
+                        row_selections.push_back(
+                            
selectors.into_iter().flatten().collect::<Vec<_>>(),
                         );
                     }
-                    debug!(
-                        "Use filter and page index create RowSelection {:?} ",
-                        &selectors
-                    );
-                    builder = builder.with_row_selection(RowSelection::from(
-                        selectors.into_iter().flatten().collect::<Vec<_>>(),
-                    ));
+                    let acc = row_selections.pop_front().unwrap_or_default();
+                    let final_selection = row_selections
+                        .into_iter()
+                        .fold(acc, |acc, x| intersect_row_selection(acc, x, 
true));
+                    builder =
+                        
builder.with_row_selection(RowSelection::from(final_selection));

Review Comment:
   You can write this like the following (though I am not sure it really any 
more readable)
   
   ```suggestion
                       let final_selection = row_selections
                           .into_iter()
                           .reduce(|acc, x| intersect_row_selection(acc, x, 
true))
                           .unwrap();
                       builder =
                           builder.with_row_selection(final_selection.into());
   ```



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -552,18 +585,111 @@ impl FileOpener for ParquetOpener {
     }
 }
 
-// Check PruningPredicates just work on one column.
-fn check_page_index_push_down_valid(predicate: &Option<PruningPredicate>) -> 
bool {
-    if let Some(predicate) = predicate {
-        // for now we only support pushDown on one col, because each col may 
have different page numbers, its hard to get
-        // `num_containers` from `PruningStatistics`.
-        let cols = predicate.need_input_columns_ids();
-        //Todo more specific rules
-        if cols.len() == 1 {
-            return true;
+// combine two `RowSelection` return the intersection
+// For example:
+// self:     NNYYYYNNY
+// other:    NYNNNNNNY
+//
+// returned: NNNNNNNNY
+// set `need_combine` true will combine result: Select(2) + Select(1) + 
Skip(2) -> Select(3) + Skip(2)
+pub(crate) fn intersect_row_selection(
+    left: Vec<RowSelector>,
+    right: Vec<RowSelector>,
+    need_combine: bool,
+) -> Vec<RowSelector> {
+    let mut res = vec![];
+    let mut l_iter = left.into_iter().peekable();
+    let mut r_iter = right.into_iter().peekable();
+
+    while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) {
+        if a.row_count == 0 {
+            l_iter.next().unwrap();
+            continue;
+        }
+        if b.row_count == 0 {
+            r_iter.next().unwrap();
+            continue;
+        }
+        match (a.skip, b.skip) {
+            (false, false) => {
+                if a.row_count < b.row_count {
+                    res.push(RowSelector::select(a.row_count));
+                    b.row_count -= a.row_count;
+                    l_iter.next().unwrap();
+                } else {
+                    res.push(RowSelector::select(b.row_count));
+                    a.row_count -= b.row_count;
+                    r_iter.next().unwrap();
+                }
+            }
+            _ => {
+                if a.row_count < b.row_count {

Review Comment:
   Shouldn't this be taking which one of `a` or `b` was skipped (rather than 
just the one that was smaller)? I tried messing around and trying to write some 
test that failed, but was not able to



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -552,18 +585,111 @@ impl FileOpener for ParquetOpener {
     }
 }
 
-// Check PruningPredicates just work on one column.
-fn check_page_index_push_down_valid(predicate: &Option<PruningPredicate>) -> 
bool {
-    if let Some(predicate) = predicate {
-        // for now we only support pushDown on one col, because each col may 
have different page numbers, its hard to get
-        // `num_containers` from `PruningStatistics`.
-        let cols = predicate.need_input_columns_ids();
-        //Todo more specific rules
-        if cols.len() == 1 {
-            return true;
+// combine two `RowSelection` return the intersection
+// For example:
+// self:     NNYYYYNNY
+// other:    NYNNNNNNY
+//
+// returned: NNNNNNNNY
+// set `need_combine` true will combine result: Select(2) + Select(1) + 
Skip(2) -> Select(3) + Skip(2)
+pub(crate) fn intersect_row_selection(
+    left: Vec<RowSelector>,
+    right: Vec<RowSelector>,
+    need_combine: bool,

Review Comment:
   Why is there a  `need_combine` parameter? It seems like the code always 
needs to combine the `RowSelector`s so it seems like there is no need for this 
parameter



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -497,37 +500,67 @@ impl FileOpener for ParquetOpener {
                 &file_metrics,
             );
 
-            if enable_page_index && 
check_page_index_push_down_valid(&pruning_predicate) {
+            let page_index_predicates = 
extract_page_index_push_down_predicates(
+                &pruning_predicate,
+                builder.schema().clone(),
+            )?;
+
+            if enable_page_index && !page_index_predicates.is_empty() {
                 let file_offset_indexes = file_metadata.offset_indexes();
                 let file_page_indexes = file_metadata.page_indexes();
                 if let (Some(file_offset_indexes), Some(file_page_indexes)) =
                     (file_offset_indexes, file_page_indexes)
                 {
-                    let mut selectors = Vec::with_capacity(row_groups.len());
-                    for r in &row_groups {
-                        selectors.extend(
-                            prune_pages_in_one_row_group(
-                                &groups[*r],
-                                pruning_predicate.clone(),
-                                file_offset_indexes.get(*r),
-                                file_page_indexes.get(*r),
-                                &file_metrics,
-                            )
-                            .map_err(|e| {
-                                ArrowError::ParquetError(format!(
-                                    "Fail in prune_pages_in_one_row_group: {}",
-                                    e
-                                ))
-                            }),
+                    let mut row_selections =
+                        VecDeque::with_capacity(page_index_predicates.len());
+                    for predicate in page_index_predicates {
+                        // `extract_page_index_push_down_predicates` only 
return predicate with one col.
+                        let col_id =
+                            
*predicate.need_input_columns_ids().iter().next().unwrap();
+                        let mut selectors = 
Vec::with_capacity(row_groups.len());
+                        for r in &row_groups {
+                            let rg_offset_indexes = 
file_offset_indexes.get(*r);
+                            let rg_page_indexes = file_page_indexes.get(*r);
+                            if let (Some(rg_page_indexes), 
Some(rg_offset_indexes)) =
+                                (rg_page_indexes, rg_offset_indexes)
+                            {
+                                selectors.extend(
+                                    prune_pages_in_one_row_group(
+                                        &groups[*r],
+                                        &predicate,
+                                        rg_offset_indexes.get(col_id),
+                                        rg_page_indexes.get(col_id),
+                                        &file_metrics,
+                                    )
+                                    .map_err(|e| {
+                                        ArrowError::ParquetError(format!(
+                                            "Fail in 
prune_pages_in_one_row_group: {}",
+                                            e
+                                        ))
+                                    }),
+                                );
+                            } else {
+                                // fallback select all rows
+                                let all_selected = vec![RowSelector::select(
+                                    groups[*r].num_rows() as usize,
+                                )];
+                                selectors.push(all_selected);
+                            }
+                        }
+                        debug!(
+                            "Use filter and page index create RowSelection 
{:?} from predicate:{:?}",
+                            &selectors, predicate
+                        );
+                        row_selections.push_back(
+                            
selectors.into_iter().flatten().collect::<Vec<_>>(),
                         );
                     }
-                    debug!(
-                        "Use filter and page index create RowSelection {:?} ",
-                        &selectors
-                    );
-                    builder = builder.with_row_selection(RowSelection::from(
-                        selectors.into_iter().flatten().collect::<Vec<_>>(),
-                    ));
+                    let acc = row_selections.pop_front().unwrap_or_default();

Review Comment:
   I am somewhat biased, but I think it would help to explain what is happening 
with page filtering in comments  -- perhaps we could factor this code into a 
function somehow, and then add the picture from 
https://github.com/apache/arrow-datafusion/pull/3780#discussion_r994625892 as 
part of the doc comments?



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -552,18 +585,111 @@ impl FileOpener for ParquetOpener {
     }
 }
 
-// Check PruningPredicates just work on one column.
-fn check_page_index_push_down_valid(predicate: &Option<PruningPredicate>) -> 
bool {
-    if let Some(predicate) = predicate {
-        // for now we only support pushDown on one col, because each col may 
have different page numbers, its hard to get
-        // `num_containers` from `PruningStatistics`.
-        let cols = predicate.need_input_columns_ids();
-        //Todo more specific rules
-        if cols.len() == 1 {
-            return true;
+// combine two `RowSelection` return the intersection
+// For example:
+// self:     NNYYYYNNY
+// other:    NYNNNNNNY
+//
+// returned: NNNNNNNNY
+// set `need_combine` true will combine result: Select(2) + Select(1) + 
Skip(2) -> Select(3) + Skip(2)
+pub(crate) fn intersect_row_selection(
+    left: Vec<RowSelector>,
+    right: Vec<RowSelector>,
+    need_combine: bool,
+) -> Vec<RowSelector> {
+    let mut res = vec![];
+    let mut l_iter = left.into_iter().peekable();
+    let mut r_iter = right.into_iter().peekable();
+
+    while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) {
+        if a.row_count == 0 {
+            l_iter.next().unwrap();
+            continue;
+        }
+        if b.row_count == 0 {
+            r_iter.next().unwrap();
+            continue;
+        }
+        match (a.skip, b.skip) {
+            (false, false) => {
+                if a.row_count < b.row_count {
+                    res.push(RowSelector::select(a.row_count));
+                    b.row_count -= a.row_count;
+                    l_iter.next().unwrap();
+                } else {
+                    res.push(RowSelector::select(b.row_count));
+                    a.row_count -= b.row_count;
+                    r_iter.next().unwrap();
+                }
+            }
+            _ => {
+                if a.row_count < b.row_count {
+                    res.push(RowSelector::skip(a.row_count));
+                    b.row_count -= a.row_count;
+                    l_iter.next().unwrap();
+                } else {
+                    res.push(RowSelector::skip(b.row_count));
+                    a.row_count -= b.row_count;
+                    r_iter.next().unwrap();
+                }
+            }
         }
     }
-    false
+    if l_iter.peek().is_some() {
+        res.extend(l_iter);
+    }
+    if r_iter.peek().is_some() {
+        res.extend(r_iter);
+    }
+    // combine the adjacent same operators and last zero row count
+    if need_combine {
+        let mut pre = res[0];
+        let mut after_combine = vec![];
+        for selector in res.iter_mut().skip(1) {
+            if selector.skip == pre.skip {
+                pre.row_count += selector.row_count;
+            } else {
+                after_combine.push(pre);
+                pre = *selector;
+            }
+        }
+        if pre.row_count != 0 {
+            after_combine.push(pre);
+        }
+        after_combine
+    } else {
+        res
+    }
+}
+
+// Extract one col pruningPredicate from input predicate for evaluating page 
Index.

Review Comment:
   ```suggestion
   // Extract single col pruningPredicate from input predicate for evaluating 
page Index.
   ```



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -552,18 +585,111 @@ impl FileOpener for ParquetOpener {
     }
 }
 
-// Check PruningPredicates just work on one column.
-fn check_page_index_push_down_valid(predicate: &Option<PruningPredicate>) -> 
bool {
-    if let Some(predicate) = predicate {
-        // for now we only support pushDown on one col, because each col may 
have different page numbers, its hard to get
-        // `num_containers` from `PruningStatistics`.
-        let cols = predicate.need_input_columns_ids();
-        //Todo more specific rules
-        if cols.len() == 1 {
-            return true;
+// combine two `RowSelection` return the intersection
+// For example:
+// self:     NNYYYYNNY
+// other:    NYNNNNNNY
+//
+// returned: NNNNNNNNY
+// set `need_combine` true will combine result: Select(2) + Select(1) + 
Skip(2) -> Select(3) + Skip(2)
+pub(crate) fn intersect_row_selection(
+    left: Vec<RowSelector>,
+    right: Vec<RowSelector>,
+    need_combine: bool,
+) -> Vec<RowSelector> {
+    let mut res = vec![];
+    let mut l_iter = left.into_iter().peekable();
+    let mut r_iter = right.into_iter().peekable();
+
+    while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) {
+        if a.row_count == 0 {
+            l_iter.next().unwrap();
+            continue;
+        }
+        if b.row_count == 0 {
+            r_iter.next().unwrap();
+            continue;
+        }
+        match (a.skip, b.skip) {
+            (false, false) => {
+                if a.row_count < b.row_count {
+                    res.push(RowSelector::select(a.row_count));
+                    b.row_count -= a.row_count;
+                    l_iter.next().unwrap();
+                } else {
+                    res.push(RowSelector::select(b.row_count));
+                    a.row_count -= b.row_count;
+                    r_iter.next().unwrap();
+                }
+            }
+            _ => {

Review Comment:
   ```suggestion
              // skip at least one
               _ => {
   ```



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -497,37 +500,67 @@ impl FileOpener for ParquetOpener {
                 &file_metrics,
             );
 
-            if enable_page_index && 
check_page_index_push_down_valid(&pruning_predicate) {
+            let page_index_predicates = 
extract_page_index_push_down_predicates(

Review Comment:
   It would be nice to avoid trying to create page index predicates if 
`enable_page_index == false` (the feature is not enabled), but I don't think 
that is critical 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to