gabotechs commented on code in PR #22657:
URL: https://github.com/apache/datafusion/pull/22657#discussion_r3350656081


##########
datafusion/catalog-listing/src/table.rs:
##########
@@ -690,12 +715,45 @@ impl ListingTable {
     /// Get the list of files for a scan as well as the file level statistics.
     /// The list is grouped to let the execution plan know how the files should
     /// be distributed to different threads / executors.
+    ///
+    /// If [`ListingOptions::output_partitioning`] is set, the returned file
+    /// groups preserve that declared partition count, including empty trailing
+    /// groups when needed, rather than using
+    /// [`ListingOptions::target_partitions`].
     pub async fn list_files_for_scan<'a>(
         &'a self,
         ctx: &'a dyn Session,
         filters: &'a [Expr],
         limit: Option<usize>,
     ) -> datafusion_common::Result<ListFilesResult> {
+        let declared_output_partitioning = 
self.options.output_partitioning.as_ref();
+        let target_partitions = declared_output_partitioning
+            .map(Partitioning::partition_count)
+            .unwrap_or(self.options.target_partitions);
+        self.list_files_for_scan_with_target(
+            ctx,
+            filters,
+            limit,
+            target_partitions,
+            declared_output_partitioning.is_some(),
+        )
+        .await
+    }
+
+    async fn list_files_for_scan_with_target<'a>(

Review Comment:
   Don't love this pattern of taking one already existing method, and adding a 
subtle variant with a `_with_x` suffix. It does not scale well, because upon 
needing more configuration params, people are force to stack `_with_x_and_y` 
variants on top of each other.
   
   An ok-ish pattern slightly better is to add a `_with_options()` variant that 
takes an options object. That way adding further config params does not imply 
neither new function variants or breaking changes to existing ones.



