Dandandan commented on code in PR #20481:
URL: https://github.com/apache/datafusion/pull/20481#discussion_r2905823354


##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -180,7 +202,352 @@ impl PreparedAccessPlan {
     }
 }
 
+impl ParquetOpener {
+    fn build_row_group_access_filter(
+        file_name: &str,
+        extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
+        row_group_count: usize,
+        row_group_metadata: &[RowGroupMetaData],
+        file_range: Option<&FileRange>,
+        stats_pruning: Option<RowGroupStatisticsPruningContext<'_>>,
+    ) -> Result<RowGroupAccessPlanFilter> {
+        let mut row_groups = RowGroupAccessPlanFilter::new(create_initial_plan(
+            file_name,
+            extensions,
+            row_group_count,
+        )?);
+
+        if let Some(range) = file_range {
+            row_groups.prune_by_range(row_group_metadata, range);
+        }
+
+        if let Some(stats_pruning) = stats_pruning {
+            row_groups.prune_by_statistics(
+                stats_pruning.physical_file_schema.as_ref(),
+                stats_pruning.parquet_schema,
+                row_group_metadata,
+                stats_pruning.predicate,
+                stats_pruning.file_metrics,
+            );
+        }
+
+        Ok(row_groups)
+    }
+}
+
 impl FileOpener for ParquetOpener {
+    fn is_leaf_morsel(&self, file: &PartitionedFile) -> bool {
+        file.extensions
+            .as_ref()
+            .map(|e| e.is::<ParquetMorsel>())
+            .unwrap_or(false)
+    }
+
+    fn morselize(
+        &self,
+        partitioned_file: PartitionedFile,
+    ) -> BoxFuture<'static, Result<Vec<PartitionedFile>>> {
+        if partitioned_file
+            .extensions
+            .as_ref()
+            .map(|e| e.is::<ParquetMorsel>())
+            .unwrap_or(false)
+        {
+            return Box::pin(ready(Ok(vec![partitioned_file])));
+        }
+
+        let file_metrics = ParquetFileMetrics::new(
+            self.partition_index,
+            partitioned_file.object_meta.location.as_ref(),
+            &self.metrics,
+        );
+        let file_name = partitioned_file.object_meta.location.to_string();
+        let file_range = partitioned_file.range.clone();
+        let extensions = partitioned_file.extensions.clone();
+
+        let metadata_size_hint = partitioned_file
+            .metadata_size_hint
+            .or(self.metadata_size_hint);
+
+        let mut async_file_reader: Box<dyn AsyncFileReader> =
+            match self.parquet_file_reader_factory.create_reader(
+                self.partition_index,
+                partitioned_file.clone(),
+                metadata_size_hint,
+                &self.metrics,
+            ) {
+                Ok(reader) => reader,
+                Err(e) => return Box::pin(ready(Err(e))),
+            };
+
+        let options =
+            
ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip);
+        #[cfg(feature = "parquet_encryption")]
+        let encryption_context = self.get_encryption_context();
+
+        let expr_adapter_factory = Arc::clone(&self.expr_adapter_factory);
+        let table_schema = self.table_schema.clone();
+        let predicate = self.predicate.clone();
+        let metrics = self.metrics.clone();
+        let enable_row_group_stats_pruning = 
self.enable_row_group_stats_pruning;
+        let enable_bloom_filter = self.enable_bloom_filter;
+        let enable_page_index = self.enable_page_index;
+        let limit = self.limit;
+        let preserve_order = self.preserve_order;
+        let parquet_file_reader_factory = 
Arc::clone(&self.parquet_file_reader_factory);
+        let partition_index = self.partition_index;
+
+        Box::pin(async move {
+            #[cfg(feature = "parquet_encryption")]
+            let options = if let Some(fd_val) = encryption_context
+                
.get_file_decryption_properties(&partitioned_file.object_meta.location)
+                .await?
+            {
+                options.with_file_decryption_properties(Arc::clone(&fd_val))
+            } else {
+                options
+            };
+
+            let predicate_creation_errors = MetricBuilder::new(&metrics)
+                .global_counter("num_predicate_creation_errors");
+
+            // Step: try to prune the file using file-level statistics before 
loading
+            // parquet metadata. This avoids the I/O cost of reading metadata 
when
+            // file-level stats (available from the catalog) indicate no rows 
can match.
+            if let Some(pred) = predicate.as_ref() {
+                let logical_file_schema = 
Arc::clone(table_schema.file_schema());
+                if let Some(mut file_pruner) = FilePruner::try_new(
+                    Arc::clone(pred),
+                    &logical_file_schema,
+                    &partitioned_file,
+                    predicate_creation_errors.clone(),
+                ) && file_pruner.should_prune()?
+                {
+                    file_metrics.files_ranges_pruned_statistics.add_pruned(1);
+                    return Ok(vec![]);
+                }
+            }
+
+            let _metadata_timer = file_metrics.metadata_load_time.timer();
+            let mut reader_metadata =
+                ArrowReaderMetadata::load_async(&mut async_file_reader, 
options.clone())
+                    .await?;
+            let num_row_groups = reader_metadata.metadata().num_row_groups();
+
+            // Adapt the physical schema to the file schema for pruning
+            let physical_file_schema = Arc::clone(reader_metadata.schema());
+            let logical_file_schema = table_schema.file_schema();
+            let rewriter = expr_adapter_factory.create(
+                Arc::clone(logical_file_schema),
+                Arc::clone(&physical_file_schema),
+            )?;
+            let simplifier = 
PhysicalExprSimplifier::new(&physical_file_schema);
+
+            // Replace partition column references with their literal values 
before rewriting.
+            // This mirrors what `open()` does. Without this, expressions like 
`val != part`
+            // (where `part` is a partition column) would cause 
`rewriter.rewrite` to fail
+            // since the partition column is not in the logical file schema.
+            let literal_columns: HashMap<String, ScalarValue> = table_schema
+                .table_partition_cols()
+                .iter()
+                .zip(partitioned_file.partition_values.iter())
+                .map(|(field, value)| (field.name().clone(), value.clone()))
+                .collect();
+
+            let adapted_predicate = predicate
+                .as_ref()
+                .map(|p| {
+                    let p = if !literal_columns.is_empty() {
+                        replace_columns_with_literals(Arc::clone(p), 
&literal_columns)?
+                    } else {
+                        Arc::clone(p)
+                    };
+                    simplifier.simplify(rewriter.rewrite(p)?)
+                })
+                .transpose()?;
+
+            let (pruning_predicate, page_pruning_predicate) = 
build_pruning_predicates(
+                adapted_predicate.as_ref(),
+                &physical_file_schema,
+                &predicate_creation_errors,
+            );
+
+            let mut row_groups = Self::build_row_group_access_filter(
+                &file_name,
+                extensions,
+                num_row_groups,
+                reader_metadata.metadata().row_groups(),
+                file_range.as_ref(),
+                pruning_predicate
+                    .as_deref()
+                    .filter(|_| enable_row_group_stats_pruning)
+                    .map(|predicate| RowGroupStatisticsPruningContext {
+                        physical_file_schema: &physical_file_schema,
+                        parquet_schema: reader_metadata.parquet_schema(),
+                        predicate,
+                        file_metrics: &file_metrics,
+                    }),
+            )?;
+
+            // Prune by limit if limit is set and order is not sensitive
+            if let (Some(limit), false) = (limit, preserve_order) {
+                row_groups.prune_by_limit(
+                    limit,
+                    reader_metadata.metadata().row_groups(),
+                    &file_metrics,
+                );
+            }
+
+            // Bloom filter pruning: done once per file here in morselize(), 
so that
+            // open() does not repeat it for each morsel (which would cause 
inflated metrics
+            // and unnecessary work).
+            //
+            // Note: the bloom filter builder takes ownership of 
`async_file_reader`.
+            // Page index loading happens afterward using a fresh reader so 
that we only
+            // pay for the page index I/O on the row groups that survive bloom 
filter pruning.
+            if let Some(predicate) = pruning_predicate.as_deref() {
+                if enable_bloom_filter && !row_groups.is_empty() {
+                    // Clone reader_metadata so it remains available for page
+                    // index loading after this builder is dropped.
+                    let mut builder = 
ParquetRecordBatchStreamBuilder::new_with_metadata(
+                        async_file_reader,
+                        reader_metadata.clone(),
+                    );
+                    row_groups
+                        .prune_by_bloom_filters(
+                            &physical_file_schema,
+                            &mut builder,
+                            predicate,
+                            &file_metrics,
+                        )
+                        .await;
+                } else {
+                    file_metrics
+                        .row_groups_pruned_bloom_filter
+                        .add_matched(row_groups.remaining_row_group_count());
+                }
+            }
+
+            // Load page index after bloom filter pruning so we skip it 
entirely if no
+            // row groups remain. Bloom filter building consumed 
`async_file_reader`, so
+            // we create a fresh reader here — reader creation is cheap (no 
I/O yet).
+            if should_enable_page_index(enable_page_index, 
&page_pruning_predicate)
+                && !row_groups.is_empty()
+            {
+                let mut fresh_reader: Box<dyn AsyncFileReader> =

Review Comment:
   Hmm but this is in `morselize` which only works on a file-level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to