Kontinuation commented on code in PR #1511:
URL: https://github.com/apache/datafusion-comet/pull/1511#discussion_r2017870588


##########
native/core/src/execution/shuffle/shuffle_writer.rs:
##########
@@ -422,27 +432,29 @@ impl ShuffleRepartitioner {
                         .collect::<Result<Vec<_>>>()?;
 
                     // use identical seed as spark hash partition
-                    let hashes_buf = &mut self.hashes_buf[..arrays[0].len()];
+                    let hashes_buf = &mut 
scratch.hashes_buf[..arrays[0].len()];
                     hashes_buf.fill(42_u32);
 
                     // Hash arrays and compute buckets based on number of 
partitions
-                    let partition_ids = &mut 
self.partition_ids[..arrays[0].len()];
+                    let partition_ids = &mut 
scratch.partition_ids[..arrays[0].len()];
                     create_murmur3_hashes(&arrays, hashes_buf)?
                         .iter()
                         .enumerate()
                         .for_each(|(idx, hash)| {
-                            partition_ids[idx] = pmod(*hash, 
*num_output_partitions) as u64
+                            partition_ids[idx] = pmod(*hash, 
*num_output_partitions) as u32;
                         });
 
                     // count each partition size
-                    let mut partition_counters = vec![0usize; 
*num_output_partitions];
+                    let partition_counters = &mut scratch.partition_starts;
+                    partition_counters.resize(num_output_partitions + 1, 0);
+                    partition_counters.fill(0);
                     partition_ids
                         .iter()
                         .for_each(|partition_id| 
partition_counters[*partition_id as usize] += 1);
 
                     // accumulate partition counters into partition ends
                     // e.g. partition counter: [1, 3, 2, 1] => [1, 4, 6, 7]

Review Comment:
   Yes, this is an inclusive prefix sum. If we use `scan` and also avoids 
allocating a new vector we have to write the following code, which is not much 
readable than the current approach.
   
   ```rust
   partition_ends.iter_mut().scan(0, |accum, v| {
       *accum += *v;
       Some(*accum)
   }).enumerate().for_each(|(i, sum)| {
       partition_ends[i] = sum;
   });
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to