##########
datafusion/catalog-listing/src/options.rs:
##########
@@ -42,8 +43,10 @@ pub struct ListingOptions {
     /// This can add a lot of overhead as it will usually require files
     /// to be opened and at least partially parsed.
     pub collect_stat: bool,
-    /// Group files to avoid that the number of partitions exceeds
-    /// this limit
+    /// Group files to avoid that the number of partitions exceeds this limit.
+    ///
+    /// If [`Self::output_partitioning`] is set, its partition count is used
+    /// instead, even when it exceeds this value.
     pub target_partitions: usize,

Review Comment:
   🤔 I can imagine how this leaves room to some unexpected behaviors. 
   
   If `target_partitions` does not match `output_partitioning`, wouldn't it be 
better to error out?



##########
datafusion/catalog-listing/src/options.rs:
##########
@@ -61,6 +64,17 @@ pub struct ListingOptions {
     ///       multiple equivalent orderings, the outer `Vec` will have a
     ///       single element.
     pub file_sort_order: Vec<Vec<SortExpr>>,
+    /// Optional declared output partitioning for this table.
+    ///
+    /// Expressions are specified against the full table schema. When set,
+    /// [`ListingTable`](crate::ListingTable) creates one scan file group per
+    /// declared output partition instead of using [`Self::target_partitions`].
+    /// Empty file groups are added when needed to preserve that count.
+    ///
+    /// Files are sorted by path before grouping. DataFusion does not validate
+    /// that rows match the declaration, so callers must ensure file group `i`
+    /// contains only rows for declared output partition `i`.
+    pub output_partitioning: Option<Partitioning>,

Review Comment:
   We might me mixing here logical plan concepts with physical plan concepts.
   
   `datafusion_physical_expr::Partitioning` is the physical representation of 
partitioning, while `datafusion_expr::Partitioning` is the equivalent used at 
the logical layer. I think we should be using here 
`datafusion_expr::Partitioning` instead.
   
   Note how for example the expressions in `file_sort_order` are `SortExpr` and 
not `PhysicalSortExpr`.



##########
datafusion/catalog-listing/src/table.rs:
##########
@@ -505,12 +506,31 @@ impl TableProvider for ListingTable {
         // at the same time. This is because the limit should be applied after 
the filters are applied.
         let statistic_file_limit = if filters.is_empty() { limit } else { None 
};
 
+        let declared_output_partitioning = if partition_filters.is_empty() {
+            self.options.output_partitioning.clone()
+        } else {
+            // Partition pruning can remove files before grouping. Without a
+            // stable file-to-declared-partition mapping, regrouping the
+            // remaining files could shift them into the wrong partition index.
+            None
+        };
+        let target_partitions = declared_output_partitioning
+            .as_ref()
+            .map(Partitioning::partition_count)
+            .unwrap_or(self.options.target_partitions);
+

Review Comment:
   This is the place where I imagine we should check the invariant 
`target_partitions == partitioning.partition_count()`.



##########
datafusion/catalog-listing/src/table.rs:
##########
@@ -561,24 +583,27 @@ impl TableProvider for ListingTable {
         };
 
         let file_source = self.create_file_source();
+        let mut scan_config_builder =
+            FileScanConfigBuilder::new(object_store_url, file_source)
+                .with_file_groups(partitioned_file_lists)
+                .with_constraints(self.constraints.clone())
+                .with_statistics(statistics)
+                .with_projection_indices(projection)?
+                .with_limit(limit)
+                .with_output_ordering(output_ordering)
+                .with_output_partitioning(declared_output_partitioning)
+                .with_expr_adapter(self.expr_adapter_factory.clone());
+        if partitioned_by_file_group {
+            scan_config_builder =
+                scan_config_builder.with_partitioned_by_file_group(true);
+        }
+        let scan_config = scan_config_builder.build();

Review Comment:
   Diff is a bit weird here, can't we just do this?
   
   ```rust
           let plan = self
               .options
               .format
               .create_physical_plan(
                   state,
                   FileScanConfigBuilder::new(object_store_url, file_source)
                       .with_file_groups(partitioned_file_lists)
                       .with_constraints(self.constraints.clone())
                       .with_statistics(statistics)
                       .with_projection_indices(projection)?
                       .with_limit(limit)
                       .with_output_ordering(output_ordering)
   +                   .with_output_partitioning(declared_output_partitioning)
                       .with_expr_adapter(self.expr_adapter_factory.clone())
                       
.with_partitioned_by_file_group(partitioned_by_file_group)
                       .build(),
               )
               .await?;
   ```



##########
datafusion/catalog-listing/src/table.rs:
##########
@@ -690,12 +715,45 @@ impl ListingTable {
     /// Get the list of files for a scan as well as the file level statistics.
     /// The list is grouped to let the execution plan know how the files should
     /// be distributed to different threads / executors.
+    ///
+    /// If [`ListingOptions::output_partitioning`] is set, the returned file
+    /// groups preserve that declared partition count, including empty trailing
+    /// groups when needed, rather than using
+    /// [`ListingOptions::target_partitions`].
     pub async fn list_files_for_scan<'a>(
         &'a self,
         ctx: &'a dyn Session,
         filters: &'a [Expr],
         limit: Option<usize>,
     ) -> datafusion_common::Result<ListFilesResult> {
+        let declared_output_partitioning = 
self.options.output_partitioning.as_ref();
+        let target_partitions = declared_output_partitioning
+            .map(Partitioning::partition_count)
+            .unwrap_or(self.options.target_partitions);
+        self.list_files_for_scan_with_target(
+            ctx,
+            filters,
+            limit,
+            target_partitions,
+            declared_output_partitioning.is_some(),
+        )
+        .await
+    }
+
+    async fn list_files_for_scan_with_target<'a>(
+        &'a self,
+        ctx: &'a dyn Session,
+        filters: &'a [Expr],
+        limit: Option<usize>,
+        target_partitions: usize,

Review Comment:
   It's pretty weird from an API standpoint that you can pass 
`target_partitions` while there is a field that already accounts for it 
`self.options.target_partitions`.
   
   It's also a bit weird that this accepts `preserve_partition_count: bool`, 
but in the two places that this is used 
`declared_output_partitioning.is_some()` is passed.
   
   Aren't these two fields inferred from information already present in `self`? 
is there any chance we can just remove `list_files_for_scan_with_target`?



##########
datafusion/catalog-listing/src/table.rs:
##########
@@ -747,27 +805,26 @@ impl ListingTable {
         // hash repartitioning for aggregates and joins on partition columns.
         let threshold = 
ctx.config_options().optimizer.preserve_file_partitions;
 
-        let (file_groups, grouped_by_partition) = if threshold > 0
-            && !self.options.table_partition_cols.is_empty()
-        {
-            let grouped =
-                
file_group.group_by_partition_values(self.options.target_partitions);
+        let (mut file_groups, grouped_by_partition) = if 
preserve_partition_count {
+            (file_group.split_files(target_partitions), false)
+        } else if threshold > 0 && 
!self.options.table_partition_cols.is_empty() {
+            let grouped = 
file_group.group_by_partition_values(target_partitions);
             if grouped.len() >= threshold {
                 (grouped, true)
             } else {
                 let all_files: Vec<_> =
                     grouped.into_iter().flat_map(|g| g.into_inner()).collect();
                 (
-                    
FileGroup::new(all_files).split_files(self.options.target_partitions),
+                    FileGroup::new(all_files).split_files(target_partitions),
                     false,
                 )
             }
         } else {
-            (
-                file_group.split_files(self.options.target_partitions),
-                false,
-            )
+            (file_group.split_files(target_partitions), false)
         };
+        if preserve_partition_count && !file_groups.is_empty() {
+            file_groups.resize_with(target_partitions, || 
FileGroup::new(vec![]));
+        }
 

Review Comment:
   It's a bit weird that `if preserve_partition_count == true` then we are 
willing to increase partitioning by resizing `file_groups` anyway.
   
   Maybe it's just a matter of a better name for the `preserve_partition_count` 
variable?



##########
datafusion/datasource/src/file_scan_config/mod.rs:
##########
@@ -206,6 +206,13 @@ pub struct FileScanConfig {
     /// If the number of file partitions > target_partitions, the file 
partitions will be grouped
     /// in a round-robin fashion such that number of file partitions = 
target_partitions.
     pub partitioned_by_file_group: bool,
+    /// Optional declared output partitioning of this file scan.
+    ///
+    /// Expressions are in terms of the full table schema, before scan
+    /// projection or filtering. If the partition count does not match the
+    /// number of file groups, [`DataSource::output_partitioning`] falls back 
to
+    /// [`Partitioning::UnknownPartitioning`].
+    pub output_partitioning: Option<Partitioning>,

Review Comment:
   Do you see an opportunity for removing `partitioned_by_file_group` now?



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -1286,6 +1288,80 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_list_files_uses_declared_output_partitioning_count() -> 
Result<()> {
+        let files = ["bucket/key-prefix/file0", "bucket/key-prefix/file1"];
+
+        let ctx = SessionContext::new();
+        register_test_store(&ctx, &files.iter().map(|f| (*f, 
10)).collect::<Vec<_>>());
+
+        let opt = ListingOptions::new(Arc::new(JsonFormat::default()))
+            .with_file_extension_opt(Some(""))
+            .with_target_partitions(1)
+            .with_output_partitioning(Some(Partitioning::RoundRobinBatch(4)));
+

Review Comment:
   This does not look very clean from a public API standpoint, the fact that 
users are allowed to pass contradicting information.
   
   One idea that comes to mind is to deprecate `.with_target_partitions()`, and 
have an implementation that does something like:
   
   ```rust
   pub fn with_target_partitions(n: usize) -> Self {
       self.output_partitioning = Some(Partitioning::RoundRobinBatch(n));
       self
   }
   ```



##########
datafusion/datasource/src/file_scan_config/mod.rs:
##########
@@ -206,6 +206,13 @@ pub struct FileScanConfig {
     /// If the number of file partitions > target_partitions, the file 
partitions will be grouped
     /// in a round-robin fashion such that number of file partitions = 
target_partitions.
     pub partitioned_by_file_group: bool,
+    /// Optional declared output partitioning of this file scan.
+    ///
+    /// Expressions are in terms of the full table schema, before scan
+    /// projection or filtering. If the partition count does not match the
+    /// number of file groups, [`DataSource::output_partitioning`] falls back 
to
+    /// [`Partitioning::UnknownPartitioning`].
+    pub output_partitioning: Option<Partitioning>,

Review Comment:
   For example, removing the `partitioned_by_file_group` field, and doing 
something like this? 
   
   ```rust
       pub fn with_partitioned_by_file_group(
           mut self,
           partitioned_by_file_group: bool,
       ) -> Self {
           self.output_partitioning = partitioned_by_file_group
               .then(|| {
                   hash_partitioning_from_partition_fields(
                       self.file_source.table_schema().table_schema(),
                       self.file_source.table_schema().table_partition_cols(),
                       self.file_groups.len(),
                   )
               })
               .flatten();
           self
       }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to