SteveStevenpoor opened a new issue, #16756:
URL: https://github.com/apache/iceberg/issues/16756
### Proposed Change
## Motivation
Many streaming SQL engines support metadata that is not part of the table
schema but is required for query planning and execution. Two notable examples
are:
- Computed columns
- Watermark definitions
For example, Apache Flink supports:
```sql
CREATE TABLE orders (
order_id BIGINT,
order_time TIMESTAMP(3),
event_time AS order_time,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
);
```
Today, Iceberg catalogs generally preserve only the physical schema and
table properties. As a result, to use watermarks, a user have to make a Flink
Table on top of Iceberg Table:
```sql
CREATE CATALOG iceberg_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://host:9083',
'warehouse'='hdfs://host:9009/user/hive/warehouse/'));
CREATE TABLE iceberg_table (
order_id STRING,
price DECIMAL(32,2),
currency STRING,
origin_order_time TIMESTAMP(6)
) WITH (
'format.version' = '2'));
CREATE TABLE default_catalog.default_database.flink_table (
order_id STRING,
price DECIMAL(32,2),
currency STRING,
origin_order_time TIMESTAMP(6),
order_time as cast(origin_order_time as TIMESTAMP(3)),
WATERMARK FOR order_time AS order_time - INTERVAL '5' MINUTE
) WITH (
'connector' = 'iceberg',
'catalog-name' = 'iceberg_catalog',
'catalog-database' = 'iceberg_db',
'catalog-table' = 'iceberg_origin',
'uri'='thrift://host:9083',
'warehouse'='hdfs://host:9009/user/hive/warehouse/'));
```
While this workflow is manageable for experienced users, it introduces
additional complexity and makes Iceberg-backed tables behave differently from
native Flink tables. Users frequently expect watermark and computed-column to
work out of the box.
This limitation has already been discussed multiple times:
- https://github.com/apache/iceberg/pull/3681 and
https://github.com/apache/iceberg/pull/4994
- stored watermark/computed-column defs in table properties and were
rejected on the same design objection - watermarks are job-scoped, not
table-scoped
- https://github.com/apache/iceberg/pull/4625
- added computed columns to Iceberg's own schema/API and was rejected on
the objection - iceberg tables should not store engine specific expressions
The goal of this proposal is to revisit the problem with a clearer
separation between:
1. Iceberg table metadata
2. Engine-specific metadata extensions
## Background
Iceberg already acts as the authoritative metadata layer for:
- Schemas
- Partition specifications
- Sort orders
- Snapshots
- Table properties
The specification intentionally separates physical storage concerns from
execution semantics.
However, several integrations already require metadata that is not
representable within the Iceberg schema model.
An example is proposal https://github.com/apache/iceberg/issues/5000, in
which computed columns were discussed as a mechanism for expressing partition
transforms through Flink SQL:
```sql
CREATE TABLE tb (
ts TIMESTAMP,
par_ts AS days(ts)
)
PARTITIONED BY (par_ts);
```
Computed-column support would simplify these use cases while preserving a
user-friendly SQL interface.
## Proposal
Add FlinkCatalog-managed metadata, stored separately from Iceberg table
metadata, that preserves Flink-specific concepts such as computed columns and
watermark definitions. Unlike previous proposals, this proposal does not
attempt to make watermark or computed-column metadata part of the Iceberg table
model.
Instead, the goal is to allow FlinkCatalog to preserve Flink-specific
metadata while keeping ownership, interpretation, and validation entirely
within Flink.
Example:
```json
{
"flink": {
"computed_columns": [
{
"name": "event_time",
"expression": "order_time"
}
],
"watermarks": [
{
"column": "event_time",
"expression": "event_time - INTERVAL '5' SECOND"
}
]
}
}
```
This metadata:
- Does not affect Iceberg readers
- Does not participate in schema evolution
- Does not affect partition planning
- Is ignored by engines that do not understand it
- Can be restored by the originating engine
## Design Goals
### Goal 1: Preserve Engine Metadata
Allow engine-specific definitions to survive catalog round-trips.
For example:
```sql
SHOW CREATE TABLE orders;
```
should be able to reconstruct the original DDL, including computed columns
and watermark definitions.
### Goal 2: Avoid Iceberg Semantic Ownership
Iceberg should not validate:
- Watermark expressions
- Watermark strategies
- Computed column semantics
These remain responsibilities of the engine that created the metadata.
### Goal 3: Enable Future Extensions
The same mechanism could support metadata from other engines.
## Alternatives Considered
### Approach 1: Ignore Watermarks and Computed Columns (Current State)
#### Pros
- No metadata changes
- Maximum engine neutrality
- No additional implementation
#### Cons
- Computed-column-based partitioning remains difficult
- Users must manage metadata externally
- May be counter intuitive on user side
### Approach 2: Store Definitions as Table Properties
#### Description
Persist metadata as table properties. This was made in
https://github.com/apache/iceberg/pull/3681 and
https://github.com/apache/iceberg/pull/4625
Example:
```json
{
"flink.watermark.event_time":
"event_time - INTERVAL '5' SECOND"
}
```
#### Pros
- No metadata format changes
- Easy implementation
- Backward compatible
#### Cons
- No structure or validation
- Naming conventions become engine-specific
- Engine-specific metadata becomes mixed with Iceberg-managed metadata
### Approach 3: Extend the Iceberg Schema Model
#### Description
Add computed columns and watermarks directly to the Iceberg schema
specification. This was made in https://github.com/apache/iceberg/pull/4994
#### Pros
- First-class metadata model
- Standardized representation
#### Cons
- Makes engine-specific concepts part of the Iceberg specification
- Watermarks are execution-engine concepts rather than table-format concepts
- Impacts all engines
### Approach 4: FlinkCatalog-managed metadata (Proposed)
#### Description
Introduce a dedicated metadata for flink-owned metadata.
#### Pros
- Preserves metadata
- Maintains engine neutrality
- No changes to Iceberg schema semantics
- Could serve as a model for future engine-specific metadata solutions
#### Cons
- New FlinkCatalog-specific API required
- Requires coordination across catalog implementations.
## Compatibility
- Existing Iceberg tables remain unchanged
- No schema evolution rules are modified
- Engines and catalogs that do not support Flink metadata continue to
operate exactly as they do today
The exact storage location is intentionally left open and could be
implemented as a separate metadata file referenced by FlinkCatalog.
## Discussion
This proposal intentionally reframes the discussion from:
> Should Iceberg understand watermarks and computed columns?
to:
> Should FlinkCatalog provide a mechanism for preserving Flink-owned
metadata?
This proposal intentionally avoids extending the Iceberg specification or
schema model. Instead, it focuses on improving the Flink integration by
allowing Flink-specific metadata to survive catalog round-trips while remaining
outside Iceberg-managed metadata.
This allows Iceberg to remain engine-neutral while enabling Flink to persist
and recover metadata required for its planning and execution semantics.
### Proposal document
_No response_
### Specifications
- [ ] Table
- [ ] View
- [ ] REST
- [ ] Puffin
- [ ] Encryption
- [ ] Other
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]