NGA-TRAN commented on code in PR #19124:
URL: https://github.com/apache/datafusion/pull/19124#discussion_r2603155154


##########
datafusion/datasource/src/file_groups.rs:
##########
@@ -464,6 +464,57 @@ impl FileGroup {
 
         chunks
     }
+
+    /// Groups files by their partition values, ensuring all files with same
+    /// partition values are in the same group.
+    ///
+    /// Note: May return fewer groups than `max_target_partitions` when the
+    /// number of unique partition values is less than the target.
+    pub fn group_by_partition_values(
+        self,
+        max_target_partitions: usize,
+    ) -> Vec<FileGroup> {
+        if self.is_empty() || max_target_partitions == 0 {
+            return vec![];
+        }
+
+        let mut partition_groups: HashMap<
+            Vec<datafusion_common::ScalarValue>,
+            Vec<PartitionedFile>,
+        > = HashMap::new();
+
+        for file in self.files {
+            partition_groups
+                .entry(file.partition_values.clone())
+                .or_default()
+                .push(file);
+        }
+
+        let num_unique_partitions = partition_groups.len();
+
+        // Sort for deterministic bucket assignment across query executions.
+        let mut sorted_partitions: Vec<_> = 
partition_groups.into_iter().collect();
+        sorted_partitions
+            .sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal));
+
+        if num_unique_partitions <= max_target_partitions {
+            sorted_partitions
+                .into_iter()
+                .map(|(_, files)| FileGroup::new(files))
+                .collect()
+        } else {
+            // Merge into max_target_partitions buckets using round-robin.
+            let mut target_groups: Vec<Vec<PartitionedFile>> =
+                vec![vec![]; max_target_partitions];
+
+            for (idx, (_, files)) in sorted_partitions.into_iter().enumerate() 
{
+                let bucket = idx % max_target_partitions;
+                target_groups[bucket].extend(files);
+            }
+
+            target_groups.into_iter().map(FileGroup::new).collect()

Review Comment:
   Yes, it works in this case but will it be work all the time? Maybe the 
answer also yes but I think we should say that: N partitions is now M (M < N) 
and thus one partition now will include files/data of many partitions.
   
   By the way, does round robin means data of 2 or more partitions will go into 
the same partition or data of a partition may be split between different 
partitions?
   
   I agree it will work but we want to document it clearly in case others have 
customized partitions and may not work with this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to