lintingbin opened a new issue, #15925:
URL: https://github.com/apache/iceberg/issues/15925
## Background
The `DynamicIcebergSink` is designed for multi-table fan-out scenarios where
a single Flink job ingests heterogeneous event streams into different Iceberg
tables. A common production pattern is the **wide-table model**: all event
types (log types) share one large Iceberg table whose schema is the union of
all their columns, while each individual event type only carries a subset of
those columns.
**Example:** A game analytics pipeline ingests 50 distinct log types into
one wide Iceberg table with 500 columns. Log type A produces columns 1–40, log
type B produces columns 200–230, and so on. No single event populates all 500
columns.
## Problem
Without projected schema writes, `DynamicIcebergSink` must write a
**full-width row** for every record. The caller is required to pad missing
columns with `null` before handing the record to the sink. This causes
significant overhead in wide-table scenarios:
### 1. Wasted CPU on null padding
The upstream operator must construct a `RowData` with the full table width
(e.g., 500 fields), setting the vast majority to `null`. For high-throughput
pipelines (millions of records/second), this is measurable CPU waste.
### 2. Wasted storage and I/O
Even with Parquet's efficient null encoding, writing explicit null columns
in every row file inflates file sizes. A Parquet column chunk is always written
even if all its values are null; the schema metadata overhead alone adds up at
scale. In our production environment with 1-minute checkpoint intervals, this
results in far more and larger small files than necessary.
### 3. Inflated serialization cost between Flink operators
`DynamicRecord` carries the full `RowData` across the network (Writer →
Committer topology). A record with 500 columns is serialized even though only
40 of those columns are meaningful. This wastes both CPU and network bandwidth
within the Flink job.
### 4. Poor write isolation between log types
`DynamicWriter` uses a `WriteTarget` as the writer factory key. With
full-width rows, all records for the same table share one writer factory
regardless of their logical log type. For use cases where log types need to be
physically isolated (e.g., different sort orders per log type), this is
inflexible.
## Proposed Solution
Introduce an optional `writeSchema` field on `DynamicRecord` (and its
internal counterpart `DynamicRecordInternal`). When set, `writeSchema` tells
`DynamicWriter` to use only those columns when creating the Parquet
`TaskWriter`, letting Iceberg fill the remaining optional columns with `null`
at read time via its column projection mechanism.
```java
DynamicRecord record = new DynamicRecord(...);
// Only specify the columns this log type actually produces
Schema logTypeASchema = tableSchema.select("event_time", "user_id", "level",
"score");
record.setWriteSchema(logTypeASchema);
```
The `RowData` inside the record is expected to match `writeSchema` in column
count and order — exactly like the existing schema-matching behavior, but
without triggering schema evolution.
### Key design points
**`writeSchema` vs `schema` (record schema)**
| Field | Purpose |
|-------|---------|
| `schema` | Used for schema matching and evolution (existing behavior,
unchanged) |
| `writeSchema` | Used only for Parquet file writing; must be a subset of
the resolved table schema |
**`writeSchemaColumnIds` in `WriteTarget`**
`WriteTarget` (the writer factory key) gains an ordered list of field IDs
derived from `writeSchema`. This ensures records with different projected
schemas get independent `TaskWriter` instances, which is required for
correctness (each writer is created with a specific `RowType`).
**Validation in `DynamicWriter`**
Before creating a writer factory for a projected schema, the following are
enforced:
- **No required columns may be skipped** — Iceberg cannot fill a `required`
column with `null` at read time; skipping one would produce unreadable files.
- **All partition columns must be included** — the writer needs partition
values to place files correctly.
- **All equality-delete fields must be included** — required for correct
upsert semantics.
- **All field IDs in `writeSchema` must exist in the resolved table schema**
— prevents silent field mismatches.
**Aggregation in `DynamicWriteResultAggregator` and `DynamicCommitter`**
`WriteResult`s produced by different projected schemas for the same table
are merged at the table level before committing. A single checkpoint produces
at most one Iceberg transaction per table, regardless of how many distinct
`writeSchema` variants were active. This is consistent with the existing
behavior for non-projected writes.
## Serialization Compatibility
Introducing `writeSchema` requires changes to several serializers:
| Serializer | Change |
|---|---|
| `DynamicRecordInternalSerializer` | Version bumped; `writeSchema`
serialized as a length-prefixed JSON string (avoids
`DataOutputStream.writeUTF`'s 65535-byte limit for large schemas);
backward-compatible deserialization preserved |
| `DynamicCommittableSerializer` | Version 1 → 2; V1 deserialization
preserved for checkpoint recovery |
| `DynamicWriteResultSerializer` | Version 1 → 2; same
backward-compatibility guarantee |
| `WriteTarget` (versioned serializer) | Version bumped to account for new
`writeSchemaColumnIds` field |
## Performance Impact
In production testing against a wide table (500 columns, 50 log types,
average 40 active columns per log type, Flink 1.20):
- **Write throughput**: up to **4x** improvement for wide schemas
- **Parquet file size**: **several times smaller** with 1-minute checkpoint
intervals (fewer column chunks per file, significantly reduced small-file
problem)
## API Changes
```java
// DynamicRecord.java (public API)
public class DynamicRecord {
// existing fields unchanged ...
/**
* Optional projected schema for writing. When set, the RowData columns
* must match this schema rather than the full table schema.
* The remaining optional columns in the table schema will be read back
* as null by Iceberg at query time.
*/
@Nullable
public Schema writeSchema() { ... }
public void setWriteSchema(@Nullable Schema writeSchema) { ... }
}
```
This is a purely additive, backward-compatible change. Callers that do not
set `writeSchema` see no behavioral difference.
## Alternatives Considered
**A. Null-pad at the source / upstream operator**
This is what the current API forces. It moves the burden to every caller and
does not reduce the serialization or storage cost — it just shifts where the
nulls are constructed.
**B. Separate tables per log type**
This defeats the purpose of the wide-table model (unified querying, single
schema governance) and creates operational overhead (hundreds of tables to
manage).
**C. Use Iceberg's existing read-side column projection**
Iceberg's read-side column projection is well-established. This proposal
applies the same concept to the write side within `DynamicIcebergSink`, where
per-record write schema variance is the norm.
## Related Issues
- #11536 — original DynamicIcebergSink tracking issue
- #14090 — `DynamicWriteResultAggregator` producing multiple committables
per table/branch/checkpoint; the table-level result aggregation introduced by
this feature also addresses that issue as a side effect
## Implementation Status
A working implementation exists and has been validated in production (Flink
1.20, ~2M records/second, 500-column wide table). We intend to extend support
to Flink 1.19 and 2.0 and submit a PR. We welcome feedback on the API design,
particularly whether `writeSchema` belongs on `DynamicRecord` directly or
should be expressed differently.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]