This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new e676f3c11 [BugFix] fix file stream time scanning metrics bug (#5020)
e676f3c11 is described below
commit e676f3c114ce00972b4bfb68c4e0a87e500a2286
Author: xyz <[email protected]>
AuthorDate: Sat Jan 28 21:26:24 2023 +0800
[BugFix] fix file stream time scanning metrics bug (#5020)
currently, file stream time scanning will be 'start()' only once,
and may be 'stop()' many times. After the first calling to 'stop()',
the self.start value of time scanning will be replaced to default value by
'take()'.
The subsequent calling to 'stop()' make no sense.
In this pr, we will 'start()' time scanning again, if the scan of current
batch is success.
Signed-off-by: xyz <[email protected]>
---
.../src/physical_plan/file_format/file_stream.rs | 28 +++++++++++++++-------
1 file changed, 20 insertions(+), 8 deletions(-)
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs
b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index 265ff7a4f..b336b19ef 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -127,7 +127,9 @@ struct FileStreamMetrics {
/// Time elapsed for file opening
pub time_opening: StartableTime,
/// Time elapsed for file scanning + first record batch of decompression +
decoding
- pub time_scanning: StartableTime,
+ pub time_scanning_until_data: StartableTime,
+ /// Total elapsed time for for scanning + record batch decompression /
decoding
+ pub time_scanning_total: StartableTime,
/// Time elapsed for data decompression + decoding
pub time_processing: StartableTime,
}
@@ -140,9 +142,15 @@ impl FileStreamMetrics {
start: None,
};
- let time_scanning = StartableTime {
+ let time_scanning_until_data = StartableTime {
metrics: MetricBuilder::new(metrics)
- .subset_time("time_elapsed_scanning", partition),
+ .subset_time("time_elapsed_scanning_until_data", partition),
+ start: None,
+ };
+
+ let time_scanning_total = StartableTime {
+ metrics: MetricBuilder::new(metrics)
+ .subset_time("time_elapsed_scanning_total", partition),
start: None,
};
@@ -154,7 +162,8 @@ impl FileStreamMetrics {
Self {
time_opening,
- time_scanning,
+ time_scanning_until_data,
+ time_scanning_total,
time_processing,
}
}
@@ -231,7 +240,8 @@ impl<F: FileOpener> FileStream<F> {
} => match ready!(future.poll_unpin(cx)) {
Ok(reader) => {
self.file_stream_metrics.time_opening.stop();
- self.file_stream_metrics.time_scanning.start();
+
self.file_stream_metrics.time_scanning_until_data.start();
+ self.file_stream_metrics.time_scanning_total.start();
self.state = FileStreamState::Scan {
partition_values: std::mem::take(partition_values),
reader,
@@ -247,7 +257,8 @@ impl<F: FileOpener> FileStream<F> {
partition_values,
} => match ready!(reader.poll_next_unpin(cx)) {
Some(result) => {
- self.file_stream_metrics.time_scanning.stop();
+
self.file_stream_metrics.time_scanning_until_data.stop();
+ self.file_stream_metrics.time_scanning_total.stop();
let result = result
.and_then(|b| self.pc_projector.project(b,
partition_values))
.map(|batch| match &mut self.remain {
@@ -268,11 +279,12 @@ impl<F: FileOpener> FileStream<F> {
if result.is_err() {
self.state = FileStreamState::Error
}
-
+ self.file_stream_metrics.time_scanning_total.start();
return Poll::Ready(Some(result));
}
None => {
- self.file_stream_metrics.time_scanning.stop();
+
self.file_stream_metrics.time_scanning_until_data.stop();
+ self.file_stream_metrics.time_scanning_total.stop();
self.state = FileStreamState::Idle;
}
},