This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new b64c13df chore: Include first ScanExec batch in metrics (#1105)
b64c13df is described below

commit b64c13df9580b7342082167623e0aa03435c2de5
Author: Andy Grove <[email protected]>
AuthorDate: Tue Nov 19 17:41:31 2024 -0700

    chore: Include first ScanExec batch in metrics (#1105)
    
    * include first batch in ScanExec metrics
    
    * record row count metric
    
    * fix regression
---
 native/core/src/execution/operators/scan.rs | 50 +++++++++++++++++++++++------
 1 file changed, 40 insertions(+), 10 deletions(-)

diff --git a/native/core/src/execution/operators/scan.rs 
b/native/core/src/execution/operators/scan.rs
index 7d75f7f1..b2546f83 100644
--- a/native/core/src/execution/operators/scan.rs
+++ b/native/core/src/execution/operators/scan.rs
@@ -65,6 +65,8 @@ pub struct ScanExec {
     pub input_source_description: String,
     /// The data types of columns of the input batch. Converted from Spark 
schema.
     pub data_types: Vec<DataType>,
+    /// Schema of first batch
+    pub schema: SchemaRef,
     /// The input batch of input data. Used to determine the schema of the 
input data.
     /// It is also used in unit test to mock the input data from JVM.
     pub batch: Arc<Mutex<Option<InputBatch>>>,
@@ -72,6 +74,7 @@ pub struct ScanExec {
     cache: PlanProperties,
     /// Metrics collector
     metrics: ExecutionPlanMetricsSet,
+    baseline_metrics: BaselineMetrics,
 }
 
 impl ScanExec {
@@ -81,6 +84,9 @@ impl ScanExec {
         input_source_description: &str,
         data_types: Vec<DataType>,
     ) -> Result<Self, CometError> {
+        let metrics_set = ExecutionPlanMetricsSet::default();
+        let baseline_metrics = BaselineMetrics::new(&metrics_set, 0);
+
         // Scan's schema is determined by the input batch, so we need to set 
it before execution.
         // Note that we determine if arrays are dictionary-encoded based on the
         // first batch. The array may be dictionary-encoded in some batches 
and not others, and
@@ -88,7 +94,12 @@ impl ScanExec {
         // may end up either unpacking dictionary arrays or 
dictionary-encoding arrays.
         // Dictionary-encoded primitive arrays are always unpacked.
         let first_batch = if let Some(input_source) = input_source.as_ref() {
-            ScanExec::get_next(exec_context_id, input_source.as_obj(), 
data_types.len())?
+            let mut timer = baseline_metrics.elapsed_compute().timer();
+            let batch =
+                ScanExec::get_next(exec_context_id, input_source.as_obj(), 
data_types.len())?;
+            timer.stop();
+            baseline_metrics.record_output(batch.num_rows());
+            batch
         } else {
             InputBatch::EOF
         };
@@ -96,7 +107,7 @@ impl ScanExec {
         let schema = scan_schema(&first_batch, &data_types);
 
         let cache = PlanProperties::new(
-            EquivalenceProperties::new(schema),
+            EquivalenceProperties::new(Arc::clone(&schema)),
             // The partitioning is not important because we are not using 
DataFusion's
             // query planner or optimizer
             Partitioning::UnknownPartitioning(1),
@@ -110,7 +121,9 @@ impl ScanExec {
             data_types,
             batch: Arc::new(Mutex::new(Some(first_batch))),
             cache,
-            metrics: ExecutionPlanMetricsSet::default(),
+            metrics: metrics_set,
+            baseline_metrics,
+            schema,
         })
     }
 
@@ -276,11 +289,15 @@ impl ExecutionPlan for ScanExec {
     }
 
     fn schema(&self) -> SchemaRef {
-        // `unwrap` is safe because `schema` is only called during converting
-        // Spark plan to DataFusion plan. At the moment, `batch` is not EOF.
-        let binding = self.batch.try_lock().unwrap();
-        let input_batch = binding.as_ref().unwrap();
-        scan_schema(input_batch, &self.data_types)
+        if self.exec_context_id == TEST_EXEC_CONTEXT_ID {
+            // `unwrap` is safe because `schema` is only called during 
converting
+            // Spark plan to DataFusion plan. At the moment, `batch` is not 
EOF.
+            let binding = self.batch.try_lock().unwrap();
+            let input_batch = binding.as_ref().unwrap();
+            scan_schema(input_batch, &self.data_types)
+        } else {
+            Arc::clone(&self.schema)
+        }
     }
 
     fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
@@ -303,6 +320,7 @@ impl ExecutionPlan for ScanExec {
             self.clone(),
             self.schema(),
             partition,
+            self.baseline_metrics.clone(),
         )))
     }
 
@@ -352,8 +370,12 @@ struct ScanStream<'a> {
 }
 
 impl<'a> ScanStream<'a> {
-    pub fn new(scan: ScanExec, schema: SchemaRef, partition: usize) -> Self {
-        let baseline_metrics = BaselineMetrics::new(&scan.metrics, partition);
+    pub fn new(
+        scan: ScanExec,
+        schema: SchemaRef,
+        partition: usize,
+        baseline_metrics: BaselineMetrics,
+    ) -> Self {
         let cast_time = 
MetricBuilder::new(&scan.metrics).subset_time("cast_time", partition);
         Self {
             scan,
@@ -465,4 +487,12 @@ impl InputBatch {
 
         InputBatch::Batch(columns, num_rows)
     }
+
+    /// Get the number of rows in this batch
+    fn num_rows(&self) -> usize {
+        match self {
+            Self::EOF => 0,
+            Self::Batch(_, num_rows) => *num_rows,
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to