This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 698155ab31 Refactor SortMergeJoinMetrics to reuse BaselineMetrics 
(#16675)
698155ab31 is described below

commit 698155ab3146a67cf2e4a20ddcb340113f8f71f4
Author: Alan Tang <[email protected]>
AuthorDate: Mon Jul 7 19:29:04 2025 +0800

    Refactor SortMergeJoinMetrics to reuse BaselineMetrics (#16675)
    
    * Refactor SortMergeJoinMetrics to reuse BaselineMetrics
    
    Signed-off-by: Alan Tang <[email protected]>
    
    * use record_poll method to update output_rows
    
    Signed-off-by: Alan Tang <[email protected]>
    
    * chore: Replace replace_poll with replace_output
    
    Signed-off-by: Alan Tang <[email protected]>
    
    ---------
    
    Signed-off-by: Alan Tang <[email protected]>
---
 datafusion/physical-plan/src/joins/sort_merge_join.rs | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index a8c209a492..9a68322834 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -41,7 +41,8 @@ use crate::joins::utils::{
     JoinOnRef,
 };
 use crate::metrics::{
-    Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, SpillMetrics,
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
+    SpillMetrics,
 };
 use crate::projection::{
     join_allows_pushdown, join_table_borders, new_join_children,
@@ -609,8 +610,8 @@ struct SortMergeJoinMetrics {
     input_rows: Count,
     /// Number of batches produced by this operator
     output_batches: Count,
-    /// Number of rows produced by this operator
-    output_rows: Count,
+    /// Execution metrics
+    baseline_metrics: BaselineMetrics,
     /// Peak memory used for buffered data.
     /// Calculated as sum of peak memory values across partitions
     peak_mem_used: metrics::Gauge,
@@ -627,16 +628,17 @@ impl SortMergeJoinMetrics {
         let input_rows = MetricBuilder::new(metrics).counter("input_rows", 
partition);
         let output_batches =
             MetricBuilder::new(metrics).counter("output_batches", partition);
-        let output_rows = MetricBuilder::new(metrics).output_rows(partition);
         let peak_mem_used = MetricBuilder::new(metrics).gauge("peak_mem_used", 
partition);
         let spill_metrics = SpillMetrics::new(metrics, partition);
 
+        let baseline_metrics = BaselineMetrics::new(metrics, partition);
+
         Self {
             join_time,
             input_batches,
             input_rows,
             output_batches,
-            output_rows,
+            baseline_metrics,
             peak_mem_used,
             spill_metrics,
         }
@@ -2032,7 +2034,9 @@ impl SortMergeJoinStream {
         let record_batch =
             concat_batches(&self.schema, 
&self.staging_output_record_batches.batches)?;
         self.join_metrics.output_batches.add(1);
-        self.join_metrics.output_rows.add(record_batch.num_rows());
+        self.join_metrics
+            .baseline_metrics
+            .record_output(record_batch.num_rows());
         // If join filter exists, `self.output_size` is not accurate as we 
don't know the exact
         // number of rows in the output record batch. If streamed row joined 
with buffered rows,
         // once join filter is applied, the number of output rows may be more 
than 1.
@@ -2059,6 +2063,7 @@ impl SortMergeJoinStream {
         {
             self.staging_output_record_batches.batches.clear();
         }
+
         Ok(record_batch)
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to