alamb commented on a change in pull request #9548:
URL: https://github.com/apache/arrow/pull/9548#discussion_r581451845
##########
File path: rust/datafusion/src/physical_plan/repartition.rs
##########
@@ -305,6 +347,33 @@ mod tests {
Ok(())
}
+ #[tokio::test(flavor = "multi_thread")]
+ async fn many_to_many_hash_partition() -> Result<()> {
+ // define input partitions
+ let schema = test_schema();
+ let partition = create_vec_batches(&schema, 50);
+ let partitions = vec![partition.clone(), partition.clone(),
partition.clone()];
+
+ let output_partitions = repartition(
+ &schema,
+ partitions,
+ Partitioning::Hash(
+ vec![Arc::new(crate::physical_plan::expressions::Column::new(
+ &"c0",
+ ))],
+ 8,
+ ),
+ )
+ .await?;
+
+ let total_rows: usize = output_partitions.iter().map(|x|
x.len()).sum();
+
+ assert_eq!(8, output_partitions.len());
Review comment:
would it make sense here also to assert on the distribution of rows
(e.g. ensure that each batch has ~ 50*3 rows?
##########
File path: rust/datafusion/src/physical_plan/repartition.rs
##########
@@ -120,23 +120,71 @@ impl ExecutionPlan for RepartitionExec {
let (sender, receiver) =
unbounded::<Option<ArrowResult<RecordBatch>>>();
channels.push((sender, receiver));
}
+ let random = ahash::RandomState::new();
+
// launch one async task per *input* partition
for i in 0..num_input_partitions {
+ let random_state = random.clone();
let input = self.input.clone();
let mut channels = channels.clone();
let partitioning = self.partitioning.clone();
let join_handle: JoinHandle<Result<()>> = tokio::spawn(async
move {
let mut stream = input.execute(i).await?;
let mut counter = 0;
while let Some(result) = stream.next().await {
- match partitioning {
+ match &partitioning {
Partitioning::RoundRobinBatch(_) => {
let output_partition = counter %
num_output_partitions;
let tx = &mut channels[output_partition].0;
tx.send(Some(result)).map_err(|e| {
DataFusionError::Execution(e.to_string())
})?;
}
+ Partitioning::Hash(exprs, _) => {
+ let input_batch = result?;
+ let arrays = exprs
+ .iter()
+ .map(|expr| {
+ Ok(expr
+ .evaluate(&input_batch)?
+
.into_array(input_batch.num_rows()))
+ })
+ .collect::<Result<Vec<_>>>()?;
+ // Hash arrays and compute buckets based on
number of partitions
+ let hashes = create_hashes(&arrays,
&random_state)?;
+ let mut indices = vec![vec![];
num_output_partitions];
+ for (index, hash) in hashes.iter().enumerate()
{
+ indices
+ [(*hash % num_output_partitions as
u64) as usize]
+ .push(index as u64)
+ }
+ for num_output_partition in
0..num_output_partitions {
+ let indices =
Review comment:
what is this `clone` needed for? Couldn't we just consume `indicies`? I
am probably missing something.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]