alamb commented on code in PR #22657:
URL: https://github.com/apache/datafusion/pull/22657#discussion_r3454788965


##########
datafusion/catalog-listing/src/options.rs:
##########
@@ -53,6 +53,20 @@ pub struct ListingOptions {
     ///       multiple equivalent orderings, the outer `Vec` will have a
     ///       single element.
     pub file_sort_order: Vec<Vec<SortExpr>>,
+    /// Declared output partitioning for scans from this table.
+    ///
+    /// Expressions are logical expressions over the full table schema. When 
set,
+    /// [`ListingTable`](crate::ListingTable) creates one file group per
+    /// declared output partition, preserving empty groups. When unset, file
+    /// grouping uses the scan-time
+    /// 
[`SessionConfig::target_partitions`](datafusion_execution::config::SessionConfig::target_partitions).
+    /// Declarations are limited to partitioning that can be represented by
+    /// assigning whole files to file groups.
+    ///
+    /// Files are assigned to groups in path order. DataFusion does not 
validate

Review Comment:
   I think an example or two would help make this documentation clearer. 
   
   e.g. an example partitioning and 3 files -- how are the files assigned to 
partititions



##########
datafusion/catalog-listing/src/table.rs:
##########
@@ -761,27 +942,46 @@ impl ListingTable {
         // hash repartitioning for aggregates and joins on partition columns.
         let threshold = 
ctx.config_options().optimizer.preserve_file_partitions;
 
-        let (file_groups, grouped_by_partition) =
-            if threshold > 0 && !self.options.table_partition_cols.is_empty() {
-                let grouped = file_group
-                    
.group_by_partition_values(ctx.config().target_partitions());
-                if grouped.len() >= threshold {
-                    (grouped, true)
-                } else {
-                    let all_files: Vec<_> =
-                        grouped.into_iter().flat_map(|g| 
g.into_inner()).collect();
-                    (
-                        FileGroup::new(all_files)
-                            .split_files(ctx.config().target_partitions()),
-                        false,
-                    )
-                }
+        let (mut file_groups, grouped_by_partition) = if 
has_declared_partitioning {

Review Comment:
   is there some way to refactor this function so it doesn't have a bunch of 
`if has_declared_partitioning` checks? it make it hard to understand what is 
going on. Maybe we can have two versions of the function or something like that?



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -1289,6 +1294,246 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_list_files_uses_declared_output_partitioning_count() -> 
Result<()> {
+        let files = ["bucket/key-prefix/file0", "bucket/key-prefix/file1"];
+
+        let ctx = SessionContext::new_with_config(
+            SessionConfig::new().with_target_partitions(1),
+        );
+        register_test_store(&ctx, &files.iter().map(|f| (*f, 
10)).collect::<Vec<_>>());
+
+        let opt = ListingOptions::new(Arc::new(JsonFormat::default()))
+            .with_file_extension_opt(Some(""))
+            .with_output_partitioning(Some(LogicalPartitioning::Range(
+                LogicalRangePartitioning::try_new(
+                    vec![col("a").sort(true, true)],
+                    vec![
+                        SplitPoint::new(vec![ScalarValue::Int32(Some(10))]),
+                        SplitPoint::new(vec![ScalarValue::Int32(Some(20))]),
+                        SplitPoint::new(vec![ScalarValue::Int32(Some(30))]),

Review Comment:
   We can probably make this more concise via
   ```suggestion
                           SplitPoint::new(vec![ScalarValue::from(10i32)]),
                           SplitPoint::new(vec![ScalarValue::from(20i32))]),
                           SplitPoint::new(vec![ScalarValue::from(30i32))]),
   ```



##########
datafusion/catalog-listing/src/options.rs:
##########
@@ -61,6 +64,17 @@ pub struct ListingOptions {
     ///       multiple equivalent orderings, the outer `Vec` will have a
     ///       single element.
     pub file_sort_order: Vec<Vec<SortExpr>>,
+    /// Optional declared output partitioning for this table.
+    ///
+    /// Expressions are specified against the full table schema. When set,
+    /// [`ListingTable`](crate::ListingTable) creates one scan file group per
+    /// declared output partition instead of using [`Self::target_partitions`].
+    /// Empty file groups are added when needed to preserve that count.
+    ///
+    /// Files are sorted by path before grouping. DataFusion does not validate
+    /// that rows match the declaration, so callers must ensure file group `i`
+    /// contains only rows for declared output partition `i`.
+    pub output_partitioning: Option<Partitioning>,

Review Comment:
   I am confused -- LIstingTableOptions gets passed when creating a listing 
table -- is the idea that this is needed for SQL integration (as the lower 
level FileScanConfig` isn't accesable via SQL)?



##########
datafusion/datasource/src/file_scan_config/mod.rs:
##########
@@ -700,56 +773,55 @@ impl DataSource for FileScanConfig {
         Ok(source.map(|s| Arc::new(s) as _))
     }
 
-    /// Returns the output partitioning for this file scan.
+    /// Returns declared or derived output partitioning for this file scan.
     ///
-    /// When `partitioned_by_file_group` is true, this returns 
`Partitioning::Hash` on
-    /// the Hive partition columns, allowing the optimizer to skip hash 
repartitioning
-    /// for aggregates and joins on those columns.
+    /// Declared partitioning is projected through the scan projection. If it
+    /// cannot be projected, or its partition count differs from `file_groups`,
+    /// this returns `UnknownPartitioning`.
     ///
-    /// Tradeoffs

Review Comment:
   these tradeoffs still seem relevant -- why remove it?



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -1289,6 +1294,246 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_list_files_uses_declared_output_partitioning_count() -> 
Result<()> {

Review Comment:
   there seems to be a lot of repetition in these tests which makes it hard to 
see what they are testing compared to the setup boiler plate -- can we use a 
fixture to reduce the repetition?



##########
datafusion/catalog-listing/src/table.rs:
##########
@@ -448,6 +457,116 @@ fn derive_common_ordering_from_files(file_groups: 
&[FileGroup]) -> Option<LexOrd
     }
 }
 
+fn create_physical_output_partitioning(

Review Comment:
   these functions seem like they are generic -- could they be made as a method 
on `LogicalPartitioning` to make them more discoverable?
   
   Or perhaps datafusion/physical-expr/src/physical_expr.rs, something like
   ```rust
     pub fn create_physical_partitioning(
         partitioning: &datafusion_expr::Partitioning,
         input_dfschema: &DFSchema,
         execution_props: &ExecutionProps,
     ) -> Result<Partitioning>
   ``` 
   
   As they are similar "logical to physical" conversion functions



##########
datafusion/datasource/src/file_scan_config/mod.rs:
##########
@@ -206,6 +206,13 @@ pub struct FileScanConfig {
     /// If the number of file partitions > target_partitions, the file 
partitions will be grouped
     /// in a round-robin fashion such that number of file partitions = 
target_partitions.
     pub partitioned_by_file_group: bool,
+    /// Optional declared output partitioning of this file scan.
+    ///
+    /// Expressions are in terms of the full table schema, before scan
+    /// projection or filtering. If the partition count does not match the
+    /// number of file groups, [`DataSource::output_partitioning`] falls back 
to
+    /// [`Partitioning::UnknownPartitioning`].
+    pub output_partitioning: Option<Partitioning>,

Review Comment:
   Can we please file a ticket to track removing this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to