Dandandan commented on a change in pull request #9548:
URL: https://github.com/apache/arrow/pull/9548#discussion_r581663717
##########
File path: rust/datafusion/src/physical_plan/repartition.rs
##########
@@ -120,23 +120,71 @@ impl ExecutionPlan for RepartitionExec {
let (sender, receiver) =
unbounded::<Option<ArrowResult<RecordBatch>>>();
channels.push((sender, receiver));
}
+ let random = ahash::RandomState::new();
+
// launch one async task per *input* partition
for i in 0..num_input_partitions {
+ let random_state = random.clone();
let input = self.input.clone();
let mut channels = channels.clone();
let partitioning = self.partitioning.clone();
let join_handle: JoinHandle<Result<()>> = tokio::spawn(async
move {
let mut stream = input.execute(i).await?;
let mut counter = 0;
while let Some(result) = stream.next().await {
- match partitioning {
+ match &partitioning {
Partitioning::RoundRobinBatch(_) => {
let output_partition = counter %
num_output_partitions;
let tx = &mut channels[output_partition].0;
tx.send(Some(result)).map_err(|e| {
DataFusionError::Execution(e.to_string())
})?;
}
+ Partitioning::Hash(exprs, _) => {
+ let input_batch = result?;
+ let arrays = exprs
+ .iter()
+ .map(|expr| {
+ Ok(expr
+ .evaluate(&input_batch)?
+
.into_array(input_batch.num_rows()))
+ })
+ .collect::<Result<Vec<_>>>()?;
+ // Hash arrays and compute buckets based on
number of partitions
+ let hashes = create_hashes(&arrays,
&random_state)?;
+ let mut indices = vec![vec![];
num_output_partitions];
+ for (index, hash) in hashes.iter().enumerate()
{
+ indices
+ [(*hash % num_output_partitions as
u64) as usize]
+ .push(index as u64)
+ }
+ for num_output_partition in
0..num_output_partitions {
+ let indices =
Review comment:
in this case, couldn't do before as I was indexing the vec with indices
for each partition before.
Now solved with `into_iter()` on the vec.
(This still does one extra conversion from `Vec<u64>` to `PrimitiveArray`,
that could be a future optimization
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]