SteveStevenpoor opened a new issue, #16756:
URL: https://github.com/apache/iceberg/issues/16756

   ### Proposed Change
   
   ## Motivation
   
   Many streaming SQL engines support metadata that is not part of the table 
schema but is required for query planning and execution. Two notable examples 
are:
   
   - Computed columns
   - Watermark definitions
   
   For example, Apache Flink supports:
   
   ```sql
   CREATE TABLE orders (
     order_id BIGINT,
     order_time TIMESTAMP(3),
     event_time AS order_time,
     WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
   );
   ```
   
   Today, Iceberg catalogs generally preserve only the physical schema and 
table properties. As a result, to use watermarks, a user have to make a Flink 
Table on top of Iceberg Table:
   
   ```sql
   CREATE CATALOG iceberg_catalog WITH (
       'type'='iceberg',
       'catalog-type'='hive',
       'uri'='thrift://host:9083',
       'warehouse'='hdfs://host:9009/user/hive/warehouse/'));
   
   CREATE TABLE  iceberg_table (
       order_id    STRING,
       price       DECIMAL(32,2), 
       currency    STRING,
       origin_order_time TIMESTAMP(6)
   ) WITH (
       'format.version' = '2'));
   
   CREATE TABLE  default_catalog.default_database.flink_table (
       order_id    STRING,
       price       DECIMAL(32,2),
       currency    STRING,
       origin_order_time TIMESTAMP(6),
       order_time as cast(origin_order_time as TIMESTAMP(3)),
       WATERMARK FOR order_time AS order_time - INTERVAL '5' MINUTE
   ) WITH (
       'connector' = 'iceberg',
       'catalog-name' = 'iceberg_catalog',
       'catalog-database' = 'iceberg_db',
       'catalog-table' = 'iceberg_origin',
       'uri'='thrift://host:9083',
       'warehouse'='hdfs://host:9009/user/hive/warehouse/'));
   ```
   
   While this workflow is manageable for experienced users, it introduces 
additional complexity and makes Iceberg-backed tables behave differently from 
native Flink tables. Users frequently expect watermark and computed-column to 
work out of the box.
   
   This limitation has already been discussed multiple times:
   - https://github.com/apache/iceberg/pull/3681 and 
https://github.com/apache/iceberg/pull/4994
     -  stored watermark/computed-column defs in table properties and were 
rejected on the same design objection - watermarks are job-scoped, not 
table-scoped 
   - https://github.com/apache/iceberg/pull/4625
     - added computed columns to Iceberg's own schema/API and was rejected on 
the objection - iceberg tables should not store engine specific expressions
   
   The goal of this proposal is to revisit the problem with a clearer 
separation between:
   
   1. Iceberg table metadata
   2. Engine-specific metadata extensions
   
   ## Background
   
   Iceberg already acts as the authoritative metadata layer for:
   
   - Schemas
   - Partition specifications
   - Sort orders
   - Snapshots
   - Table properties
   
   The specification intentionally separates physical storage concerns from 
execution semantics.
   
   However, several integrations already require metadata that is not 
representable within the Iceberg schema model.
   
   An example is proposal https://github.com/apache/iceberg/issues/5000, in 
which computed columns were discussed as a mechanism for expressing partition 
transforms through Flink SQL:
   
   ```sql
   CREATE TABLE tb (
     ts TIMESTAMP,
     par_ts AS days(ts)
   )
   PARTITIONED BY (par_ts);
   ```
   
   Computed-column support would simplify these use cases while preserving a 
user-friendly SQL interface.
   
   ## Proposal
   
   Add FlinkCatalog-managed metadata, stored separately from Iceberg table 
metadata, that preserves Flink-specific concepts such as computed columns and 
watermark definitions. Unlike previous proposals, this proposal does not 
attempt to make watermark or computed-column metadata part of the Iceberg table 
model.
   
   Instead, the goal is to allow FlinkCatalog to preserve Flink-specific 
