stuhood commented on code in PR #22657:
URL: https://github.com/apache/datafusion/pull/22657#discussion_r3353430105
##########
datafusion/datasource/src/file_scan_config/mod.rs:
##########
@@ -702,54 +774,60 @@ impl DataSource for FileScanConfig {
/// Returns the output partitioning for this file scan.
///
- /// When `partitioned_by_file_group` is true, this returns
`Partitioning::Hash` on
- /// the Hive partition columns, allowing the optimizer to skip hash
repartitioning
- /// for aggregates and joins on those columns.
+ /// When output partitioning is declared, this returns it after remapping
+ /// through the scan projection. Otherwise, when
`partitioned_by_file_group`
+ /// is true, this returns `Partitioning::Hash` on the Hive partition
+ /// columns, allowing the optimizer to skip repartitioning for compatible
+ /// aggregates and joins.
///
/// Tradeoffs
- /// - Benefit: Eliminates `RepartitionExec` and `SortExec` for queries with
- /// `GROUP BY` or `ORDER BY` on partition columns.
- /// - Cost: Files are grouped by partition values rather than split by byte
- /// ranges, which may reduce I/O parallelism when partition sizes are
uneven.
- /// For simple aggregations without `ORDER BY`, this cost may outweigh
the benefit.
+ /// - Benefit: Eliminates `RepartitionExec` for compatible queries.
+ /// - Cost: File groups must remain intact, so byte-range file splitting
+ /// and sibling work stealing are disabled.
///
/// Follow-up Work
- /// - Idea: Could allow byte-range splitting within partition-aware groups,
+ /// - Idea: Could allow byte-range splitting within each output partition,
/// preserving I/O parallelism while maintaining partition semantics.
fn output_partitioning(&self) -> Partitioning {
- if self.partitioned_by_file_group {
- let partition_cols = self.table_partition_cols();
- if !partition_cols.is_empty() {
- let projected_schema = match self.projected_schema() {
- Ok(schema) => schema,
- Err(_) => {
- debug!(
- "Could not get projected schema, falling back to
UnknownPartitioning."
- );
- return
Partitioning::UnknownPartitioning(self.file_groups.len());
- }
- };
-
- // Build Column expressions for partition columns based on
their
- // position in the projected schema
- let mut exprs: Vec<Arc<dyn PhysicalExpr>> = Vec::new();
- for partition_col in partition_cols {
- if let Some((idx, _)) = projected_schema
- .fields()
- .iter()
- .enumerate()
- .find(|(_, f)| f.name() == partition_col.name())
- {
- exprs.push(Arc::new(Column::new(partition_col.name(),
idx)));
- }
- }
+ let Some(output_partitioning) =
self.output_partitioning.clone().or_else(|| {
+ self.partitioned_by_file_group.then(|| {
+ hash_partitioning_from_partition_fields(
+ self.file_source.table_schema().table_schema(),
+ self.table_partition_cols(),
+ self.file_groups.len(),
+ )
+ })?
+ }) else {
+ return Partitioning::UnknownPartitioning(self.file_groups.len());
+ };
+ if output_partitioning.partition_count() != self.file_groups.len() {
+ debug!(
+ "Declared output partitioning has {} partitions, but file scan
has {} file groups. Falling back to UnknownPartitioning.",
+ output_partitioning.partition_count(),
+ self.file_groups.len()
+ );
+ return Partitioning::UnknownPartitioning(self.file_groups.len());
+ }
Review Comment:
In which case would this happen? The case where partition filters has
eliminated some files...? It feels like it should be a louder error.
##########
datafusion/catalog-listing/src/table.rs:
##########
@@ -690,12 +715,45 @@ impl ListingTable {
/// Get the list of files for a scan as well as the file level statistics.
/// The list is grouped to let the execution plan know how the files should
/// be distributed to different threads / executors.
+ ///
+ /// If [`ListingOptions::output_partitioning`] is set, the returned file
+ /// groups preserve that declared partition count, including empty trailing
+ /// groups when needed, rather than using
+ /// [`ListingOptions::target_partitions`].
Review Comment:
This feels like it should be documented on the `target_partitions` and
`output_partitioning` setters/options rather than here? e.g. that only one may
be set.
##########
datafusion/catalog-listing/src/table.rs:
##########
@@ -505,12 +506,31 @@ impl TableProvider for ListingTable {
// at the same time. This is because the limit should be applied after
the filters are applied.
let statistic_file_limit = if filters.is_empty() { limit } else { None
};
+ let declared_output_partitioning = if partition_filters.is_empty() {
+ self.options.output_partitioning.clone()
+ } else {
+ // Partition pruning can remove files before grouping. Without a
+ // stable file-to-declared-partition mapping, regrouping the
+ // remaining files could shift them into the wrong partition index.
+ None
Review Comment:
Should/can the partition filters be calculated _after_ having assigned files
to partitions? You'd essentially bucket the files into partitions during
`with_output_partitioning`, I think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]