This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 2e28f8953 perf(reader): Pass data file size and delete file size to 
reader to avoid `stat()` calls (#2175)
2e28f8953 is described below

commit 2e28f895326fa5218ae256c6bb1a9ce9cddf1eb2
Author: Matt Butrovich <[email protected]>
AuthorDate: Wed Feb 25 19:50:36 2026 -0500

    perf(reader): Pass data file size and delete file size to reader to avoid 
`stat()` calls (#2175)
    
    ## Which issue does this PR close?
    
    
    - Partially address #2172.
    - In a workload I tested, calling `stat()` is taking over 17% of the CPU
    time in `create_record_batch_stream_builder`:
    <img width="1929" height="492" alt="Screenshot 2026-02-24 at 2 03 07 PM"
    
src="https://github.com/user-attachments/assets/b8a49db6-9d9b-45ac-863b-e9effa12bc71";
    />
    
    ## What changes are included in this PR?
    
    <!--
    Provide a summary of the modifications in this PR. List the main changes
    such as new features, bug fixes, refactoring, or any other updates.
    -->
    
    - Pass through data file size to `FileScanTask`. Iceberg Java does this
    by wrapping a reference to a `DataFile` in its `FileScanTask`. In this
    case we're just cherry-picking some fields until we decide we need more.
    - Remove redundant data file creation code in tests.
    
    ## Are these changes tested?
    
    <!--
    Specify what test covers (unit test, integration test, etc.).
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    Existing tests.
---
 .../src/arrow/caching_delete_file_loader.rs        |  21 +++-
 crates/iceberg/src/arrow/delete_file_loader.rs     |   6 +-
 crates/iceberg/src/arrow/delete_filter.rs          |  22 ++++
 crates/iceberg/src/arrow/reader.rs                 |  68 ++++++++--
 crates/iceberg/src/scan/context.rs                 |   1 +
 crates/iceberg/src/scan/mod.rs                     | 137 ++++-----------------
 crates/iceberg/src/scan/task.rs                    |   7 ++
 7 files changed, 135 insertions(+), 127 deletions(-)

diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs 
b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
index 5d0b1da71..dfee7ed87 100644
--- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
@@ -235,7 +235,7 @@ impl CachingDeleteFileLoader {
                     PosDelLoadAction::Load => Ok(DeleteFileContext::PosDels {
                         file_path: task.file_path.clone(),
                         stream: basic_delete_file_loader
-                            .parquet_to_batch_stream(&task.file_path)
+                            .parquet_to_batch_stream(&task.file_path, 
task.file_size_in_bytes)
                             .await?,
                     }),
                 }
@@ -254,7 +254,7 @@ impl CachingDeleteFileLoader {
                 let equality_ids_vec = task.equality_ids.clone().unwrap();
                 let evolved_stream = BasicDeleteFileLoader::evolve_schema(
                     basic_delete_file_loader
-                        .parquet_to_batch_stream(&task.file_path)
+                        .parquet_to_batch_stream(&task.file_path, 
task.file_size_in_bytes)
                         .await?,
                     schema,
                     &equality_ids_vec,
@@ -614,7 +614,10 @@ mod tests {
 
         let basic_delete_file_loader = 
BasicDeleteFileLoader::new(file_io.clone());
         let record_batch_stream = basic_delete_file_loader
-            .parquet_to_batch_stream(&eq_delete_file_path)
+            .parquet_to_batch_stream(
+                &eq_delete_file_path,
+                std::fs::metadata(&eq_delete_file_path).unwrap().len(),
+            )
             .await
             .expect("could not get batch stream");
 
@@ -811,7 +814,10 @@ mod tests {
         let basic_delete_file_loader = 
BasicDeleteFileLoader::new(file_io.clone());
 
         let batch_stream = basic_delete_file_loader
-            .parquet_to_batch_stream(&delete_file_path)
+            .parquet_to_batch_stream(
+                &delete_file_path,
+                std::fs::metadata(&delete_file_path).unwrap().len(),
+            )
             .await
             .unwrap();
 
@@ -913,7 +919,8 @@ mod tests {
 
         // Create FileScanTask with BOTH positional and equality deletes
         let pos_del = FileScanTaskDeleteFile {
-            file_path: pos_del_path,
+            file_path: pos_del_path.clone(),
+            file_size_in_bytes: 
std::fs::metadata(&pos_del_path).unwrap().len(),
             file_type: DataContentType::PositionDeletes,
             partition_spec_id: 0,
             equality_ids: None,
@@ -921,12 +928,14 @@ mod tests {
 
         let eq_del = FileScanTaskDeleteFile {
             file_path: eq_delete_path.clone(),
+            file_size_in_bytes: 
std::fs::metadata(&eq_delete_path).unwrap().len(),
             file_type: DataContentType::EqualityDeletes,
             partition_spec_id: 0,
             equality_ids: Some(vec![2, 3]), // Only use field IDs that exist 
in both schemas
         };
 
         let file_scan_task = FileScanTask {
+            file_size_in_bytes: 0,
             start: 0,
             length: 0,
             record_count: None,
@@ -993,7 +1002,7 @@ mod tests {
 
         let basic_delete_file_loader = 
BasicDeleteFileLoader::new(file_io.clone());
         let record_batch_stream = basic_delete_file_loader
-            .parquet_to_batch_stream(&path)
+            .parquet_to_batch_stream(&path, 
std::fs::metadata(&path).unwrap().len())
             .await
             .expect("could not get batch stream");
 
diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs 
b/crates/iceberg/src/arrow/delete_file_loader.rs
index fa47076fe..33744d876 100644
--- a/crates/iceberg/src/arrow/delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/delete_file_loader.rs
@@ -54,6 +54,7 @@ impl BasicDeleteFileLoader {
     pub(crate) async fn parquet_to_batch_stream(
         &self,
         data_file_path: &str,
+        file_size_in_bytes: u64,
     ) -> Result<ArrowRecordBatchStream> {
         /*
            Essentially a super-cut-down ArrowReader. We can't use ArrowReader 
directly
@@ -65,6 +66,7 @@ impl BasicDeleteFileLoader {
             false,
             None,
             None,
+            file_size_in_bytes,
         )
         .await?
         .build()?
@@ -102,7 +104,9 @@ impl DeleteFileLoader for BasicDeleteFileLoader {
         task: &FileScanTaskDeleteFile,
         schema: SchemaRef,
     ) -> Result<ArrowRecordBatchStream> {
-        let raw_batch_stream = 
self.parquet_to_batch_stream(&task.file_path).await?;
+        let raw_batch_stream = self
+            .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes)
+            .await?;
 
         // For equality deletes, only evolve the equality_ids columns.
         // For positional deletes (equality_ids is None), use all field IDs.
diff --git a/crates/iceberg/src/arrow/delete_filter.rs 
b/crates/iceberg/src/arrow/delete_filter.rs
index 4af9f6b6f..3d074ca32 100644
--- a/crates/iceberg/src/arrow/delete_filter.rs
+++ b/crates/iceberg/src/arrow/delete_filter.rs
@@ -378,6 +378,12 @@ pub(crate) mod tests {
 
         let pos_del_1 = FileScanTaskDeleteFile {
             file_path: format!("{}/pos-del-1.parquet", 
table_location.to_str().unwrap()),
+            file_size_in_bytes: std::fs::metadata(format!(
+                "{}/pos-del-1.parquet",
+                table_location.to_str().unwrap()
+            ))
+            .unwrap()
+            .len(),
             file_type: DataContentType::PositionDeletes,
             partition_spec_id: 0,
             equality_ids: None,
@@ -385,6 +391,12 @@ pub(crate) mod tests {
 
         let pos_del_2 = FileScanTaskDeleteFile {
             file_path: format!("{}/pos-del-2.parquet", 
table_location.to_str().unwrap()),
+            file_size_in_bytes: std::fs::metadata(format!(
+                "{}/pos-del-2.parquet",
+                table_location.to_str().unwrap()
+            ))
+            .unwrap()
+            .len(),
             file_type: DataContentType::PositionDeletes,
             partition_spec_id: 0,
             equality_ids: None,
@@ -392,6 +404,12 @@ pub(crate) mod tests {
 
         let pos_del_3 = FileScanTaskDeleteFile {
             file_path: format!("{}/pos-del-3.parquet", 
table_location.to_str().unwrap()),
+            file_size_in_bytes: std::fs::metadata(format!(
+                "{}/pos-del-3.parquet",
+                table_location.to_str().unwrap()
+            ))
+            .unwrap()
+            .len(),
             file_type: DataContentType::PositionDeletes,
             partition_spec_id: 0,
             equality_ids: None,
@@ -399,6 +417,7 @@ pub(crate) mod tests {
 
         let file_scan_tasks = vec![
             FileScanTask {
+                file_size_in_bytes: 0,
                 start: 0,
                 length: 0,
                 record_count: None,
@@ -414,6 +433,7 @@ pub(crate) mod tests {
                 case_sensitive: false,
             },
             FileScanTask {
+                file_size_in_bytes: 0,
                 start: 0,
                 length: 0,
                 record_count: None,
@@ -464,6 +484,7 @@ pub(crate) mod tests {
 
         // ---------- fake FileScanTask ----------
         let task = FileScanTask {
+            file_size_in_bytes: 0,
             start: 0,
             length: 0,
             record_count: None,
@@ -474,6 +495,7 @@ pub(crate) mod tests {
             predicate: None,
             deletes: vec![FileScanTaskDeleteFile {
                 file_path: "eq-del.parquet".to_string(),
+                file_size_in_bytes: 1, // never read; this test fails before 
opening the file
                 file_type: DataContentType::EqualityDeletes,
                 partition_spec_id: 0,
                 equality_ids: None,
diff --git a/crates/iceberg/src/arrow/reader.rs 
b/crates/iceberg/src/arrow/reader.rs
index c4c2fa003..93dbdaa35 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -33,7 +33,7 @@ use arrow_string::like::starts_with;
 use bytes::Bytes;
 use fnv::FnvHashSet;
 use futures::future::BoxFuture;
-use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join};
+use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
 use parquet::arrow::arrow_reader::{
     ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
 };
@@ -237,6 +237,7 @@ impl ArrowReader {
             should_load_page_index,
             None,
             metadata_size_hint,
+            task.file_size_in_bytes,
         )
         .await?;
 
@@ -290,6 +291,7 @@ impl ArrowReader {
                 should_load_page_index,
                 Some(options),
                 metadata_size_hint,
+                task.file_size_in_bytes,
             )
             .await?
         } else {
@@ -494,16 +496,21 @@ impl ArrowReader {
         should_load_page_index: bool,
         arrow_reader_options: Option<ArrowReaderOptions>,
         metadata_size_hint: Option<usize>,
+        file_size_in_bytes: u64,
     ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader>> {
         // Get the metadata for the Parquet file we need to read and build
         // a reader for the data within
         let parquet_file = file_io.new_input(data_file_path)?;
-        let (parquet_metadata, parquet_reader) =
-            try_join!(parquet_file.metadata(), parquet_file.reader())?;
-        let mut parquet_file_reader = ArrowFileReader::new(parquet_metadata, 
parquet_reader)
-            .with_preload_column_index(true)
-            .with_preload_offset_index(true)
-            .with_preload_page_index(should_load_page_index);
+        let parquet_reader = parquet_file.reader().await?;
+        let mut parquet_file_reader = ArrowFileReader::new(
+            FileMetadata {
+                size: file_size_in_bytes,
+            },
+            parquet_reader,
+        )
+        .with_preload_column_index(true)
+        .with_preload_offset_index(true)
+        .with_preload_page_index(should_load_page_index);
 
         if let Some(hint) = metadata_size_hint {
             parquet_file_reader = 
parquet_file_reader.with_metadata_size_hint(hint);
@@ -2121,6 +2128,9 @@ message schema {
     ) -> Vec<Option<String>> {
         let tasks = Box::pin(futures::stream::iter(
             vec![Ok(FileScanTask {
+                file_size_in_bytes: 
std::fs::metadata(format!("{table_location}/1.parquet"))
+                    .unwrap()
+                    .len(),
                 start: 0,
                 length: 0,
                 record_count: None,
@@ -2443,6 +2453,7 @@ message schema {
 
         // Task 1: read only the first row group
         let task1 = FileScanTask {
+            file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(),
             start: rg0_start,
             length: row_group_0.compressed_size() as u64,
             record_count: Some(100),
@@ -2460,6 +2471,7 @@ message schema {
 
         // Task 2: read the second and third row groups
         let task2 = FileScanTask {
+            file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(),
             start: rg1_start,
             length: file_end - rg1_start,
             record_count: Some(200),
@@ -2588,6 +2600,9 @@ message schema {
         let reader = ArrowReaderBuilder::new(file_io).build();
         let tasks = Box::pin(futures::stream::iter(
             vec![Ok(FileScanTask {
+                file_size_in_bytes: 
std::fs::metadata(format!("{table_location}/old_file.parquet"))
+                    .unwrap()
+                    .len(),
                 start: 0,
                 length: 0,
                 record_count: None,
@@ -2755,6 +2770,7 @@ message schema {
         let reader = ArrowReaderBuilder::new(file_io).build();
 
         let task = FileScanTask {
+            file_size_in_bytes: 
std::fs::metadata(&data_file_path).unwrap().len(),
             start: 0,
             length: 0,
             record_count: Some(200),
@@ -2764,6 +2780,7 @@ message schema {
             project_field_ids: vec![1],
             predicate: None,
             deletes: vec![FileScanTaskDeleteFile {
+                file_size_in_bytes: 
std::fs::metadata(&delete_file_path).unwrap().len(),
                 file_path: delete_file_path,
                 file_type: DataContentType::PositionDeletes,
                 partition_spec_id: 0,
@@ -2973,6 +2990,7 @@ message schema {
 
         // Create FileScanTask that reads ONLY row group 1 via byte range 
filtering
         let task = FileScanTask {
+            file_size_in_bytes: 
std::fs::metadata(&data_file_path).unwrap().len(),
             start: rg1_start,
             length: rg1_length,
             record_count: Some(100), // Row group 1 has 100 rows
@@ -2982,6 +3000,7 @@ message schema {
             project_field_ids: vec![1],
             predicate: None,
             deletes: vec![FileScanTaskDeleteFile {
+                file_size_in_bytes: 
std::fs::metadata(&delete_file_path).unwrap().len(),
                 file_path: delete_file_path,
                 file_type: DataContentType::PositionDeletes,
                 partition_spec_id: 0,
@@ -3184,6 +3203,7 @@ message schema {
 
         // Create FileScanTask that reads ONLY row group 1 via byte range 
filtering
         let task = FileScanTask {
+            file_size_in_bytes: 
std::fs::metadata(&data_file_path).unwrap().len(),
             start: rg1_start,
             length: rg1_length,
             record_count: Some(100), // Row group 1 has 100 rows
@@ -3193,6 +3213,7 @@ message schema {
             project_field_ids: vec![1],
             predicate: None,
             deletes: vec![FileScanTaskDeleteFile {
+                file_size_in_bytes: 
std::fs::metadata(&delete_file_path).unwrap().len(),
                 file_path: delete_file_path,
                 file_type: DataContentType::PositionDeletes,
                 partition_spec_id: 0,
@@ -3293,6 +3314,9 @@ message schema {
 
         let tasks = Box::pin(futures::stream::iter(
             vec![Ok(FileScanTask {
+                file_size_in_bytes: 
std::fs::metadata(format!("{table_location}/1.parquet"))
+                    .unwrap()
+                    .len(),
                 start: 0,
                 length: 0,
                 record_count: None,
@@ -3391,6 +3415,9 @@ message schema {
 
         let tasks = Box::pin(futures::stream::iter(
             vec![Ok(FileScanTask {
+                file_size_in_bytes: 
std::fs::metadata(format!("{table_location}/1.parquet"))
+                    .unwrap()
+                    .len(),
                 start: 0,
                 length: 0,
                 record_count: None,
@@ -3478,6 +3505,9 @@ message schema {
 
         let tasks = Box::pin(futures::stream::iter(
             vec![Ok(FileScanTask {
+                file_size_in_bytes: 
std::fs::metadata(format!("{table_location}/1.parquet"))
+                    .unwrap()
+                    .len(),
                 start: 0,
                 length: 0,
                 record_count: None,
@@ -3579,6 +3609,9 @@ message schema {
 
         let tasks = Box::pin(futures::stream::iter(
             vec![Ok(FileScanTask {
+                file_size_in_bytes: 
std::fs::metadata(format!("{table_location}/1.parquet"))
+                    .unwrap()
+                    .len(),
                 start: 0,
                 length: 0,
                 record_count: None,
@@ -3709,6 +3742,9 @@ message schema {
 
         let tasks = Box::pin(futures::stream::iter(
             vec![Ok(FileScanTask {
+                file_size_in_bytes: 
std::fs::metadata(format!("{table_location}/1.parquet"))
+                    .unwrap()
+                    .len(),
                 start: 0,
                 length: 0,
                 record_count: None,
@@ -3806,6 +3842,9 @@ message schema {
 
         let tasks = Box::pin(futures::stream::iter(
             vec![Ok(FileScanTask {
+                file_size_in_bytes: 
std::fs::metadata(format!("{table_location}/1.parquet"))
+                    .unwrap()
+                    .len(),
                 start: 0,
                 length: 0,
                 record_count: None,
@@ -3916,6 +3955,9 @@ message schema {
 
         let tasks = Box::pin(futures::stream::iter(
             vec![Ok(FileScanTask {
+                file_size_in_bytes: 
std::fs::metadata(format!("{table_location}/1.parquet"))
+                    .unwrap()
+                    .len(),
                 start: 0,
                 length: 0,
                 record_count: None,
@@ -4007,6 +4049,9 @@ message schema {
         // Create tasks in a specific order: file_0, file_1, file_2
         let tasks = vec![
             Ok(FileScanTask {
+                file_size_in_bytes: 
std::fs::metadata(format!("{table_location}/file_0.parquet"))
+                    .unwrap()
+                    .len(),
                 start: 0,
                 length: 0,
                 record_count: None,
@@ -4022,6 +4067,9 @@ message schema {
                 case_sensitive: false,
             }),
             Ok(FileScanTask {
+                file_size_in_bytes: 
std::fs::metadata(format!("{table_location}/file_1.parquet"))
+                    .unwrap()
+                    .len(),
                 start: 0,
                 length: 0,
                 record_count: None,
@@ -4037,6 +4085,9 @@ message schema {
                 case_sensitive: false,
             }),
             Ok(FileScanTask {
+                file_size_in_bytes: 
std::fs::metadata(format!("{table_location}/file_2.parquet"))
+                    .unwrap()
+                    .len(),
                 start: 0,
                 length: 0,
                 record_count: None,
@@ -4216,6 +4267,9 @@ message schema {
         let reader = ArrowReaderBuilder::new(file_io).build();
         let tasks = Box::pin(futures::stream::iter(
             vec![Ok(FileScanTask {
+                file_size_in_bytes: 
std::fs::metadata(format!("{table_location}/data.parquet"))
+                    .unwrap()
+                    .len(),
                 start: 0,
                 length: 0,
                 record_count: None,
diff --git a/crates/iceberg/src/scan/context.rs 
b/crates/iceberg/src/scan/context.rs
index 169d8e640..aa28ffd5a 100644
--- a/crates/iceberg/src/scan/context.rs
+++ b/crates/iceberg/src/scan/context.rs
@@ -117,6 +117,7 @@ impl ManifestEntryContext {
             .await;
 
         Ok(FileScanTask {
+            file_size_in_bytes: self.manifest_entry.file_size_in_bytes(),
             start: 0,
             length: self.manifest_entry.file_size_in_bytes(),
             record_count: Some(self.manifest_entry.record_count()),
diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs
index c055c12c9..2a685cd17 100644
--- a/crates/iceberg/src/scan/mod.rs
+++ b/crates/iceberg/src/scan/mod.rs
@@ -756,7 +756,9 @@ pub mod tests {
             let current_schema = 
current_snapshot.schema(self.table.metadata()).unwrap();
             let current_partition_spec = 
self.table.metadata().default_partition_spec();
 
-            // Write data files
+            // Write the data files first, then use the file size in the 
manifest entries
+            let parquet_file_size = self.write_parquet_data_files();
+
             let mut writer = ManifestWriterBuilder::new(
                 self.next_manifest_file(),
                 Some(current_snapshot.snapshot_id()),
@@ -775,7 +777,7 @@ pub mod tests {
                                 .content(DataContentType::Data)
                                 .file_path(format!("{}/1.parquet", 
&self.table_location))
                                 .file_format(DataFileFormat::Parquet)
-                                .file_size_in_bytes(100)
+                                .file_size_in_bytes(parquet_file_size)
                                 .record_count(1)
                                 
.partition(Struct::from_iter([Some(Literal::long(100))]))
                                 .key_metadata(None)
@@ -798,7 +800,7 @@ pub mod tests {
                                 .content(DataContentType::Data)
                                 .file_path(format!("{}/2.parquet", 
&self.table_location))
                                 .file_format(DataFileFormat::Parquet)
-                                .file_size_in_bytes(100)
+                                .file_size_in_bytes(parquet_file_size)
                                 .record_count(1)
                                 
.partition(Struct::from_iter([Some(Literal::long(200))]))
                                 .build()
@@ -820,7 +822,7 @@ pub mod tests {
                                 .content(DataContentType::Data)
                                 .file_path(format!("{}/3.parquet", 
&self.table_location))
                                 .file_format(DataFileFormat::Parquet)
-                                .file_size_in_bytes(100)
+                                .file_size_in_bytes(parquet_file_size)
                                 .record_count(1)
                                 
.partition(Struct::from_iter([Some(Literal::long(300))]))
                                 .build()
@@ -845,8 +847,13 @@ pub mod tests {
                 .add_manifests(vec![data_file_manifest].into_iter())
                 .unwrap();
             manifest_list_write.close().await.unwrap();
+        }
+
+        /// Writes identical Parquet data files (1.parquet, 2.parquet, 
3.parquet)
+        /// and returns the file size in bytes.
+        fn write_parquet_data_files(&self) -> u64 {
+            std::fs::create_dir_all(&self.table_location).unwrap();
 
-            // prepare data
             let schema = {
                 let fields = vec![
                     arrow_schema::Field::new("x", 
arrow_schema::DataType::Int64, false)
@@ -958,6 +965,10 @@ pub mod tests {
                 // writer must be closed to write footer
                 writer.close().unwrap();
             }
+
+            std::fs::metadata(format!("{}/1.parquet", &self.table_location))
+                .unwrap()
+                .len()
         }
 
         pub async fn setup_unpartitioned_manifest_files(&mut self) {
@@ -968,6 +979,9 @@ pub mod tests {
             let current_schema = 
current_snapshot.schema(self.table.metadata()).unwrap();
             let current_partition_spec = 
Arc::new(PartitionSpec::unpartition_spec());
 
+            // Write the data files first, then use the file size in the 
manifest entries
+            let parquet_file_size = self.write_parquet_data_files();
+
             // Write data files using an empty partition for unpartitioned 
tables.
             let mut writer = ManifestWriterBuilder::new(
                 self.next_manifest_file(),
@@ -991,7 +1005,7 @@ pub mod tests {
                                 .content(DataContentType::Data)
                                 .file_path(format!("{}/1.parquet", 
&self.table_location))
                                 .file_format(DataFileFormat::Parquet)
-                                .file_size_in_bytes(100)
+                                .file_size_in_bytes(parquet_file_size)
                                 .record_count(1)
                                 .partition(empty_partition.clone())
                                 .key_metadata(None)
@@ -1015,7 +1029,7 @@ pub mod tests {
                                 .content(DataContentType::Data)
                                 .file_path(format!("{}/2.parquet", 
&self.table_location))
                                 .file_format(DataFileFormat::Parquet)
-                                .file_size_in_bytes(100)
+                                .file_size_in_bytes(parquet_file_size)
                                 .record_count(1)
                                 .partition(empty_partition.clone())
                                 .build()
@@ -1038,7 +1052,7 @@ pub mod tests {
                                 .content(DataContentType::Data)
                                 .file_path(format!("{}/3.parquet", 
&self.table_location))
                                 .file_format(DataFileFormat::Parquet)
-                                .file_size_in_bytes(100)
+                                .file_size_in_bytes(parquet_file_size)
                                 .record_count(1)
                                 .partition(empty_partition.clone())
                                 .build()
@@ -1064,111 +1078,6 @@ pub mod tests {
                 .add_manifests(vec![data_file_manifest].into_iter())
                 .unwrap();
             manifest_list_write.close().await.unwrap();
-
-            // prepare data for parquet files
-            let schema = {
-                let fields = vec![
-                    arrow_schema::Field::new("x", 
arrow_schema::DataType::Int64, false)
-                        .with_metadata(HashMap::from([(
-                            PARQUET_FIELD_ID_META_KEY.to_string(),
-                            "1".to_string(),
-                        )])),
-                    arrow_schema::Field::new("y", 
arrow_schema::DataType::Int64, false)
-                        .with_metadata(HashMap::from([(
-                            PARQUET_FIELD_ID_META_KEY.to_string(),
-                            "2".to_string(),
-                        )])),
-                    arrow_schema::Field::new("z", 
arrow_schema::DataType::Int64, false)
-                        .with_metadata(HashMap::from([(
-                            PARQUET_FIELD_ID_META_KEY.to_string(),
-                            "3".to_string(),
-                        )])),
-                    arrow_schema::Field::new("a", 
arrow_schema::DataType::Utf8, false)
-                        .with_metadata(HashMap::from([(
-                            PARQUET_FIELD_ID_META_KEY.to_string(),
-                            "4".to_string(),
-                        )])),
-                    arrow_schema::Field::new("dbl", 
arrow_schema::DataType::Float64, false)
-                        .with_metadata(HashMap::from([(
-                            PARQUET_FIELD_ID_META_KEY.to_string(),
-                            "5".to_string(),
-                        )])),
-                    arrow_schema::Field::new("i32", 
arrow_schema::DataType::Int32, false)
-                        .with_metadata(HashMap::from([(
-                            PARQUET_FIELD_ID_META_KEY.to_string(),
-                            "6".to_string(),
-                        )])),
-                    arrow_schema::Field::new("i64", 
arrow_schema::DataType::Int64, false)
-                        .with_metadata(HashMap::from([(
-                            PARQUET_FIELD_ID_META_KEY.to_string(),
-                            "7".to_string(),
-                        )])),
-                    arrow_schema::Field::new("bool", 
arrow_schema::DataType::Boolean, false)
-                        .with_metadata(HashMap::from([(
-                            PARQUET_FIELD_ID_META_KEY.to_string(),
-                            "8".to_string(),
-                        )])),
-                ];
-                Arc::new(arrow_schema::Schema::new(fields))
-            };
-
-            // Build the arrays for the RecordBatch
-            let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) 
as ArrayRef;
-
-            let mut values = vec![2; 512];
-            values.append(vec![3; 200].as_mut());
-            values.append(vec![4; 300].as_mut());
-            values.append(vec![5; 12].as_mut());
-            let col2 = Arc::new(Int64Array::from_iter_values(values)) as 
ArrayRef;
-
-            let mut values = vec![3; 512];
-            values.append(vec![4; 512].as_mut());
-            let col3 = Arc::new(Int64Array::from_iter_values(values)) as 
ArrayRef;
-
-            let mut values = vec!["Apache"; 512];
-            values.append(vec!["Iceberg"; 512].as_mut());
-            let col4 = Arc::new(StringArray::from_iter_values(values)) as 
ArrayRef;
-
-            let mut values = vec![100.0f64; 512];
-            values.append(vec![150.0f64; 12].as_mut());
-            values.append(vec![200.0f64; 500].as_mut());
-            let col5 = Arc::new(Float64Array::from_iter_values(values)) as 
ArrayRef;
-
-            let mut values = vec![100i32; 512];
-            values.append(vec![150i32; 12].as_mut());
-            values.append(vec![200i32; 500].as_mut());
-            let col6 = Arc::new(Int32Array::from_iter_values(values)) as 
ArrayRef;
-
-            let mut values = vec![100i64; 512];
-            values.append(vec![150i64; 12].as_mut());
-            values.append(vec![200i64; 500].as_mut());
-            let col7 = Arc::new(Int64Array::from_iter_values(values)) as 
ArrayRef;
-
-            let mut values = vec![false; 512];
-            values.append(vec![true; 512].as_mut());
-            let values: BooleanArray = values.into();
-            let col8 = Arc::new(values) as ArrayRef;
-
-            let to_write = RecordBatch::try_new(schema.clone(), vec![
-                col1, col2, col3, col4, col5, col6, col7, col8,
-            ])
-            .unwrap();
-
-            // Write the Parquet files
-            let props = WriterProperties::builder()
-                .set_compression(Compression::SNAPPY)
-                .build();
-
-            for n in 1..=3 {
-                let file = File::create(format!("{}/{}.parquet", 
&self.table_location, n)).unwrap();
-                let mut writer =
-                    ArrowWriter::try_new(file, to_write.schema(), 
Some(props.clone())).unwrap();
-
-                writer.write(&to_write).expect("Writing batch");
-
-                // writer must be closed to write footer
-                writer.close().unwrap();
-            }
         }
 
         pub async fn setup_deadlock_manifests(&mut self) {
@@ -1874,6 +1783,7 @@ pub mod tests {
         );
         let task = FileScanTask {
             data_file_path: "data_file_path".to_string(),
+            file_size_in_bytes: 0,
             start: 0,
             length: 100,
             project_field_ids: vec![1, 2, 3],
@@ -1892,6 +1802,7 @@ pub mod tests {
         // with predicate
         let task = FileScanTask {
             data_file_path: "data_file_path".to_string(),
+            file_size_in_bytes: 0,
             start: 0,
             length: 100,
             project_field_ids: vec![1, 2, 3],
diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs
index 5349a9bdd..67615c351 100644
--- a/crates/iceberg/src/scan/task.rs
+++ b/crates/iceberg/src/scan/task.rs
@@ -51,6 +51,9 @@ where D: serde::Deserializer<'de> {
 /// A task to scan part of file.
 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
 pub struct FileScanTask {
+    /// The total size of the data file in bytes, from the manifest entry.
+    /// Used to skip a stat/HEAD request when reading Parquet footers.
+    pub file_size_in_bytes: u64,
     /// The start offset of the file to scan.
     pub start: u64,
     /// The length of the file to scan.
@@ -146,6 +149,7 @@ impl From<&DeleteFileContext> for FileScanTaskDeleteFile {
     fn from(ctx: &DeleteFileContext) -> Self {
         FileScanTaskDeleteFile {
             file_path: ctx.manifest_entry.file_path().to_string(),
+            file_size_in_bytes: ctx.manifest_entry.file_size_in_bytes(),
             file_type: ctx.manifest_entry.content_type(),
             partition_spec_id: ctx.partition_spec_id,
             equality_ids: ctx.manifest_entry.data_file.equality_ids.clone(),
@@ -159,6 +163,9 @@ pub struct FileScanTaskDeleteFile {
     /// The delete file path
     pub file_path: String,
 
+    /// The total size of the delete file in bytes, from the manifest entry.
+    pub file_size_in_bytes: u64,
+
     /// delete file type
     pub file_type: DataContentType,
 

Reply via email to