XiangpengHao commented on code in PR #7850:
URL: https://github.com/apache/arrow-rs/pull/7850#discussion_r2261283189


##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -656,18 +691,69 @@ where
         }
         // fetch the pages needed for decoding
         row_group
-            .fetch(&mut self.input, &projection, plan_builder.selection())
+            // Final projection fetch shouldn't expand selection for cache; 
pass None
+            .fetch(
+                &mut self.input,
+                &projection,
+                plan_builder.selection(),
+                batch_size,
+                None,
+            )
             .await?;
 
         let plan = plan_builder.build();
 
-        let array_reader = ArrayReaderBuilder::new(&row_group)
+        let cache_options = cache_options_builder.consumer();
+        let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
+            .with_cache_options(Some(&cache_options))
             .build_array_reader(self.fields.as_deref(), &projection)?;
 
         let reader = ParquetRecordBatchReader::new(array_reader, plan);
 
         Ok((self, Some(reader)))
     }
+
+    /// Compute which columns are used in filters and the final (output) 
projection
+    fn compute_cache_projection(&self, projection: &ProjectionMask) -> 
Option<ProjectionMask> {
+        let filters = self.filter.as_ref()?;
+        let mut cache_projection = 
filters.predicates.first()?.projection().clone();
+        for predicate in filters.predicates.iter() {
+            cache_projection.union(predicate.projection());
+        }
+        cache_projection.intersect(projection);
+        self.exclude_nested_columns_from_cache(&cache_projection)
+    }
+
+    /// Exclude leaves belonging to roots that span multiple parquet leaves 
(i.e. nested columns)
+    fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> 
Option<ProjectionMask> {

Review Comment:
   New change 1: exclude nested column from cache.
   
   Previous behavior: panic.
   
   It's not impossible but very hard to support cache nested columns. We don't 
support it yet.
   
   With this change, it will fallback to the old implementation, i.e., decode 
twice, but at least will not panic.



##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -924,7 +1014,15 @@ impl InMemoryRowGroup<'_> {
                         _ => (),
                     }
 
-                    
ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
+                    // Expand selection to batch boundaries only for cached 
columns
+                    let use_expanded = cache_mask.map(|m| 
m.leaf_included(idx)).unwrap_or(false);

Review Comment:
   Change 2: only expand the selection for the caching column, not other 
columns. This should improve the IO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to