alamb commented on code in PR #3070:
URL: https://github.com/apache/arrow-datafusion/pull/3070#discussion_r940203006


##########
datafusion/core/src/physical_plan/file_format/file_stream.rs:
##########
@@ -164,6 +206,12 @@ impl<F: FileOpener> FileStream<F> {
                     partition_values,
                 } => match ready!(future.poll_unpin(cx)) {
                     Ok(reader) => {
+                        if let Some(instant) =

Review Comment:
   I was wondering if it would be possible to use a `ScopedTimerGuard` rather 
than recording the starting instant yousefl
   
   
https://docs.rs/datafusion/10.0.0/datafusion/physical_plan/metrics/struct.ScopedTimerGuard.html



##########
datafusion/core/src/physical_plan/file_format/file_stream.rs:
##########
@@ -104,13 +109,48 @@ enum FileStreamState {
     Limit,
 }
 
+struct FileStreamMetrics {
+    /// Time elapsed for file opening

Review Comment:
   I recommend also documenting what the `Option<Instant>` is
   
   



##########
datafusion/core/src/physical_plan/file_format/file_stream.rs:
##########
@@ -104,13 +109,48 @@ enum FileStreamState {
     Limit,
 }
 
+struct FileStreamMetrics {
+    /// Time elapsed for file opening
+    pub time_opening: (metrics::Time, Option<Instant>),
+    /// Time elapsed for file scanning + first record batch of decompression + 
decoding
+    pub time_scanning: (metrics::Time, Option<Instant>),
+    /// Time elapsed for data decompression + decoding
+    pub time_processing: (metrics::Time, Option<Instant>),
+}
+
+impl FileStreamMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        let time_opening = (
+            MetricBuilder::new(metrics).subset_time("time_elapsed_opening", 
partition),
+            None,
+        );
+
+        let time_scanning = (
+            MetricBuilder::new(metrics)
+                .subset_time("time_elapsed_scanning_processing", partition),

Review Comment:
   I recommend keeping this name consistent with the field 
(`FileStreamMetrics::time_scanning`) -- if the `processing` is important here, 
then perhaps we can rename the field
   
   ```suggestion
                   .subset_time("time_elapsed_scanning", partition),
   ```



##########
datafusion/core/src/physical_plan/file_format/file_stream.rs:
##########
@@ -179,6 +227,14 @@ impl<F: FileOpener> FileStream<F> {
                     partition_values,
                 } => match ready!(reader.poll_next_unpin(cx)) {
                     Some(result) => {
+                        if let Some(instant) =
+                            self.file_stream_metrics.time_scanning.1.take()
+                        {
+                            self.file_stream_metrics
+                                .time_scanning
+                                .0
+                                .add_elapsed(instant);
+                        }

Review Comment:
   I wonder does it matter if we update these metrics on plan failure?  As in 
if the stream is never ready the 'file opening time' will never get updated. 
Maybe that is ok 🤔 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to