This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 9152dc2c2 chore: Remove num partitions from repartitioner (#1498)
9152dc2c2 is described below

commit 9152dc2c23276023708237d7aa857c1da1b2807b
Author: Emily Matheys <[email protected]>
AuthorDate: Tue Mar 11 21:16:28 2025 +0200

    chore: Remove num partitions from repartitioner (#1498)
    
    * chore: Remove num partitions from repartitioner
    
    treat all single partition schemes the same way
    remove duplicate messaging by using assert_eq instead of assert
    
    * rebased and resolved conflicts
    
    ---------
    
    Co-authored-by: Emily Matheys <[email protected]>
---
 .../core/src/execution/shuffle/shuffle_writer.rs   | 46 +++++++++-------------
 1 file changed, 18 insertions(+), 28 deletions(-)

diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs 
b/native/core/src/execution/shuffle/shuffle_writer.rs
index f1c2b4d78..9f6c7e406 100644
--- a/native/core/src/execution/shuffle/shuffle_writer.rs
+++ b/native/core/src/execution/shuffle/shuffle_writer.rs
@@ -290,7 +290,6 @@ struct ShuffleRepartitioner {
     buffered_partitions: Vec<PartitionBuffer>,
     /// Partitioning scheme to use
     partitioning: Partitioning,
-    num_output_partitions: usize,
     runtime: Arc<RuntimeEnv>,
     metrics: ShuffleRepartitionerMetrics,
     /// Hashes for each row in the current batch
@@ -315,8 +314,6 @@ impl ShuffleRepartitioner {
         codec: CompressionCodec,
         enable_fast_encoding: bool,
     ) -> Result<Self> {
-        let num_output_partitions = partitioning.partition_count();
-
         let mut hashes_buf = Vec::with_capacity(batch_size);
         let mut partition_ids = Vec::with_capacity(batch_size);
 
@@ -336,7 +333,7 @@ impl ShuffleRepartitioner {
             output_data_file,
             output_index_file,
             schema: Arc::clone(&schema),
-            buffered_partitions: (0..num_output_partitions)
+            buffered_partitions: (0..partitioning.partition_count())
                 .map(|_| {
                     PartitionBuffer::try_new(
                         Arc::clone(&schema),
@@ -348,7 +345,6 @@ impl ShuffleRepartitioner {
                 })
                 .collect::<Result<Vec<_>>>()?,
             partitioning,
-            num_output_partitions,
             runtime,
             metrics,
             hashes_buf,
@@ -401,9 +397,21 @@ impl ShuffleRepartitioner {
         // number of rows those are written to output data file.
         self.metrics.baseline.record_output(input.num_rows());
 
-        let num_output_partitions = self.num_output_partitions;
         match &self.partitioning {
-            Partitioning::Hash(exprs, _) => {
+            any if any.partition_count() == 1 => {
+                let buffered_partitions = &mut self.buffered_partitions;
+
+                assert_eq!(buffered_partitions.len(), 1, "Expected 1 
partition");
+
+                // TODO the single partition case could be optimized to avoid 
appending all
+                // rows from the batch into builders and then recreating the 
batch
+                // https://github.com/apache/datafusion-comet/issues/1453
+                let indices = (0..input.num_rows()).collect::<Vec<usize>>();
+
+                self.append_rows_to_partition(input.columns(), &indices, 0)
+                    .await?;
+            }
+            Partitioning::Hash(exprs, num_output_partitions) => {
                 let (partition_starts, shuffled_partition_ids): (Vec<usize>, 
Vec<usize>) = {
                     let mut timer = self.metrics.repart_time.timer();
 
@@ -423,11 +431,11 @@ impl ShuffleRepartitioner {
                         .iter()
                         .enumerate()
                         .for_each(|(idx, hash)| {
-                            partition_ids[idx] = pmod(*hash, 
num_output_partitions) as u64
+                            partition_ids[idx] = pmod(*hash, 
*num_output_partitions) as u64
                         });
 
                     // count each partition size
-                    let mut partition_counters = vec![0usize; 
num_output_partitions];
+                    let mut partition_counters = vec![0usize; 
*num_output_partitions];
                     partition_ids
                         .iter()
                         .for_each(|partition_id| 
partition_counters[*partition_id as usize] += 1);
@@ -478,24 +486,6 @@ impl ShuffleRepartitioner {
                     .await?;
                 }
             }
-            Partitioning::UnknownPartitioning(n) if *n == 1 => {
-                let buffered_partitions = &mut self.buffered_partitions;
-
-                assert_eq!(
-                    buffered_partitions.len(),
-                    1,
-                    "Expected 1 partition but got {}",
-                    buffered_partitions.len()
-                );
-
-                // TODO the single partition case could be optimized to avoid 
appending all
-                // rows from the batch into builders and then recreating the 
batch
-                // https://github.com/apache/datafusion-comet/issues/1453
-                let indices = (0..input.num_rows()).collect::<Vec<usize>>();
-
-                self.append_rows_to_partition(input.columns(), &indices, 0)
-                    .await?;
-            }
             other => {
                 // this should be unreachable as long as the validation logic
                 // in the constructor is kept up-to-date
@@ -511,8 +501,8 @@ impl ShuffleRepartitioner {
     /// Writes buffered shuffled record batches into Arrow IPC bytes.
     async fn shuffle_write(&mut self) -> Result<SendableRecordBatchStream> {
         let mut elapsed_compute = 
self.metrics.baseline.elapsed_compute().timer();
-        let num_output_partitions = self.num_output_partitions;
         let buffered_partitions = &mut self.buffered_partitions;
+        let num_output_partitions = buffered_partitions.len();
         let mut output_batches: Vec<Vec<u8>> = vec![vec![]; 
num_output_partitions];
         let mut offsets = vec![0; num_output_partitions + 1];
         for i in 0..num_output_partitions {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to