gene-bordegaray commented on code in PR #19124:
URL: https://github.com/apache/datafusion/pull/19124#discussion_r2603136115


##########
datafusion/datasource/src/file_groups.rs:
##########
@@ -464,6 +464,57 @@ impl FileGroup {
 
         chunks
     }
+
+    /// Groups files by their partition values, ensuring all files with same
+    /// partition values are in the same group.
+    ///
+    /// Note: May return fewer groups than `max_target_partitions` when the
+    /// number of unique partition values is less than the target.
+    pub fn group_by_partition_values(
+        self,
+        max_target_partitions: usize,
+    ) -> Vec<FileGroup> {
+        if self.is_empty() || max_target_partitions == 0 {
+            return vec![];
+        }
+
+        let mut partition_groups: HashMap<
+            Vec<datafusion_common::ScalarValue>,
+            Vec<PartitionedFile>,
+        > = HashMap::new();
+
+        for file in self.files {
+            partition_groups
+                .entry(file.partition_values.clone())
+                .or_default()
+                .push(file);
+        }
+
+        let num_unique_partitions = partition_groups.len();
+
+        // Sort for deterministic bucket assignment across query executions.
+        let mut sorted_partitions: Vec<_> = 
partition_groups.into_iter().collect();
+        sorted_partitions
+            .sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal));
+
+        if num_unique_partitions <= max_target_partitions {
+            sorted_partitions
+                .into_iter()
+                .map(|(_, files)| FileGroup::new(files))
+                .collect()
+        } else {
+            // Merge into max_target_partitions buckets using round-robin.
+            let mut target_groups: Vec<Vec<PartitionedFile>> =
+                vec![vec![]; max_target_partitions];
+
+            for (idx, (_, files)) in sorted_partitions.into_iter().enumerate() 
{
+                let bucket = idx % max_target_partitions;
+                target_groups[bucket].extend(files);
+            }
+
+            target_groups.into_iter().map(FileGroup::new).collect()

Review Comment:
   no it still holds as we are just round-robin distributing groups that are 
already partitioned so that each group contains all values for the partition 
key. Thus we are just merging partition groups into larger ones



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to