This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 16f89346f Add baseline_metrics for FileStream to record metrics like 
elapsed time, record output, etc (#63) (#2965)
16f89346f is described below

commit 16f89346fc4bdc9e99b6c96cb54e8a2424fda091
Author: Yang Jiang <[email protected]>
AuthorDate: Thu Jul 28 01:52:33 2022 +0800

    Add baseline_metrics for FileStream to record metrics like elapsed time, 
record output, etc (#63) (#2965)
    
    Co-authored-by: yangzhong <[email protected]>
---
 datafusion/core/src/physical_plan/file_format/avro.rs    | 16 ++++++++++++++--
 datafusion/core/src/physical_plan/file_format/csv.rs     | 12 +++++++++++-
 .../core/src/physical_plan/file_format/file_stream.rs    | 16 ++++++++++++++--
 datafusion/core/src/physical_plan/file_format/json.rs    | 12 +++++++++++-
 datafusion/core/src/physical_plan/file_format/parquet.rs | 10 ++++++++--
 5 files changed, 58 insertions(+), 8 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs 
b/datafusion/core/src/physical_plan/file_format/avro.rs
index dc7a72691..f480b9d54 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -24,6 +24,9 @@ use crate::physical_plan::{
 use arrow::datatypes::SchemaRef;
 
 use crate::execution::context::TaskContext;
+#[cfg(feature = "avro")]
+use crate::physical_plan::metrics::BaselineMetrics;
+use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
 use std::any::Any;
 use std::sync::Arc;
 
@@ -31,10 +34,13 @@ use super::FileScanConfig;
 
 /// Execution plan for scanning Avro data source
 #[derive(Debug, Clone)]
+#[allow(dead_code)]
 pub struct AvroExec {
     base_config: FileScanConfig,
     projected_statistics: Statistics,
     projected_schema: SchemaRef,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
 }
 
 impl AvroExec {
@@ -46,6 +52,7 @@ impl AvroExec {
             base_config,
             projected_schema,
             projected_statistics,
+            metrics: ExecutionPlanMetricsSet::new(),
         }
     }
     /// Ref to the base configs
@@ -104,7 +111,6 @@ impl ExecutionPlan for AvroExec {
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         use super::file_stream::FileStream;
-
         let config = Arc::new(private::AvroConfig {
             schema: Arc::clone(&self.base_config.file_schema),
             batch_size: context.session_config().batch_size(),
@@ -112,7 +118,13 @@ impl ExecutionPlan for AvroExec {
         });
         let opener = private::AvroOpener { config };
 
-        let stream = FileStream::new(&self.base_config, partition, context, 
opener)?;
+        let stream = FileStream::new(
+            &self.base_config,
+            partition,
+            context,
+            opener,
+            BaselineMetrics::new(&self.metrics, partition),
+        )?;
         Ok(Box::pin(stream))
     }
 
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs 
b/datafusion/core/src/physical_plan/file_format/csv.rs
index 9ae634c0c..975c3ae5f 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -29,6 +29,7 @@ use 
crate::physical_plan::file_format::delimited_stream::newline_delimited_strea
 use crate::physical_plan::file_format::file_stream::{
     FileStream, FormatReader, ReaderFuture,
 };
+use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
 use arrow::csv;
 use arrow::datatypes::SchemaRef;
 use bytes::Buf;
@@ -50,6 +51,8 @@ pub struct CsvExec {
     projected_schema: SchemaRef,
     has_header: bool,
     delimiter: u8,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
 }
 
 impl CsvExec {
@@ -63,6 +66,7 @@ impl CsvExec {
             projected_statistics,
             has_header,
             delimiter,
+            metrics: ExecutionPlanMetricsSet::new(),
         }
     }
 
@@ -130,7 +134,13 @@ impl ExecutionPlan for CsvExec {
         });
 
         let opener = CsvOpener { config };
-        let stream = FileStream::new(&self.base_config, partition, context, 
opener)?;
+        let stream = FileStream::new(
+            &self.base_config,
+            partition,
+            context,
+            opener,
+            BaselineMetrics::new(&self.metrics, partition),
+        )?;
         Ok(Box::pin(stream) as SendableRecordBatchStream)
     }
 
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs 
b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index ca57028ab..c3ac064e2 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -39,6 +39,7 @@ use crate::datasource::listing::{FileRange, PartitionedFile};
 use crate::error::Result;
 use crate::execution::context::TaskContext;
 use crate::physical_plan::file_format::{FileScanConfig, 
PartitionColumnProjector};
+use crate::physical_plan::metrics::BaselineMetrics;
 use crate::physical_plan::RecordBatchStream;
 
 /// A fallible future that resolves to a stream of [`RecordBatch`]
@@ -74,6 +75,8 @@ pub struct FileStream<F: FormatReader> {
     object_store: Arc<dyn ObjectStore>,
     /// The stream state
     state: FileStreamState,
+    /// runtime metrics recording
+    baseline_metrics: BaselineMetrics,
 }
 
 enum FileStreamState {
@@ -107,6 +110,7 @@ impl<F: FormatReader> FileStream<F> {
         partition: usize,
         context: Arc<TaskContext>,
         file_reader: F,
+        baseline_metrics: BaselineMetrics,
     ) -> Result<Self> {
         let (projected_schema, _) = config.project();
         let pc_projector = PartitionColumnProjector::new(
@@ -128,6 +132,7 @@ impl<F: FormatReader> FileStream<F> {
             pc_projector,
             object_store,
             state: FileStreamState::Idle,
+            baseline_metrics,
         })
     }
 
@@ -214,7 +219,11 @@ impl<F: FormatReader> Stream for FileStream<F> {
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        self.poll_inner(cx)
+        let cloned_time = self.baseline_metrics.elapsed_compute().clone();
+        let timer = cloned_time.timer();
+        let result = self.poll_inner(cx);
+        timer.done();
+        self.baseline_metrics.record_poll(result)
     }
 }
 
@@ -230,6 +239,7 @@ mod tests {
 
     use super::*;
     use crate::datasource::object_store::ObjectStoreUrl;
+    use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
     use crate::prelude::SessionContext;
     use crate::{
         error::Result,
@@ -276,7 +286,9 @@ mod tests {
             table_partition_cols: vec![],
         };
 
-        let file_stream = FileStream::new(&config, 0, ctx.task_ctx(), 
reader).unwrap();
+        let metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
+        let file_stream =
+            FileStream::new(&config, 0, ctx.task_ctx(), reader, 
metrics).unwrap();
 
         file_stream
             .map(|b| b.expect("No error expected in stream"))
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs 
b/datafusion/core/src/physical_plan/file_format/json.rs
index bdea9515e..bb2f09f42 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -27,6 +27,7 @@ use 
crate::physical_plan::file_format::delimited_stream::newline_delimited_strea
 use crate::physical_plan::file_format::file_stream::{
     FileStream, FormatReader, ReaderFuture,
 };
+use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
 use crate::physical_plan::{
     DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, 
Statistics,
 };
@@ -48,6 +49,8 @@ pub struct NdJsonExec {
     base_config: FileScanConfig,
     projected_statistics: Statistics,
     projected_schema: SchemaRef,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
 }
 
 impl NdJsonExec {
@@ -59,6 +62,7 @@ impl NdJsonExec {
             base_config,
             projected_schema,
             projected_statistics,
+            metrics: ExecutionPlanMetricsSet::new(),
         }
     }
 }
@@ -117,7 +121,13 @@ impl ExecutionPlan for NdJsonExec {
             options,
         };
 
-        let stream = FileStream::new(&self.base_config, partition, context, 
opener)?;
+        let stream = FileStream::new(
+            &self.base_config,
+            partition,
+            context,
+            opener,
+            BaselineMetrics::new(&self.metrics, partition),
+        )?;
 
         Ok(Box::pin(stream) as SendableRecordBatchStream)
     }
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs 
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index b2239876f..40f2e3304 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -51,6 +51,7 @@ use crate::datasource::listing::FileRange;
 use crate::physical_plan::file_format::file_stream::{
     FileStream, FormatReader, ReaderFuture,
 };
+use crate::physical_plan::metrics::BaselineMetrics;
 use crate::{
     error::{DataFusionError, Result},
     execution::context::{SessionState, TaskContext},
@@ -223,8 +224,13 @@ impl ExecutionPlan for ParquetExec {
             metrics: self.metrics.clone(),
         };
 
-        let stream =
-            FileStream::new(&self.base_config, partition_index, context, 
opener)?;
+        let stream = FileStream::new(
+            &self.base_config,
+            partition_index,
+            context,
+            opener,
+            BaselineMetrics::new(&self.metrics, partition_index),
+        )?;
 
         Ok(Box::pin(stream))
     }

Reply via email to