herbherbherb opened a new issue, #15783:
URL: https://github.com/apache/iceberg/issues/15783

   ### Query engine
   
   Flink
   
   ### Feature Request / Improvement
   
   Add a `WriteObserver` plugin interface to `IcebergSink` (Sink V2) that 
observes each record written and produces per-checkpoint metadata that flows 
through the entire sink pipeline to the Iceberg snapshot summary.
   
   ### Motivation
   
   Users need to extract per-record metadata (e.g., watermark timestamps, data 
quality scores) at the writer level and attach it to the committed Iceberg 
snapshot. Currently there is no way to propagate custom metadata from the 
writer through the aggregator to the committer without subclassing multiple 
internal classes (`IcebergSinkWriter`, `IcebergWriteAggregator`, 
`IcebergCommittable`, `IcebergCommitter`).
   
   This PR adds a `WriteObserver` that is called for each record in 
`IcebergSinkWriter.write()`. At checkpoint time, the observer's accumulated 
metadata is carried through the serialization boundary via a ThreadLocal 
holder, serialized alongside `WriteResult` and `IcebergCommittable`, merged 
across parallel writer subtasks in the aggregator, and applied as additional 
Iceberg snapshot properties in the committer.
   
   ### Changes
   
   - New: `WriteObserver.java` -- interface with `observe(RowData, 
SinkWriter.Context)` and `default Map<String, String> snapshotMetadata()`
   - New: `WriteObserverMetadataHolder.java` -- ThreadLocal holder for passing 
metadata through serialization boundaries
   - Modified: `IcebergSinkWriter` -- calls observer per-record, collects 
metadata at checkpoint via `prepareCommit()`
   - Modified: `WriteResultSerializer` -- v2 format carries observer metadata 
alongside WriteResult bytes
   - Modified: `IcebergWriteAggregator` -- reads metadata from deserialized 
WriteResults, merges across writer subtasks, passes to IcebergCommittable
   - Modified: `IcebergCommittable` + `IcebergCommittableSerializer` -- new 
`observerMetadata` field with v2 serialization format (backward-compatible with 
v1)
   - Modified: `IcebergCommitter` -- merges observer metadata from committables 
and applies as snapshot properties
   - Modified: `IcebergSink.Builder` -- new `writeObserver()` method
   
   ### Compatibility
   
   - No behavioral change when the observer is not set (null default)
   - `IcebergCommittableSerializer` v2 can deserialize v1 payloads (backward 
compatible)
   - `WriteResultSerializer` v2 can deserialize v1 payloads (backward 
compatible)
   - No changes to public API signatures of existing methods
   
   ### Use cases
   
   - Per-record watermark extraction for downstream freshness tracking
   - Data quality score aggregation per checkpoint
   - Custom metadata that should appear in Iceberg snapshot summaries


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to