This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e90b3ac5cf Minor: unecessary row_count calculation in `CrossJoinExec`
and `NestedLoopsJoinExec` (#11632)
e90b3ac5cf is described below
commit e90b3ac5cf89ec5b1a94506ac69e85bd9b7d319e
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Jul 24 10:20:18 2024 -0400
Minor: unecessary row_count calculation in `CrossJoinExec` and
`NestedLoopsJoinExec` (#11632)
* Minor: remove row_count calculation
* Minor: remove row_count calculation
---
datafusion/physical-plan/src/joins/cross_join.rs | 31 +++++++++-------------
.../physical-plan/src/joins/nested_loop_join.rs | 14 +++++-----
2 files changed, 19 insertions(+), 26 deletions(-)
diff --git a/datafusion/physical-plan/src/joins/cross_join.rs
b/datafusion/physical-plan/src/joins/cross_join.rs
index b1482a9699..2840d3f62b 100644
--- a/datafusion/physical-plan/src/joins/cross_join.rs
+++ b/datafusion/physical-plan/src/joins/cross_join.rs
@@ -154,24 +154,19 @@ async fn load_left_input(
let stream = merge.execute(0, context)?;
// Load all batches and count the rows
- let (batches, _num_rows, _, reservation) = stream
- .try_fold(
- (Vec::new(), 0usize, metrics, reservation),
- |mut acc, batch| async {
- let batch_size = batch.get_array_memory_size();
- // Reserve memory for incoming batch
- acc.3.try_grow(batch_size)?;
- // Update metrics
- acc.2.build_mem_used.add(batch_size);
- acc.2.build_input_batches.add(1);
- acc.2.build_input_rows.add(batch.num_rows());
- // Update rowcount
- acc.1 += batch.num_rows();
- // Push batch to output
- acc.0.push(batch);
- Ok(acc)
- },
- )
+ let (batches, _metrics, reservation) = stream
+ .try_fold((Vec::new(), metrics, reservation), |mut acc, batch| async {
+ let batch_size = batch.get_array_memory_size();
+ // Reserve memory for incoming batch
+ acc.2.try_grow(batch_size)?;
+ // Update metrics
+ acc.1.build_mem_used.add(batch_size);
+ acc.1.build_input_batches.add(1);
+ acc.1.build_input_rows.add(batch.num_rows());
+ // Push batch to output
+ acc.0.push(batch);
+ Ok(acc)
+ })
.await?;
let merged_batch = concat_batches(&left_schema, &batches)?;
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index eac135bfd0..9f1465c2d7 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -364,19 +364,17 @@ async fn collect_left_input(
let stream = merge.execute(0, context)?;
// Load all batches and count the rows
- let (batches, _num_rows, metrics, mut reservation) = stream
+ let (batches, metrics, mut reservation) = stream
.try_fold(
- (Vec::new(), 0usize, join_metrics, reservation),
+ (Vec::new(), join_metrics, reservation),
|mut acc, batch| async {
let batch_size = batch.get_array_memory_size();
// Reserve memory for incoming batch
- acc.3.try_grow(batch_size)?;
+ acc.2.try_grow(batch_size)?;
// Update metrics
- acc.2.build_mem_used.add(batch_size);
- acc.2.build_input_batches.add(1);
- acc.2.build_input_rows.add(batch.num_rows());
- // Update rowcount
- acc.1 += batch.num_rows();
+ acc.1.build_mem_used.add(batch_size);
+ acc.1.build_input_batches.add(1);
+ acc.1.build_input_rows.add(batch.num_rows());
// Push batch to output
acc.0.push(batch);
Ok(acc)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]