Hi, FeatZhang. Thanks for driving this discussion. I've read through the full 
FLIP and the mailing list context, and I have a few questions:
1. If I understand correctly, in a Layered Data Architecture, silver_events 
would typically be a table, a materialized view, or a materialized table. From 
the mailing list discussion, it seems like no consensus was reached on this 
point. I think we still need to consider whether APPLY_WATERMARK should be 
allowed on (non-materialized) views.
2. In the Planner Changes section under Logical Plan, could you elaborate on 
why monotonicity guarantees need to be ensured for the watermark expression 
validation?
3. (nit) In the Watermark Override part under Planner Changes, shouldn't the 
override of the upstream watermark happen at runtime rather than at the planner 
level?
4. I feel that APPLY_WATERMARK is quite similar to TO_CHANGELOG and 
FROM_CHANGELOG. Is what we actually need just a BuiltInProcessTableFunction? 
That way, we would only need to further extend ProcessTableFunction to support 
this.
5. If we choose to translate APPLY_WATERMARK into a specialized ExecNode 
(similar to VECTOR_SEARCH and ML_PREDICT), would the existing 
StreamExecWatermarkAssigner be sufficient for this purpose?





--

    Best!
    Xuyang



At 2026-04-21 22:22:33, "FeatZhang" <[email protected]> wrote:
>Hi everyone,
>
>Thank you for the feedback and discussions on the initial proposal. I've
>revised the FLIP based on the community's input and would like to share
>the updated version.
>
>
> FLIP-XXX: Support Flexible Watermark Assignment via Built-in Function
><https://drive.google.com/open?id=17PXYAi6Pb91OqFhVVK7tRULiaHAb6wiX79jjC0NaaDA>
>
>
>KEY UPDATES
>===========
>
>The proposal has evolved from "Support Watermark Definition in SQL Views"
>to a more flexible and powerful approach: FLIP-XXX: Support Flexible
>Watermark Assignment via APPLY_WATERMARK Function.
>
>What's Changed:
>
>1. Broader Scope: Instead of limiting watermark definitions to SQL views
>   only, the new proposal introduces a built-in table function
>   APPLY_WATERMARK that works with:
>   - Base tables
>   - Views (both regular and materialized)
>   - Subqueries
>   - Any table-valued expressions
>
>2. More Flexible Design: The function-based approach provides:
>   - Dynamic watermark assignment at query time without modifying catalog
>     metadata
>   - Override capability for existing watermark strategies
>   - Composability with other SQL operations
>   - No need for DDL changes or catalog write permissions
>
>3. Better SQL Semantics: Using a table function follows SQL standard
>   patterns and integrates naturally with Flink's existing function
>   ecosystem.
>
>UPDATED FLIP DOCUMENT
>=====================
>
>The revised FLIP is now available at:
>https://iwiki.woa.com/p/4019879693
>
>Key sections include:
>- Motivation and use cases
>- Public interfaces and SQL syntax
>- Implementation plan
>- Compatibility analysis
>- Test plan
>
>EXAMPLE USAGE
>=============
>
>-- Apply watermark to a view
>SELECT *FROM APPLY_WATERMARK(my_view, DESCRIPTOR(event_time),
>event_time - INTERVAL '5' SECOND);
>-- Override existing watermark strategy
>SELECT *FROM APPLY_WATERMARK(my_table_with_watermark, DESCRIPTOR(ts),
>ts - INTERVAL '10' SECOND -- Different from DDL watermark
>);
>-- Use in complex queries
>SELECT window_start,
>       window_end,
>       COUNT(*)FROM TABLE(TUMBLE(TABLE APPLY_WATERMARK(orders,
>DESCRIPTOR(order_time), order_time - INTERVAL '5' SECOND),
>DESCRIPTOR(order_time), INTERVAL '1' HOUR))GROUP BY window_start,
>         window_end;
>
>
>IMPLEMENTATION PROGRESS
>=======================
>
>I've also opened a draft PR #27984 with the initial implementation:
>- Core built-in function definition
>- SQL-to-RelNode conversion rules
>- Physical plan integration
>- Unit tests and documentation (English + Chinese)
>
>The PR is available at:
>https://github.com/apache/flink/pull/27984
>
>REQUEST FOR FEEDBACK
>====================
>
>I would appreciate your thoughts on:
>
>1. Function naming: Is APPLY_WATERMARK clear and intuitive?
>   (Alternative considered: WITH_WATERMARK, SET_WATERMARK)
>
>2. DESCRIPTOR syntax: Using DESCRIPTOR(column_name) to specify the
>   rowtime column—does this align well with Flink's existing patterns?
>
>3. Override behavior: Should APPLY_WATERMARK always override existing
>   watermarks, or should we provide a mode parameter
>   (e.g., OVERRIDE, MERGE)?
>
>4. Performance considerations: Any concerns about the function-based
>   approach vs. catalog-level watermark definitions?
>
>
>
>
>
>Looking forward to your valuable feedback!
>
>Best regards,
>FeatZhang
>
>On Thu, Feb 12, 2026 at 6:24 PM Timo Walther <[email protected]> wrote:
>
>> Hi everyone,
>>
>> I think we all agree that we clearly want this functionality, just the
>> "how" needs to be discussed. I also like Lincoln’s suggestion of
>> introducing a built-in PTF for this, I had similar ideas in mind.
>>
>> There are two issues with a APPLY_WATERMARK function, but both on the
>> short-term roadmap:
>>
>> 1) This function would need to be a function that takes an expression.
>> Ideally as a lambda function. Newer Calcite versions have already lambda
>> expression support. At Confluent we were planning to work on a Calcite
>> upgrade this quarter especially to get lambda support in and improve
>> built-in functions that work on collections.
>>
>> 2) User-defined PTFs are currently not able to emit watermarks. We could
>> introduce a new interface WatermarkFunction (similar to
>> ChangelogFunction) that would offer this to everyone. Alternatively, we
>> could only use the PTF signature, but translate to a specialized
>> ExecNode similar how we do it for VECTOR_SEARCH and ML_PREDICT.
>>
>> In any case, even if we go with the function approach, we definitely
>> need a full FLIP on this.
>>
>> Thanks,
>> Timo
>>
>> On 12.02.26 08:25, Gyula Fóra wrote:
>> > Hi All!
>> > I would like to chime in here quickly from a slightly different angle.
>> > While I am the first to admit that I cannot grasp all the planning /
>> > conceptual implications, I also feel the need for more flexible watermark
>> > handling as suggested by Feat.
>> >
>> > Anything that can only be applied to base/catalog tables is very limiting
>> > from a usability perspective. Watermarks feel like they should be a
>> simple
>> > function that you can apply on a column/table as part of a query/view.
>> For
>> > example extract timestamp from a string convert to TS -> apply watermark
>> > etc.
>> >
>> > Users often receive the tables/catalogs as given and can only write
>> > queries.
>> >
>> > Fixing this would eliminate a long standing disconnect between the
>> > datastream api flexible watermark handling compared to the currently very
>> > restrictive SQL approach.
>> >
>> > Cheers
>> > Gyula
>> >
>> > On Thu, Feb 12, 2026 at 7:54 AM FeatZhang <[email protected]> wrote:
>> >
>> >> Hi Timo, Lincoln,
>> >>
>> >> Thank you both for the detailed feedback.
>> >>
>> >> I agree with the concern that non-materialized SQL views should remain a
>> >> pure logical abstraction. Introducing watermark definitions directly
>> >> into CREATE
>> >> VIEW or ALTER VIEW could blur the boundary between logical aliasing and
>> >> physical planning semantics, especially considering optimization
>> barriers
>> >> and watermark propagation behavior.
>> >>
>> >> Lincoln’s suggestion of introducing a built-in function such as:
>> >>
>> >> APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column), watermark_expression)
>> >>
>> >> is a cleaner direction. It keeps watermark definition as an explicit
>> >> relational transformation rather than attaching additional semantics to
>> >> views.
>> >>
>> >> However, to fully address the original use cases (especially logical
>> reuse
>> >> and layered lakehouse architectures), I propose that the table parameter
>> >> should support:
>> >>
>> >>     - Base tables
>> >>     - Non-materialized views
>> >>
>> >> If APPLY_WATERMARK can accept both, we can:
>> >>
>> >>     - Preserve the conceptual purity of SQL views
>> >>     - Avoid redefining view semantics
>> >>     - Still enable logical reuse via views
>> >>     - Allow different watermark strategies over the same logical
>> relation
>> >>
>> >> In other words, watermark definition becomes an explicit relational
>> >> operator applied on top of any logical relation, instead of being
>> embedded
>> >> into the view definition itself.
>> >>
>> >>  From a planner perspective, this keeps the model consistent:
>> >>
>> >>     - The function expands into a relational node
>> >>     - No optimization barrier is introduced by views
>> >>     - Watermark handling remains part of the logical plan transformation
>> >>
>> >> I will prepare a PR to prototype APPLY_WATERMARK with support for both
>> base
>> >> tables and non-materialized views, and share it for further discussion.
>> >>
>> >> Looking forward to your thoughts.
>> >>
>> >> Best,
>> >> Feat
>> >>
>> >>
>> >> Lincoln Lee <[email protected]> 于2026年2月12日周四 12:19写道:
>> >>
>> >>> Agree with Timo’s point regarding the conceptual semantics. We should
>> not
>> >>> directly extend non-materialized views with additional watermark
>> >>> definitions.
>> >>>
>> >>> Regarding the use case mentioned by Feat, defining different watermark
>> >>> strategies
>> >>> for the same data source, especially in the case of catalog tables, we
>> >> are
>> >>> exploring a possible solution introducing a built-in function:
>> >>> ```sql
>> >>> APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column),
>> watermark_expression)
>> >>> ```
>> >>> This function only support base tables as input and not support views,
>> >>> subqueries
>> >>> or derived relations.
>> >>>
>> >>> This would address a meaningful subset of the identified use cases
>> >> without
>> >>> redefining the role of SQL views.
>> >>>
>> >>>
>> >>> Best,
>> >>> Lincoln Lee
>> >>>
>> >>>
>> >>> Timo Walther <[email protected]> 于2026年2月11日周三 23:29写道:
>> >>>
>> >>>> Hi Feat,
>> >>>>
>> >>>> thanks for proposing this FLIP. We had similar discussions in the
>> past,
>> >>>> but so far could never reach consensus.
>> >>>>
>> >>>> SQL views are actually a very simple concept, they just give SQL text
>> >> an
>> >>>> alias. A view has no other properties except for the view definition.
>> >>>> Everything else is dynamically computed when the SQL text is inserted
>> >>>> into the larger plan. A view is never evaluated without the
>> surrounding
>> >>>> plan.
>> >>>>
>> >>>> Watermarks in the middle of a pipeline raise a couple of tricky
>> issues:
>> >>>>
>> >>>> - What if the upstream table is updating, how would you deal with
>> >>>> watermarks in the downstream view?
>> >>>> - What if the upstream table emits already watermarks? Would the view
>> >>>> catch them and discard this information?
>> >>>> - A view usually dissolves into the plan (e.g. via projection or
>> filter
>> >>>> pushdown). Would a watermark definition suddenly introduce an
>> >>>> optimization barrier? If this is an optimization barrier, is this
>> still
>> >>>> a view or a new concept? E.g. a "materialized view" or "pre-planned
>> >>> view"?
>> >>>>
>> >>>> Cheers,
>> >>>> Timo
>> >>>>
>> >>>>
>> >>>>
>> >>>> On 11.02.26 03:51, FeatZhang wrote:
>> >>>>> Hi Flink Community,
>> >>>>>
>> >>>>> I'd like to propose adding watermark support for SQL Views in Flink
>> >> to
>> >>>>> better support event-time processing scenarios.
>> >>>>> Problem Statement
>> >>>>>
>> >>>>> Currently, Flink SQL views cannot define watermarks. This limitation
>> >>>>> creates several challenges:
>> >>>>>
>> >>>>>      - *Broken Abstraction*: Users must reference underlying table
>> >>>> watermarks
>> >>>>>      directly, exposing implementation details
>> >>>>>      - *No Flexibility*: Cannot define different watermark strategies
>> >>> for
>> >>>>>      different use cases on the same data source
>> >>>>>      - *Limited Architecture Support*: Incompatible with modern
>> >> layered
>> >>>> data
>> >>>>>      architectures (Bronze/Silver/Gold medallion pattern)
>> >>>>>
>> >>>>> For example, in multi-tenant scenarios, different tenants may require
>> >>>>> different lateness tolerance, but currently we cannot create views
>> >> with
>> >>>>> different watermark strategies on the same source table.
>> >>>>> Proposed Solution
>> >>>>>
>> >>>>> I propose adding two SQL syntax options to support watermark
>> >>> definitions
>> >>>> in
>> >>>>> views:
>> >>>>>
>> >>>>> *Option 1: CREATE VIEW with WATERMARK*
>> >>>>>
>> >>>>> CREATE VIEW user_activity
>> >>>>> WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDAS SELECT
>> >>>>> user_id, event_time, action FROM raw_events;
>> >>>>>
>> >>>>> *Option 2: ALTER VIEW SET WATERMARK*
>> >>>>>
>> >>>>> ALTER VIEW user_activity SET WATERMARK FOR event_time AS event_time -
>> >>>>> INTERVAL '5' SECOND;
>> >>>>>
>> >>>>> Key Design Aspects
>> >>>>>
>> >>>>>      - *Backward Compatibility*: Watermark stored as optional
>> metadata
>> >>> in
>> >>>>>      view options; existing views continue to work unchanged
>> >>>>>      - *Validation*: Watermark column must exist in view schema and
>> be
>> >>> of
>> >>>>>      TIMESTAMP/TIMESTAMP_LTZ type
>> >>>>>      - *Storage*: Watermark metadata persists in catalog options map
>> >>>> (works
>> >>>>>      with all catalog implementations)
>> >>>>>      - *Propagation*: Follows existing Flink watermark propagation
>> >> rules
>> >>>> in
>> >>>>>      joins and nested views
>> >>>>>
>> >>>>> Use Case Example: Data Lakehouse Architecture
>> >>>>>
>> >>>>> -- Bronze: Raw data (no watermark)CREATE TABLE bronze_events
>> >> (raw_data
>> >>>>> STRING, ingestion_time TIMESTAMP(3)) WITH (...);
>> >>>>> -- Silver: Cleaned data with watermarkCREATE VIEW silver_events
>> >>>>> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECONDAS
>> >> SELECT
>> >>>>>       CAST(JSON_VALUE(raw_data, '$.event_time') AS TIMESTAMP(3)) AS
>> >>>> event_time,
>> >>>>>       JSON_VALUE(raw_data, '$.user_id') AS user_idFROM
>> >>>>> bronze_eventsWHERE JSON_VALUE(raw_data, '$.event_time') IS NOT NULL;
>> >>>>> -- Gold: AggregationsSELECT TUMBLE_START(event_time, INTERVAL '1'
>> >>>>> HOUR), COUNT(*)FROM silver_eventsGROUP BY TUMBLE(event_time, INTERVAL
>> >>>>> '1' HOUR);
>> >>>>>
>> >>>>> Reference Materials
>> >>>>>
>> >>>>>      - FLIP Document: FLIP-XXX: Support Watermark in Flink SQL View
>> >>>>>      <
>> >>>>
>> >>>
>> >>
>> https://docs.google.com/document/d/1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y/edit?usp=sharing
>> >>>>>
>> >>>>>      - JIRA Issue: https://issues.apache.org/jira/browse/FLINK-39062
>> >>>>>      - Implementation POC:
>> >>>>>      - [FLINK-39062][table] Support WATERMARK clause in CREATE VIEW
>> >>>> statement
>> >>>>>      <https://github.com/apache/flink/pull/27571>
>> >>>>>      - [FLINK-39062][table] Support ALTER VIEW SET WATERMARK syntax
>> >>>>>      <https://github.com/apache/flink/pull/27570>
>> >>>>>
>> >>>>> Implementation Timeline
>> >>>>>
>> >>>>> Estimated 6-8 weeks covering parser layer, planner layer, catalog
>> >>>>> integration, and comprehensive testing.
>> >>>>> Request for Feedback
>> >>>>>
>> >>>>> This enhancement would significantly improve Flink's support for
>> >>> layered
>> >>>>> data architectures and flexible event-time processing. I'm happy to
>> >>>> provide
>> >>>>> more details or start a formal FLIP process if the community sees
>> >> value
>> >>>> in
>> >>>>> this proposal.
>> >>>>>
>> >>>>> Looking forward to the community's feedback!
>> >>>>>
>> >>>>> Best regards,
>> >>>>>
>> >>>>> Feat Zhang
>> >>>>>
>> >>>>>    FLIP-XXX: Support Watermark in Flink SQL View
>> >>>>> <
>> >>>>
>> >>>
>> >>
>> https://drive.google.com/open?id=1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y
>> >>>>>
>> >>>>>
>> >>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>>

Reply via email to