metadata while keeping ownership, interpretation, and validation entirely 
within Flink.
   
   Example:
   
   ```json
   {
       "flink": {
         "computed_columns": [
           {
             "name": "event_time",
             "expression": "order_time"
           }
         ],
         "watermarks": [
           {
             "column": "event_time",
             "expression": "event_time - INTERVAL '5' SECOND"
           }
         ]
       }
   }
   ```
   
   This metadata:
   
   - Does not affect Iceberg readers
   - Does not participate in schema evolution
   - Does not affect partition planning
   - Is ignored by engines that do not understand it
   - Can be restored by the originating engine
   
   ## Design Goals
   
   ### Goal 1: Preserve Engine Metadata
   
   Allow engine-specific definitions to survive catalog round-trips.
   
   For example:
   
   ```sql
   SHOW CREATE TABLE orders;
   ```
   
   should be able to reconstruct the original DDL, including computed columns 
and watermark definitions.
   
   ### Goal 2: Avoid Iceberg Semantic Ownership
   
   Iceberg should not validate:
   
   - Watermark expressions
   - Watermark strategies
   - Computed column semantics
   
   These remain responsibilities of the engine that created the metadata.
   
   ### Goal 3: Enable Future Extensions
   
   The same mechanism could support metadata from other engines.
   
   ## Alternatives Considered
   
   ### Approach 1: Ignore Watermarks and Computed Columns (Current State)
   #### Pros
   
   - No metadata changes
   - Maximum engine neutrality
   - No additional implementation
   
   #### Cons
   
   - Computed-column-based partitioning remains difficult
   - Users must manage metadata externally
   - May be counter intuitive on user side
   
   ### Approach 2: Store Definitions as Table Properties
   
   #### Description
   
   Persist metadata as table properties. This was made in 
https://github.com/apache/iceberg/pull/3681 and 
https://github.com/apache/iceberg/pull/4625
   
   Example:
   
   ```json
   {
     "flink.watermark.event_time":
       "event_time - INTERVAL '5' SECOND"
   }
   ```
   
   #### Pros
   
   - No metadata format changes
   - Easy implementation
   - Backward compatible
   
   #### Cons
   
   - No structure or validation
   - Naming conventions become engine-specific
   - Engine-specific metadata becomes mixed with Iceberg-managed metadata
   
   ### Approach 3: Extend the Iceberg Schema Model
   
   #### Description
   
   Add computed columns and watermarks directly to the Iceberg schema 
specification. This was made in https://github.com/apache/iceberg/pull/4994
   
   #### Pros
   
   - First-class metadata model
   - Standardized representation
   
   #### Cons
   
   - Makes engine-specific concepts part of the Iceberg specification
   - Watermarks are execution-engine concepts rather than table-format concepts
   - Impacts all engines
   
   ### Approach 4: FlinkCatalog-managed metadata  (Proposed)
   
   #### Description
   
   Introduce a dedicated metadata for flink-owned metadata.
   
   #### Pros
   
   - Preserves metadata
   - Maintains engine neutrality
   - No changes to Iceberg schema semantics
   - Could serve as a model for future engine-specific metadata solutions
   
   #### Cons
   
   - New FlinkCatalog-specific API required
   - Requires coordination across catalog implementations.
   
   ## Compatibility
   
   - Existing Iceberg tables remain unchanged
   - No schema evolution rules are modified
   - Engines and catalogs that do not support Flink metadata continue to 
operate exactly as they do today
   
   The exact storage location is intentionally left open and could be 
implemented as a separate metadata file referenced by FlinkCatalog.
   
   ## Discussion
   
   This proposal intentionally reframes the discussion from:
   
   > Should Iceberg understand watermarks and computed columns?
   
   to:
   
   > Should FlinkCatalog provide a mechanism for preserving Flink-owned 
metadata?
   
   This proposal intentionally avoids extending the Iceberg specification or 
schema model. Instead, it focuses on improving the Flink integration by 
allowing Flink-specific metadata to survive catalog round-trips while remaining 
outside Iceberg-managed metadata.
   
   This allows Iceberg to remain engine-neutral while enabling Flink to persist 
and recover metadata required for its planning and execution semantics.
   
   ### Proposal document
   
   _No response_
   
   ### Specifications
   
   - [ ] Table
   - [ ] View
   - [ ] REST
   - [ ] Puffin
   - [ ] Encryption
   - [ ] Other


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to