gabotechs commented on code in PR #19124:
URL: https://github.com/apache/datafusion/pull/19124#discussion_r2605761960


##########
datafusion/common/src/config.rs:
##########
@@ -965,6 +965,19 @@ config_namespace! {
         /// record tables provided to the MemTable on creation.
         pub repartition_file_scans: bool, default = true
 
+        /// Minimum number of distinct partition values required to group 
files by their
+        /// Hive partition column values (enabling Hash partitioning 
declaration).
+        ///
+        /// How the option is used:
+        ///     - preserve_file_partitions=0: Disable it.
+        ///     - preserve_file_partitions=1: Always enable it.
+        ///     - preserve_file_partitions=N, actual file partitions=M: Only 
enable when M >= N.
+        ///     This threshold preserves I/O parallelism when file 
partitioning is below it.
+        ///
+        /// Note: This may reduce parallelism at the I/O level if the number 
of distinct

Review Comment:
   And probably not only at the I/O level. If a leaf DataSourceExec decides to 
use just 2 partitions for example, all the steps above it up to a repartition 
will use 2 partitions, and those operations will likely be purely computational.
   
   That's fine though, but maybe just stating that gives more accurate 
information to the user



##########
datafusion/core/benches/preserve_file_partitioning.rs:
##########
@@ -0,0 +1,837 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmark for `preserve_file_partitions` optimization.
+//!
+//! When enabled, this optimization declares Hive-partitioned tables as
+//! `Hash([partition_col])` partitioned, allowing the query optimizer to
+//! skip unnecessary repartitioning and sorting operations.
+//!
+//! When This Optimization Helps
+//! - Window functions: PARTITION BY on partition column eliminates 
RepartitionExec and SortExec
+//! - Aggregates with ORDER BY: GROUP BY partition column and ORDER BY 
eliminates post aggregate sort
+//!
+//! When This Optimization Does NOT Help
+//! - GROUP BY non-partition columns: Required Hash distribution doesn't match 
declared partitioning
+//! - I/O Intensive Queries: Limits the parallilization at I/O level, benefits 
may not outweigh.

Review Comment:
   It does not necessary limit the parallelization at the IO level right? it 
only limits it if "num of distinct keys" < "CPUs in the machine"



##########
datafusion/datasource/src/file_groups.rs:
##########
@@ -464,6 +466,65 @@ impl FileGroup {
 
         chunks
     }
+
+    /// Groups files by their partition values, ensuring all files with same
+    /// partition values are in the same group.
+    ///
+    /// Note: May return fewer groups than `max_target_partitions` when the
+    /// number of unique partition values is less than the target.
+    pub fn group_by_partition_values(
+        self,
+        max_target_partitions: usize,
+    ) -> Vec<FileGroup> {
+        if self.is_empty() || max_target_partitions == 0 {
+            return vec![];
+        }
+
+        let mut partition_groups: HashMap<
+            Vec<datafusion_common::ScalarValue>,
+            Vec<PartitionedFile>,
+        > = HashMap::new();
+
+        for file in self.files {
+            partition_groups
+                .entry(file.partition_values.clone())
+                .or_default()
+                .push(file);
+        }
+
+        let num_unique_partitions = partition_groups.len();
+
+        // Sort for deterministic bucket assignment across query executions.

Review Comment:
   I think you should be able to use a `BTreeMap` for `partition_groups` if you 
want a deterministic order without manually sorting.



##########
datafusion/core/benches/preserve_file_partitioning.rs:
##########
@@ -0,0 +1,837 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmark for `preserve_file_partitions` optimization.
+//!
+//! When enabled, this optimization declares Hive-partitioned tables as
+//! `Hash([partition_col])` partitioned, allowing the query optimizer to
+//! skip unnecessary repartitioning and sorting operations.
+//!
+//! When This Optimization Helps
+//! - Window functions: PARTITION BY on partition column eliminates 
RepartitionExec and SortExec
+//! - Aggregates with ORDER BY: GROUP BY partition column and ORDER BY 
eliminates post aggregate sort
+//!
+//! When This Optimization Does NOT Help
+//! - GROUP BY non-partition columns: Required Hash distribution doesn't match 
declared partitioning
+//! - I/O Intensive Queries: Limits the parallilization at I/O level, benefits 
may not outweigh.

Review Comment:
   ```suggestion
   //! - I/O Intensive Queries: Limits the parallelization at I/O level, 
benefits may not outweigh.
   ```



##########
datafusion/datasource/src/file_groups.rs:
##########
@@ -1160,4 +1222,51 @@ mod test {
         assert_partitioned_files(repartitioned.clone(), 
repartitioned_preserving_sort);
         repartitioned
     }
+
+    #[test]
+    fn test_group_by_partition_values() {
+        // Edge cases: empty and zero target
+        assert!(FileGroup::default().group_by_partition_values(4).is_empty());
+        assert!(FileGroup::new(vec![pfile("a", 100)])
+            .group_by_partition_values(0)
+            .is_empty());
+
+        // Helper to create file with partition value
+        let pfile_with_pv = |path: &str, pv: &str| {
+            let mut f = pfile(path, 10);
+            f.partition_values = vec![ScalarValue::from(pv)];
+            f
+        };
+
+        // Case 1: fewer partitions than target

Review Comment:
   Rather than having multiple test cases in the same test function, it would 
be nice to have them in separate test functions, as we can allow them to run 
independently, resulting in a better test report outcome and more 
debugger-friendly experience.



##########
datafusion/datasource/src/file_groups.rs:
##########
@@ -464,6 +466,65 @@ impl FileGroup {
 
         chunks
     }
+
+    /// Groups files by their partition values, ensuring all files with same
+    /// partition values are in the same group.
+    ///
+    /// Note: May return fewer groups than `max_target_partitions` when the
+    /// number of unique partition values is less than the target.
+    pub fn group_by_partition_values(
+        self,
+        max_target_partitions: usize,
+    ) -> Vec<FileGroup> {
+        if self.is_empty() || max_target_partitions == 0 {
+            return vec![];
+        }
+
+        let mut partition_groups: HashMap<
+            Vec<datafusion_common::ScalarValue>,
+            Vec<PartitionedFile>,
+        > = HashMap::new();
+
+        for file in self.files {
+            partition_groups
+                .entry(file.partition_values.clone())
+                .or_default()
+                .push(file);
+        }
+
+        let num_unique_partitions = partition_groups.len();
+
+        // Sort for deterministic bucket assignment across query executions.
+        let mut sorted_partitions: Vec<_> = 
partition_groups.into_iter().collect();
+        let sort_options =
+            vec![
+                SortOptions::default();
+                sorted_partitions.first().map(|(k, _)| k.len()).unwrap_or(0)
+            ];
+        sorted_partitions.sort_by(|a, b| {
+            compare_rows(&a.0, &b.0, &sort_options).unwrap_or(Ordering::Equal)
+        });
+
+        if num_unique_partitions <= max_target_partitions {
+            sorted_partitions
+                .into_iter()
+                .map(|(_, files)| FileGroup::new(files))
+                .collect()
+        } else {
+            // Merge into max_target_partitions buckets using round-robin.
+            // This maintains grouping by partition value as we are merging 
groups which already
+            // contain all values for a partition key.
+            let mut target_groups: Vec<Vec<PartitionedFile>> =
+                vec![vec![]; max_target_partitions];

Review Comment:
   nit: type declaration should be redundant
   ```suggestion
               let mut target_groups = vec![vec![]; max_target_partitions];
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to