Copilot commented on code in PR #20481:
URL: https://github.com/apache/datafusion/pull/20481#discussion_r2869159876


##########
datafusion/datasource/src/source.rs:
##########
@@ -300,7 +331,56 @@ impl ExecutionPlan for DataSourceExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        let stream = self.data_source.open(partition, Arc::clone(&context))?;
+        let shared_morsel_queue = if let Some(config) =
+            self.data_source.as_any().downcast_ref::<FileScanConfig>()
+        {
+            if config.morsel_driven {
+                let mut state = self.morsel_state.lock().unwrap();
+
+                // Start a new cycle once all expected partition streams for 
the
+                // previous cycle have been opened.
+                //
+                // Limitation: this heuristic assumes every execution opens all
+                // `file_groups.len()` partitions. If a caller opens only a 
subset
+                // (e.g. partition 0 of 2 and then abandons the rest), the 
state
+                // remains stuck and the next execution reuses the stale queue.
+                // In normal DataFusion query execution all partitions are 
opened,
+                // so this is acceptable in practice.

Review Comment:
   The queue lifecycle relies on `streams_opened` reaching `expected_streams`, 
but if a caller executes only a subset of partitions (which is a supported 
pattern in tests and can happen via public `ExecutionPlan::execute` usage), the 
shared queue can be left in a drained/stale state and then incorrectly reused 
on subsequent executions. This breaks re-executability (a second execution may 
produce no rows). Consider tracking active stream lifetimes (increment on open, 
decrement on stream drop) and resetting/creating a new queue when the last 
stream for a cycle is dropped, rather than relying on “all partitions were 
opened” heuristics.



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -180,7 +195,293 @@ impl PreparedAccessPlan {
     }
 }
 
+impl ParquetOpener {
+    fn build_row_group_access_filter(
+        file_name: &str,
+        extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
+        row_group_count: usize,
+        row_group_metadata: &[RowGroupMetaData],
+        file_range: Option<&FileRange>,
+        stats_pruning: Option<RowGroupStatisticsPruningContext<'_>>,
+    ) -> Result<RowGroupAccessPlanFilter> {
+        let mut row_groups = RowGroupAccessPlanFilter::new(create_initial_plan(
+            file_name,
+            extensions,
+            row_group_count,
+        )?);
+
+        if let Some(range) = file_range {
+            row_groups.prune_by_range(row_group_metadata, range);
+        }
+
+        if let Some(stats_pruning) = stats_pruning {
+            row_groups.prune_by_statistics(
+                stats_pruning.physical_file_schema.as_ref(),
+                stats_pruning.parquet_schema,
+                row_group_metadata,
+                stats_pruning.predicate,
+                stats_pruning.file_metrics,
+            );
+        }
+
+        Ok(row_groups)
+    }
+}
+
 impl FileOpener for ParquetOpener {
+    fn is_leaf_morsel(&self, file: &PartitionedFile) -> bool {
+        file.extensions
+            .as_ref()
+            .map(|e| e.is::<ParquetMorsel>())
+            .unwrap_or(false)
+    }
+
+    fn morselize(
+        &self,
+        partitioned_file: PartitionedFile,
+    ) -> BoxFuture<'static, Result<Vec<PartitionedFile>>> {
+        if partitioned_file
+            .extensions
+            .as_ref()
+            .map(|e| e.is::<ParquetMorsel>())
+            .unwrap_or(false)
+        {
+            return Box::pin(ready(Ok(vec![partitioned_file])));
+        }
+
+        let file_metrics = ParquetFileMetrics::new(
+            self.partition_index,
+            partitioned_file.object_meta.location.as_ref(),
+            &self.metrics,
+        );
+        let file_name = partitioned_file.object_meta.location.to_string();
+        let file_range = partitioned_file.range.clone();
+        let extensions = partitioned_file.extensions.clone();
+
+        let metadata_size_hint = partitioned_file
+            .metadata_size_hint
+            .or(self.metadata_size_hint);
+
+        let mut async_file_reader: Box<dyn AsyncFileReader> =
+            match self.parquet_file_reader_factory.create_reader(
+                self.partition_index,
+                partitioned_file.clone(),
+                metadata_size_hint,
+                &self.metrics,
+            ) {
+                Ok(reader) => reader,
+                Err(e) => return Box::pin(ready(Err(e))),
+            };
+
+        let options = ArrowReaderOptions::new().with_page_index(false);
+        #[cfg(feature = "parquet_encryption")]
+        let encryption_context = self.get_encryption_context();
+
+        let expr_adapter_factory = Arc::clone(&self.expr_adapter_factory);
+        let table_schema = self.table_schema.clone();
+        let predicate = self.predicate.clone();
+        let metrics = self.metrics.clone();
+        let enable_row_group_stats_pruning = 
self.enable_row_group_stats_pruning;
+        let enable_bloom_filter = self.enable_bloom_filter;
+        let enable_page_index = self.enable_page_index;
+        let limit = self.limit;
+        let preserve_order = self.preserve_order;
+        let parquet_file_reader_factory = 
Arc::clone(&self.parquet_file_reader_factory);
+        let partition_index = self.partition_index;
+
+        Box::pin(async move {
+            #[cfg(feature = "parquet_encryption")]
+            let options = if let Some(fd_val) = encryption_context
+                
.get_file_decryption_properties(&partitioned_file.object_meta.location)
+                .await?
+            {
+                options.with_file_decryption_properties(Arc::clone(&fd_val))
+            } else {
+                options
+            };
+
+            let predicate_creation_errors = MetricBuilder::new(&metrics)
+                .global_counter("num_predicate_creation_errors");
+
+            // Step: try to prune the file using file-level statistics before 
loading
+            // parquet metadata. This avoids the I/O cost of reading metadata 
when
+            // file-level stats (available from the catalog) indicate no rows 
can match.
+            if let Some(pred) = predicate.as_ref() {
+                let logical_file_schema = 
Arc::clone(table_schema.file_schema());
+                if let Some(mut file_pruner) = FilePruner::try_new(
+                    Arc::clone(pred),
+                    &logical_file_schema,
+                    &partitioned_file,
+                    predicate_creation_errors.clone(),
+                ) && file_pruner.should_prune()?
+                {
+                    file_metrics.files_ranges_pruned_statistics.add_pruned(1);
+                    return Ok(vec![]);
+                }
+            }
+
+            let _metadata_timer = file_metrics.metadata_load_time.timer();
+            let mut reader_metadata =
+                ArrowReaderMetadata::load_async(&mut async_file_reader, 
options.clone())
+                    .await?;
+            let num_row_groups = reader_metadata.metadata().num_row_groups();
+
+            // Adapt the physical schema to the file schema for pruning
+            let physical_file_schema = Arc::clone(reader_metadata.schema());
+            let logical_file_schema = table_schema.file_schema();
+            let rewriter = expr_adapter_factory.create(
+                Arc::clone(logical_file_schema),
+                Arc::clone(&physical_file_schema),
+            )?;
+            let simplifier = 
PhysicalExprSimplifier::new(&physical_file_schema);
+
+            // Replace partition column references with their literal values 
before rewriting.
+            // This mirrors what `open()` does. Without this, expressions like 
`val != part`
+            // (where `part` is a partition column) would cause 
`rewriter.rewrite` to fail
+            // since the partition column is not in the logical file schema.
+            let literal_columns: HashMap<String, ScalarValue> = table_schema
+                .table_partition_cols()
+                .iter()
+                .zip(partitioned_file.partition_values.iter())
+                .map(|(field, value)| (field.name().clone(), value.clone()))
+                .collect();
+
+            let adapted_predicate = predicate
+                .as_ref()
+                .map(|p| {
+                    let p = if !literal_columns.is_empty() {
+                        replace_columns_with_literals(Arc::clone(p), 
&literal_columns)?
+                    } else {
+                        Arc::clone(p)
+                    };
+                    simplifier.simplify(rewriter.rewrite(p)?)
+                })
+                .transpose()?;
+
+            let (pruning_predicate, page_pruning_predicate) = 
build_pruning_predicates(
+                adapted_predicate.as_ref(),
+                &physical_file_schema,
+                &predicate_creation_errors,
+            );
+
+            let mut row_groups = Self::build_row_group_access_filter(
+                &file_name,
+                extensions,
+                num_row_groups,
+                reader_metadata.metadata().row_groups(),
+                file_range.as_ref(),
+                pruning_predicate
+                    .as_deref()
+                    .filter(|_| enable_row_group_stats_pruning)
+                    .map(|predicate| RowGroupStatisticsPruningContext {
+                        physical_file_schema: &physical_file_schema,
+                        parquet_schema: reader_metadata.parquet_schema(),
+                        predicate,
+                        file_metrics: &file_metrics,
+                    }),
+            )?;
+
+            // Prune by limit if limit is set and order is not sensitive
+            if let (Some(limit), false) = (limit, preserve_order) {
+                row_groups.prune_by_limit(
+                    limit,
+                    reader_metadata.metadata().row_groups(),
+                    &file_metrics,
+                );
+            }
+
+            // Bloom filter pruning: done once per file here in morselize(), 
so that
+            // open() does not repeat it for each morsel (which would cause 
inflated metrics
+            // and unnecessary work).
+            //
+            // Note: the bloom filter builder takes ownership of 
`async_file_reader`.
+            // Page index loading happens afterward using a fresh reader so 
that we only
+            // pay for the page index I/O on the row groups that survive bloom 
filter pruning.
+            if let Some(predicate) = pruning_predicate.as_deref() {
+                if enable_bloom_filter && !row_groups.is_empty() {
+                    // Clone reader_metadata so it remains available for page
+                    // index loading after this builder is dropped.
+                    let mut builder = 
ParquetRecordBatchStreamBuilder::new_with_metadata(
+                        async_file_reader,
+                        reader_metadata.clone(),
+                    );
+                    row_groups
+                        .prune_by_bloom_filters(
+                            &physical_file_schema,
+                            &mut builder,
+                            predicate,
+                            &file_metrics,
+                        )
+                        .await;
+                } else {
+                    file_metrics
+                        .row_groups_pruned_bloom_filter
+                        .add_matched(row_groups.remaining_row_group_count());
+                }
+            }
+
+            // Load page index after bloom filter pruning so we skip it 
entirely if no
+            // row groups remain. Bloom filter building consumed 
`async_file_reader`, so
+            // we create a fresh reader here — reader creation is cheap (no 
I/O yet).
+            if should_enable_page_index(enable_page_index, 
&page_pruning_predicate)
+                && !row_groups.is_empty()
+            {
+                let mut fresh_reader: Box<dyn AsyncFileReader> =
+                    parquet_file_reader_factory.create_reader(
+                        partition_index,
+                        partitioned_file.clone(),
+                        metadata_size_hint,
+                        &metrics,
+                    )?;
+                reader_metadata = load_page_index(
+                    reader_metadata,
+                    &mut fresh_reader,
+                    options.with_page_index_policy(PageIndexPolicy::Optional),
+                )
+                .await?;
+            }
+
+            // Extract metadata after potentially loading the page index, so 
the cached
+            // metadata in each morsel includes the page index if it was 
loaded.
+            let metadata = Arc::clone(reader_metadata.metadata());
+
+            let mut access_plan = row_groups.build();
+
+            // Page pruning: done once per file here in morselize(), so that 
open()
+            // does not repeat it for each morsel.
+            if enable_page_index
+                && !access_plan.is_empty()
+                && let Some(p) = page_pruning_predicate
+            {
+                access_plan = p.prune_plan_with_page_index(
+                    access_plan,
+                    &physical_file_schema,
+                    metadata.file_metadata().schema_descr(),
+                    metadata.as_ref(),
+                    &file_metrics,
+                );
+            }
+
+            let mut morsels = Vec::with_capacity(access_plan.len());
+            for i in 0..num_row_groups {
+                if !access_plan.should_scan(i) {
+                    continue;
+                }
+                let mut morsel_access_plan = 
ParquetAccessPlan::new_none(num_row_groups);
+                // Transfer the page-pruned access (Scan or Selection) for 
this row group
+                morsel_access_plan.set(i, access_plan.inner()[i].clone());
+                let morsel = ParquetMorsel {
+                    metadata: Arc::clone(&metadata),
+                    access_plan: morsel_access_plan,
+                };

Review Comment:
   `morselize` creates one `ParquetAccessPlan` per row group using 
`ParquetAccessPlan::new_none(num_row_groups)`, which allocates a 
`Vec<RowGroupAccess>` of length `num_row_groups` for every morsel. For files 
with many row groups, this is quadratic memory/time overhead and could be 
significant. Consider changing `ParquetMorsel` to store just the row-group 
index + its `RowGroupAccess` (and/or share a single plan via `Arc`) so each 
morsel is O(1) additional memory.



##########
datafusion/datasource/src/file_stream.rs:
##########
@@ -102,7 +130,16 @@ impl FileStream {
     ///
     /// Since file opening is mostly IO (and may involve a
     /// bunch of sequential IO), it can be parallelized with decoding.
+    ///
+    /// In morsel-driven mode this prefetches the next already-morselized item
+    /// from the shared queue (leaf morsels only — items that still need
+    /// async morselization are left in the queue for the normal Idle →
+    /// Morselizing path).

Review Comment:
   The doc comment says morsel-driven mode “prefetches the next 
already-morselized item from the shared queue”, but the implementation 
immediately returns `None` when `self.morsel_driven` is true (no prefetch). 
Please update the comment to match the actual behavior, or implement the 
described prefetching behavior if intended.
   ```suggestion
       /// In non–morsel-driven mode this method starts opening the next file
       /// while the current file is still being decoded, effectively 
prefetching
       /// the next file to overlap IO and compute.
       ///
       /// In morsel-driven mode no such prefetching is performed; this method
       /// returns `None` and the shared work queue controls when new morsels 
are
       /// produced.
   ```



##########
datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs:
##########
@@ -227,8 +227,65 @@ impl RunQueryResult {
         format!("{}", pretty_format_batches(&self.result).unwrap())
     }
 
+    /// Extract ORDER BY column names from the query.
+    /// The query format is always:
+    ///   `SELECT * FROM test_table ORDER BY <col> <dir> <nulls>, ... LIMIT 
<n>`
+    fn sort_columns(&self) -> Vec<String> {
+        let order_by_start = self.query.find("ORDER BY").unwrap() + "ORDER 
BY".len();
+        let limit_start = self.query.rfind(" LIMIT").unwrap();
+        self.query[order_by_start..limit_start]
+            .trim()
+            .split(',')
+            .map(|part| part.split_whitespace().next().unwrap().to_string())
+            .collect()
+    }
+
+    /// Project `batches` to only include the named columns.
+    fn project_columns(batches: &[RecordBatch], cols: &[String]) -> 
Vec<RecordBatch> {
+        batches
+            .iter()
+            .map(|b| {
+                let schema = b.schema();
+                let indices: Vec<usize> = cols
+                    .iter()
+                    .filter_map(|c| schema.index_of(c).ok())

Review Comment:
   `project_columns` silently ignores missing ORDER BY columns via 
`filter_map(|c| schema.index_of(c).ok())`. If a regression caused an ORDER BY 
column to be missing from the output schema, this test could still pass because 
it would compare fewer (or zero) key columns. Consider failing fast (e.g., 
require all ORDER BY columns to be present) so the fuzz test can’t mask 
schema-related bugs.
   ```suggestion
                       .map(|c| {
                           schema.index_of(c).unwrap_or_else(|e| {
                               panic!(
                                   "Expected ORDER BY column '{c}' to be 
present in schema {:?}: {e}",
                                   schema
                               )
                           })
                       })
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to