Hi FeatZhang. Thanks for the detailed responses. I have a few follow-up
comments and questions:
1. Support for APPLY_WATERMARK at any node/position
I generally agree with the direction that APPLY_WATERMARK should be applicable
at any node and any position — one or more times — similar to how the
DataStream API allows watermark assignment. However, I think we need to clearly
articulate the precise behavior/semantics in each scenario to reduce user
confusion. Aligning the mental model with the DataStream API (where users can
call assignTimestampsAndWatermarks() at arbitrary points in the pipeline) would
also help lower the learning curve.
2. Monotonicity validation of watermark expressions
Why do we need to enforce monotonicity guarantees on the watermark expression
at the planner level? As far as I know, Flink SQL currently does NOT perform
such validation at the DDL level for CREATE TABLE ... WATERMARK FOR ... AS ....
What it actually does is ensure at runtime that emitted watermarks are
non-decreasing. If the existing DDL path does not validate monotonicity at
planning time, why should APPLY_WATERMARK introduce a stricter contract?
3. Planner-level watermark override
Could you elaborate more on why the watermark override must happen at the
planner level? If I understand correctly, in the DataStream API, users can
define different watermark strategies at different nodes in the same pipeline,
and the runtime handles watermark propagation naturally.
4. "Watermark expression evaluation: Needs to support arbitrary expressions"
You mentioned that StreamExecWatermarkAssigner currently has limitations in
watermark expression evaluation and "needs to support arbitrary expressions."
Could you clarify what the current limitations are exactly? Today's CREATE
TABLE ... WATERMARK FOR ts AS ts - INTERVAL '5' SECOND already supports scalar
expressions, what additional expression types does APPLY_WATERMARK require that
are not already supported?
5. State management
You mentioned "State management". Are we proposing to introduce a stateful
watermark assigner node that evicts late data? This sounds like a significant
change that goes well beyond the scope of this FLIP. The current
WatermarkAssigner is stateless, it simply computes and emits watermarks without
buffering data.
Looking forward to your clarification!
--
Best!
Xuyang
At 2026-05-09 19:08:32, "熊饶饶" <[email protected]> wrote:
>Hi Feat Zhang,
>
>+1 (non-binding)
>
>This is a well-motivated proposal that addresses a real pain point in Flink
>SQL. The inability to define watermarks in views currently forces users to
>either:
>
>Reference underlying table watermarks directly, breaking encapsulation
>Create separate physical tables for each watermark strategy, leading to data
>duplication
>The proposed syntax CREATE VIEW ... WATERMARK FOR col AS expr is intuitive and
>aligns naturally with existing DDL watermark semantics. The
>backward-compatible design (storing watermark metadata in catalog options) is
>also a smart choice — it avoids breaking existing views while enabling the new
>capability.
>
>The Data Lakehouse / medallion architecture use case is particularly
>compelling. Being able to define watermark strategies at the Silver/Gold layer
>while keeping Bronze as raw data would significantly simplify pipeline design
>for many teams.
>
>Looking forward to seeing this move to a formal FLIP!
>
>Best regards,
>Raorao Xiong
>
>> 2026年5月7日 20:56,FeatZhang <[email protected]> 写道:
>>
>> Hi Xuyang,
>>
>> Thank you for the thorough review and thoughtful questions.
>>
>> *Problem this FLIP aims to solve*: The core goal of this FLIP is to allow
>> watermarks to be defined on *computed columns (and, more generally, on any
>> column produced inside a SQL query) directly in SQL statements*. Today,
>> watermarks in Flink SQL can only be declared at the CREATE TABLE level
>> via WATERMARK
>> FOR ... AS ..., which means the time attribute must be a column visible at
>> the base table DDL. This makes it impossible to attach a watermark to a
>> timestamp derived inside a query — for example, one computed by string
>> parsing, JSON extraction, or any expression inside a view or subquery —
>> without pushing that computation back down into the source table DDL. By
>> introducing APPLY_WATERMARK as an explicit relational operator that can be
>> applied to *base tables, non-materialized views, and subqueries*, users can
>> assign watermark semantics to any (computed) column produced by a query,
>> which also addresses the broader motivations listed in the FLIP: broken
>> layered-pipeline abstractions, lack of per-query / multi-tenant watermark
>> strategies, and the current gap between SQL and the DataStream API.
>>
>> Now let me address each of your points:
>> Support for Non-materialized Views
>>
>> You raised an excellent point about the scope of APPLY_WATERMARK in layered
>> architectures.
>>
>> *My position*: APPLY_WATERMARK should support base tables, non-materialized
>> views, *and subqueries* (this matches the Goals section of the FLIP).
>> Here's why:
>>
>> - Non-materialized views dissolve into the surrounding plan during
>> optimization (inline expansion)
>> - There's no physical "view" node in the execution plan—just a logical
>> alias
>> - The watermark becomes a relational transformation applied on top of
>> the view's / subquery's output
>>
>> The key design principle is: *watermark definition is an explicit
>> relational operator, not attached metadata*.
>>
>> I'd also like to clarify the positions in the prior thread to avoid
>> confusion:
>>
>> - *Lincoln* originally proposed APPLY_WATERMARK(table, DESCRIPTOR(col),
>> expr) scoped to base tables only.
>> - *Timo* raised the concern about blurring the view abstraction ("*A
>> view usually dissolves into the plan … would a watermark definition
>> suddenly introduce an optimization barrier? If this is an optimization
>> barrier, is this still a view or a new concept?*"). This is exactly why
>> this FLIP does *not* attach watermarks to views via CREATE VIEW / ALTER
>> VIEW, and keeps views as pure logical aliases.
>> - *Gyula* emphasized that watermark assignment should be available on
>> views and subqueries too, consistent with the DataStream API.
>>
>> To address Timo's concern concretely:
>>
>> - Watermark semantics are applied at query planning time via an explicit
>> relational operator (APPLY_WATERMARK), not hidden in view/catalog metadata.
>> - No watermark information is persisted into the catalog for views — the
>> catalog stays unchanged (see FLIP "Catalog Changes": *No catalog changes
>> are required*).
>> - Views continue to dissolve transparently into the plan; the
>> optimization barrier only appears where APPLY_WATERMARK is explicitly used.
>>
>> Monotonicity Validation
>>
>> Great question! Monotonicity guarantees are essential for watermark
>> correctness:
>>
>> - Watermarks define the boundary of "late" data
>> - If the watermark expression is not monotonically non-decreasing, the
>> watermark could move backward
>> - This would cause data that was previously considered "on-time" to be
>> treated as late (or vice-versa), breaking event-time semantics
>>
>> *Validation requirement*: In line with the FLIP's Planner Changes section,
>> the planner validates that watermark_expression is a valid *scalar*
>> expression
>> over columns of the input schema, and (as with today's CREATE TABLE ...
>> WATERMARK FOR ... AS ...) the expression must produce a monotonically
>> non-decreasing value relative to the designated rowtime column.
>>
>> Typical valid forms are the same as what's allowed in DDL today, for
>> example:
>>
>> -- Bounded out-of-orderness (most common)
>> APPLY_WATERMARK(t, DESCRIPTOR(ts), ts - INTERVAL '5' SECOND)
>> -- Strictly ascending
>> APPLY_WATERMARK(t, DESCRIPTOR(ts), ts)
>>
>> Note: watermark_expression is a *scalar expression* per the FLIP (not an
>> aggregate / window function). Richer forms such as user-defined watermark
>> strategies are explicitly out of scope for this FLIP and are tracked as a
>> future WatermarkFunction interface, which also depends on the Calcite
>> lambda upgrade mentioned by Timo.
>> Override Timing (Planner vs Runtime)
>>
>> You raised a valid concern. Let me clarify the design, which aligns with
>> the FLIP's "Planner Changes → Interaction with Existing Table Watermarks"
>> section:
>>
>> *Current proposal*: Planner-level override
>>
>> - During query compilation, when the input to APPLY_WATERMARK already
>> carries a watermark (e.g., from CREATE TABLE ... WATERMARK), the
>> LogicalWatermarkAssigner node produced by APPLY_WATERMARK *overrides* the
>> upstream watermark strategy.
>> - When the input has no watermark (e.g., a view or a subquery),
>> APPLY_WATERMARK introduces a new one.
>> - This makes override behavior explicit in the plan and keeps room for
>> standard optimizations.
>>
>> *Why not runtime-level override*:
>>
>> 1. Planner-level override keeps watermark semantics a first-class,
>> visible part of the plan (consistent with how VECTOR_SEARCH / ML_PREDICT
>> are modeled as specialized ExecNodes in the FLIP).
>> 2. The override point is deterministic and inspectable via EXPLAIN.
>> 3. Simpler execution model — no dual-watermark reconciliation at runtime.
>>
>> If concrete use cases for a runtime-level override surface later, we could
>> revisit this via a hint (e.g., /*+ RUNTIME_OVERRIDE */), but it's not part
>> of this FLIP.
>> Relationship with BuiltInProcessTableFunction
>>
>> Good observation. APPLY_WATERMARK is declared as a built-in PTF at the SQL
>> surface, similar in spirit to TO_CHANGELOG / FROM_CHANGELOG, but the FLIP
>> intentionally maps it to a *specialized ExecNode* rather than a generic PTF
>> runtime — the same pattern used by VECTOR_SEARCH and ML_PREDICT.
>>
>> *Option A: Reuse the generic BuiltInProcessTableFunction runtime*
>>
>> - Pros: Consistent with other built-in PTFs at the runtime layer.
>> - Cons: Watermark assignment is not a row-transforming PTF — it changes
>> stream metadata (time attribute + watermark strategy). Forcing it through
>> the generic PTF runtime would require extending the PTF contract with
>> watermark semantics.
>>
>> *Option B: Dedicated LogicalWatermarkAssigner + specialized ExecNode (the
>> FLIP's choice)*
>>
>> - Pros: Keeps watermark semantics a first-class citizen in the planner;
>> cleanly integrates with watermark propagation rules; no need to overload
>> the PTF contract; same pattern as VECTOR_SEARCH / ML_PREDICT already
>> established in Flink.
>> - Cons: A new dedicated node, though that cost is small compared to the
>> semantic clarity.
>>
>> *Current decision*: Option B, as stated in the FLIP ("*APPLY_WATERMARK
>> compiles to a specialized ExecNode --- similar to how VECTOR_SEARCH and
>> ML_PREDICT are handled*"). Open to revisiting based on community feedback.
>> StreamExecWatermarkAssigner Sufficiency
>>
>> For the physical implementation:
>>
>> *Yes, StreamExecWatermarkAssigner should be sufficient*, with some
>> modifications:
>>
>> 1. *Input handling*: Currently assumes direct table scan; needs to
>> handle APPLY_WATERMARK's column mapping
>> 2. *Watermark expression evaluation*: Needs to support arbitrary
>> expressions (currently limited)
>> 3. *State management*: May need additional state for handling
>> out-of-order events
>>
>> The key insight is that APPLY_WATERMARK conceptually translates to:
>>
>> TableScan -> Calc (expression evaluation) -> WatermarkAssigner
>>
>> StreamExecWatermarkAssigner handles the last step; the Calc step handles
>> the expression.
>> ------------------------------
>> Summary of Proposed Responses
>>
>> - *Scope of input*: Support base tables, non-materialized views *and
>> subqueries* (per FLIP Goals); views/catalog semantics stay unchanged.
>> - *Monotonicity*: Validate watermark_expression as a scalar expression;
>> same monotonicity contract as today's DDL watermarks.
>> - *Override timing*: Planner-level override at LogicalWatermarkAssigner;
>> potential /*+ RUNTIME_OVERRIDE */ hint as future work.
>> - *PTF reuse*: Dedicated LogicalWatermarkAssigner + specialized ExecNode
>> (same pattern as VECTOR_SEARCH / ML_PREDICT).
>> - *ExecNode sufficiency*: StreamExecWatermarkAssigner is sufficient with
>> minor modifications (input handling + expression evaluation).
>>
>> ------------------------------
>>
>> Looking forward to further discussion!
>>
>> Best regards,
>> FeatZhang
>>
>>
>> On Wed, May 6, 2026 at 4:39 PM Xuyang <[email protected]> wrote:
>>
>>> Hi, FeatZhang. Thanks for driving this discussion. I've read through the
>>> full FLIP and the mailing list context, and I have a few questions:
>>> 1. If I understand correctly, in a Layered Data Architecture,
>>> silver_events would typically be a table, a materialized view, or a
>>> materialized table. From the mailing list discussion, it seems like no
>>> consensus was reached on this point. I think we still need to consider
>>> whether APPLY_WATERMARK should be allowed on (non-materialized) views.
>>> 2. In the Planner Changes section under Logical Plan, could you elaborate
>>> on why monotonicity guarantees need to be ensured for the watermark
>>> expression validation?
>>> 3. (nit) In the Watermark Override part under Planner Changes, shouldn't
>>> the override of the upstream watermark happen at runtime rather than at the
>>> planner level?
>>> 4. I feel that APPLY_WATERMARK is quite similar to TO_CHANGELOG and
>>> FROM_CHANGELOG. Is what we actually need just a
>>> BuiltInProcessTableFunction? That way, we would only need to further extend
>>> ProcessTableFunction to support this.
>>> 5. If we choose to translate APPLY_WATERMARK into a specialized ExecNode
>>> (similar to VECTOR_SEARCH and ML_PREDICT), would the existing
>>> StreamExecWatermarkAssigner be sufficient for this purpose?
>>>
>>>
>>>
>>>
>>>
>>> --
>>>
>>> Best!
>>> Xuyang
>>>
>>>
>>>
>>> At 2026-04-21 22:22:33, "FeatZhang" <[email protected]> wrote:
>>>> Hi everyone,
>>>>
>>>> Thank you for the feedback and discussions on the initial proposal. I've
>>>> revised the FLIP based on the community's input and would like to share
>>>> the updated version.
>>>>
>>>>
>>>> FLIP-XXX: Support Flexible Watermark Assignment via Built-in Function
>>>> <
>>> https://drive.google.com/open?id=17PXYAi6Pb91OqFhVVK7tRULiaHAb6wiX79jjC0NaaDA
>>>>
>>>>
>>>>
>>>> KEY UPDATES
>>>> ===========
>>>>
>>>> The proposal has evolved from "Support Watermark Definition in SQL Views"
>>>> to a more flexible and powerful approach: FLIP-XXX: Support Flexible
>>>> Watermark Assignment via APPLY_WATERMARK Function.
>>>>
>>>> What's Changed:
>>>>
>>>> 1. Broader Scope: Instead of limiting watermark definitions to SQL views
>>>> only, the new proposal introduces a built-in table function
>>>> APPLY_WATERMARK that works with:
>>>> - Base tables
>>>> - Views (both regular and materialized)
>>>> - Subqueries
>>>> - Any table-valued expressions
>>>>
>>>> 2. More Flexible Design: The function-based approach provides:
>>>> - Dynamic watermark assignment at query time without modifying catalog
>>>> metadata
>>>> - Override capability for existing watermark strategies
>>>> - Composability with other SQL operations
>>>> - No need for DDL changes or catalog write permissions
>>>>
>>>> 3. Better SQL Semantics: Using a table function follows SQL standard
>>>> patterns and integrates naturally with Flink's existing function
>>>> ecosystem.
>>>>
>>>> UPDATED FLIP DOCUMENT
>>>> =====================
>>>>
>>>> The revised FLIP is now available at:
>>>> https://iwiki.woa.com/p/4019879693
>>>>
>>>> Key sections include:
>>>> - Motivation and use cases
>>>> - Public interfaces and SQL syntax
>>>> - Implementation plan
>>>> - Compatibility analysis
>>>> - Test plan
>>>>
>>>> EXAMPLE USAGE
>>>> =============
>>>>
>>>> -- Apply watermark to a view
>>>> SELECT *FROM APPLY_WATERMARK(my_view, DESCRIPTOR(event_time),
>>>> event_time - INTERVAL '5' SECOND);
>>>> -- Override existing watermark strategy
>>>> SELECT *FROM APPLY_WATERMARK(my_table_with_watermark, DESCRIPTOR(ts),
>>>> ts - INTERVAL '10' SECOND -- Different from DDL watermark
>>>> );
>>>> -- Use in complex queries
>>>> SELECT window_start,
>>>> window_end,
>>>> COUNT(*)FROM TABLE(TUMBLE(TABLE APPLY_WATERMARK(orders,
>>>> DESCRIPTOR(order_time), order_time - INTERVAL '5' SECOND),
>>>> DESCRIPTOR(order_time), INTERVAL '1' HOUR))GROUP BY window_start,
>>>> window_end;
>>>>
>>>>
>>>> IMPLEMENTATION PROGRESS
>>>> =======================
>>>>
>>>> I've also opened a draft PR #27984 with the initial implementation:
>>>> - Core built-in function definition
>>>> - SQL-to-RelNode conversion rules
>>>> - Physical plan integration
>>>> - Unit tests and documentation (English + Chinese)
>>>>
>>>> The PR is available at:
>>>> https://github.com/apache/flink/pull/27984
>>>>
>>>> REQUEST FOR FEEDBACK
>>>> ====================
>>>>
>>>> I would appreciate your thoughts on:
>>>>
>>>> 1. Function naming: Is APPLY_WATERMARK clear and intuitive?
>>>> (Alternative considered: WITH_WATERMARK, SET_WATERMARK)
>>>>
>>>> 2. DESCRIPTOR syntax: Using DESCRIPTOR(column_name) to specify the
>>>> rowtime column—does this align well with Flink's existing patterns?
>>>>
>>>> 3. Override behavior: Should APPLY_WATERMARK always override existing
>>>> watermarks, or should we provide a mode parameter
>>>> (e.g., OVERRIDE, MERGE)?
>>>>
>>>> 4. Performance considerations: Any concerns about the function-based
>>>> approach vs. catalog-level watermark definitions?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Looking forward to your valuable feedback!
>>>>
>>>> Best regards,
>>>> FeatZhang
>>>>
>>>> On Thu, Feb 12, 2026 at 6:24 PM Timo Walther <[email protected]> wrote:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> I think we all agree that we clearly want this functionality, just the
>>>>> "how" needs to be discussed. I also like Lincoln’s suggestion of
>>>>> introducing a built-in PTF for this, I had similar ideas in mind.
>>>>>
>>>>> There are two issues with a APPLY_WATERMARK function, but both on the
>>>>> short-term roadmap:
>>>>>
>>>>> 1) This function would need to be a function that takes an expression.
>>>>> Ideally as a lambda function. Newer Calcite versions have already lambda
>>>>> expression support. At Confluent we were planning to work on a Calcite
>>>>> upgrade this quarter especially to get lambda support in and improve
>>>>> built-in functions that work on collections.
>>>>>
>>>>> 2) User-defined PTFs are currently not able to emit watermarks. We could
>>>>> introduce a new interface WatermarkFunction (similar to
>>>>> ChangelogFunction) that would offer this to everyone. Alternatively, we
>>>>> could only use the PTF signature, but translate to a specialized
>>>>> ExecNode similar how we do it for VECTOR_SEARCH and ML_PREDICT.
>>>>>
>>>>> In any case, even if we go with the function approach, we definitely
>>>>> need a full FLIP on this.
>>>>>
>>>>> Thanks,
>>>>> Timo
>>>>>
>>>>> On 12.02.26 08:25, Gyula Fóra wrote:
>>>>>> Hi All!
>>>>>> I would like to chime in here quickly from a slightly different angle.
>>>>>> While I am the first to admit that I cannot grasp all the planning /
>>>>>> conceptual implications, I also feel the need for more flexible
>>> watermark
>>>>>> handling as suggested by Feat.
>>>>>>
>>>>>> Anything that can only be applied to base/catalog tables is very
>>> limiting
>>>>>> from a usability perspective. Watermarks feel like they should be a
>>>>> simple
>>>>>> function that you can apply on a column/table as part of a query/view.
>>>>> For
>>>>>> example extract timestamp from a string convert to TS -> apply
>>> watermark
>>>>>> etc.
>>>>>>
>>>>>> Users often receive the tables/catalogs as given and can only write
>>>>>> queries.
>>>>>>
>>>>>> Fixing this would eliminate a long standing disconnect between the
>>>>>> datastream api flexible watermark handling compared to the currently
>>> very
>>>>>> restrictive SQL approach.
>>>>>>
>>>>>> Cheers
>>>>>> Gyula
>>>>>>
>>>>>> On Thu, Feb 12, 2026 at 7:54 AM FeatZhang <[email protected]>
>>> wrote:
>>>>>>
>>>>>>> Hi Timo, Lincoln,
>>>>>>>
>>>>>>> Thank you both for the detailed feedback.
>>>>>>>
>>>>>>> I agree with the concern that non-materialized SQL views should
>>> remain a
>>>>>>> pure logical abstraction. Introducing watermark definitions directly
>>>>>>> into CREATE
>>>>>>> VIEW or ALTER VIEW could blur the boundary between logical aliasing
>>> and
>>>>>>> physical planning semantics, especially considering optimization
>>>>> barriers
>>>>>>> and watermark propagation behavior.
>>>>>>>
>>>>>>> Lincoln’s suggestion of introducing a built-in function such as:
>>>>>>>
>>>>>>> APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column),
>>> watermark_expression)
>>>>>>>
>>>>>>> is a cleaner direction. It keeps watermark definition as an explicit
>>>>>>> relational transformation rather than attaching additional semantics
>>> to
>>>>>>> views.
>>>>>>>
>>>>>>> However, to fully address the original use cases (especially logical
>>>>> reuse
>>>>>>> and layered lakehouse architectures), I propose that the table
>>> parameter
>>>>>>> should support:
>>>>>>>
>>>>>>> - Base tables
>>>>>>> - Non-materialized views
>>>>>>>
>>>>>>> If APPLY_WATERMARK can accept both, we can:
>>>>>>>
>>>>>>> - Preserve the conceptual purity of SQL views
>>>>>>> - Avoid redefining view semantics
>>>>>>> - Still enable logical reuse via views
>>>>>>> - Allow different watermark strategies over the same logical
>>>>> relation
>>>>>>>
>>>>>>> In other words, watermark definition becomes an explicit relational
>>>>>>> operator applied on top of any logical relation, instead of being
>>>>> embedded
>>>>>>> into the view definition itself.
>>>>>>>
>>>>>>> From a planner perspective, this keeps the model consistent:
>>>>>>>
>>>>>>> - The function expands into a relational node
>>>>>>> - No optimization barrier is introduced by views
>>>>>>> - Watermark handling remains part of the logical plan
>>> transformation
>>>>>>>
>>>>>>> I will prepare a PR to prototype APPLY_WATERMARK with support for
>>> both
>>>>> base
>>>>>>> tables and non-materialized views, and share it for further
>>> discussion.
>>>>>>>
>>>>>>> Looking forward to your thoughts.
>>>>>>>
>>>>>>> Best,
>>>>>>> Feat
>>>>>>>
>>>>>>>
>>>>>>> Lincoln Lee <[email protected]> 于2026年2月12日周四 12:19写道:
>>>>>>>
>>>>>>>> Agree with Timo’s point regarding the conceptual semantics. We
>>> should
>>>>> not
>>>>>>>> directly extend non-materialized views with additional watermark
>>>>>>>> definitions.
>>>>>>>>
>>>>>>>> Regarding the use case mentioned by Feat, defining different
>>> watermark
>>>>>>>> strategies
>>>>>>>> for the same data source, especially in the case of catalog tables,
>>> we
>>>>>>> are
>>>>>>>> exploring a possible solution introducing a built-in function:
>>>>>>>> ```sql
>>>>>>>> APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column),
>>>>> watermark_expression)
>>>>>>>> ```
>>>>>>>> This function only support base tables as input and not support
>>> views,
>>>>>>>> subqueries
>>>>>>>> or derived relations.
>>>>>>>>
>>>>>>>> This would address a meaningful subset of the identified use cases
>>>>>>> without
>>>>>>>> redefining the role of SQL views.
>>>>>>>>
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Lincoln Lee
>>>>>>>>
>>>>>>>>
>>>>>>>> Timo Walther <[email protected]> 于2026年2月11日周三 23:29写道:
>>>>>>>>
>>>>>>>>> Hi Feat,
>>>>>>>>>
>>>>>>>>> thanks for proposing this FLIP. We had similar discussions in the
>>>>> past,
>>>>>>>>> but so far could never reach consensus.
>>>>>>>>>
>>>>>>>>> SQL views are actually a very simple concept, they just give SQL
>>> text
>>>>>>> an
>>>>>>>>> alias. A view has no other properties except for the view
>>> definition.
>>>>>>>>> Everything else is dynamically computed when the SQL text is
>>> inserted
>>>>>>>>> into the larger plan. A view is never evaluated without the
>>>>> surrounding
>>>>>>>>> plan.
>>>>>>>>>
>>>>>>>>> Watermarks in the middle of a pipeline raise a couple of tricky
>>>>> issues:
>>>>>>>>>
>>>>>>>>> - What if the upstream table is updating, how would you deal with
>>>>>>>>> watermarks in the downstream view?
>>>>>>>>> - What if the upstream table emits already watermarks? Would the
>>> view
>>>>>>>>> catch them and discard this information?
>>>>>>>>> - A view usually dissolves into the plan (e.g. via projection or
>>>>> filter
>>>>>>>>> pushdown). Would a watermark definition suddenly introduce an
>>>>>>>>> optimization barrier? If this is an optimization barrier, is this
>>>>> still
>>>>>>>>> a view or a new concept? E.g. a "materialized view" or "pre-planned
>>>>>>>> view"?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Timo
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 11.02.26 03:51, FeatZhang wrote:
>>>>>>>>>> Hi Flink Community,
>>>>>>>>>>
>>>>>>>>>> I'd like to propose adding watermark support for SQL Views in
>>> Flink
>>>>>>> to
>>>>>>>>>> better support event-time processing scenarios.
>>>>>>>>>> Problem Statement
>>>>>>>>>>
>>>>>>>>>> Currently, Flink SQL views cannot define watermarks. This
>>> limitation
>>>>>>>>>> creates several challenges:
>>>>>>>>>>
>>>>>>>>>> - *Broken Abstraction*: Users must reference underlying table
>>>>>>>>> watermarks
>>>>>>>>>> directly, exposing implementation details
>>>>>>>>>> - *No Flexibility*: Cannot define different watermark
>>> strategies
>>>>>>>> for
>>>>>>>>>> different use cases on the same data source
>>>>>>>>>> - *Limited Architecture Support*: Incompatible with modern
>>>>>>> layered
>>>>>>>>> data
>>>>>>>>>> architectures (Bronze/Silver/Gold medallion pattern)
>>>>>>>>>>
>>>>>>>>>> For example, in multi-tenant scenarios, different tenants may
>>> require
>>>>>>>>>> different lateness tolerance, but currently we cannot create views
>>>>>>> with
>>>>>>>>>> different watermark strategies on the same source table.
>>>>>>>>>> Proposed Solution
>>>>>>>>>>
>>>>>>>>>> I propose adding two SQL syntax options to support watermark
>>>>>>>> definitions
>>>>>>>>> in
>>>>>>>>>> views:
>>>>>>>>>>
>>>>>>>>>> *Option 1: CREATE VIEW with WATERMARK*
>>>>>>>>>>
>>>>>>>>>> CREATE VIEW user_activity
>>>>>>>>>> WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDAS
>>> SELECT
>>>>>>>>>> user_id, event_time, action FROM raw_events;
>>>>>>>>>>
>>>>>>>>>> *Option 2: ALTER VIEW SET WATERMARK*
>>>>>>>>>>
>>>>>>>>>> ALTER VIEW user_activity SET WATERMARK FOR event_time AS
>>> event_time -
>>>>>>>>>> INTERVAL '5' SECOND;
>>>>>>>>>>
>>>>>>>>>> Key Design Aspects
>>>>>>>>>>
>>>>>>>>>> - *Backward Compatibility*: Watermark stored as optional
>>>>> metadata
>>>>>>>> in
>>>>>>>>>> view options; existing views continue to work unchanged
>>>>>>>>>> - *Validation*: Watermark column must exist in view schema
>>> and
>>>>> be
>>>>>>>> of
>>>>>>>>>> TIMESTAMP/TIMESTAMP_LTZ type
>>>>>>>>>> - *Storage*: Watermark metadata persists in catalog options
>>> map
>>>>>>>>> (works
>>>>>>>>>> with all catalog implementations)
>>>>>>>>>> - *Propagation*: Follows existing Flink watermark propagation
>>>>>>> rules
>>>>>>>>> in
>>>>>>>>>> joins and nested views
>>>>>>>>>>
>>>>>>>>>> Use Case Example: Data Lakehouse Architecture
>>>>>>>>>>
>>>>>>>>>> -- Bronze: Raw data (no watermark)CREATE TABLE bronze_events
>>>>>>> (raw_data
>>>>>>>>>> STRING, ingestion_time TIMESTAMP(3)) WITH (...);
>>>>>>>>>> -- Silver: Cleaned data with watermarkCREATE VIEW silver_events
>>>>>>>>>> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECONDAS
>>>>>>> SELECT
>>>>>>>>>> CAST(JSON_VALUE(raw_data, '$.event_time') AS TIMESTAMP(3))
>>> AS
>>>>>>>>> event_time,
>>>>>>>>>> JSON_VALUE(raw_data, '$.user_id') AS user_idFROM
>>>>>>>>>> bronze_eventsWHERE JSON_VALUE(raw_data, '$.event_time') IS NOT
>>> NULL;
>>>>>>>>>> -- Gold: AggregationsSELECT TUMBLE_START(event_time, INTERVAL '1'
>>>>>>>>>> HOUR), COUNT(*)FROM silver_eventsGROUP BY TUMBLE(event_time,
>>> INTERVAL
>>>>>>>>>> '1' HOUR);
>>>>>>>>>>
>>>>>>>>>> Reference Materials
>>>>>>>>>>
>>>>>>>>>> - FLIP Document: FLIP-XXX: Support Watermark in Flink SQL
>>> View
>>>>>>>>>> <
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>> https://docs.google.com/document/d/1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y/edit?usp=sharing
>>>>>>>>>>
>>>>>>>>>> - JIRA Issue:
>>> https://issues.apache.org/jira/browse/FLINK-39062
>>>>>>>>>> - Implementation POC:
>>>>>>>>>> - [FLINK-39062][table] Support WATERMARK clause in CREATE
>>> VIEW
>>>>>>>>> statement
>>>>>>>>>> <https://github.com/apache/flink/pull/27571>
>>>>>>>>>> - [FLINK-39062][table] Support ALTER VIEW SET WATERMARK
>>> syntax
>>>>>>>>>> <https://github.com/apache/flink/pull/27570>
>>>>>>>>>>
>>>>>>>>>> Implementation Timeline
>>>>>>>>>>
>>>>>>>>>> Estimated 6-8 weeks covering parser layer, planner layer, catalog
>>>>>>>>>> integration, and comprehensive testing.
>>>>>>>>>> Request for Feedback
>>>>>>>>>>
>>>>>>>>>> This enhancement would significantly improve Flink's support for
>>>>>>>> layered
>>>>>>>>>> data architectures and flexible event-time processing. I'm happy
>>> to
>>>>>>>>> provide
>>>>>>>>>> more details or start a formal FLIP process if the community sees
>>>>>>> value
>>>>>>>>> in
>>>>>>>>>> this proposal.
>>>>>>>>>>
>>>>>>>>>> Looking forward to the community's feedback!
>>>>>>>>>>
>>>>>>>>>> Best regards,
>>>>>>>>>>
>>>>>>>>>> Feat Zhang
>>>>>>>>>>
>>>>>>>>>> FLIP-XXX: Support Watermark in Flink SQL View
>>>>>>>>>> <
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>> https://drive.google.com/open?id=1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>