XiangpengHao commented on code in PR #7850: URL: https://github.com/apache/arrow-rs/pull/7850#discussion_r2261283189
########## parquet/src/arrow/async_reader/mod.rs: ########## @@ -656,18 +691,69 @@ where } // fetch the pages needed for decoding row_group - .fetch(&mut self.input, &projection, plan_builder.selection()) + // Final projection fetch shouldn't expand selection for cache; pass None + .fetch( + &mut self.input, + &projection, + plan_builder.selection(), + batch_size, + None, + ) .await?; let plan = plan_builder.build(); - let array_reader = ArrayReaderBuilder::new(&row_group) + let cache_options = cache_options_builder.consumer(); + let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics) + .with_cache_options(Some(&cache_options)) .build_array_reader(self.fields.as_deref(), &projection)?; let reader = ParquetRecordBatchReader::new(array_reader, plan); Ok((self, Some(reader))) } + + /// Compute which columns are used in filters and the final (output) projection + fn compute_cache_projection(&self, projection: &ProjectionMask) -> Option<ProjectionMask> { + let filters = self.filter.as_ref()?; + let mut cache_projection = filters.predicates.first()?.projection().clone(); + for predicate in filters.predicates.iter() { + cache_projection.union(predicate.projection()); + } + cache_projection.intersect(projection); + self.exclude_nested_columns_from_cache(&cache_projection) + } + + /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns) + fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> { Review Comment: New change 1: exclude nested column from cache. Previous behavior: panic. It's not impossible but very hard to support cache nested columns. We don't support it yet. With this change, it will fallback to the old implementation, i.e., decode twice, but at least will not panic. ########## parquet/src/arrow/async_reader/mod.rs: ########## @@ -924,7 +1014,15 @@ impl InMemoryRowGroup<'_> { _ => (), } - ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations)); + // Expand selection to batch boundaries only for cached columns + let use_expanded = cache_mask.map(|m| m.leaf_included(idx)).unwrap_or(false); Review Comment: Change 2: only expand the selection for the caching column, not other columns. This should improve the IO. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org