Hi Flink Community,

I'd like to propose adding watermark support for SQL Views in Flink to
better support event-time processing scenarios.
Problem Statement

Currently, Flink SQL views cannot define watermarks. This limitation
creates several challenges:

   - *Broken Abstraction*: Users must reference underlying table watermarks
   directly, exposing implementation details
   - *No Flexibility*: Cannot define different watermark strategies for
   different use cases on the same data source
   - *Limited Architecture Support*: Incompatible with modern layered data
   architectures (Bronze/Silver/Gold medallion pattern)

For example, in multi-tenant scenarios, different tenants may require
different lateness tolerance, but currently we cannot create views with
different watermark strategies on the same source table.
Proposed Solution

I propose adding two SQL syntax options to support watermark definitions in
views:

*Option 1: CREATE VIEW with WATERMARK*

CREATE VIEW user_activity
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDAS SELECT
user_id, event_time, action FROM raw_events;

*Option 2: ALTER VIEW SET WATERMARK*

ALTER VIEW user_activity SET WATERMARK FOR event_time AS event_time -
INTERVAL '5' SECOND;

Key Design Aspects

   - *Backward Compatibility*: Watermark stored as optional metadata in
   view options; existing views continue to work unchanged
   - *Validation*: Watermark column must exist in view schema and be of
   TIMESTAMP/TIMESTAMP_LTZ type
   - *Storage*: Watermark metadata persists in catalog options map (works
   with all catalog implementations)
   - *Propagation*: Follows existing Flink watermark propagation rules in
   joins and nested views

Use Case Example: Data Lakehouse Architecture

-- Bronze: Raw data (no watermark)CREATE TABLE bronze_events (raw_data
STRING, ingestion_time TIMESTAMP(3)) WITH (...);
-- Silver: Cleaned data with watermarkCREATE VIEW silver_events
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECONDAS SELECT
    CAST(JSON_VALUE(raw_data, '$.event_time') AS TIMESTAMP(3)) AS event_time,
    JSON_VALUE(raw_data, '$.user_id') AS user_idFROM
bronze_eventsWHERE JSON_VALUE(raw_data, '$.event_time') IS NOT NULL;
-- Gold: AggregationsSELECT TUMBLE_START(event_time, INTERVAL '1'
HOUR), COUNT(*)FROM silver_eventsGROUP BY TUMBLE(event_time, INTERVAL
'1' HOUR);

Reference Materials

   - FLIP Document: FLIP-XXX: Support Watermark in Flink SQL View
   
<https://docs.google.com/document/d/1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y/edit?usp=sharing>
   - JIRA Issue: https://issues.apache.org/jira/browse/FLINK-39062
   - Implementation POC:
   - [FLINK-39062][table] Support WATERMARK clause in CREATE VIEW statement
   <https://github.com/apache/flink/pull/27571>
   - [FLINK-39062][table] Support ALTER VIEW SET WATERMARK syntax
   <https://github.com/apache/flink/pull/27570>

Implementation Timeline

Estimated 6-8 weeks covering parser layer, planner layer, catalog
integration, and comprehensive testing.
Request for Feedback

This enhancement would significantly improve Flink's support for layered
data architectures and flexible event-time processing. I'm happy to provide
more details or start a formal FLIP process if the community sees value in
this proposal.

Looking forward to the community's feedback!

Best regards,

Feat Zhang

 FLIP-XXX: Support Watermark in Flink SQL View
<https://drive.google.com/open?id=1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y>

Reply via email to