This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 028e351595 Add files_processed and files_scanned metrics to 
FileStreamMetrics (#20592)
028e351595 is described below

commit 028e351595654a67e3a70097cf4131cbbcb9fa8e
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Wed Mar 4 18:53:32 2026 +0100

    Add files_processed and files_scanned metrics to FileStreamMetrics (#20592)
    
    ## Summary
    - Add `files_processed` counter to `FileStreamMetrics`, incremented for
    every file assigned to the partition — whether it was opened, pruned
    (returned an empty stream), or skipped due to a LIMIT. When the stream
    completes, this equals the total number of files in the partition.
    - Add `files_opened` counter to `FileStreamMetrics`, incremented as soon
    as we consider a file for processing (either actually opened, discarded
    because of a LIMIT or stats, etc.).
    
    ## Motivation
    
    These metrics enable **tracking query progress** during long-running
    scans. Today, there is no way to monitor how far along a file scan is.
    The existing `FileStreamMetrics` only provide:
    
    - **Timing metrics** (`time_elapsed_opening`,
    `time_elapsed_scanning_total`, etc.) — these measure duration but don't
    indicate progress. You can't tell whether a scan is 10% or 90% done from
    elapsed time alone.
    - **Error counters** (`file_open_errors`, `file_scan_errors`) — these
    only count failures, not successful progress.
    - **`output_rows`** or **`bytes_scanned`** (from `BaselineMetrics`) —
    counts rows emitted, but since we don't know upfront how many rows will
    be emitted in total this is a poor metric, i.e. it never converges to
    100% if there are filters, etc.
    
    In contrast, `files_processed` and `files_opened` combined with the
    known number of files in `file_groups` give a clear progress indicator:
    `files_processed / total_files`. This is the most natural and reliable
    way to track scan progress since the file count is known at plan time.
    Depending on what users plan to do with the metric they can pick
    `files_opened / total_files` (leading metric) or `files_processed /
    total_files` (lagging metric).
    
    ## Test plan
    - [x] Existing `file_stream` tests pass (8/8)
    - [x] `cargo check -p datafusion-datasource` compiles cleanly
    
    🤖 Generated with [Claude Code](https://claude.com/claude-code)
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 datafusion/datasource/src/file_stream.rs | 70 ++++++++++++++++++++++++--------
 1 file changed, 54 insertions(+), 16 deletions(-)

diff --git a/datafusion/datasource/src/file_stream.rs 
b/datafusion/datasource/src/file_stream.rs
index c809038209..514a7e0a0b 100644
--- a/datafusion/datasource/src/file_stream.rs
+++ b/datafusion/datasource/src/file_stream.rs
@@ -124,6 +124,7 @@ impl FileStream {
                 }
                 FileStreamState::Open { future } => match 
ready!(future.poll_unpin(cx)) {
                     Ok(reader) => {
+                        self.file_stream_metrics.files_opened.add(1);
                         // include time needed to start opening in 
`start_next_file`
                         self.file_stream_metrics.time_opening.stop();
                         let next = self.start_next_file().transpose();
@@ -150,6 +151,7 @@ impl FileStream {
                         self.file_stream_metrics.file_open_errors.add(1);
                         match self.on_error {
                             OnError::Skip => {
+                                
self.file_stream_metrics.files_processed.add(1);
                                 self.file_stream_metrics.time_opening.stop();
                                 self.state = FileStreamState::Idle
                             }
@@ -179,6 +181,15 @@ impl FileStream {
                                         batch
                                     } else {
                                         let batch = batch.slice(0, *remain);
+                                        // Count this file, the prefetched 
next file
+                                        // (if any), and all remaining files 
we will
+                                        // never open.
+                                        let done = 1
+                                            + self.file_iter.len()
+                                            + usize::from(next.is_some());
+                                        self.file_stream_metrics
+                                            .files_processed
+                                            .add(done);
                                         self.state = FileStreamState::Limit;
                                         *remain = 0;
                                         batch
@@ -196,26 +207,29 @@ impl FileStream {
 
                             match self.on_error {
                                 // If `OnError::Skip` we skip the file as soon 
as we hit the first error
-                                OnError::Skip => match mem::take(next) {
-                                    Some(future) => {
-                                        
self.file_stream_metrics.time_opening.start();
-
-                                        match future {
-                                            NextOpen::Pending(future) => {
-                                                self.state =
-                                                    FileStreamState::Open { 
future }
-                                            }
-                                            NextOpen::Ready(reader) => {
-                                                self.state = 
FileStreamState::Open {
-                                                    future: 
Box::pin(std::future::ready(
-                                                        reader,
-                                                    )),
+                                OnError::Skip => {
+                                    
self.file_stream_metrics.files_processed.add(1);
+                                    match mem::take(next) {
+                                        Some(future) => {
+                                            
self.file_stream_metrics.time_opening.start();
+
+                                            match future {
+                                                NextOpen::Pending(future) => {
+                                                    self.state =
+                                                        FileStreamState::Open 
{ future }
+                                                }
+                                                NextOpen::Ready(reader) => {
+                                                    self.state = 
FileStreamState::Open {
+                                                        future: Box::pin(
+                                                            
std::future::ready(reader),
+                                                        ),
+                                                    }
                                                 }
                                             }
                                         }
+                                        None => return Poll::Ready(None),
                                     }
-                                    None => return Poll::Ready(None),
-                                },
+                                }
                                 OnError::Fail => {
                                     self.state = FileStreamState::Error;
                                     return Poll::Ready(Some(Err(err)));
@@ -223,6 +237,7 @@ impl FileStream {
                             }
                         }
                         None => {
+                            self.file_stream_metrics.files_processed.add(1);
                             
self.file_stream_metrics.time_scanning_until_data.stop();
                             
self.file_stream_metrics.time_scanning_total.stop();
 
@@ -399,6 +414,22 @@ pub struct FileStreamMetrics {
     /// If using `OnError::Skip` this will provide a count of the number of 
files
     /// which were skipped and will not be included in the scan results.
     pub file_scan_errors: Count,
+    /// Count of files successfully opened or evaluated for processing.
+    /// At t=end (completion of a query) this is equal to `files_opened`, and 
both values are equal
+    /// to the total number of files in the query; unless the query itself 
fails.
+    /// This value will always be greater than or equal to `files_open`.
+    /// Note that this value does *not* mean the file was actually scanned.
+    /// We increment this value for any processing of a file, even if that 
processing is
+    /// discarding it because we hit a `LIMIT` (in this case `files_opened` 
and `files_processed` are both incremented at the same time).
+    pub files_opened: Count,
+    /// Count of files completely processed / closed (opened, pruned, or 
skipped due to limit).
+    /// At t=0 (the beginning of a query) this is 0.
+    /// At t=end (completion of a query) this is equal to `files_opened`, and 
both values are equal
+    /// to the total number of files in the query; unless the query itself 
fails.
+    /// This value will always be less than or equal to `files_open`.
+    /// We increment this value for any processing of a file, even if that 
processing is
+    /// discarding it because we hit a `LIMIT` (in this case `files_opened` 
and `files_processed` are both incremented at the same time).
+    pub files_processed: Count,
 }
 
 impl FileStreamMetrics {
@@ -433,6 +464,11 @@ impl FileStreamMetrics {
         let file_scan_errors =
             MetricBuilder::new(metrics).counter("file_scan_errors", partition);
 
+        let files_opened = MetricBuilder::new(metrics).counter("files_opened", 
partition);
+
+        let files_processed =
+            MetricBuilder::new(metrics).counter("files_processed", partition);
+
         Self {
             time_opening,
             time_scanning_until_data,
@@ -440,6 +476,8 @@ impl FileStreamMetrics {
             time_processing,
             file_open_errors,
             file_scan_errors,
+            files_opened,
+            files_processed,
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to