mbutrovich opened a new issue, #2172:
URL: https://github.com/apache/iceberg-rust/issues/2172

   ### What's the feature are you trying to implement?
   
   When running Iceberg workloads via DataFusion Comet, we're seeing 
significant per-file overhead in iceberg-rust's `ArrowReader`. Comet is an 
accelerator for Spark workloads that relies on Iceberg Java for scan planning. 
Iceberg Java generates `FileScanTask`s that are split by byte range and 
bin-packed across partitions for parallelism. Comet serializes these over to a 
native operator backed by iceberg-rust's `ArrowReader`.
   
   iceberg-rust's `ArrowReader` treats each `FileScanTask` as a fully 
independent operation, with no reuse of I/O clients, metadata, or operators 
across tasks. With many thousands of tasks (common for large tables, or 
degenerate cases of partitioned tables with thousands of small data files), the 
per-task overhead becomes a significant fraction of total CPU time. Flame 
graphs show ~30% of executor CPU in per-file overhead: operator creation, 
`stat()` calls, TLS handshakes, and credential provider initialization.
   
   I currently have a 
[branch](https://github.com/mbutrovich/iceberg-rust/tree/reader_perf) with all 
of these optimizations, and have seen good benefits for our workloads 
([DataFusion Comet PR](https://github.com/apache/datafusion-comet/pull/3551)). 
The optimizations below are independent and will be submitted as separate PRs, 
and I will benchmark them independently.
   
   ### 1. OpenDAL operator caching
   
   **Problem:** 
[`create_operator()`](https://github.com/apache/iceberg-rust/blob/88fdfedf/crates/iceberg/src/io/storage/opendal/mod.rs#L271-L364)
 builds a new `Operator` on every call. Every storage operation (`metadata()`, 
`reader()`, etc.) calls `create_operator()`, so each `FileScanTask` creates at 
least 2 operators. Each operator construction involves credential provider 
initialization, signer creation, and wrapping in a 
[`RetryLayer`](https://github.com/apache/iceberg-rust/blob/88fdfedf/crates/iceberg/src/io/storage/opendal/mod.rs#L362).
 These are all discarded after a single use.
   
   **Fix:** Cache operators by bucket/container name within `OpenDalStorage` 
(*e.g.*, in a `HashMap<String, Operator>` behind a `Mutex`). The operator is 
stateless with respect to individual files — it's scoped to a bucket — so it's 
safe to reuse across concurrent tasks.
   
   **Scope:** 
[`crates/iceberg/src/io/storage/opendal/mod.rs`](https://github.com/apache/iceberg-rust/blob/88fdfedf/crates/iceberg/src/io/storage/opendal/mod.rs)
 — the `OpenDalStorage` enum variants (S3, Gcs, Oss, Azdls) and 
`create_operator()`.
   
   ### 2. File size passthrough via `FileScanTask`
   
   **Problem:** 
[`create_parquet_record_batch_stream_builder()`](https://github.com/apache/iceberg-rust/blob/88fdfedf/crates/iceberg/src/arrow/reader.rs#L472-L493)
 calls [`try_join!(parquet_file.metadata(), 
parquet_file.reader())`](https://github.com/apache/iceberg-rust/blob/88fdfedf/crates/iceberg/src/arrow/reader.rs#L481-L482).
 The `metadata()` call issues a `stat()` / HEAD request to object storage just 
to get the file size. This file size is already known from the manifest entry's 
`file_size_in_bytes` field, but 
[`FileScanTask`](https://github.com/apache/iceberg-rust/blob/88fdfedf/crates/iceberg/src/scan/task.rs#L52-L110)
 doesn't carry it.
   
   **Fix:** Add `file_size_in_bytes: u64` to `FileScanTask` (populated from the 
manifest entry during planning), and pass it through to skip the `stat()` call. 
When present, construct `FileMetadata { size }` directly instead of issuing a 
storage round-trip.
   
   **Scope:** 
[`crates/iceberg/src/scan/task.rs`](https://github.com/apache/iceberg-rust/blob/88fdfedf/crates/iceberg/src/scan/task.rs)
 (add field), 
[`crates/iceberg/src/arrow/reader.rs`](https://github.com/apache/iceberg-rust/blob/88fdfedf/crates/iceberg/src/arrow/reader.rs#L472-L493)
 (use it), and scan planning code that constructs `FileScanTask`s.
   
   ### 3. Eliminate double-open for migrated tables
   
   **Problem:** 
[`process_file_scan_task()`](https://github.com/apache/iceberg-rust/blob/88fdfedf/crates/iceberg/src/arrow/reader.rs#L201-L279)
 calls 
[`create_parquet_record_batch_stream_builder()`](https://github.com/apache/iceberg-rust/blob/88fdfedf/crates/iceberg/src/arrow/reader.rs#L217-L223)
 once to inspect the Parquet schema for embedded field IDs. If the file lacks 
field IDs 
([`missing_field_ids`](https://github.com/apache/iceberg-rust/blob/88fdfedf/crates/iceberg/src/arrow/reader.rs#L228-L233)
 is true), it calls [`create_parquet_record_batch_stream_builder()` a *second* 
time](https://github.com/apache/iceberg-rust/blob/88fdfedf/crates/iceberg/src/arrow/reader.rs#L269-L275)
 with `ArrowReaderOptions` to apply name mapping or fallback IDs. Each call 
issues `metadata()` + `reader()` to storage. For migrated tables, this means 4 
operator creations and 2 full metadata fetches per task.
   
   **Fix:** Separate metadata loading from stream builder construction. Load 
`ParquetMetaData` once, inspect it in-memory for field IDs using 
`ArrowReaderMetadata::try_new()`, then construct a single 
`ParquetRecordBatchStreamBuilder` via `new_with_metadata()` with the correct 
options.
   
   **Scope:** 
[`crates/iceberg/src/arrow/reader.rs`](https://github.com/apache/iceberg-rust/blob/88fdfedf/crates/iceberg/src/arrow/reader.rs#L201-L279)
 — restructure `process_file_scan_task()` and 
`create_parquet_record_batch_stream_builder()`.
   
   ### 4. Parquet metadata prefetch with size hint
   
   **Problem:** Without a metadata size hint, arrow-rs's Parquet reader does 
two round-trips to read the footer: first an 8-byte read to get the magic bytes 
and footer length, then a second read for the actual footer. On object storage, 
each round-trip has significant latency.
   
   **Fix:** Use 
[`ArrowFileReader::with_metadata_size_hint()`](https://github.com/apache/iceberg-rust/blob/88fdfedf/crates/iceberg/src/arrow/reader.rs#L1720-L1723)
 (which wraps arrow-rs's 
[`ParquetMetaDataReader::with_prefetch_hint()`](https://github.com/apache/arrow-rs/blob/main/parquet/src/file/metadata/reader.rs#L182-L184))
 to speculatively read a larger chunk from the end of the file in a single 
request. DataFusion [uses 512KB as the 
default](https://github.com/apache/datafusion/blob/17d770d6e53400db0c3d46bde330bf0cdffcce6e/datafusion/common/src/config.rs#L706).
 The exact value can be discussed in the PR.
   
   **Scope:** 
[`crates/iceberg/src/arrow/reader.rs`](https://github.com/apache/iceberg-rust/blob/88fdfedf/crates/iceberg/src/arrow/reader.rs#L472-L493)
 — in `create_parquet_record_batch_stream_builder()` when constructing 
`ArrowFileReader`.
   
   ### 5. Parquet metadata caching across same-file splits
   
   **Problem:** When Iceberg Java splits a large file into multiple 
`FileScanTask`s (different `start`/`length` byte ranges of the same file), 
iceberg-rust re-fetches the Parquet metadata for each split independently. The 
metadata is identical across splits of the same file.
   
   **Fix:** Cache `ParquetMetaData` by `(file_path, should_load_page_index)` 
within a `read()` invocation using a bounded LRU cache (e.g., moka). Subsequent 
tasks for the same file get a cache hit and skip all metadata I/O. Use 
`ParquetRecordBatchStreamBuilder::new_with_metadata()` to pass the cached 
metadata to the builder.
   
   **Scope:** 
[`crates/iceberg/src/arrow/reader.rs`](https://github.com/apache/iceberg-rust/blob/88fdfedf/crates/iceberg/src/arrow/reader.rs)
 — new `ParquetMetadataCache` struct shared across tasks via 
`ArrowReader::read()`.
   
   **Note:** The benefit of this optimization depends on how the external 
planner groups tasks. In Spark's default configuration, bin-packing rarely 
places multiple splits of the same file in the same partition, so cache hits 
may be infrequent. However, when iceberg-rust plans its own tasks (whole-file), 
or when external planners explicitly coalesce same-file splits, this provides a 
measurable reduction in metadata I/O.
   
   ### Commentary
   
   2, 3, 4, and seem the least controversial to me. 1 there was some discussion 
in Iceberg Slack about implementing after all the `Storage` trait work is done 
(@CTTY). 5 I added the note that this might not be very effective, and maybe 
not worth the complication. I will measure it independently and see how it does.
   
   ### Willingness to contribute
   
   I can contribute to this feature independently


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to