This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 16f89346f Add baseline_metrics for FileStream to record metrics like
elapsed time, record output, etc (#63) (#2965)
16f89346f is described below
commit 16f89346fc4bdc9e99b6c96cb54e8a2424fda091
Author: Yang Jiang <[email protected]>
AuthorDate: Thu Jul 28 01:52:33 2022 +0800
Add baseline_metrics for FileStream to record metrics like elapsed time,
record output, etc (#63) (#2965)
Co-authored-by: yangzhong <[email protected]>
---
datafusion/core/src/physical_plan/file_format/avro.rs | 16 ++++++++++++++--
datafusion/core/src/physical_plan/file_format/csv.rs | 12 +++++++++++-
.../core/src/physical_plan/file_format/file_stream.rs | 16 ++++++++++++++--
datafusion/core/src/physical_plan/file_format/json.rs | 12 +++++++++++-
datafusion/core/src/physical_plan/file_format/parquet.rs | 10 ++++++++--
5 files changed, 58 insertions(+), 8 deletions(-)
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs
b/datafusion/core/src/physical_plan/file_format/avro.rs
index dc7a72691..f480b9d54 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -24,6 +24,9 @@ use crate::physical_plan::{
use arrow::datatypes::SchemaRef;
use crate::execution::context::TaskContext;
+#[cfg(feature = "avro")]
+use crate::physical_plan::metrics::BaselineMetrics;
+use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use std::any::Any;
use std::sync::Arc;
@@ -31,10 +34,13 @@ use super::FileScanConfig;
/// Execution plan for scanning Avro data source
#[derive(Debug, Clone)]
+#[allow(dead_code)]
pub struct AvroExec {
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_schema: SchemaRef,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
}
impl AvroExec {
@@ -46,6 +52,7 @@ impl AvroExec {
base_config,
projected_schema,
projected_statistics,
+ metrics: ExecutionPlanMetricsSet::new(),
}
}
/// Ref to the base configs
@@ -104,7 +111,6 @@ impl ExecutionPlan for AvroExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
use super::file_stream::FileStream;
-
let config = Arc::new(private::AvroConfig {
schema: Arc::clone(&self.base_config.file_schema),
batch_size: context.session_config().batch_size(),
@@ -112,7 +118,13 @@ impl ExecutionPlan for AvroExec {
});
let opener = private::AvroOpener { config };
- let stream = FileStream::new(&self.base_config, partition, context,
opener)?;
+ let stream = FileStream::new(
+ &self.base_config,
+ partition,
+ context,
+ opener,
+ BaselineMetrics::new(&self.metrics, partition),
+ )?;
Ok(Box::pin(stream))
}
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs
b/datafusion/core/src/physical_plan/file_format/csv.rs
index 9ae634c0c..975c3ae5f 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -29,6 +29,7 @@ use
crate::physical_plan::file_format::delimited_stream::newline_delimited_strea
use crate::physical_plan::file_format::file_stream::{
FileStream, FormatReader, ReaderFuture,
};
+use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use arrow::csv;
use arrow::datatypes::SchemaRef;
use bytes::Buf;
@@ -50,6 +51,8 @@ pub struct CsvExec {
projected_schema: SchemaRef,
has_header: bool,
delimiter: u8,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
}
impl CsvExec {
@@ -63,6 +66,7 @@ impl CsvExec {
projected_statistics,
has_header,
delimiter,
+ metrics: ExecutionPlanMetricsSet::new(),
}
}
@@ -130,7 +134,13 @@ impl ExecutionPlan for CsvExec {
});
let opener = CsvOpener { config };
- let stream = FileStream::new(&self.base_config, partition, context,
opener)?;
+ let stream = FileStream::new(
+ &self.base_config,
+ partition,
+ context,
+ opener,
+ BaselineMetrics::new(&self.metrics, partition),
+ )?;
Ok(Box::pin(stream) as SendableRecordBatchStream)
}
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs
b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index ca57028ab..c3ac064e2 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -39,6 +39,7 @@ use crate::datasource::listing::{FileRange, PartitionedFile};
use crate::error::Result;
use crate::execution::context::TaskContext;
use crate::physical_plan::file_format::{FileScanConfig,
PartitionColumnProjector};
+use crate::physical_plan::metrics::BaselineMetrics;
use crate::physical_plan::RecordBatchStream;
/// A fallible future that resolves to a stream of [`RecordBatch`]
@@ -74,6 +75,8 @@ pub struct FileStream<F: FormatReader> {
object_store: Arc<dyn ObjectStore>,
/// The stream state
state: FileStreamState,
+ /// runtime metrics recording
+ baseline_metrics: BaselineMetrics,
}
enum FileStreamState {
@@ -107,6 +110,7 @@ impl<F: FormatReader> FileStream<F> {
partition: usize,
context: Arc<TaskContext>,
file_reader: F,
+ baseline_metrics: BaselineMetrics,
) -> Result<Self> {
let (projected_schema, _) = config.project();
let pc_projector = PartitionColumnProjector::new(
@@ -128,6 +132,7 @@ impl<F: FormatReader> FileStream<F> {
pc_projector,
object_store,
state: FileStreamState::Idle,
+ baseline_metrics,
})
}
@@ -214,7 +219,11 @@ impl<F: FormatReader> Stream for FileStream<F> {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- self.poll_inner(cx)
+ let cloned_time = self.baseline_metrics.elapsed_compute().clone();
+ let timer = cloned_time.timer();
+ let result = self.poll_inner(cx);
+ timer.done();
+ self.baseline_metrics.record_poll(result)
}
}
@@ -230,6 +239,7 @@ mod tests {
use super::*;
use crate::datasource::object_store::ObjectStoreUrl;
+ use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use crate::prelude::SessionContext;
use crate::{
error::Result,
@@ -276,7 +286,9 @@ mod tests {
table_partition_cols: vec![],
};
- let file_stream = FileStream::new(&config, 0, ctx.task_ctx(),
reader).unwrap();
+ let metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
+ let file_stream =
+ FileStream::new(&config, 0, ctx.task_ctx(), reader,
metrics).unwrap();
file_stream
.map(|b| b.expect("No error expected in stream"))
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs
b/datafusion/core/src/physical_plan/file_format/json.rs
index bdea9515e..bb2f09f42 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -27,6 +27,7 @@ use
crate::physical_plan::file_format::delimited_stream::newline_delimited_strea
use crate::physical_plan::file_format::file_stream::{
FileStream, FormatReader, ReaderFuture,
};
+use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
};
@@ -48,6 +49,8 @@ pub struct NdJsonExec {
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_schema: SchemaRef,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
}
impl NdJsonExec {
@@ -59,6 +62,7 @@ impl NdJsonExec {
base_config,
projected_schema,
projected_statistics,
+ metrics: ExecutionPlanMetricsSet::new(),
}
}
}
@@ -117,7 +121,13 @@ impl ExecutionPlan for NdJsonExec {
options,
};
- let stream = FileStream::new(&self.base_config, partition, context,
opener)?;
+ let stream = FileStream::new(
+ &self.base_config,
+ partition,
+ context,
+ opener,
+ BaselineMetrics::new(&self.metrics, partition),
+ )?;
Ok(Box::pin(stream) as SendableRecordBatchStream)
}
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index b2239876f..40f2e3304 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -51,6 +51,7 @@ use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::file_stream::{
FileStream, FormatReader, ReaderFuture,
};
+use crate::physical_plan::metrics::BaselineMetrics;
use crate::{
error::{DataFusionError, Result},
execution::context::{SessionState, TaskContext},
@@ -223,8 +224,13 @@ impl ExecutionPlan for ParquetExec {
metrics: self.metrics.clone(),
};
- let stream =
- FileStream::new(&self.base_config, partition_index, context,
opener)?;
+ let stream = FileStream::new(
+ &self.base_config,
+ partition_index,
+ context,
+ opener,
+ BaselineMetrics::new(&self.metrics, partition_index),
+ )?;
Ok(Box::pin(stream))
}