This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 2e28f8953 perf(reader): Pass data file size and delete file size to
reader to avoid `stat()` calls (#2175)
2e28f8953 is described below
commit 2e28f895326fa5218ae256c6bb1a9ce9cddf1eb2
Author: Matt Butrovich <[email protected]>
AuthorDate: Wed Feb 25 19:50:36 2026 -0500
perf(reader): Pass data file size and delete file size to reader to avoid
`stat()` calls (#2175)
## Which issue does this PR close?
- Partially address #2172.
- In a workload I tested, calling `stat()` is taking over 17% of the CPU
time in `create_record_batch_stream_builder`:
<img width="1929" height="492" alt="Screenshot 2026-02-24 at 2 03 07 PM"
src="https://github.com/user-attachments/assets/b8a49db6-9d9b-45ac-863b-e9effa12bc71"
/>
## What changes are included in this PR?
<!--
Provide a summary of the modifications in this PR. List the main changes
such as new features, bug fixes, refactoring, or any other updates.
-->
- Pass through data file size to `FileScanTask`. Iceberg Java does this
by wrapping a reference to a `DataFile` in its `FileScanTask`. In this
case we're just cherry-picking some fields until we decide we need more.
- Remove redundant data file creation code in tests.
## Are these changes tested?
<!--
Specify what test covers (unit test, integration test, etc.).
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
Existing tests.
---
.../src/arrow/caching_delete_file_loader.rs | 21 +++-
crates/iceberg/src/arrow/delete_file_loader.rs | 6 +-
crates/iceberg/src/arrow/delete_filter.rs | 22 ++++
crates/iceberg/src/arrow/reader.rs | 68 ++++++++--
crates/iceberg/src/scan/context.rs | 1 +
crates/iceberg/src/scan/mod.rs | 137 ++++-----------------
crates/iceberg/src/scan/task.rs | 7 ++
7 files changed, 135 insertions(+), 127 deletions(-)
diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
index 5d0b1da71..dfee7ed87 100644
--- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
@@ -235,7 +235,7 @@ impl CachingDeleteFileLoader {
PosDelLoadAction::Load => Ok(DeleteFileContext::PosDels {
file_path: task.file_path.clone(),
stream: basic_delete_file_loader
- .parquet_to_batch_stream(&task.file_path)
+ .parquet_to_batch_stream(&task.file_path,
task.file_size_in_bytes)
.await?,
}),
}
@@ -254,7 +254,7 @@ impl CachingDeleteFileLoader {
let equality_ids_vec = task.equality_ids.clone().unwrap();
let evolved_stream = BasicDeleteFileLoader::evolve_schema(
basic_delete_file_loader
- .parquet_to_batch_stream(&task.file_path)
+ .parquet_to_batch_stream(&task.file_path,
task.file_size_in_bytes)
.await?,
schema,
&equality_ids_vec,
@@ -614,7 +614,10 @@ mod tests {
let basic_delete_file_loader =
BasicDeleteFileLoader::new(file_io.clone());
let record_batch_stream = basic_delete_file_loader
- .parquet_to_batch_stream(&eq_delete_file_path)
+ .parquet_to_batch_stream(
+ &eq_delete_file_path,
+ std::fs::metadata(&eq_delete_file_path).unwrap().len(),
+ )
.await
.expect("could not get batch stream");
@@ -811,7 +814,10 @@ mod tests {
let basic_delete_file_loader =
BasicDeleteFileLoader::new(file_io.clone());
let batch_stream = basic_delete_file_loader
- .parquet_to_batch_stream(&delete_file_path)
+ .parquet_to_batch_stream(
+ &delete_file_path,
+ std::fs::metadata(&delete_file_path).unwrap().len(),
+ )
.await
.unwrap();
@@ -913,7 +919,8 @@ mod tests {
// Create FileScanTask with BOTH positional and equality deletes
let pos_del = FileScanTaskDeleteFile {
- file_path: pos_del_path,
+ file_path: pos_del_path.clone(),
+ file_size_in_bytes:
std::fs::metadata(&pos_del_path).unwrap().len(),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: None,
@@ -921,12 +928,14 @@ mod tests {
let eq_del = FileScanTaskDeleteFile {
file_path: eq_delete_path.clone(),
+ file_size_in_bytes:
std::fs::metadata(&eq_delete_path).unwrap().len(),
file_type: DataContentType::EqualityDeletes,
partition_spec_id: 0,
equality_ids: Some(vec![2, 3]), // Only use field IDs that exist
in both schemas
};
let file_scan_task = FileScanTask {
+ file_size_in_bytes: 0,
start: 0,
length: 0,
record_count: None,
@@ -993,7 +1002,7 @@ mod tests {
let basic_delete_file_loader =
BasicDeleteFileLoader::new(file_io.clone());
let record_batch_stream = basic_delete_file_loader
- .parquet_to_batch_stream(&path)
+ .parquet_to_batch_stream(&path,
std::fs::metadata(&path).unwrap().len())
.await
.expect("could not get batch stream");
diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs
b/crates/iceberg/src/arrow/delete_file_loader.rs
index fa47076fe..33744d876 100644
--- a/crates/iceberg/src/arrow/delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/delete_file_loader.rs
@@ -54,6 +54,7 @@ impl BasicDeleteFileLoader {
pub(crate) async fn parquet_to_batch_stream(
&self,
data_file_path: &str,
+ file_size_in_bytes: u64,
) -> Result<ArrowRecordBatchStream> {
/*
Essentially a super-cut-down ArrowReader. We can't use ArrowReader
directly
@@ -65,6 +66,7 @@ impl BasicDeleteFileLoader {
false,
None,
None,
+ file_size_in_bytes,
)
.await?
.build()?
@@ -102,7 +104,9 @@ impl DeleteFileLoader for BasicDeleteFileLoader {
task: &FileScanTaskDeleteFile,
schema: SchemaRef,
) -> Result<ArrowRecordBatchStream> {
- let raw_batch_stream =
self.parquet_to_batch_stream(&task.file_path).await?;
+ let raw_batch_stream = self
+ .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes)
+ .await?;
// For equality deletes, only evolve the equality_ids columns.
// For positional deletes (equality_ids is None), use all field IDs.
diff --git a/crates/iceberg/src/arrow/delete_filter.rs
b/crates/iceberg/src/arrow/delete_filter.rs
index 4af9f6b6f..3d074ca32 100644
--- a/crates/iceberg/src/arrow/delete_filter.rs
+++ b/crates/iceberg/src/arrow/delete_filter.rs
@@ -378,6 +378,12 @@ pub(crate) mod tests {
let pos_del_1 = FileScanTaskDeleteFile {
file_path: format!("{}/pos-del-1.parquet",
table_location.to_str().unwrap()),
+ file_size_in_bytes: std::fs::metadata(format!(
+ "{}/pos-del-1.parquet",
+ table_location.to_str().unwrap()
+ ))
+ .unwrap()
+ .len(),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: None,
@@ -385,6 +391,12 @@ pub(crate) mod tests {
let pos_del_2 = FileScanTaskDeleteFile {
file_path: format!("{}/pos-del-2.parquet",
table_location.to_str().unwrap()),
+ file_size_in_bytes: std::fs::metadata(format!(
+ "{}/pos-del-2.parquet",
+ table_location.to_str().unwrap()
+ ))
+ .unwrap()
+ .len(),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: None,
@@ -392,6 +404,12 @@ pub(crate) mod tests {
let pos_del_3 = FileScanTaskDeleteFile {
file_path: format!("{}/pos-del-3.parquet",
table_location.to_str().unwrap()),
+ file_size_in_bytes: std::fs::metadata(format!(
+ "{}/pos-del-3.parquet",
+ table_location.to_str().unwrap()
+ ))
+ .unwrap()
+ .len(),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: None,
@@ -399,6 +417,7 @@ pub(crate) mod tests {
let file_scan_tasks = vec![
FileScanTask {
+ file_size_in_bytes: 0,
start: 0,
length: 0,
record_count: None,
@@ -414,6 +433,7 @@ pub(crate) mod tests {
case_sensitive: false,
},
FileScanTask {
+ file_size_in_bytes: 0,
start: 0,
length: 0,
record_count: None,
@@ -464,6 +484,7 @@ pub(crate) mod tests {
// ---------- fake FileScanTask ----------
let task = FileScanTask {
+ file_size_in_bytes: 0,
start: 0,
length: 0,
record_count: None,
@@ -474,6 +495,7 @@ pub(crate) mod tests {
predicate: None,
deletes: vec![FileScanTaskDeleteFile {
file_path: "eq-del.parquet".to_string(),
+ file_size_in_bytes: 1, // never read; this test fails before
opening the file
file_type: DataContentType::EqualityDeletes,
partition_spec_id: 0,
equality_ids: None,
diff --git a/crates/iceberg/src/arrow/reader.rs
b/crates/iceberg/src/arrow/reader.rs
index c4c2fa003..93dbdaa35 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -33,7 +33,7 @@ use arrow_string::like::starts_with;
use bytes::Bytes;
use fnv::FnvHashSet;
use futures::future::BoxFuture;
-use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join};
+use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use parquet::arrow::arrow_reader::{
ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
};
@@ -237,6 +237,7 @@ impl ArrowReader {
should_load_page_index,
None,
metadata_size_hint,
+ task.file_size_in_bytes,
)
.await?;
@@ -290,6 +291,7 @@ impl ArrowReader {
should_load_page_index,
Some(options),
metadata_size_hint,
+ task.file_size_in_bytes,
)
.await?
} else {
@@ -494,16 +496,21 @@ impl ArrowReader {
should_load_page_index: bool,
arrow_reader_options: Option<ArrowReaderOptions>,
metadata_size_hint: Option<usize>,
+ file_size_in_bytes: u64,
) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader>> {
// Get the metadata for the Parquet file we need to read and build
// a reader for the data within
let parquet_file = file_io.new_input(data_file_path)?;
- let (parquet_metadata, parquet_reader) =
- try_join!(parquet_file.metadata(), parquet_file.reader())?;
- let mut parquet_file_reader = ArrowFileReader::new(parquet_metadata,
parquet_reader)
- .with_preload_column_index(true)
- .with_preload_offset_index(true)
- .with_preload_page_index(should_load_page_index);
+ let parquet_reader = parquet_file.reader().await?;
+ let mut parquet_file_reader = ArrowFileReader::new(
+ FileMetadata {
+ size: file_size_in_bytes,
+ },
+ parquet_reader,
+ )
+ .with_preload_column_index(true)
+ .with_preload_offset_index(true)
+ .with_preload_page_index(should_load_page_index);
if let Some(hint) = metadata_size_hint {
parquet_file_reader =
parquet_file_reader.with_metadata_size_hint(hint);
@@ -2121,6 +2128,9 @@ message schema {
) -> Vec<Option<String>> {
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
+ file_size_in_bytes:
std::fs::metadata(format!("{table_location}/1.parquet"))
+ .unwrap()
+ .len(),
start: 0,
length: 0,
record_count: None,
@@ -2443,6 +2453,7 @@ message schema {
// Task 1: read only the first row group
let task1 = FileScanTask {
+ file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(),
start: rg0_start,
length: row_group_0.compressed_size() as u64,
record_count: Some(100),
@@ -2460,6 +2471,7 @@ message schema {
// Task 2: read the second and third row groups
let task2 = FileScanTask {
+ file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(),
start: rg1_start,
length: file_end - rg1_start,
record_count: Some(200),
@@ -2588,6 +2600,9 @@ message schema {
let reader = ArrowReaderBuilder::new(file_io).build();
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
+ file_size_in_bytes:
std::fs::metadata(format!("{table_location}/old_file.parquet"))
+ .unwrap()
+ .len(),
start: 0,
length: 0,
record_count: None,
@@ -2755,6 +2770,7 @@ message schema {
let reader = ArrowReaderBuilder::new(file_io).build();
let task = FileScanTask {
+ file_size_in_bytes:
std::fs::metadata(&data_file_path).unwrap().len(),
start: 0,
length: 0,
record_count: Some(200),
@@ -2764,6 +2780,7 @@ message schema {
project_field_ids: vec![1],
predicate: None,
deletes: vec![FileScanTaskDeleteFile {
+ file_size_in_bytes:
std::fs::metadata(&delete_file_path).unwrap().len(),
file_path: delete_file_path,
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
@@ -2973,6 +2990,7 @@ message schema {
// Create FileScanTask that reads ONLY row group 1 via byte range
filtering
let task = FileScanTask {
+ file_size_in_bytes:
std::fs::metadata(&data_file_path).unwrap().len(),
start: rg1_start,
length: rg1_length,
record_count: Some(100), // Row group 1 has 100 rows
@@ -2982,6 +3000,7 @@ message schema {
project_field_ids: vec![1],
predicate: None,
deletes: vec![FileScanTaskDeleteFile {
+ file_size_in_bytes:
std::fs::metadata(&delete_file_path).unwrap().len(),
file_path: delete_file_path,
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
@@ -3184,6 +3203,7 @@ message schema {
// Create FileScanTask that reads ONLY row group 1 via byte range
filtering
let task = FileScanTask {
+ file_size_in_bytes:
std::fs::metadata(&data_file_path).unwrap().len(),
start: rg1_start,
length: rg1_length,
record_count: Some(100), // Row group 1 has 100 rows
@@ -3193,6 +3213,7 @@ message schema {
project_field_ids: vec![1],
predicate: None,
deletes: vec![FileScanTaskDeleteFile {
+ file_size_in_bytes:
std::fs::metadata(&delete_file_path).unwrap().len(),
file_path: delete_file_path,
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
@@ -3293,6 +3314,9 @@ message schema {
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
+ file_size_in_bytes:
std::fs::metadata(format!("{table_location}/1.parquet"))
+ .unwrap()
+ .len(),
start: 0,
length: 0,
record_count: None,
@@ -3391,6 +3415,9 @@ message schema {
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
+ file_size_in_bytes:
std::fs::metadata(format!("{table_location}/1.parquet"))
+ .unwrap()
+ .len(),
start: 0,
length: 0,
record_count: None,
@@ -3478,6 +3505,9 @@ message schema {
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
+ file_size_in_bytes:
std::fs::metadata(format!("{table_location}/1.parquet"))
+ .unwrap()
+ .len(),
start: 0,
length: 0,
record_count: None,
@@ -3579,6 +3609,9 @@ message schema {
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
+ file_size_in_bytes:
std::fs::metadata(format!("{table_location}/1.parquet"))
+ .unwrap()
+ .len(),
start: 0,
length: 0,
record_count: None,
@@ -3709,6 +3742,9 @@ message schema {
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
+ file_size_in_bytes:
std::fs::metadata(format!("{table_location}/1.parquet"))
+ .unwrap()
+ .len(),
start: 0,
length: 0,
record_count: None,
@@ -3806,6 +3842,9 @@ message schema {
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
+ file_size_in_bytes:
std::fs::metadata(format!("{table_location}/1.parquet"))
+ .unwrap()
+ .len(),
start: 0,
length: 0,
record_count: None,
@@ -3916,6 +3955,9 @@ message schema {
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
+ file_size_in_bytes:
std::fs::metadata(format!("{table_location}/1.parquet"))
+ .unwrap()
+ .len(),
start: 0,
length: 0,
record_count: None,
@@ -4007,6 +4049,9 @@ message schema {
// Create tasks in a specific order: file_0, file_1, file_2
let tasks = vec![
Ok(FileScanTask {
+ file_size_in_bytes:
std::fs::metadata(format!("{table_location}/file_0.parquet"))
+ .unwrap()
+ .len(),
start: 0,
length: 0,
record_count: None,
@@ -4022,6 +4067,9 @@ message schema {
case_sensitive: false,
}),
Ok(FileScanTask {
+ file_size_in_bytes:
std::fs::metadata(format!("{table_location}/file_1.parquet"))
+ .unwrap()
+ .len(),
start: 0,
length: 0,
record_count: None,
@@ -4037,6 +4085,9 @@ message schema {
case_sensitive: false,
}),
Ok(FileScanTask {
+ file_size_in_bytes:
std::fs::metadata(format!("{table_location}/file_2.parquet"))
+ .unwrap()
+ .len(),
start: 0,
length: 0,
record_count: None,
@@ -4216,6 +4267,9 @@ message schema {
let reader = ArrowReaderBuilder::new(file_io).build();
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
+ file_size_in_bytes:
std::fs::metadata(format!("{table_location}/data.parquet"))
+ .unwrap()
+ .len(),
start: 0,
length: 0,
record_count: None,
diff --git a/crates/iceberg/src/scan/context.rs
b/crates/iceberg/src/scan/context.rs
index 169d8e640..aa28ffd5a 100644
--- a/crates/iceberg/src/scan/context.rs
+++ b/crates/iceberg/src/scan/context.rs
@@ -117,6 +117,7 @@ impl ManifestEntryContext {
.await;
Ok(FileScanTask {
+ file_size_in_bytes: self.manifest_entry.file_size_in_bytes(),
start: 0,
length: self.manifest_entry.file_size_in_bytes(),
record_count: Some(self.manifest_entry.record_count()),
diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs
index c055c12c9..2a685cd17 100644
--- a/crates/iceberg/src/scan/mod.rs
+++ b/crates/iceberg/src/scan/mod.rs
@@ -756,7 +756,9 @@ pub mod tests {
let current_schema =
current_snapshot.schema(self.table.metadata()).unwrap();
let current_partition_spec =
self.table.metadata().default_partition_spec();
- // Write data files
+ // Write the data files first, then use the file size in the
manifest entries
+ let parquet_file_size = self.write_parquet_data_files();
+
let mut writer = ManifestWriterBuilder::new(
self.next_manifest_file(),
Some(current_snapshot.snapshot_id()),
@@ -775,7 +777,7 @@ pub mod tests {
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet",
&self.table_location))
.file_format(DataFileFormat::Parquet)
- .file_size_in_bytes(100)
+ .file_size_in_bytes(parquet_file_size)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.key_metadata(None)
@@ -798,7 +800,7 @@ pub mod tests {
.content(DataContentType::Data)
.file_path(format!("{}/2.parquet",
&self.table_location))
.file_format(DataFileFormat::Parquet)
- .file_size_in_bytes(100)
+ .file_size_in_bytes(parquet_file_size)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(200))]))
.build()
@@ -820,7 +822,7 @@ pub mod tests {
.content(DataContentType::Data)
.file_path(format!("{}/3.parquet",
&self.table_location))
.file_format(DataFileFormat::Parquet)
- .file_size_in_bytes(100)
+ .file_size_in_bytes(parquet_file_size)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(300))]))
.build()
@@ -845,8 +847,13 @@ pub mod tests {
.add_manifests(vec![data_file_manifest].into_iter())
.unwrap();
manifest_list_write.close().await.unwrap();
+ }
+
+ /// Writes identical Parquet data files (1.parquet, 2.parquet,
3.parquet)
+ /// and returns the file size in bytes.
+ fn write_parquet_data_files(&self) -> u64 {
+ std::fs::create_dir_all(&self.table_location).unwrap();
- // prepare data
let schema = {
let fields = vec![
arrow_schema::Field::new("x",
arrow_schema::DataType::Int64, false)
@@ -958,6 +965,10 @@ pub mod tests {
// writer must be closed to write footer
writer.close().unwrap();
}
+
+ std::fs::metadata(format!("{}/1.parquet", &self.table_location))
+ .unwrap()
+ .len()
}
pub async fn setup_unpartitioned_manifest_files(&mut self) {
@@ -968,6 +979,9 @@ pub mod tests {
let current_schema =
current_snapshot.schema(self.table.metadata()).unwrap();
let current_partition_spec =
Arc::new(PartitionSpec::unpartition_spec());
+ // Write the data files first, then use the file size in the
manifest entries
+ let parquet_file_size = self.write_parquet_data_files();
+
// Write data files using an empty partition for unpartitioned
tables.
let mut writer = ManifestWriterBuilder::new(
self.next_manifest_file(),
@@ -991,7 +1005,7 @@ pub mod tests {
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet",
&self.table_location))
.file_format(DataFileFormat::Parquet)
- .file_size_in_bytes(100)
+ .file_size_in_bytes(parquet_file_size)
.record_count(1)
.partition(empty_partition.clone())
.key_metadata(None)
@@ -1015,7 +1029,7 @@ pub mod tests {
.content(DataContentType::Data)
.file_path(format!("{}/2.parquet",
&self.table_location))
.file_format(DataFileFormat::Parquet)
- .file_size_in_bytes(100)
+ .file_size_in_bytes(parquet_file_size)
.record_count(1)
.partition(empty_partition.clone())
.build()
@@ -1038,7 +1052,7 @@ pub mod tests {
.content(DataContentType::Data)
.file_path(format!("{}/3.parquet",
&self.table_location))
.file_format(DataFileFormat::Parquet)
- .file_size_in_bytes(100)
+ .file_size_in_bytes(parquet_file_size)
.record_count(1)
.partition(empty_partition.clone())
.build()
@@ -1064,111 +1078,6 @@ pub mod tests {
.add_manifests(vec![data_file_manifest].into_iter())
.unwrap();
manifest_list_write.close().await.unwrap();
-
- // prepare data for parquet files
- let schema = {
- let fields = vec![
- arrow_schema::Field::new("x",
arrow_schema::DataType::Int64, false)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "1".to_string(),
- )])),
- arrow_schema::Field::new("y",
arrow_schema::DataType::Int64, false)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "2".to_string(),
- )])),
- arrow_schema::Field::new("z",
arrow_schema::DataType::Int64, false)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "3".to_string(),
- )])),
- arrow_schema::Field::new("a",
arrow_schema::DataType::Utf8, false)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "4".to_string(),
- )])),
- arrow_schema::Field::new("dbl",
arrow_schema::DataType::Float64, false)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "5".to_string(),
- )])),
- arrow_schema::Field::new("i32",
arrow_schema::DataType::Int32, false)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "6".to_string(),
- )])),
- arrow_schema::Field::new("i64",
arrow_schema::DataType::Int64, false)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "7".to_string(),
- )])),
- arrow_schema::Field::new("bool",
arrow_schema::DataType::Boolean, false)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "8".to_string(),
- )])),
- ];
- Arc::new(arrow_schema::Schema::new(fields))
- };
-
- // Build the arrays for the RecordBatch
- let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024]))
as ArrayRef;
-
- let mut values = vec![2; 512];
- values.append(vec![3; 200].as_mut());
- values.append(vec![4; 300].as_mut());
- values.append(vec![5; 12].as_mut());
- let col2 = Arc::new(Int64Array::from_iter_values(values)) as
ArrayRef;
-
- let mut values = vec![3; 512];
- values.append(vec![4; 512].as_mut());
- let col3 = Arc::new(Int64Array::from_iter_values(values)) as
ArrayRef;
-
- let mut values = vec!["Apache"; 512];
- values.append(vec!["Iceberg"; 512].as_mut());
- let col4 = Arc::new(StringArray::from_iter_values(values)) as
ArrayRef;
-
- let mut values = vec![100.0f64; 512];
- values.append(vec![150.0f64; 12].as_mut());
- values.append(vec![200.0f64; 500].as_mut());
- let col5 = Arc::new(Float64Array::from_iter_values(values)) as
ArrayRef;
-
- let mut values = vec![100i32; 512];
- values.append(vec![150i32; 12].as_mut());
- values.append(vec![200i32; 500].as_mut());
- let col6 = Arc::new(Int32Array::from_iter_values(values)) as
ArrayRef;
-
- let mut values = vec![100i64; 512];
- values.append(vec![150i64; 12].as_mut());
- values.append(vec![200i64; 500].as_mut());
- let col7 = Arc::new(Int64Array::from_iter_values(values)) as
ArrayRef;
-
- let mut values = vec![false; 512];
- values.append(vec![true; 512].as_mut());
- let values: BooleanArray = values.into();
- let col8 = Arc::new(values) as ArrayRef;
-
- let to_write = RecordBatch::try_new(schema.clone(), vec![
- col1, col2, col3, col4, col5, col6, col7, col8,
- ])
- .unwrap();
-
- // Write the Parquet files
- let props = WriterProperties::builder()
- .set_compression(Compression::SNAPPY)
- .build();
-
- for n in 1..=3 {
- let file = File::create(format!("{}/{}.parquet",
&self.table_location, n)).unwrap();
- let mut writer =
- ArrowWriter::try_new(file, to_write.schema(),
Some(props.clone())).unwrap();
-
- writer.write(&to_write).expect("Writing batch");
-
- // writer must be closed to write footer
- writer.close().unwrap();
- }
}
pub async fn setup_deadlock_manifests(&mut self) {
@@ -1874,6 +1783,7 @@ pub mod tests {
);
let task = FileScanTask {
data_file_path: "data_file_path".to_string(),
+ file_size_in_bytes: 0,
start: 0,
length: 100,
project_field_ids: vec![1, 2, 3],
@@ -1892,6 +1802,7 @@ pub mod tests {
// with predicate
let task = FileScanTask {
data_file_path: "data_file_path".to_string(),
+ file_size_in_bytes: 0,
start: 0,
length: 100,
project_field_ids: vec![1, 2, 3],
diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs
index 5349a9bdd..67615c351 100644
--- a/crates/iceberg/src/scan/task.rs
+++ b/crates/iceberg/src/scan/task.rs
@@ -51,6 +51,9 @@ where D: serde::Deserializer<'de> {
/// A task to scan part of file.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FileScanTask {
+ /// The total size of the data file in bytes, from the manifest entry.
+ /// Used to skip a stat/HEAD request when reading Parquet footers.
+ pub file_size_in_bytes: u64,
/// The start offset of the file to scan.
pub start: u64,
/// The length of the file to scan.
@@ -146,6 +149,7 @@ impl From<&DeleteFileContext> for FileScanTaskDeleteFile {
fn from(ctx: &DeleteFileContext) -> Self {
FileScanTaskDeleteFile {
file_path: ctx.manifest_entry.file_path().to_string(),
+ file_size_in_bytes: ctx.manifest_entry.file_size_in_bytes(),
file_type: ctx.manifest_entry.content_type(),
partition_spec_id: ctx.partition_spec_id,
equality_ids: ctx.manifest_entry.data_file.equality_ids.clone(),
@@ -159,6 +163,9 @@ pub struct FileScanTaskDeleteFile {
/// The delete file path
pub file_path: String,
+ /// The total size of the delete file in bytes, from the manifest entry.
+ pub file_size_in_bytes: u64,
+
/// delete file type
pub file_type: DataContentType,