alamb commented on code in PR #5268:
URL: https://github.com/apache/arrow-datafusion/pull/5268#discussion_r1105014942


##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -550,50 +550,63 @@ pub(crate) mod test_util {
     use parquet::file::properties::WriterProperties;
     use tempfile::NamedTempFile;
 
+    /// How many rows per page should be written
+    const ROWS_PER_PAGE: usize = 2;
+
     /// Writes `batches` to a temporary parquet file
     ///
-    /// If multi_page is set to `true`, all batches are written into
-    /// one temporary parquet file and the parquet file is written
+    /// If multi_page is set to `true`, the parquet file(s) are written
     /// with 2 rows per data page (used to test page filtering and
     /// boundaries).
     pub async fn store_parquet(
         batches: Vec<RecordBatch>,
         multi_page: bool,
     ) -> Result<(Vec<ObjectMeta>, Vec<NamedTempFile>)> {
-        if multi_page {
-            // All batches write in to one file, each batch must have same 
schema.
-            let mut output = NamedTempFile::new().expect("creating temp file");
-            let mut builder = WriterProperties::builder();
-            builder = builder.set_data_page_row_count_limit(2);
-            let proper = builder.build();
-            let mut writer =
-                ArrowWriter::try_new(&mut output, batches[0].schema(), 
Some(proper))
-                    .expect("creating writer");
-            for b in batches {
-                writer.write(&b).expect("Writing batch");
-            }
-            writer.close().unwrap();
-            Ok((vec![local_unpartitioned_file(&output)], vec![output]))
-        } else {
-            // Each batch writes to their own file
-            let files: Vec<_> = batches
-                .into_iter()
-                .map(|batch| {
-                    let mut output = NamedTempFile::new().expect("creating 
temp file");
+        // Each batch writes to their own file

Review Comment:
   This change is so I can actually test evolved schemas with page indexes (aka 
write multiple files with different schemas)
   



##########
datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs:
##########
@@ -155,30 +159,25 @@ impl PagePruningPredicate {
 
         let mut row_selections = 
Vec::with_capacity(page_index_predicates.len());
         for predicate in page_index_predicates {
-            // `extract_page_index_push_down_predicates` only return predicate 
with one col.
-            //  when building `PruningPredicate`, some single column filter 
like `abs(i) = 1`
-            //  will be rewrite to `lit(true)`, so may have an empty 
required_columns.
-            let col_id =
-                if let Some(&col_id) = 
predicate.need_input_columns_ids().iter().next() {
-                    col_id
-                } else {
-                    continue;
-                };
+            // find column index by looking in the row group metadata.
+            let col_idx = find_column_index(predicate, &groups[0]);

Review Comment:
   this calls the new per-file column index logic. I considered some more major 
rearranging of this code (like to have it do the column index lookup in the 
pruning stats) but I felt this way was easiest to review and was likely to be 
the most performant as well



##########
datafusion/core/src/physical_optimizer/pruning.rs:
##########
@@ -233,25 +233,18 @@ impl PruningPredicate {
             .unwrap_or_default()
     }
 
-    /// Returns all need column indexes to evaluate this pruning predicate
-    pub(crate) fn need_input_columns_ids(&self) -> HashSet<usize> {
-        let mut set = HashSet::new();
-        self.required_columns.columns.iter().for_each(|x| {
-            match self.schema().column_with_name(x.0.name.as_str()) {
-                None => {}
-                Some(y) => {
-                    set.insert(y.0);
-                }
-            }
-        });
-        set
+    pub(crate) fn required_columns(&self) -> &RequiredStatColumns {

Review Comment:
   This is the core change -- `need_input_columns_ids` returns indexes in terms 
of the overall table schema. If an individual parquet file does not have all 
the columns or has the columns in a different order, these indexes are not 
correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to