crepererum commented on code in PR #10009:
URL:
https://github.com/apache/arrow-datafusion/pull/10009#discussion_r1557395084
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -77,6 +79,90 @@ struct RepartitionExecState {
abort_helper: Arc<Vec<SpawnedTask<()>>>,
}
+impl RepartitionExecState {
+ fn new(
+ input: Arc<dyn ExecutionPlan>,
+ partitioning: Partitioning,
+ metrics: ExecutionPlanMetricsSet,
+ preserve_order: bool,
+ name: String,
+ context: Arc<TaskContext>,
+ ) -> Self {
+ let num_input_partitions =
input.output_partitioning().partition_count();
+ let num_output_partitions = partitioning.partition_count();
+
+ let (txs, rxs) = if preserve_order {
+ let (txs, rxs) =
+ partition_aware_channels(num_input_partitions,
num_output_partitions);
+ // Take transpose of senders and receivers. `state.channels` keeps
track of entries per output partition
+ let txs = transpose(txs);
+ let rxs = transpose(rxs);
+ (txs, rxs)
+ } else {
+ // create one channel per *output* partition
+ // note we use a custom channel that ensures there is always data
for each receiver
+ // but limits the amount of buffering if required.
+ let (txs, rxs) = channels(num_output_partitions);
+ // Clone sender for each input partitions
+ let txs = txs
+ .into_iter()
+ .map(|item| vec![item; num_input_partitions])
+ .collect::<Vec<_>>();
+ let rxs = rxs.into_iter().map(|item|
vec![item]).collect::<Vec<_>>();
+ (txs, rxs)
+ };
+
+ let mut channels = HashMap::with_capacity(txs.len());
+ for (partition, (tx, rx)) in txs.into_iter().zip(rxs).enumerate() {
+ let reservation = Arc::new(Mutex::new(
+ MemoryConsumer::new(format!("{}[{partition}]", name))
+ .register(context.memory_pool()),
+ ));
+ channels.insert(partition, (tx, rx, reservation));
+ }
+
+ // launch one async task per *input* partition
+ let mut spawned_tasks = Vec::with_capacity(num_input_partitions);
+ for i in 0..num_input_partitions {
+ let txs: HashMap<_, _> = channels
+ .iter()
+ .map(|(partition, (tx, _rx, reservation))| {
+ (*partition, (tx[i].clone(), Arc::clone(reservation)))
+ })
+ .collect();
+
+ // TODO: metric input-output mapping is broken
Review Comment:
This was broken before: the first parameter was set to `partition`, i.e. the
first partition that initializes the state. That's clearly wrong. I now
initialize it to `0` and will fix the tracking properly in a follow-up PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]