Hi Flink Community,
I'd like to propose adding watermark support for SQL Views in Flink to
better support event-time processing scenarios.
Problem Statement
Currently, Flink SQL views cannot define watermarks. This limitation
creates several challenges:
- *Broken Abstraction*: Users must reference underlying table watermarks
directly, exposing implementation details
- *No Flexibility*: Cannot define different watermark strategies for
different use cases on the same data source
- *Limited Architecture Support*: Incompatible with modern layered data
architectures (Bronze/Silver/Gold medallion pattern)
For example, in multi-tenant scenarios, different tenants may require
different lateness tolerance, but currently we cannot create views with
different watermark strategies on the same source table.
Proposed Solution
I propose adding two SQL syntax options to support watermark definitions in
views:
*Option 1: CREATE VIEW with WATERMARK*
CREATE VIEW user_activity
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDAS SELECT
user_id, event_time, action FROM raw_events;
*Option 2: ALTER VIEW SET WATERMARK*
ALTER VIEW user_activity SET WATERMARK FOR event_time AS event_time -
INTERVAL '5' SECOND;
Key Design Aspects
- *Backward Compatibility*: Watermark stored as optional metadata in
view options; existing views continue to work unchanged
- *Validation*: Watermark column must exist in view schema and be of
TIMESTAMP/TIMESTAMP_LTZ type
- *Storage*: Watermark metadata persists in catalog options map (works
with all catalog implementations)
- *Propagation*: Follows existing Flink watermark propagation rules in
joins and nested views
Use Case Example: Data Lakehouse Architecture
-- Bronze: Raw data (no watermark)CREATE TABLE bronze_events (raw_data
STRING, ingestion_time TIMESTAMP(3)) WITH (...);
-- Silver: Cleaned data with watermarkCREATE VIEW silver_events
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECONDAS SELECT
CAST(JSON_VALUE(raw_data, '$.event_time') AS TIMESTAMP(3)) AS event_time,
JSON_VALUE(raw_data, '$.user_id') AS user_idFROM
bronze_eventsWHERE JSON_VALUE(raw_data, '$.event_time') IS NOT NULL;
-- Gold: AggregationsSELECT TUMBLE_START(event_time, INTERVAL '1'
HOUR), COUNT(*)FROM silver_eventsGROUP BY TUMBLE(event_time, INTERVAL
'1' HOUR);
Reference Materials
- FLIP Document: FLIP-XXX: Support Watermark in Flink SQL View
<https://docs.google.com/document/d/1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y/edit?usp=sharing>
- JIRA Issue: https://issues.apache.org/jira/browse/FLINK-39062
- Implementation POC:
- [FLINK-39062][table] Support WATERMARK clause in CREATE VIEW statement
<https://github.com/apache/flink/pull/27571>
- [FLINK-39062][table] Support ALTER VIEW SET WATERMARK syntax
<https://github.com/apache/flink/pull/27570>
Implementation Timeline
Estimated 6-8 weeks covering parser layer, planner layer, catalog
integration, and comprehensive testing.
Request for Feedback
This enhancement would significantly improve Flink's support for layered
data architectures and flexible event-time processing. I'm happy to provide
more details or start a formal FLIP process if the community sees value in
this proposal.
Looking forward to the community's feedback!
Best regards,
Feat Zhang
FLIP-XXX: Support Watermark in Flink SQL View
<https://drive.google.com/open?id=1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y>