This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 698155ab31 Refactor SortMergeJoinMetrics to reuse BaselineMetrics
(#16675)
698155ab31 is described below
commit 698155ab3146a67cf2e4a20ddcb340113f8f71f4
Author: Alan Tang <[email protected]>
AuthorDate: Mon Jul 7 19:29:04 2025 +0800
Refactor SortMergeJoinMetrics to reuse BaselineMetrics (#16675)
* Refactor SortMergeJoinMetrics to reuse BaselineMetrics
Signed-off-by: Alan Tang <[email protected]>
* use record_poll method to update output_rows
Signed-off-by: Alan Tang <[email protected]>
* chore: Replace replace_poll with replace_output
Signed-off-by: Alan Tang <[email protected]>
---------
Signed-off-by: Alan Tang <[email protected]>
---
datafusion/physical-plan/src/joins/sort_merge_join.rs | 17 +++++++++++------
1 file changed, 11 insertions(+), 6 deletions(-)
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index a8c209a492..9a68322834 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -41,7 +41,8 @@ use crate::joins::utils::{
JoinOnRef,
};
use crate::metrics::{
- Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, SpillMetrics,
+ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
+ SpillMetrics,
};
use crate::projection::{
join_allows_pushdown, join_table_borders, new_join_children,
@@ -609,8 +610,8 @@ struct SortMergeJoinMetrics {
input_rows: Count,
/// Number of batches produced by this operator
output_batches: Count,
- /// Number of rows produced by this operator
- output_rows: Count,
+ /// Execution metrics
+ baseline_metrics: BaselineMetrics,
/// Peak memory used for buffered data.
/// Calculated as sum of peak memory values across partitions
peak_mem_used: metrics::Gauge,
@@ -627,16 +628,17 @@ impl SortMergeJoinMetrics {
let input_rows = MetricBuilder::new(metrics).counter("input_rows",
partition);
let output_batches =
MetricBuilder::new(metrics).counter("output_batches", partition);
- let output_rows = MetricBuilder::new(metrics).output_rows(partition);
let peak_mem_used = MetricBuilder::new(metrics).gauge("peak_mem_used",
partition);
let spill_metrics = SpillMetrics::new(metrics, partition);
+ let baseline_metrics = BaselineMetrics::new(metrics, partition);
+
Self {
join_time,
input_batches,
input_rows,
output_batches,
- output_rows,
+ baseline_metrics,
peak_mem_used,
spill_metrics,
}
@@ -2032,7 +2034,9 @@ impl SortMergeJoinStream {
let record_batch =
concat_batches(&self.schema,
&self.staging_output_record_batches.batches)?;
self.join_metrics.output_batches.add(1);
- self.join_metrics.output_rows.add(record_batch.num_rows());
+ self.join_metrics
+ .baseline_metrics
+ .record_output(record_batch.num_rows());
// If join filter exists, `self.output_size` is not accurate as we
don't know the exact
// number of rows in the output record batch. If streamed row joined
with buffered rows,
// once join filter is applied, the number of output rows may be more
than 1.
@@ -2059,6 +2063,7 @@ impl SortMergeJoinStream {
{
self.staging_output_record_batches.batches.clear();
}
+
Ok(record_batch)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]