gtrettenero opened a new issue, #16675:
URL: https://github.com/apache/iceberg/issues/16675

   ### Proposed Change
   
   # Proposal: Capture and emit aggregated data-file (Parquet footer) metrics 
at commit time
   
   ### Feature Request / Improvement
   
   Provide an opt-in mechanism to capture aggregated physical/storage 
statistics directly from Parquet footers during a write, and surface them 
through Iceberg's existing event framework at commit time — **without** 
persisting them in table metadata.
   
   ### Motivation
   
   Iceberg infers column-level metrics (`value_counts`, `null_value_counts`, 
`nan_value_counts`, `lower_bounds`, `upper_bounds`) and stores them 
per-`DataFile` in manifests. Because these metrics live in manifest entries, 
collecting them for every column of a wide table bloats manifests and slows 
scan planning. To bound this, Iceberg caps inferred metrics at the first N 
columns:
   
   ```write.metadata.metrics.max-inferred-column-defaults = 100 (default)```
   
   
   This cap is the right trade-off for *query-planning* metadata, but it 
creates a gap for **observability / storage-health** use cases:
   
   1. **Wide tables (>100 columns)** get no inferred metrics for columns beyond 
the cap unless each is configured explicitly. There is no aggregate visibility 
into the storage footprint of those columns.
   2. **Physical/storage statistics aren't captured at all** in metadata — e.g. 
per-column compressed vs. uncompressed size, encoding/codec usage, 
dictionary/bloom-filter presence, and file-size distribution (percentiles, 
small-file counts). Today the only way to get these is to re-open and re-read 
files after the fact, which is expensive and runs on stale data.
   3. Raising the metrics cap to get more coverage directly **worsens metadata 
bloat and planning latency** — the exact problem the cap exists to prevent.
   
   The key insight: metrics needed for *query planning* and metrics needed for 
*operational observability* have different lifetimes and storage requirements. 
Observability metrics don't need to live in manifests forever; they're most 
valuable **at write time**, aggregated, and emitted to a monitoring system. 
Decoupling the two lets us collect rich, uncapped, aggregate statistics cheaply 
without touching the metadata hot path.
   
   ### Proposed Solution
   
   Add an opt-in write-path feature that, during a Spark write:
   
   1. **Reads Parquet footers** for the data files produced by each task 
(footers are already on the writer; this avoids a separate read pass).
   2. **Aggregates statistics map-side**, grouped by partition and by column 
field-id, so per-file payloads never accumulate on the driver.
   3. **Bridges the aggregated result into the commit** and publishes it as a 
new event through `org.apache.iceberg.events.Listeners`, so any deployment can 
register a listener and route the metrics to its own monitoring/observability 
system.
   
   Because the metrics flow through the event framework rather than into 
`TableMetadata`/manifests, there is **no metadata bloat and no column cap**, 
and the commit path's persisted state is unchanged.
   
   ### High-level implementation details
   
   **Executor-side capture (Spark write path)**
   - A new opt-in table property gates the behavior; when disabled (the 
default), there is zero overhead.
   - After each task's `DataWriter.commit()`, the produced `DataFile`s' Parquet 
footers are read via `FileIO`/`InputFile` (with a small adapter from Iceberg's 
`SeekableInputStream` to Parquet's `InputFile`).
   - For each footer, statistics are extracted per column chunk and mapped back 
to **Iceberg field-ids** (via the Parquet schema's field ids, so the data is 
schema-evolution safe), then folded into a partition-keyed aggregate.
   
   **Map-side aggregation via a Spark `AccumulatorV2`**
   - Aggregates are accumulated into a custom accumulator and merged on the 
driver. To avoid driver heap pressure on writes producing very large numbers of 
partitions, the accumulator:
     - merges executor-local aggregates directly (no per-file objects 
retained), and
     - returns an empty snapshot from `value()` so Spark does not retain 
`O(numTasks)` deep copies in `TaskInfo` accumulables; the real data is consumed 
once, at commit, via an explicit drain.
   
   **Commit-time emission**
   - Just before `commitOperation()`, the merged aggregate is finalized 
(file-size percentiles, small-file counts) and handed to the commit via a 
thread-local bridge, then published as an event during the synchronous commit 
notification.
   - The bridge is cleared on commit success, on `CommitStateUnknownException`, 
and on any other failure, so nothing leaks across commits.
   - The core event type stays free of Parquet/Spark types (the payload is 
carried in a serialized, engine-agnostic form), keeping the engine boundary 
clean.
   
   **Metrics captured (aggregated per partition + per column)**
   - Row counts and total record counts.
   - File counts, total file size, and a file-size distribution: average, 
p50/p75/p90/p95/p99, fixed size buckets, and a configurable small-file count.
   - Per-column (by field-id): compressed size, uncompressed size, value count, 
null count, codec (or "mixed"), and dictionary-page / bloom-filter presence.
   
   **Configuration (new table properties)**
   ```write.data-file-metrics.enabled (default: false) ```
   ```write.data-file-metrics.small-file-threshold-bytes (default: 128 KB)```
   
   
   ### Why the event framework instead of metadata
   
   - No manifest/`TableMetadata` changes → no planning-path impact and no spec 
change.
   - No column cap → aggregate coverage for arbitrarily wide tables.
   - Metrics are available in **real time** at commit, computed from footers 
already produced by the write.
   - Fully opt-in and pluggable: deployments that don't register a listener pay 
nothing.
   
   ### Scope / non-goals
   
   - Initial implementation targets the **Spark** write path (`SparkWrite`); 
Flink and others could follow the same pattern.
   - Parquet only initially (footer-based); ORC/Avro could be added later.
   - This does **not** replace Iceberg's existing inferred metrics used for 
pruning — it's a complementary, observability-focused channel.
   
   ### Open questions for discussion
   
   1. Is the **events framework** the right surface, or should this be a 
dedicated metrics/reporting interface (similar in spirit to `MetricsReporter` 
for scans, but for writes)?
   2. Naming/placement of the new property and event type.
   3. Should the aggregation granularity (per-partition, per-column) and the 
file-size bucket boundaries be configurable?
   4. Appetite for extending beyond Parquet, and beyond Spark, in follow-ups.
   
   
   
   ### Proposal document
   
   _No response_
   
   ### Specifications
   
   - [x] Table
   - [ ] View
   - [ ] REST
   - [ ] Puffin
   - [ ] Encryption
   - [ ] Other


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to