stuhood commented on code in PR #22657:
URL: https://github.com/apache/datafusion/pull/22657#discussion_r3353430105
##########
datafusion/datasource/src/file_scan_config/mod.rs:
##########
@@ -702,54 +774,60 @@ impl DataSource for FileScanConfig {
/// Returns the output partitioning for this file scan.
///
- /// When `partitioned_by_file_group` is true, this returns
`Partitioning::Hash` on
- /// the Hive partition columns, allowing the optimizer to skip hash
repartitioning
- /// for aggregates and joins on those columns.
+ /// When output partitioning is declared, this returns it after remapping
+ /// through the scan projection. Otherwise, when
`partitioned_by_file_group`
+ /// is true, this returns `Partitioning::Hash` on the Hive partition
+ /// columns, allowing the optimizer to skip repartitioning for compatible
+ /// aggregates and joins.
///
/// Tradeoffs
- /// - Benefit: Eliminates `RepartitionExec` and `SortExec` for queries with
- /// `GROUP BY` or `ORDER BY` on partition columns.
- /// - Cost: Files are grouped by partition values rather than split by byte
- /// ranges, which may reduce I/O parallelism when partition sizes are
uneven.
- /// For simple aggregations without `ORDER BY`, this cost may outweigh
the benefit.
+ /// - Benefit: Eliminates `RepartitionExec` for compatible queries.
+ /// - Cost: File groups must remain intact, so byte-range file splitting
+ /// and sibling work stealing are disabled.
///
/// Follow-up Work
- /// - Idea: Could allow byte-range splitting within partition-aware groups,
+ /// - Idea: Could allow byte-range splitting within each output partition,
/// preserving I/O parallelism while maintaining partition semantics.
fn output_partitioning(&self) -> Partitioning {
- if self.partitioned_by_file_group {
- let partition_cols = self.table_partition_cols();
- if !partition_cols.is_empty() {
- let projected_schema = match self.projected_schema() {
- Ok(schema) => schema,
- Err(_) => {
- debug!(
- "Could not get projected schema, falling back to
UnknownPartitioning."
- );
- return
Partitioning::UnknownPartitioning(self.file_groups.len());
- }
- };
-
- // Build Column expressions for partition columns based on
their
- // position in the projected schema
- let mut exprs: Vec<Arc<dyn PhysicalExpr>> = Vec::new();
- for partition_col in partition_cols {
- if let Some((idx, _)) = projected_schema
- .fields()
- .iter()
- .enumerate()
- .find(|(_, f)| f.name() == partition_col.name())
- {
- exprs.push(Arc::new(Column::new(partition_col.name(),
idx)));
- }
- }
+ let Some(output_partitioning) =
self.output_partitioning.clone().or_else(|| {
+ self.partitioned_by_file_group.then(|| {
+ hash_partitioning_from_partition_fields(
+ self.file_source.table_schema().table_schema(),
+ self.table_partition_cols(),
+ self.file_groups.len(),
+ )
+ })?
+ }) else {
+ return Partitioning::UnknownPartitioning(self.file_groups.len());
+ };
+ if output_partitioning.partition_count() != self.file_groups.len() {
+ debug!(
+ "Declared output partitioning has {} partitions, but file scan
has {} file groups. Falling back to UnknownPartitioning.",
+ output_partitioning.partition_count(),
+ self.file_groups.len()
+ );
+ return Partitioning::UnknownPartitioning(self.file_groups.len());
+ }
Review Comment:
In which case would this happen? The case where partition filters have
eliminated some files...? It feels like it should be a louder error.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]