Copilot commented on code in PR #20481:
URL: https://github.com/apache/datafusion/pull/20481#discussion_r2869159876
##########
datafusion/datasource/src/source.rs:
##########
@@ -300,7 +331,56 @@ impl ExecutionPlan for DataSourceExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- let stream = self.data_source.open(partition, Arc::clone(&context))?;
+ let shared_morsel_queue = if let Some(config) =
+ self.data_source.as_any().downcast_ref::<FileScanConfig>()
+ {
+ if config.morsel_driven {
+ let mut state = self.morsel_state.lock().unwrap();
+
+ // Start a new cycle once all expected partition streams for
the
+ // previous cycle have been opened.
+ //
+ // Limitation: this heuristic assumes every execution opens all
+ // `file_groups.len()` partitions. If a caller opens only a
subset
+ // (e.g. partition 0 of 2 and then abandons the rest), the
state
+ // remains stuck and the next execution reuses the stale queue.
+ // In normal DataFusion query execution all partitions are
opened,
+ // so this is acceptable in practice.
Review Comment:
The queue lifecycle relies on `streams_opened` reaching `expected_streams`,
but if a caller executes only a subset of partitions (which is a supported
pattern in tests and can happen via public `ExecutionPlan::execute` usage), the
shared queue can be left in a drained/stale state and then incorrectly reused
on subsequent executions. This breaks re-executability (a second execution may
produce no rows). Consider tracking active stream lifetimes (increment on open,
decrement on stream drop) and resetting/creating a new queue when the last
stream for a cycle is dropped, rather than relying on “all partitions were
opened” heuristics.
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -180,7 +195,293 @@ impl PreparedAccessPlan {
}
}
+impl ParquetOpener {
+ fn build_row_group_access_filter(
+ file_name: &str,
+ extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
+ row_group_count: usize,
+ row_group_metadata: &[RowGroupMetaData],
+ file_range: Option<&FileRange>,
+ stats_pruning: Option<RowGroupStatisticsPruningContext<'_>>,
+ ) -> Result<RowGroupAccessPlanFilter> {
+ let mut row_groups = RowGroupAccessPlanFilter::new(create_initial_plan(
+ file_name,
+ extensions,
+ row_group_count,
+ )?);
+
+ if let Some(range) = file_range {
+ row_groups.prune_by_range(row_group_metadata, range);
+ }
+
+ if let Some(stats_pruning) = stats_pruning {
+ row_groups.prune_by_statistics(
+ stats_pruning.physical_file_schema.as_ref(),
+ stats_pruning.parquet_schema,
+ row_group_metadata,
+ stats_pruning.predicate,
+ stats_pruning.file_metrics,
+ );
+ }
+
+ Ok(row_groups)
+ }
+}
+
impl FileOpener for ParquetOpener {
+ fn is_leaf_morsel(&self, file: &PartitionedFile) -> bool {
+ file.extensions
+ .as_ref()
+ .map(|e| e.is::<ParquetMorsel>())
+ .unwrap_or(false)
+ }
+
+ fn morselize(
+ &self,
+ partitioned_file: PartitionedFile,
+ ) -> BoxFuture<'static, Result<Vec<PartitionedFile>>> {
+ if partitioned_file
+ .extensions
+ .as_ref()
+ .map(|e| e.is::<ParquetMorsel>())
+ .unwrap_or(false)
+ {
+ return Box::pin(ready(Ok(vec![partitioned_file])));
+ }
+
+ let file_metrics = ParquetFileMetrics::new(
+ self.partition_index,
+ partitioned_file.object_meta.location.as_ref(),
+ &self.metrics,
+ );
+ let file_name = partitioned_file.object_meta.location.to_string();
+ let file_range = partitioned_file.range.clone();
+ let extensions = partitioned_file.extensions.clone();
+
+ let metadata_size_hint = partitioned_file
+ .metadata_size_hint
+ .or(self.metadata_size_hint);
+
+ let mut async_file_reader: Box<dyn AsyncFileReader> =
+ match self.parquet_file_reader_factory.create_reader(
+ self.partition_index,
+ partitioned_file.clone(),
+ metadata_size_hint,
+ &self.metrics,
+ ) {
+ Ok(reader) => reader,
+ Err(e) => return Box::pin(ready(Err(e))),
+ };
+
+ let options = ArrowReaderOptions::new().with_page_index(false);
+ #[cfg(feature = "parquet_encryption")]
+ let encryption_context = self.get_encryption_context();
+
+ let expr_adapter_factory = Arc::clone(&self.expr_adapter_factory);
+ let table_schema = self.table_schema.clone();
+ let predicate = self.predicate.clone();
+ let metrics = self.metrics.clone();
+ let enable_row_group_stats_pruning =
self.enable_row_group_stats_pruning;
+ let enable_bloom_filter = self.enable_bloom_filter;
+ let enable_page_index = self.enable_page_index;
+ let limit = self.limit;
+ let preserve_order = self.preserve_order;
+ let parquet_file_reader_factory =
Arc::clone(&self.parquet_file_reader_factory);
+ let partition_index = self.partition_index;
+
+ Box::pin(async move {
+ #[cfg(feature = "parquet_encryption")]
+ let options = if let Some(fd_val) = encryption_context
+
.get_file_decryption_properties(&partitioned_file.object_meta.location)
+ .await?
+ {
+ options.with_file_decryption_properties(Arc::clone(&fd_val))
+ } else {
+ options
+ };
+
+ let predicate_creation_errors = MetricBuilder::new(&metrics)
+ .global_counter("num_predicate_creation_errors");
+
+ // Step: try to prune the file using file-level statistics before
loading
+ // parquet metadata. This avoids the I/O cost of reading metadata
when
+ // file-level stats (available from the catalog) indicate no rows
can match.
+ if let Some(pred) = predicate.as_ref() {
+ let logical_file_schema =
Arc::clone(table_schema.file_schema());
+ if let Some(mut file_pruner) = FilePruner::try_new(
+ Arc::clone(pred),
+ &logical_file_schema,
+ &partitioned_file,
+ predicate_creation_errors.clone(),
+ ) && file_pruner.should_prune()?
+ {
+ file_metrics.files_ranges_pruned_statistics.add_pruned(1);
+ return Ok(vec![]);
+ }
+ }
+
+ let _metadata_timer = file_metrics.metadata_load_time.timer();
+ let mut reader_metadata =
+ ArrowReaderMetadata::load_async(&mut async_file_reader,
options.clone())
+ .await?;
+ let num_row_groups = reader_metadata.metadata().num_row_groups();
+
+ // Adapt the physical schema to the file schema for pruning
+ let physical_file_schema = Arc::clone(reader_metadata.schema());
+ let logical_file_schema = table_schema.file_schema();
+ let rewriter = expr_adapter_factory.create(
+ Arc::clone(logical_file_schema),
+ Arc::clone(&physical_file_schema),
+ )?;
+ let simplifier =
PhysicalExprSimplifier::new(&physical_file_schema);
+
+ // Replace partition column references with their literal values
before rewriting.
+ // This mirrors what `open()` does. Without this, expressions like
`val != part`
+ // (where `part` is a partition column) would cause
`rewriter.rewrite` to fail
+ // since the partition column is not in the logical file schema.
+ let literal_columns: HashMap<String, ScalarValue> = table_schema
+ .table_partition_cols()
+ .iter()
+ .zip(partitioned_file.partition_values.iter())
+ .map(|(field, value)| (field.name().clone(), value.clone()))
+ .collect();
+
+ let adapted_predicate = predicate
+ .as_ref()
+ .map(|p| {
+ let p = if !literal_columns.is_empty() {
+ replace_columns_with_literals(Arc::clone(p),
&literal_columns)?
+ } else {
+ Arc::clone(p)
+ };
+ simplifier.simplify(rewriter.rewrite(p)?)
+ })
+ .transpose()?;
+
+ let (pruning_predicate, page_pruning_predicate) =
build_pruning_predicates(
+ adapted_predicate.as_ref(),
+ &physical_file_schema,
+ &predicate_creation_errors,
+ );
+
+ let mut row_groups = Self::build_row_group_access_filter(
+ &file_name,
+ extensions,
+ num_row_groups,
+ reader_metadata.metadata().row_groups(),
+ file_range.as_ref(),
+ pruning_predicate
+ .as_deref()
+ .filter(|_| enable_row_group_stats_pruning)
+ .map(|predicate| RowGroupStatisticsPruningContext {
+ physical_file_schema: &physical_file_schema,
+ parquet_schema: reader_metadata.parquet_schema(),
+ predicate,
+ file_metrics: &file_metrics,
+ }),
+ )?;
+
+ // Prune by limit if limit is set and order is not sensitive
+ if let (Some(limit), false) = (limit, preserve_order) {
+ row_groups.prune_by_limit(
+ limit,
+ reader_metadata.metadata().row_groups(),
+ &file_metrics,
+ );
+ }
+
+ // Bloom filter pruning: done once per file here in morselize(),
so that
+ // open() does not repeat it for each morsel (which would cause
inflated metrics
+ // and unnecessary work).
+ //
+ // Note: the bloom filter builder takes ownership of
`async_file_reader`.
+ // Page index loading happens afterward using a fresh reader so
that we only
+ // pay for the page index I/O on the row groups that survive bloom
filter pruning.
+ if let Some(predicate) = pruning_predicate.as_deref() {
+ if enable_bloom_filter && !row_groups.is_empty() {
+ // Clone reader_metadata so it remains available for page
+ // index loading after this builder is dropped.
+ let mut builder =
ParquetRecordBatchStreamBuilder::new_with_metadata(
+ async_file_reader,
+ reader_metadata.clone(),
+ );
+ row_groups
+ .prune_by_bloom_filters(
+ &physical_file_schema,
+ &mut builder,
+ predicate,
+ &file_metrics,
+ )
+ .await;
+ } else {
+ file_metrics
+ .row_groups_pruned_bloom_filter
+ .add_matched(row_groups.remaining_row_group_count());
+ }
+ }
+
+ // Load page index after bloom filter pruning so we skip it
entirely if no
+ // row groups remain. Bloom filter building consumed
`async_file_reader`, so
+ // we create a fresh reader here — reader creation is cheap (no
I/O yet).
+ if should_enable_page_index(enable_page_index,
&page_pruning_predicate)
+ && !row_groups.is_empty()
+ {
+ let mut fresh_reader: Box<dyn AsyncFileReader> =
+ parquet_file_reader_factory.create_reader(
+ partition_index,
+ partitioned_file.clone(),
+ metadata_size_hint,
+ &metrics,
+ )?;
+ reader_metadata = load_page_index(
+ reader_metadata,
+ &mut fresh_reader,
+ options.with_page_index_policy(PageIndexPolicy::Optional),
+ )
+ .await?;
+ }
+
+ // Extract metadata after potentially loading the page index, so
the cached
+ // metadata in each morsel includes the page index if it was
loaded.
+ let metadata = Arc::clone(reader_metadata.metadata());
+
+ let mut access_plan = row_groups.build();
+
+ // Page pruning: done once per file here in morselize(), so that
open()
+ // does not repeat it for each morsel.
+ if enable_page_index
+ && !access_plan.is_empty()
+ && let Some(p) = page_pruning_predicate
+ {
+ access_plan = p.prune_plan_with_page_index(
+ access_plan,
+ &physical_file_schema,
+ metadata.file_metadata().schema_descr(),
+ metadata.as_ref(),
+ &file_metrics,
+ );
+ }
+
+ let mut morsels = Vec::with_capacity(access_plan.len());
+ for i in 0..num_row_groups {
+ if !access_plan.should_scan(i) {
+ continue;
+ }
+ let mut morsel_access_plan =
ParquetAccessPlan::new_none(num_row_groups);
+ // Transfer the page-pruned access (Scan or Selection) for
this row group
+ morsel_access_plan.set(i, access_plan.inner()[i].clone());
+ let morsel = ParquetMorsel {
+ metadata: Arc::clone(&metadata),
+ access_plan: morsel_access_plan,
+ };
Review Comment:
`morselize` creates one `ParquetAccessPlan` per row group using
`ParquetAccessPlan::new_none(num_row_groups)`, which allocates a
`Vec<RowGroupAccess>` of length `num_row_groups` for every morsel. For files
with many row groups, this is quadratic memory/time overhead and could be
significant. Consider changing `ParquetMorsel` to store just the row-group
index + its `RowGroupAccess` (and/or share a single plan via `Arc`) so each
morsel is O(1) additional memory.
##########
datafusion/datasource/src/file_stream.rs:
##########
@@ -102,7 +130,16 @@ impl FileStream {
///
/// Since file opening is mostly IO (and may involve a
/// bunch of sequential IO), it can be parallelized with decoding.
+ ///
+ /// In morsel-driven mode this prefetches the next already-morselized item
+ /// from the shared queue (leaf morsels only — items that still need
+ /// async morselization are left in the queue for the normal Idle →
+ /// Morselizing path).
Review Comment:
The doc comment says morsel-driven mode “prefetches the next
already-morselized item from the shared queue”, but the implementation
immediately returns `None` when `self.morsel_driven` is true (no prefetch).
Please update the comment to match the actual behavior, or implement the
described prefetching behavior if intended.
```suggestion
/// In non–morsel-driven mode this method starts opening the next file
/// while the current file is still being decoded, effectively
prefetching
/// the next file to overlap IO and compute.
///
/// In morsel-driven mode no such prefetching is performed; this method
/// returns `None` and the shared work queue controls when new morsels
are
/// produced.
```
##########
datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs:
##########
@@ -227,8 +227,65 @@ impl RunQueryResult {
format!("{}", pretty_format_batches(&self.result).unwrap())
}
+ /// Extract ORDER BY column names from the query.
+ /// The query format is always:
+ /// `SELECT * FROM test_table ORDER BY <col> <dir> <nulls>, ... LIMIT
<n>`
+ fn sort_columns(&self) -> Vec<String> {
+ let order_by_start = self.query.find("ORDER BY").unwrap() + "ORDER
BY".len();
+ let limit_start = self.query.rfind(" LIMIT").unwrap();
+ self.query[order_by_start..limit_start]
+ .trim()
+ .split(',')
+ .map(|part| part.split_whitespace().next().unwrap().to_string())
+ .collect()
+ }
+
+ /// Project `batches` to only include the named columns.
+ fn project_columns(batches: &[RecordBatch], cols: &[String]) ->
Vec<RecordBatch> {
+ batches
+ .iter()
+ .map(|b| {
+ let schema = b.schema();
+ let indices: Vec<usize> = cols
+ .iter()
+ .filter_map(|c| schema.index_of(c).ok())
Review Comment:
`project_columns` silently ignores missing ORDER BY columns via
`filter_map(|c| schema.index_of(c).ok())`. If a regression caused an ORDER BY
column to be missing from the output schema, this test could still pass because
it would compare fewer (or zero) key columns. Consider failing fast (e.g.,
require all ORDER BY columns to be present) so the fuzz test can’t mask
schema-related bugs.
```suggestion
.map(|c| {
schema.index_of(c).unwrap_or_else(|e| {
panic!(
"Expected ORDER BY column '{c}' to be
present in schema {:?}: {e}",
schema
)
})
})
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]