Hi Xuyang,

Thanks a lot for the confirmation, and for the very thorough back-and-forth
throughout this discussion — it really helped sharpen the proposal,
especially around the "arbitrary position" semantics, the planner-side
contract, and the catalog persistence story. Much appreciated!

@all: with Xuyang's last round of feedback addressed, the FLIP has
stabilized.

If anyone — Timo, Sergey, Yunfeng, Jingsong, Ron, Shengkai, and others
who have been following the thread — still has open concerns, please let
me know in the next ~3 days. Otherwise I'll start the VOTE thread.

Thanks again to everyone for the reviews!

Best,
FeatZhang

On Mon, May 18, 2026 at 5:20 PM Xuyang <[email protected]> wrote:

> Hi FeatZhang,
>
> Thanks for the update and the revised proposal! I don't have further
> questions. Looking forward to seeing this move forward.
>
>
>
> --
>
>     Best!
>     Xuyang
>
>
>
> 在 2026-05-16 13:44:20,"FeatZhang" <[email protected]> 写道:
> >Hi Xuyang,
> >
> >Thanks a lot for the careful follow-ups — these are exactly the right
> >questions to ask before we move to a vote. After re-checking the code
> >paths you pointed at, I want to revise several statements I made in my
> >previous reply, and tighten the FLIP accordingly. I'll go through them
> >one by one.
> >
> >1. Semantics at any position in the pipeline
> >=============================================
> >Fully agree. To make this concrete, I'll add a new sub-section
> >"Semantics at Different Pipeline Positions" to the FLIP, covering:
> >
> >  - APPLY_WATERMARK on a base table (with or without an existing
> >    DDL watermark)
> >  - APPLY_WATERMARK on top of a non-materialized view / sub-query
> >  - APPLY_WATERMARK applied multiple times in the same query
> >  - Interaction with TUMBLE / HOP / SESSION / CUMULATE
> >  - Interaction with joins (regular / interval / temporal)
> >
> >The mental model will be explicitly aligned with the DataStream API:
> >each APPLY_WATERMARK in SQL corresponds to one
> >`assignTimestampsAndWatermarks(...)` call in the DataStream pipeline,
> >applied in the order they appear.
> >
> >2. Monotonicity validation
> >==========================
> >You're right, and I want to correct my earlier reply. Today's
> >`CREATE TABLE ... WATERMARK FOR ... AS ...` does NOT enforce
> >monotonicity at the planner level — the planner only checks that:
> >
> >  - the rowtime column exists and is of TIMESTAMP / TIMESTAMP_LTZ,
> >  - the watermark expression is a valid scalar expression over the
> >    table's schema and resolves to a comparable type.
> >
> >Monotonicity is a runtime contract: the WatermarkAssignerOperator
> >emits watermarks that are guaranteed to be non-decreasing.
> >
> >APPLY_WATERMARK will follow exactly the same contract — no stricter
> >planner-level monotonicity check. I'll update the FLIP's "Planner
> >Changes → Validation" section to reflect this.
> >
> >3. Override timing — clarification
> >==================================
> >I think this was a wording issue on my side rather than a real
> >design disagreement. Let me restate it:
> >
> >  - APPLY_WATERMARK introduces a dedicated WatermarkAssigner node in
> >    the plan. Whether the input already carries a watermark or not,
> >    the plan ends up with the new assigner positioned downstream of
> >    the existing one.
> >  - At runtime there is no "merge" or "reconciliation": each
> >    WatermarkAssigner operator independently emits its own watermark
> >    stream; downstream operators simply observe the watermark from
> >    the most recent upstream assigner.
> >
> >This is the same model as calling `assignTimestampsAndWatermarks()`
> >twice in DataStream — the second call wins because it sits later in
> >the operator chain, not because of any planner-level magic.
> >
> >So "planner-level override" was a poor choice of words. The correct
> >description is: **the planner decides the operator topology; the
> >runtime emits watermarks according to that topology**, exactly like
> >DataStream. I'll rephrase the FLIP accordingly and drop the
> >"override" terminology.
> >
> >4. "Watermark expression evaluation: needs to support arbitrary
> >   expressions"
> >====================================================================
> >Apologies, this statement was inaccurate. After re-checking,
> >StreamExecWatermarkAssigner already evaluates the watermark
> >expression through the standard `ExprCodeGenerator`, which supports
> >the same scalar expressions as DDL today (arithmetic on TIMESTAMP /
> >TIMESTAMP_LTZ, INTERVAL arithmetic, scalar UDFs, etc.).
> >
> >What APPLY_WATERMARK actually needs from the ExecNode is:
> >
> >  - resolving the rowtime column index from the DESCRIPTOR, since the
> >    input may be a non-base-table (view / sub-query / projected
> >    relation),
> >  - wiring the watermark expression's input row to the upstream
> >    operator's output row instead of a TableScan output.
> >
> >No new expression capability is required. I'll fix this in the FLIP.
> >
> >5. State management
> >===================
> >You're right, this should be removed. In the scope of this FLIP the
> >APPLY_WATERMARK ExecNode is **stateless**, identical to the existing
> >`WatermarkAssignerOperator`. It does not buffer rows and does not
> >evict late data; late-data handling remains the responsibility of
> >the downstream window / join operators, exactly as it works today.
> >
> >The "state management" bullet in my previous reply was speculation
> >about future watermark strategies (idle source detection, etc.) and
> >does not belong in this FLIP. I'll drop it.
> >
> >------------------------------------------------------------
> >Updated summary of the design after this round
> >------------------------------------------------------------
> >
> >  - Scope: base tables, non-materialized views, sub-queries; any
> >    relation position in the query.
> >  - Semantics: aligned with DataStream API
> >    `assignTimestampsAndWatermarks()`; multiple applications in the
> >    same query are positional, not "overriding".
> >  - Validation: same contract as today's DDL — scalar expression on
> >    a TIMESTAMP / TIMESTAMP_LTZ rowtime column; no planner-level
> >    monotonicity check.
> >  - ExecNode: stateless, reuses StreamExecWatermarkAssigner with
> >    minor wiring changes (rowtime column resolution + non-TableScan
> >    input handling).
> >  - Out of scope: WatermarkFunction interface, idle-source state,
> >    runtime-level merge of multiple watermark strategies.
> >
> >I'll update the FLIP document and PR #27984 to match the points
> >above, and post a diff summary here once it's done.
> >
> >Thanks again for pushing on these — the FLIP is much cleaner after
> >this round.
> >
> >Best regards,
> >FeatZhang
> >
> >On Mon, May 11, 2026 at 10:45 AM Xuyang <[email protected]> wrote:
> >
> >> Hi FeatZhang. Thanks for the detailed responses. I have a few follow-up
> >> comments and questions:
> >>
> >>
> >> 1. Support for APPLY_WATERMARK at any node/position
> >> I generally agree with the direction that APPLY_WATERMARK should be
> >> applicable at any node and any position — one or more times — similar to
> >> how the DataStream API allows watermark assignment. However, I think we
> >> need to clearly articulate the precise behavior/semantics in each
> scenario
> >> to reduce user confusion. Aligning the mental model with the DataStream
> API
> >> (where users can call assignTimestampsAndWatermarks() at arbitrary
> points
> >> in the pipeline) would also help lower the learning curve.
> >> 2. Monotonicity validation of watermark expressions
> >> Why do we need to enforce monotonicity guarantees on the watermark
> >> expression at the planner level? As far as I know, Flink SQL currently
> does
> >> NOT perform such validation at the DDL level for CREATE TABLE ...
> WATERMARK
> >> FOR ... AS .... What it actually does is ensure at runtime that emitted
> >> watermarks are non-decreasing. If the existing DDL path does not
> validate
> >> monotonicity at planning time, why should APPLY_WATERMARK introduce a
> >> stricter contract?
> >> 3. Planner-level watermark override
> >> Could you elaborate more on why the watermark override must happen at
> the
> >> planner level? If I understand correctly, in the DataStream API, users
> can
> >> define different watermark strategies at different nodes in the same
> >> pipeline, and the runtime handles watermark propagation naturally.
> >> 4. "Watermark expression evaluation: Needs to support arbitrary
> >> expressions"
> >> You mentioned that StreamExecWatermarkAssigner currently has limitations
> >> in watermark expression evaluation and "needs to support arbitrary
> >> expressions." Could you clarify what the current limitations are
> exactly?
> >> Today's CREATE TABLE ... WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
> >> already supports scalar expressions, what additional expression types
> does
> >> APPLY_WATERMARK require that are not already supported?
> >> 5. State management
> >> You mentioned "State management". Are we proposing to introduce a
> stateful
> >> watermark assigner node that evicts late data? This sounds like a
> >> significant change that goes well beyond the scope of this FLIP. The
> >> current WatermarkAssigner is stateless, it simply computes and emits
> >> watermarks without buffering data.
> >> Looking forward to your clarification!
> >>
> >>
> >>
> >>
> >>
> >> --
> >>
> >>     Best!
> >>     Xuyang
> >>
> >>
> >>
> >> At 2026-05-09 19:08:32, "熊饶饶" <[email protected]> wrote:
> >> >Hi Feat Zhang,
> >> >
> >> >+1 (non-binding)
> >> >
> >> >This is a well-motivated proposal that addresses a real pain point in
> >> Flink SQL. The inability to define watermarks in views currently forces
> >> users to either:
> >> >
> >> >Reference underlying table watermarks directly, breaking encapsulation
> >> >Create separate physical tables for each watermark strategy, leading to
> >> data duplication
> >> >The proposed syntax CREATE VIEW ... WATERMARK FOR col AS expr is
> >> intuitive and aligns naturally with existing DDL watermark semantics.
> The
> >> backward-compatible design (storing watermark metadata in catalog
> options)
> >> is also a smart choice — it avoids breaking existing views while
> enabling
> >> the new capability.
> >> >
> >> >The Data Lakehouse / medallion architecture use case is particularly
> >> compelling. Being able to define watermark strategies at the Silver/Gold
> >> layer while keeping Bronze as raw data would significantly simplify
> >> pipeline design for many teams.
> >> >
> >> >Looking forward to seeing this move to a formal FLIP!
> >> >
> >> >Best regards,
> >> >Raorao Xiong
> >> >
> >> >> 2026年5月7日 20:56,FeatZhang <[email protected]> 写道:
> >> >>
> >> >> Hi Xuyang,
> >> >>
> >> >> Thank you for the thorough review and thoughtful questions.
> >> >>
> >> >> *Problem this FLIP aims to solve*: The core goal of this FLIP is to
> >> allow
> >> >> watermarks to be defined on *computed columns (and, more generally,
> on
> >> any
> >> >> column produced inside a SQL query) directly in SQL statements*.
> Today,
> >> >> watermarks in Flink SQL can only be declared at the CREATE TABLE
> level
> >> >> via WATERMARK
> >> >> FOR ... AS ..., which means the time attribute must be a column
> visible
> >> at
> >> >> the base table DDL. This makes it impossible to attach a watermark
> to a
> >> >> timestamp derived inside a query — for example, one computed by
> string
> >> >> parsing, JSON extraction, or any expression inside a view or
> subquery —
> >> >> without pushing that computation back down into the source table
> DDL. By
> >> >> introducing APPLY_WATERMARK as an explicit relational operator that
> can
> >> be
> >> >> applied to *base tables, non-materialized views, and subqueries*,
> users
> >> can
> >> >> assign watermark semantics to any (computed) column produced by a
> query,
> >> >> which also addresses the broader motivations listed in the FLIP:
> broken
> >> >> layered-pipeline abstractions, lack of per-query / multi-tenant
> >> watermark
> >> >> strategies, and the current gap between SQL and the DataStream API.
> >> >>
> >> >> Now let me address each of your points:
> >> >> Support for Non-materialized Views
> >> >>
> >> >> You raised an excellent point about the scope of APPLY_WATERMARK in
> >> layered
> >> >> architectures.
> >> >>
> >> >> *My position*: APPLY_WATERMARK should support base tables,
> >> non-materialized
> >> >> views, *and subqueries* (this matches the Goals section of the FLIP).
> >> >> Here's why:
> >> >>
> >> >>   - Non-materialized views dissolve into the surrounding plan during
> >> >>   optimization (inline expansion)
> >> >>   - There's no physical "view" node in the execution plan—just a
> logical
> >> >>   alias
> >> >>   - The watermark becomes a relational transformation applied on top
> of
> >> >>   the view's / subquery's output
> >> >>
> >> >> The key design principle is: *watermark definition is an explicit
> >> >> relational operator, not attached metadata*.
> >> >>
> >> >> I'd also like to clarify the positions in the prior thread to avoid
> >> >> confusion:
> >> >>
> >> >>   - *Lincoln* originally proposed APPLY_WATERMARK(table,
> >> DESCRIPTOR(col),
> >> >>   expr) scoped to base tables only.
> >> >>   - *Timo* raised the concern about blurring the view abstraction
> ("*A
> >> >>   view usually dissolves into the plan … would a watermark definition
> >> >>   suddenly introduce an optimization barrier? If this is an
> optimization
> >> >>   barrier, is this still a view or a new concept?*"). This is exactly
> >> why
> >> >>   this FLIP does *not* attach watermarks to views via CREATE VIEW /
> >> ALTER
> >> >>   VIEW, and keeps views as pure logical aliases.
> >> >>   - *Gyula* emphasized that watermark assignment should be available
> on
> >> >>   views and subqueries too, consistent with the DataStream API.
> >> >>
> >> >> To address Timo's concern concretely:
> >> >>
> >> >>   - Watermark semantics are applied at query planning time via an
> >> explicit
> >> >>   relational operator (APPLY_WATERMARK), not hidden in view/catalog
> >> metadata.
> >> >>   - No watermark information is persisted into the catalog for views
> —
> >> the
> >> >>   catalog stays unchanged (see FLIP "Catalog Changes": *No catalog
> >> changes
> >> >>   are required*).
> >> >>   - Views continue to dissolve transparently into the plan; the
> >> >>   optimization barrier only appears where APPLY_WATERMARK is
> explicitly
> >> used.
> >> >>
> >> >> Monotonicity Validation
> >> >>
> >> >> Great question! Monotonicity guarantees are essential for watermark
> >> >> correctness:
> >> >>
> >> >>   - Watermarks define the boundary of "late" data
> >> >>   - If the watermark expression is not monotonically non-decreasing,
> the
> >> >>   watermark could move backward
> >> >>   - This would cause data that was previously considered "on-time"
> to be
> >> >>   treated as late (or vice-versa), breaking event-time semantics
> >> >>
> >> >> *Validation requirement*: In line with the FLIP's Planner Changes
> >> section,
> >> >> the planner validates that watermark_expression is a valid *scalar*
> >> expression
> >> >> over columns of the input schema, and (as with today's CREATE TABLE
> ...
> >> >> WATERMARK FOR ... AS ...) the expression must produce a monotonically
> >> >> non-decreasing value relative to the designated rowtime column.
> >> >>
> >> >> Typical valid forms are the same as what's allowed in DDL today, for
> >> >> example:
> >> >>
> >> >> -- Bounded out-of-orderness (most common)
> >> >> APPLY_WATERMARK(t, DESCRIPTOR(ts), ts - INTERVAL '5' SECOND)
> >> >> -- Strictly ascending
> >> >> APPLY_WATERMARK(t, DESCRIPTOR(ts), ts)
> >> >>
> >> >> Note: watermark_expression is a *scalar expression* per the FLIP
> (not an
> >> >> aggregate / window function). Richer forms such as user-defined
> >> watermark
> >> >> strategies are explicitly out of scope for this FLIP and are tracked
> as
> >> a
> >> >> future WatermarkFunction interface, which also depends on the Calcite
> >> >> lambda upgrade mentioned by Timo.
> >> >> Override Timing (Planner vs Runtime)
> >> >>
> >> >> You raised a valid concern. Let me clarify the design, which aligns
> with
> >> >> the FLIP's "Planner Changes → Interaction with Existing Table
> >> Watermarks"
> >> >> section:
> >> >>
> >> >> *Current proposal*: Planner-level override
> >> >>
> >> >>   - During query compilation, when the input to APPLY_WATERMARK
> already
> >> >>   carries a watermark (e.g., from CREATE TABLE ... WATERMARK), the
> >> >>   LogicalWatermarkAssigner node produced by APPLY_WATERMARK
> *overrides*
> >> the
> >> >>   upstream watermark strategy.
> >> >>   - When the input has no watermark (e.g., a view or a subquery),
> >> >>   APPLY_WATERMARK introduces a new one.
> >> >>   - This makes override behavior explicit in the plan and keeps room
> for
> >> >>   standard optimizations.
> >> >>
> >> >> *Why not runtime-level override*:
> >> >>
> >> >>   1. Planner-level override keeps watermark semantics a first-class,
> >> >>   visible part of the plan (consistent with how VECTOR_SEARCH /
> >> ML_PREDICT
> >> >>   are modeled as specialized ExecNodes in the FLIP).
> >> >>   2. The override point is deterministic and inspectable via EXPLAIN.
> >> >>   3. Simpler execution model — no dual-watermark reconciliation at
> >> runtime.
> >> >>
> >> >> If concrete use cases for a runtime-level override surface later, we
> >> could
> >> >> revisit this via a hint (e.g., /*+ RUNTIME_OVERRIDE */), but it's not
> >> part
> >> >> of this FLIP.
> >> >> Relationship with BuiltInProcessTableFunction
> >> >>
> >> >> Good observation. APPLY_WATERMARK is declared as a built-in PTF at
> the
> >> SQL
> >> >> surface, similar in spirit to TO_CHANGELOG / FROM_CHANGELOG, but the
> >> FLIP
> >> >> intentionally maps it to a *specialized ExecNode* rather than a
> generic
> >> PTF
> >> >> runtime — the same pattern used by VECTOR_SEARCH and ML_PREDICT.
> >> >>
> >> >> *Option A: Reuse the generic BuiltInProcessTableFunction runtime*
> >> >>
> >> >>   - Pros: Consistent with other built-in PTFs at the runtime layer.
> >> >>   - Cons: Watermark assignment is not a row-transforming PTF — it
> >> changes
> >> >>   stream metadata (time attribute + watermark strategy). Forcing it
> >> through
> >> >>   the generic PTF runtime would require extending the PTF contract
> with
> >> >>   watermark semantics.
> >> >>
> >> >> *Option B: Dedicated LogicalWatermarkAssigner + specialized ExecNode
> >> (the
> >> >> FLIP's choice)*
> >> >>
> >> >>   - Pros: Keeps watermark semantics a first-class citizen in the
> >> planner;
> >> >>   cleanly integrates with watermark propagation rules; no need to
> >> overload
> >> >>   the PTF contract; same pattern as VECTOR_SEARCH / ML_PREDICT
> already
> >> >>   established in Flink.
> >> >>   - Cons: A new dedicated node, though that cost is small compared to
> >> the
> >> >>   semantic clarity.
> >> >>
> >> >> *Current decision*: Option B, as stated in the FLIP
> ("*APPLY_WATERMARK
> >> >> compiles to a specialized ExecNode --- similar to how VECTOR_SEARCH
> and
> >> >> ML_PREDICT are handled*"). Open to revisiting based on community
> >> feedback.
> >> >> StreamExecWatermarkAssigner Sufficiency
> >> >>
> >> >> For the physical implementation:
> >> >>
> >> >> *Yes, StreamExecWatermarkAssigner should be sufficient*, with some
> >> >> modifications:
> >> >>
> >> >>   1. *Input handling*: Currently assumes direct table scan; needs to
> >> >>   handle APPLY_WATERMARK's column mapping
> >> >>   2. *Watermark expression evaluation*: Needs to support arbitrary
> >> >>   expressions (currently limited)
> >> >>   3. *State management*: May need additional state for handling
> >> >>   out-of-order events
> >> >>
> >> >> The key insight is that APPLY_WATERMARK conceptually translates to:
> >> >>
> >> >> TableScan -> Calc (expression evaluation) -> WatermarkAssigner
> >> >>
> >> >> StreamExecWatermarkAssigner handles the last step; the Calc step
> handles
> >> >> the expression.
> >> >> ------------------------------
> >> >> Summary of Proposed Responses
> >> >>
> >> >>   - *Scope of input*: Support base tables, non-materialized views
> *and
> >> >>   subqueries* (per FLIP Goals); views/catalog semantics stay
> unchanged.
> >> >>   - *Monotonicity*: Validate watermark_expression as a scalar
> >> expression;
> >> >>   same monotonicity contract as today's DDL watermarks.
> >> >>   - *Override timing*: Planner-level override at
> >> LogicalWatermarkAssigner;
> >> >>   potential /*+ RUNTIME_OVERRIDE */ hint as future work.
> >> >>   - *PTF reuse*: Dedicated LogicalWatermarkAssigner + specialized
> >> ExecNode
> >> >>   (same pattern as VECTOR_SEARCH / ML_PREDICT).
> >> >>   - *ExecNode sufficiency*: StreamExecWatermarkAssigner is sufficient
> >> with
> >> >>   minor modifications (input handling + expression evaluation).
> >> >>
> >> >> ------------------------------
> >> >>
> >> >> Looking forward to further discussion!
> >> >>
> >> >> Best regards,
> >> >> FeatZhang
> >> >>
> >> >>
> >> >> On Wed, May 6, 2026 at 4:39 PM Xuyang <[email protected]> wrote:
> >> >>
> >> >>> Hi, FeatZhang. Thanks for driving this discussion. I've read through
> >> the
> >> >>> full FLIP and the mailing list context, and I have a few questions:
> >> >>> 1. If I understand correctly, in a Layered Data Architecture,
> >> >>> silver_events would typically be a table, a materialized view, or a
> >> >>> materialized table. From the mailing list discussion, it seems like
> no
> >> >>> consensus was reached on this point. I think we still need to
> consider
> >> >>> whether APPLY_WATERMARK should be allowed on (non-materialized)
> views.
> >> >>> 2. In the Planner Changes section under Logical Plan, could you
> >> elaborate
> >> >>> on why monotonicity guarantees need to be ensured for the watermark
> >> >>> expression validation?
> >> >>> 3. (nit) In the Watermark Override part under Planner Changes,
> >> shouldn't
> >> >>> the override of the upstream watermark happen at runtime rather than
> >> at the
> >> >>> planner level?
> >> >>> 4. I feel that APPLY_WATERMARK is quite similar to TO_CHANGELOG and
> >> >>> FROM_CHANGELOG. Is what we actually need just a
> >> >>> BuiltInProcessTableFunction? That way, we would only need to further
> >> extend
> >> >>> ProcessTableFunction to support this.
> >> >>> 5. If we choose to translate APPLY_WATERMARK into a specialized
> >> ExecNode
> >> >>> (similar to VECTOR_SEARCH and ML_PREDICT), would the existing
> >> >>> StreamExecWatermarkAssigner be sufficient for this purpose?
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> --
> >> >>>
> >> >>>    Best!
> >> >>>    Xuyang
> >> >>>
> >> >>>
> >> >>>
> >> >>> At 2026-04-21 22:22:33, "FeatZhang" <[email protected]> wrote:
> >> >>>> Hi everyone,
> >> >>>>
> >> >>>> Thank you for the feedback and discussions on the initial proposal.
> >> I've
> >> >>>> revised the FLIP based on the community's input and would like to
> >> share
> >> >>>> the updated version.
> >> >>>>
> >> >>>>
> >> >>>> FLIP-XXX: Support Flexible Watermark Assignment via Built-in
> Function
> >> >>>> <
> >> >>>
> >>
> https://drive.google.com/open?id=17PXYAi6Pb91OqFhVVK7tRULiaHAb6wiX79jjC0NaaDA
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>> KEY UPDATES
> >> >>>> ===========
> >> >>>>
> >> >>>> The proposal has evolved from "Support Watermark Definition in SQL
> >> Views"
> >> >>>> to a more flexible and powerful approach: FLIP-XXX: Support
> Flexible
> >> >>>> Watermark Assignment via APPLY_WATERMARK Function.
> >> >>>>
> >> >>>> What's Changed:
> >> >>>>
> >> >>>> 1. Broader Scope: Instead of limiting watermark definitions to SQL
> >> views
> >> >>>>  only, the new proposal introduces a built-in table function
> >> >>>>  APPLY_WATERMARK that works with:
> >> >>>>  - Base tables
> >> >>>>  - Views (both regular and materialized)
> >> >>>>  - Subqueries
> >> >>>>  - Any table-valued expressions
> >> >>>>
> >> >>>> 2. More Flexible Design: The function-based approach provides:
> >> >>>>  - Dynamic watermark assignment at query time without modifying
> >> catalog
> >> >>>>    metadata
> >> >>>>  - Override capability for existing watermark strategies
> >> >>>>  - Composability with other SQL operations
> >> >>>>  - No need for DDL changes or catalog write permissions
> >> >>>>
> >> >>>> 3. Better SQL Semantics: Using a table function follows SQL
> standard
> >> >>>>  patterns and integrates naturally with Flink's existing function
> >> >>>>  ecosystem.
> >> >>>>
> >> >>>> UPDATED FLIP DOCUMENT
> >> >>>> =====================
> >> >>>>
> >> >>>> The revised FLIP is now available at:
> >> >>>> https://iwiki.woa.com/p/4019879693
> >> >>>>
> >> >>>> Key sections include:
> >> >>>> - Motivation and use cases
> >> >>>> - Public interfaces and SQL syntax
> >> >>>> - Implementation plan
> >> >>>> - Compatibility analysis
> >> >>>> - Test plan
> >> >>>>
> >> >>>> EXAMPLE USAGE
> >> >>>> =============
> >> >>>>
> >> >>>> -- Apply watermark to a view
> >> >>>> SELECT *FROM APPLY_WATERMARK(my_view, DESCRIPTOR(event_time),
> >> >>>> event_time - INTERVAL '5' SECOND);
> >> >>>> -- Override existing watermark strategy
> >> >>>> SELECT *FROM APPLY_WATERMARK(my_table_with_watermark,
> DESCRIPTOR(ts),
> >> >>>> ts - INTERVAL '10' SECOND -- Different from DDL watermark
> >> >>>> );
> >> >>>> -- Use in complex queries
> >> >>>> SELECT window_start,
> >> >>>>      window_end,
> >> >>>>      COUNT(*)FROM TABLE(TUMBLE(TABLE APPLY_WATERMARK(orders,
> >> >>>> DESCRIPTOR(order_time), order_time - INTERVAL '5' SECOND),
> >> >>>> DESCRIPTOR(order_time), INTERVAL '1' HOUR))GROUP BY window_start,
> >> >>>>        window_end;
> >> >>>>
> >> >>>>
> >> >>>> IMPLEMENTATION PROGRESS
> >> >>>> =======================
> >> >>>>
> >> >>>> I've also opened a draft PR #27984 with the initial implementation:
> >> >>>> - Core built-in function definition
> >> >>>> - SQL-to-RelNode conversion rules
> >> >>>> - Physical plan integration
> >> >>>> - Unit tests and documentation (English + Chinese)
> >> >>>>
> >> >>>> The PR is available at:
> >> >>>> https://github.com/apache/flink/pull/27984
> >> >>>>
> >> >>>> REQUEST FOR FEEDBACK
> >> >>>> ====================
> >> >>>>
> >> >>>> I would appreciate your thoughts on:
> >> >>>>
> >> >>>> 1. Function naming: Is APPLY_WATERMARK clear and intuitive?
> >> >>>>  (Alternative considered: WITH_WATERMARK, SET_WATERMARK)
> >> >>>>
> >> >>>> 2. DESCRIPTOR syntax: Using DESCRIPTOR(column_name) to specify the
> >> >>>>  rowtime column—does this align well with Flink's existing
> patterns?
> >> >>>>
> >> >>>> 3. Override behavior: Should APPLY_WATERMARK always override
> existing
> >> >>>>  watermarks, or should we provide a mode parameter
> >> >>>>  (e.g., OVERRIDE, MERGE)?
> >> >>>>
> >> >>>> 4. Performance considerations: Any concerns about the
> function-based
> >> >>>>  approach vs. catalog-level watermark definitions?
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>> Looking forward to your valuable feedback!
> >> >>>>
> >> >>>> Best regards,
> >> >>>> FeatZhang
> >> >>>>
> >> >>>> On Thu, Feb 12, 2026 at 6:24 PM Timo Walther <[email protected]>
> >> wrote:
> >> >>>>
> >> >>>>> Hi everyone,
> >> >>>>>
> >> >>>>> I think we all agree that we clearly want this functionality, just
> >> the
> >> >>>>> "how" needs to be discussed. I also like Lincoln’s suggestion of
> >> >>>>> introducing a built-in PTF for this, I had similar ideas in mind.
> >> >>>>>
> >> >>>>> There are two issues with a APPLY_WATERMARK function, but both on
> the
> >> >>>>> short-term roadmap:
> >> >>>>>
> >> >>>>> 1) This function would need to be a function that takes an
> >> expression.
> >> >>>>> Ideally as a lambda function. Newer Calcite versions have already
> >> lambda
> >> >>>>> expression support. At Confluent we were planning to work on a
> >> Calcite
> >> >>>>> upgrade this quarter especially to get lambda support in and
> improve
> >> >>>>> built-in functions that work on collections.
> >> >>>>>
> >> >>>>> 2) User-defined PTFs are currently not able to emit watermarks. We
> >> could
> >> >>>>> introduce a new interface WatermarkFunction (similar to
> >> >>>>> ChangelogFunction) that would offer this to everyone.
> Alternatively,
> >> we
> >> >>>>> could only use the PTF signature, but translate to a specialized
> >> >>>>> ExecNode similar how we do it for VECTOR_SEARCH and ML_PREDICT.
> >> >>>>>
> >> >>>>> In any case, even if we go with the function approach, we
> definitely
> >> >>>>> need a full FLIP on this.
> >> >>>>>
> >> >>>>> Thanks,
> >> >>>>> Timo
> >> >>>>>
> >> >>>>> On 12.02.26 08:25, Gyula Fóra wrote:
> >> >>>>>> Hi All!
> >> >>>>>> I would like to chime in here quickly from a slightly different
> >> angle.
> >> >>>>>> While I am the first to admit that I cannot grasp all the
> planning /
> >> >>>>>> conceptual implications, I also feel the need for more flexible
> >> >>> watermark
> >> >>>>>> handling as suggested by Feat.
> >> >>>>>>
> >> >>>>>> Anything that can only be applied to base/catalog tables is very
> >> >>> limiting
> >> >>>>>> from a usability perspective. Watermarks feel like they should
> be a
> >> >>>>> simple
> >> >>>>>> function that you can apply on a column/table as part of a
> >> query/view.
> >> >>>>> For
> >> >>>>>> example extract timestamp from a string convert to TS -> apply
> >> >>> watermark
> >> >>>>>> etc.
> >> >>>>>>
> >> >>>>>> Users often receive the tables/catalogs as given and can only
> write
> >> >>>>>> queries.
> >> >>>>>>
> >> >>>>>> Fixing this would eliminate a long standing disconnect between
> the
> >> >>>>>> datastream api flexible watermark handling compared to the
> currently
> >> >>> very
> >> >>>>>> restrictive SQL approach.
> >> >>>>>>
> >> >>>>>> Cheers
> >> >>>>>> Gyula
> >> >>>>>>
> >> >>>>>> On Thu, Feb 12, 2026 at 7:54 AM FeatZhang <[email protected]>
> >> >>> wrote:
> >> >>>>>>
> >> >>>>>>> Hi Timo, Lincoln,
> >> >>>>>>>
> >> >>>>>>> Thank you both for the detailed feedback.
> >> >>>>>>>
> >> >>>>>>> I agree with the concern that non-materialized SQL views should
> >> >>> remain a
> >> >>>>>>> pure logical abstraction. Introducing watermark definitions
> >> directly
> >> >>>>>>> into CREATE
> >> >>>>>>> VIEW or ALTER VIEW could blur the boundary between logical
> aliasing
> >> >>> and
> >> >>>>>>> physical planning semantics, especially considering optimization
> >> >>>>> barriers
> >> >>>>>>> and watermark propagation behavior.
> >> >>>>>>>
> >> >>>>>>> Lincoln’s suggestion of introducing a built-in function such as:
> >> >>>>>>>
> >> >>>>>>> APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column),
> >> >>> watermark_expression)
> >> >>>>>>>
> >> >>>>>>> is a cleaner direction. It keeps watermark definition as an
> >> explicit
> >> >>>>>>> relational transformation rather than attaching additional
> >> semantics
> >> >>> to
> >> >>>>>>> views.
> >> >>>>>>>
> >> >>>>>>> However, to fully address the original use cases (especially
> >> logical
> >> >>>>> reuse
> >> >>>>>>> and layered lakehouse architectures), I propose that the table
> >> >>> parameter
> >> >>>>>>> should support:
> >> >>>>>>>
> >> >>>>>>>    - Base tables
> >> >>>>>>>    - Non-materialized views
> >> >>>>>>>
> >> >>>>>>> If APPLY_WATERMARK can accept both, we can:
> >> >>>>>>>
> >> >>>>>>>    - Preserve the conceptual purity of SQL views
> >> >>>>>>>    - Avoid redefining view semantics
> >> >>>>>>>    - Still enable logical reuse via views
> >> >>>>>>>    - Allow different watermark strategies over the same logical
> >> >>>>> relation
> >> >>>>>>>
> >> >>>>>>> In other words, watermark definition becomes an explicit
> relational
> >> >>>>>>> operator applied on top of any logical relation, instead of
> being
> >> >>>>> embedded
> >> >>>>>>> into the view definition itself.
> >> >>>>>>>
> >> >>>>>>> From a planner perspective, this keeps the model consistent:
> >> >>>>>>>
> >> >>>>>>>    - The function expands into a relational node
> >> >>>>>>>    - No optimization barrier is introduced by views
> >> >>>>>>>    - Watermark handling remains part of the logical plan
> >> >>> transformation
> >> >>>>>>>
> >> >>>>>>> I will prepare a PR to prototype APPLY_WATERMARK with support
> for
> >> >>> both
> >> >>>>> base
> >> >>>>>>> tables and non-materialized views, and share it for further
> >> >>> discussion.
> >> >>>>>>>
> >> >>>>>>> Looking forward to your thoughts.
> >> >>>>>>>
> >> >>>>>>> Best,
> >> >>>>>>> Feat
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>>> Lincoln Lee <[email protected]> 于2026年2月12日周四 12:19写道:
> >> >>>>>>>
> >> >>>>>>>> Agree with Timo’s point regarding the conceptual semantics. We
> >> >>> should
> >> >>>>> not
> >> >>>>>>>> directly extend non-materialized views with additional
> watermark
> >> >>>>>>>> definitions.
> >> >>>>>>>>
> >> >>>>>>>> Regarding the use case mentioned by Feat, defining different
> >> >>> watermark
> >> >>>>>>>> strategies
> >> >>>>>>>> for the same data source, especially in the case of catalog
> >> tables,
> >> >>> we
> >> >>>>>>> are
> >> >>>>>>>> exploring a possible solution introducing a built-in function:
> >> >>>>>>>> ```sql
> >> >>>>>>>> APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column),
> >> >>>>> watermark_expression)
> >> >>>>>>>> ```
> >> >>>>>>>> This function only support base tables as input and not support
> >> >>> views,
> >> >>>>>>>> subqueries
> >> >>>>>>>> or derived relations.
> >> >>>>>>>>
> >> >>>>>>>> This would address a meaningful subset of the identified use
> cases
> >> >>>>>>> without
> >> >>>>>>>> redefining the role of SQL views.
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> Best,
> >> >>>>>>>> Lincoln Lee
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> Timo Walther <[email protected]> 于2026年2月11日周三 23:29写道:
> >> >>>>>>>>
> >> >>>>>>>>> Hi Feat,
> >> >>>>>>>>>
> >> >>>>>>>>> thanks for proposing this FLIP. We had similar discussions in
> the
> >> >>>>> past,
> >> >>>>>>>>> but so far could never reach consensus.
> >> >>>>>>>>>
> >> >>>>>>>>> SQL views are actually a very simple concept, they just give
> SQL
> >> >>> text
> >> >>>>>>> an
> >> >>>>>>>>> alias. A view has no other properties except for the view
> >> >>> definition.
> >> >>>>>>>>> Everything else is dynamically computed when the SQL text is
> >> >>> inserted
> >> >>>>>>>>> into the larger plan. A view is never evaluated without the
> >> >>>>> surrounding
> >> >>>>>>>>> plan.
> >> >>>>>>>>>
> >> >>>>>>>>> Watermarks in the middle of a pipeline raise a couple of
> tricky
> >> >>>>> issues:
> >> >>>>>>>>>
> >> >>>>>>>>> - What if the upstream table is updating, how would you deal
> with
> >> >>>>>>>>> watermarks in the downstream view?
> >> >>>>>>>>> - What if the upstream table emits already watermarks? Would
> the
> >> >>> view
> >> >>>>>>>>> catch them and discard this information?
> >> >>>>>>>>> - A view usually dissolves into the plan (e.g. via projection
> or
> >> >>>>> filter
> >> >>>>>>>>> pushdown). Would a watermark definition suddenly introduce an
> >> >>>>>>>>> optimization barrier? If this is an optimization barrier, is
> this
> >> >>>>> still
> >> >>>>>>>>> a view or a new concept? E.g. a "materialized view" or
> >> "pre-planned
> >> >>>>>>>> view"?
> >> >>>>>>>>>
> >> >>>>>>>>> Cheers,
> >> >>>>>>>>> Timo
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> On 11.02.26 03:51, FeatZhang wrote:
> >> >>>>>>>>>> Hi Flink Community,
> >> >>>>>>>>>>
> >> >>>>>>>>>> I'd like to propose adding watermark support for SQL Views in
> >> >>> Flink
> >> >>>>>>> to
> >> >>>>>>>>>> better support event-time processing scenarios.
> >> >>>>>>>>>> Problem Statement
> >> >>>>>>>>>>
> >> >>>>>>>>>> Currently, Flink SQL views cannot define watermarks. This
> >> >>> limitation
> >> >>>>>>>>>> creates several challenges:
> >> >>>>>>>>>>
> >> >>>>>>>>>>     - *Broken Abstraction*: Users must reference underlying
> >> table
> >> >>>>>>>>> watermarks
> >> >>>>>>>>>>     directly, exposing implementation details
> >> >>>>>>>>>>     - *No Flexibility*: Cannot define different watermark
> >> >>> strategies
> >> >>>>>>>> for
> >> >>>>>>>>>>     different use cases on the same data source
> >> >>>>>>>>>>     - *Limited Architecture Support*: Incompatible with
> modern
> >> >>>>>>> layered
> >> >>>>>>>>> data
> >> >>>>>>>>>>     architectures (Bronze/Silver/Gold medallion pattern)
> >> >>>>>>>>>>
> >> >>>>>>>>>> For example, in multi-tenant scenarios, different tenants may
> >> >>> require
> >> >>>>>>>>>> different lateness tolerance, but currently we cannot create
> >> views
> >> >>>>>>> with
> >> >>>>>>>>>> different watermark strategies on the same source table.
> >> >>>>>>>>>> Proposed Solution
> >> >>>>>>>>>>
> >> >>>>>>>>>> I propose adding two SQL syntax options to support watermark
> >> >>>>>>>> definitions
> >> >>>>>>>>> in
> >> >>>>>>>>>> views:
> >> >>>>>>>>>>
> >> >>>>>>>>>> *Option 1: CREATE VIEW with WATERMARK*
> >> >>>>>>>>>>
> >> >>>>>>>>>> CREATE VIEW user_activity
> >> >>>>>>>>>> WATERMARK FOR event_time AS event_time - INTERVAL '5'
> SECONDAS
> >> >>> SELECT
> >> >>>>>>>>>> user_id, event_time, action FROM raw_events;
> >> >>>>>>>>>>
> >> >>>>>>>>>> *Option 2: ALTER VIEW SET WATERMARK*
> >> >>>>>>>>>>
> >> >>>>>>>>>> ALTER VIEW user_activity SET WATERMARK FOR event_time AS
> >> >>> event_time -
> >> >>>>>>>>>> INTERVAL '5' SECOND;
> >> >>>>>>>>>>
> >> >>>>>>>>>> Key Design Aspects
> >> >>>>>>>>>>
> >> >>>>>>>>>>     - *Backward Compatibility*: Watermark stored as optional
> >> >>>>> metadata
> >> >>>>>>>> in
> >> >>>>>>>>>>     view options; existing views continue to work unchanged
> >> >>>>>>>>>>     - *Validation*: Watermark column must exist in view
> schema
> >> >>> and
> >> >>>>> be
> >> >>>>>>>> of
> >> >>>>>>>>>>     TIMESTAMP/TIMESTAMP_LTZ type
> >> >>>>>>>>>>     - *Storage*: Watermark metadata persists in catalog
> options
> >> >>> map
> >> >>>>>>>>> (works
> >> >>>>>>>>>>     with all catalog implementations)
> >> >>>>>>>>>>     - *Propagation*: Follows existing Flink watermark
> >> propagation
> >> >>>>>>> rules
> >> >>>>>>>>> in
> >> >>>>>>>>>>     joins and nested views
> >> >>>>>>>>>>
> >> >>>>>>>>>> Use Case Example: Data Lakehouse Architecture
> >> >>>>>>>>>>
> >> >>>>>>>>>> -- Bronze: Raw data (no watermark)CREATE TABLE bronze_events
> >> >>>>>>> (raw_data
> >> >>>>>>>>>> STRING, ingestion_time TIMESTAMP(3)) WITH (...);
> >> >>>>>>>>>> -- Silver: Cleaned data with watermarkCREATE VIEW
> silver_events
> >> >>>>>>>>>> WATERMARK FOR event_time AS event_time - INTERVAL '10'
> SECONDAS
> >> >>>>>>> SELECT
> >> >>>>>>>>>>      CAST(JSON_VALUE(raw_data, '$.event_time') AS
> TIMESTAMP(3))
> >> >>> AS
> >> >>>>>>>>> event_time,
> >> >>>>>>>>>>      JSON_VALUE(raw_data, '$.user_id') AS user_idFROM
> >> >>>>>>>>>> bronze_eventsWHERE JSON_VALUE(raw_data, '$.event_time') IS
> NOT
> >> >>> NULL;
> >> >>>>>>>>>> -- Gold: AggregationsSELECT TUMBLE_START(event_time, INTERVAL
> >> '1'
> >> >>>>>>>>>> HOUR), COUNT(*)FROM silver_eventsGROUP BY TUMBLE(event_time,
> >> >>> INTERVAL
> >> >>>>>>>>>> '1' HOUR);
> >> >>>>>>>>>>
> >> >>>>>>>>>> Reference Materials
> >> >>>>>>>>>>
> >> >>>>>>>>>>     - FLIP Document: FLIP-XXX: Support Watermark in Flink SQL
> >> >>> View
> >> >>>>>>>>>>     <
> >> >>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>
> >> >>>
> >>
> https://docs.google.com/document/d/1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y/edit?usp=sharing
> >> >>>>>>>>>>
> >> >>>>>>>>>>     - JIRA Issue:
> >> >>> https://issues.apache.org/jira/browse/FLINK-39062
> >> >>>>>>>>>>     - Implementation POC:
> >> >>>>>>>>>>     - [FLINK-39062][table] Support WATERMARK clause in CREATE
> >> >>> VIEW
> >> >>>>>>>>> statement
> >> >>>>>>>>>>     <https://github.com/apache/flink/pull/27571>
> >> >>>>>>>>>>     - [FLINK-39062][table] Support ALTER VIEW SET WATERMARK
> >> >>> syntax
> >> >>>>>>>>>>     <https://github.com/apache/flink/pull/27570>
> >> >>>>>>>>>>
> >> >>>>>>>>>> Implementation Timeline
> >> >>>>>>>>>>
> >> >>>>>>>>>> Estimated 6-8 weeks covering parser layer, planner layer,
> >> catalog
> >> >>>>>>>>>> integration, and comprehensive testing.
> >> >>>>>>>>>> Request for Feedback
> >> >>>>>>>>>>
> >> >>>>>>>>>> This enhancement would significantly improve Flink's support
> for
> >> >>>>>>>> layered
> >> >>>>>>>>>> data architectures and flexible event-time processing. I'm
> happy
> >> >>> to
> >> >>>>>>>>> provide
> >> >>>>>>>>>> more details or start a formal FLIP process if the community
> >> sees
> >> >>>>>>> value
> >> >>>>>>>>> in
> >> >>>>>>>>>> this proposal.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Looking forward to the community's feedback!
> >> >>>>>>>>>>
> >> >>>>>>>>>> Best regards,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Feat Zhang
> >> >>>>>>>>>>
> >> >>>>>>>>>>   FLIP-XXX: Support Watermark in Flink SQL View
> >> >>>>>>>>>> <
> >> >>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>
> >> >>>
> >>
> https://drive.google.com/open?id=1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>>
> >> >>>>>
> >> >>>
> >>
>

Reply via email to