alamb commented on a change in pull request #7951:
URL: https://github.com/apache/arrow/pull/7951#discussion_r470759208
##########
File path: rust/datafusion/src/execution/physical_plan/merge.rs
##########
@@ -64,39 +76,70 @@ struct MergePartition {
schema: SchemaRef,
/// Input partitions
partitions: Vec<Arc<dyn Partition>>,
+ /// Maximum number of concurrent threads
+ concurrency: usize,
+}
+
+fn collect_from_thread(
+ thread: JoinHandle<Result<Vec<RecordBatch>>>,
+ combined_results: &mut Vec<Arc<RecordBatch>>,
+) -> Result<()> {
+ match thread.join() {
+ Ok(join) => {
+ join?
+ .iter()
+ .for_each(|batch|
combined_results.push(Arc::new(batch.clone())));
+ Ok(())
+ }
+ Err(e) => Err(ExecutionError::General(format!(
+ "Error collecting batches from thread: {:?}",
+ e
+ ))),
+ }
}
impl Partition for MergePartition {
fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send +
Sync>>> {
- let threads: Vec<JoinHandle<Result<Vec<RecordBatch>>>> = self
- .partitions
- .iter()
- .map(|p| {
- let p = p.clone();
- thread::spawn(move || {
- let it = p.execute()?;
- common::collect(it)
- })
- })
- .collect();
-
- // combine the results from each thread
- let mut combined_results: Vec<Arc<RecordBatch>> = vec![];
- for thread in threads {
- match thread.join() {
- Ok(join) => {
- join?
- .iter()
- .for_each(|batch|
combined_results.push(Arc::new(batch.clone())));
+ match self.partitions.len() {
+ 0 => Err(ExecutionError::General(
+ "MergeExec requires at least one input partition".to_owned(),
+ )),
+ 1 => {
+ // bypass any threading if there is a single partition
+ self.partitions[0].execute()
+ }
+ _ => {
+ let partitions_per_thread =
+ (self.partitions.len() / self.concurrency).max(1);
+ let chunks = self.partitions.chunks(partitions_per_thread);
Review comment:
yeah, again this approach will work well as long as the work per
partition (and thus chunks) is relatively even and well balanced. If we have
imbalanced partitions then some threads may be idle when there is still work to
do in chunks being processed by other threads
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]