Hi Xuyang, Thanks a lot for the confirmation, and for the very thorough back-and-forth throughout this discussion — it really helped sharpen the proposal, especially around the "arbitrary position" semantics, the planner-side contract, and the catalog persistence story. Much appreciated!
@all: with Xuyang's last round of feedback addressed, the FLIP has stabilized. If anyone — Timo, Sergey, Yunfeng, Jingsong, Ron, Shengkai, and others who have been following the thread — still has open concerns, please let me know in the next ~3 days. Otherwise I'll start the VOTE thread. Thanks again to everyone for the reviews! Best, FeatZhang On Mon, May 18, 2026 at 5:20 PM Xuyang <[email protected]> wrote: > Hi FeatZhang, > > Thanks for the update and the revised proposal! I don't have further > questions. Looking forward to seeing this move forward. > > > > -- > > Best! > Xuyang > > > > 在 2026-05-16 13:44:20,"FeatZhang" <[email protected]> 写道: > >Hi Xuyang, > > > >Thanks a lot for the careful follow-ups — these are exactly the right > >questions to ask before we move to a vote. After re-checking the code > >paths you pointed at, I want to revise several statements I made in my > >previous reply, and tighten the FLIP accordingly. I'll go through them > >one by one. > > > >1. Semantics at any position in the pipeline > >============================================= > >Fully agree. To make this concrete, I'll add a new sub-section > >"Semantics at Different Pipeline Positions" to the FLIP, covering: > > > > - APPLY_WATERMARK on a base table (with or without an existing > > DDL watermark) > > - APPLY_WATERMARK on top of a non-materialized view / sub-query > > - APPLY_WATERMARK applied multiple times in the same query > > - Interaction with TUMBLE / HOP / SESSION / CUMULATE > > - Interaction with joins (regular / interval / temporal) > > > >The mental model will be explicitly aligned with the DataStream API: > >each APPLY_WATERMARK in SQL corresponds to one > >`assignTimestampsAndWatermarks(...)` call in the DataStream pipeline, > >applied in the order they appear. > > > >2. Monotonicity validation > >========================== > >You're right, and I want to correct my earlier reply. Today's > >`CREATE TABLE ... WATERMARK FOR ... AS ...` does NOT enforce > >monotonicity at the planner level — the planner only checks that: > > > > - the rowtime column exists and is of TIMESTAMP / TIMESTAMP_LTZ, > > - the watermark expression is a valid scalar expression over the > > table's schema and resolves to a comparable type. > > > >Monotonicity is a runtime contract: the WatermarkAssignerOperator > >emits watermarks that are guaranteed to be non-decreasing. > > > >APPLY_WATERMARK will follow exactly the same contract — no stricter > >planner-level monotonicity check. I'll update the FLIP's "Planner > >Changes → Validation" section to reflect this. > > > >3. Override timing — clarification > >================================== > >I think this was a wording issue on my side rather than a real > >design disagreement. Let me restate it: > > > > - APPLY_WATERMARK introduces a dedicated WatermarkAssigner node in > > the plan. Whether the input already carries a watermark or not, > > the plan ends up with the new assigner positioned downstream of > > the existing one. > > - At runtime there is no "merge" or "reconciliation": each > > WatermarkAssigner operator independently emits its own watermark > > stream; downstream operators simply observe the watermark from > > the most recent upstream assigner. > > > >This is the same model as calling `assignTimestampsAndWatermarks()` > >twice in DataStream — the second call wins because it sits later in > >the operator chain, not because of any planner-level magic. > > > >So "planner-level override" was a poor choice of words. The correct > >description is: **the planner decides the operator topology; the > >runtime emits watermarks according to that topology**, exactly like > >DataStream. I'll rephrase the FLIP accordingly and drop the > >"override" terminology. > > > >4. "Watermark expression evaluation: needs to support arbitrary > > expressions" > >==================================================================== > >Apologies, this statement was inaccurate. After re-checking, > >StreamExecWatermarkAssigner already evaluates the watermark > >expression through the standard `ExprCodeGenerator`, which supports > >the same scalar expressions as DDL today (arithmetic on TIMESTAMP / > >TIMESTAMP_LTZ, INTERVAL arithmetic, scalar UDFs, etc.). > > > >What APPLY_WATERMARK actually needs from the ExecNode is: > > > > - resolving the rowtime column index from the DESCRIPTOR, since the > > input may be a non-base-table (view / sub-query / projected > > relation), > > - wiring the watermark expression's input row to the upstream > > operator's output row instead of a TableScan output. > > > >No new expression capability is required. I'll fix this in the FLIP. > > > >5. State management > >=================== > >You're right, this should be removed. In the scope of this FLIP the > >APPLY_WATERMARK ExecNode is **stateless**, identical to the existing > >`WatermarkAssignerOperator`. It does not buffer rows and does not > >evict late data; late-data handling remains the responsibility of > >the downstream window / join operators, exactly as it works today. > > > >The "state management" bullet in my previous reply was speculation > >about future watermark strategies (idle source detection, etc.) and > >does not belong in this FLIP. I'll drop it. > > > >------------------------------------------------------------ > >Updated summary of the design after this round > >------------------------------------------------------------ > > > > - Scope: base tables, non-materialized views, sub-queries; any > > relation position in the query. > > - Semantics: aligned with DataStream API > > `assignTimestampsAndWatermarks()`; multiple applications in the > > same query are positional, not "overriding". > > - Validation: same contract as today's DDL — scalar expression on > > a TIMESTAMP / TIMESTAMP_LTZ rowtime column; no planner-level > > monotonicity check. > > - ExecNode: stateless, reuses StreamExecWatermarkAssigner with > > minor wiring changes (rowtime column resolution + non-TableScan > > input handling). > > - Out of scope: WatermarkFunction interface, idle-source state, > > runtime-level merge of multiple watermark strategies. > > > >I'll update the FLIP document and PR #27984 to match the points > >above, and post a diff summary here once it's done. > > > >Thanks again for pushing on these — the FLIP is much cleaner after > >this round. > > > >Best regards, > >FeatZhang > > > >On Mon, May 11, 2026 at 10:45 AM Xuyang <[email protected]> wrote: > > > >> Hi FeatZhang. Thanks for the detailed responses. I have a few follow-up > >> comments and questions: > >> > >> > >> 1. Support for APPLY_WATERMARK at any node/position > >> I generally agree with the direction that APPLY_WATERMARK should be > >> applicable at any node and any position — one or more times — similar to > >> how the DataStream API allows watermark assignment. However, I think we > >> need to clearly articulate the precise behavior/semantics in each > scenario > >> to reduce user confusion. Aligning the mental model with the DataStream > API > >> (where users can call assignTimestampsAndWatermarks() at arbitrary > points > >> in the pipeline) would also help lower the learning curve. > >> 2. Monotonicity validation of watermark expressions > >> Why do we need to enforce monotonicity guarantees on the watermark > >> expression at the planner level? As far as I know, Flink SQL currently > does > >> NOT perform such validation at the DDL level for CREATE TABLE ... > WATERMARK > >> FOR ... AS .... What it actually does is ensure at runtime that emitted > >> watermarks are non-decreasing. If the existing DDL path does not > validate > >> monotonicity at planning time, why should APPLY_WATERMARK introduce a > >> stricter contract? > >> 3. Planner-level watermark override > >> Could you elaborate more on why the watermark override must happen at > the > >> planner level? If I understand correctly, in the DataStream API, users > can > >> define different watermark strategies at different nodes in the same > >> pipeline, and the runtime handles watermark propagation naturally. > >> 4. "Watermark expression evaluation: Needs to support arbitrary > >> expressions" > >> You mentioned that StreamExecWatermarkAssigner currently has limitations > >> in watermark expression evaluation and "needs to support arbitrary > >> expressions." Could you clarify what the current limitations are > exactly? > >> Today's CREATE TABLE ... WATERMARK FOR ts AS ts - INTERVAL '5' SECOND > >> already supports scalar expressions, what additional expression types > does > >> APPLY_WATERMARK require that are not already supported? > >> 5. State management > >> You mentioned "State management". Are we proposing to introduce a > stateful > >> watermark assigner node that evicts late data? This sounds like a > >> significant change that goes well beyond the scope of this FLIP. The > >> current WatermarkAssigner is stateless, it simply computes and emits > >> watermarks without buffering data. > >> Looking forward to your clarification! > >> > >> > >> > >> > >> > >> -- > >> > >> Best! > >> Xuyang > >> > >> > >> > >> At 2026-05-09 19:08:32, "熊饶饶" <[email protected]> wrote: > >> >Hi Feat Zhang, > >> > > >> >+1 (non-binding) > >> > > >> >This is a well-motivated proposal that addresses a real pain point in > >> Flink SQL. The inability to define watermarks in views currently forces > >> users to either: > >> > > >> >Reference underlying table watermarks directly, breaking encapsulation > >> >Create separate physical tables for each watermark strategy, leading to > >> data duplication > >> >The proposed syntax CREATE VIEW ... WATERMARK FOR col AS expr is > >> intuitive and aligns naturally with existing DDL watermark semantics. > The > >> backward-compatible design (storing watermark metadata in catalog > options) > >> is also a smart choice — it avoids breaking existing views while > enabling > >> the new capability. > >> > > >> >The Data Lakehouse / medallion architecture use case is particularly > >> compelling. Being able to define watermark strategies at the Silver/Gold > >> layer while keeping Bronze as raw data would significantly simplify > >> pipeline design for many teams. > >> > > >> >Looking forward to seeing this move to a formal FLIP! > >> > > >> >Best regards, > >> >Raorao Xiong > >> > > >> >> 2026年5月7日 20:56,FeatZhang <[email protected]> 写道: > >> >> > >> >> Hi Xuyang, > >> >> > >> >> Thank you for the thorough review and thoughtful questions. > >> >> > >> >> *Problem this FLIP aims to solve*: The core goal of this FLIP is to > >> allow > >> >> watermarks to be defined on *computed columns (and, more generally, > on > >> any > >> >> column produced inside a SQL query) directly in SQL statements*. > Today, > >> >> watermarks in Flink SQL can only be declared at the CREATE TABLE > level > >> >> via WATERMARK > >> >> FOR ... AS ..., which means the time attribute must be a column > visible > >> at > >> >> the base table DDL. This makes it impossible to attach a watermark > to a > >> >> timestamp derived inside a query — for example, one computed by > string > >> >> parsing, JSON extraction, or any expression inside a view or > subquery — > >> >> without pushing that computation back down into the source table > DDL. By > >> >> introducing APPLY_WATERMARK as an explicit relational operator that > can > >> be > >> >> applied to *base tables, non-materialized views, and subqueries*, > users > >> can > >> >> assign watermark semantics to any (computed) column produced by a > query, > >> >> which also addresses the broader motivations listed in the FLIP: > broken > >> >> layered-pipeline abstractions, lack of per-query / multi-tenant > >> watermark > >> >> strategies, and the current gap between SQL and the DataStream API. > >> >> > >> >> Now let me address each of your points: > >> >> Support for Non-materialized Views > >> >> > >> >> You raised an excellent point about the scope of APPLY_WATERMARK in > >> layered > >> >> architectures. > >> >> > >> >> *My position*: APPLY_WATERMARK should support base tables, > >> non-materialized > >> >> views, *and subqueries* (this matches the Goals section of the FLIP). > >> >> Here's why: > >> >> > >> >> - Non-materialized views dissolve into the surrounding plan during > >> >> optimization (inline expansion) > >> >> - There's no physical "view" node in the execution plan—just a > logical > >> >> alias > >> >> - The watermark becomes a relational transformation applied on top > of > >> >> the view's / subquery's output > >> >> > >> >> The key design principle is: *watermark definition is an explicit > >> >> relational operator, not attached metadata*. > >> >> > >> >> I'd also like to clarify the positions in the prior thread to avoid > >> >> confusion: > >> >> > >> >> - *Lincoln* originally proposed APPLY_WATERMARK(table, > >> DESCRIPTOR(col), > >> >> expr) scoped to base tables only. > >> >> - *Timo* raised the concern about blurring the view abstraction > ("*A > >> >> view usually dissolves into the plan … would a watermark definition > >> >> suddenly introduce an optimization barrier? If this is an > optimization > >> >> barrier, is this still a view or a new concept?*"). This is exactly > >> why > >> >> this FLIP does *not* attach watermarks to views via CREATE VIEW / > >> ALTER > >> >> VIEW, and keeps views as pure logical aliases. > >> >> - *Gyula* emphasized that watermark assignment should be available > on > >> >> views and subqueries too, consistent with the DataStream API. > >> >> > >> >> To address Timo's concern concretely: > >> >> > >> >> - Watermark semantics are applied at query planning time via an > >> explicit > >> >> relational operator (APPLY_WATERMARK), not hidden in view/catalog > >> metadata. > >> >> - No watermark information is persisted into the catalog for views > — > >> the > >> >> catalog stays unchanged (see FLIP "Catalog Changes": *No catalog > >> changes > >> >> are required*). > >> >> - Views continue to dissolve transparently into the plan; the > >> >> optimization barrier only appears where APPLY_WATERMARK is > explicitly > >> used. > >> >> > >> >> Monotonicity Validation > >> >> > >> >> Great question! Monotonicity guarantees are essential for watermark > >> >> correctness: > >> >> > >> >> - Watermarks define the boundary of "late" data > >> >> - If the watermark expression is not monotonically non-decreasing, > the > >> >> watermark could move backward > >> >> - This would cause data that was previously considered "on-time" > to be > >> >> treated as late (or vice-versa), breaking event-time semantics > >> >> > >> >> *Validation requirement*: In line with the FLIP's Planner Changes > >> section, > >> >> the planner validates that watermark_expression is a valid *scalar* > >> expression > >> >> over columns of the input schema, and (as with today's CREATE TABLE > ... > >> >> WATERMARK FOR ... AS ...) the expression must produce a monotonically > >> >> non-decreasing value relative to the designated rowtime column. > >> >> > >> >> Typical valid forms are the same as what's allowed in DDL today, for > >> >> example: > >> >> > >> >> -- Bounded out-of-orderness (most common) > >> >> APPLY_WATERMARK(t, DESCRIPTOR(ts), ts - INTERVAL '5' SECOND) > >> >> -- Strictly ascending > >> >> APPLY_WATERMARK(t, DESCRIPTOR(ts), ts) > >> >> > >> >> Note: watermark_expression is a *scalar expression* per the FLIP > (not an > >> >> aggregate / window function). Richer forms such as user-defined > >> watermark > >> >> strategies are explicitly out of scope for this FLIP and are tracked > as > >> a > >> >> future WatermarkFunction interface, which also depends on the Calcite > >> >> lambda upgrade mentioned by Timo. > >> >> Override Timing (Planner vs Runtime) > >> >> > >> >> You raised a valid concern. Let me clarify the design, which aligns > with > >> >> the FLIP's "Planner Changes → Interaction with Existing Table > >> Watermarks" > >> >> section: > >> >> > >> >> *Current proposal*: Planner-level override > >> >> > >> >> - During query compilation, when the input to APPLY_WATERMARK > already > >> >> carries a watermark (e.g., from CREATE TABLE ... WATERMARK), the > >> >> LogicalWatermarkAssigner node produced by APPLY_WATERMARK > *overrides* > >> the > >> >> upstream watermark strategy. > >> >> - When the input has no watermark (e.g., a view or a subquery), > >> >> APPLY_WATERMARK introduces a new one. > >> >> - This makes override behavior explicit in the plan and keeps room > for > >> >> standard optimizations. > >> >> > >> >> *Why not runtime-level override*: > >> >> > >> >> 1. Planner-level override keeps watermark semantics a first-class, > >> >> visible part of the plan (consistent with how VECTOR_SEARCH / > >> ML_PREDICT > >> >> are modeled as specialized ExecNodes in the FLIP). > >> >> 2. The override point is deterministic and inspectable via EXPLAIN. > >> >> 3. Simpler execution model — no dual-watermark reconciliation at > >> runtime. > >> >> > >> >> If concrete use cases for a runtime-level override surface later, we > >> could > >> >> revisit this via a hint (e.g., /*+ RUNTIME_OVERRIDE */), but it's not > >> part > >> >> of this FLIP. > >> >> Relationship with BuiltInProcessTableFunction > >> >> > >> >> Good observation. APPLY_WATERMARK is declared as a built-in PTF at > the > >> SQL > >> >> surface, similar in spirit to TO_CHANGELOG / FROM_CHANGELOG, but the > >> FLIP > >> >> intentionally maps it to a *specialized ExecNode* rather than a > generic > >> PTF > >> >> runtime — the same pattern used by VECTOR_SEARCH and ML_PREDICT. > >> >> > >> >> *Option A: Reuse the generic BuiltInProcessTableFunction runtime* > >> >> > >> >> - Pros: Consistent with other built-in PTFs at the runtime layer. > >> >> - Cons: Watermark assignment is not a row-transforming PTF — it > >> changes > >> >> stream metadata (time attribute + watermark strategy). Forcing it > >> through > >> >> the generic PTF runtime would require extending the PTF contract > with > >> >> watermark semantics. > >> >> > >> >> *Option B: Dedicated LogicalWatermarkAssigner + specialized ExecNode > >> (the > >> >> FLIP's choice)* > >> >> > >> >> - Pros: Keeps watermark semantics a first-class citizen in the > >> planner; > >> >> cleanly integrates with watermark propagation rules; no need to > >> overload > >> >> the PTF contract; same pattern as VECTOR_SEARCH / ML_PREDICT > already > >> >> established in Flink. > >> >> - Cons: A new dedicated node, though that cost is small compared to > >> the > >> >> semantic clarity. > >> >> > >> >> *Current decision*: Option B, as stated in the FLIP > ("*APPLY_WATERMARK > >> >> compiles to a specialized ExecNode --- similar to how VECTOR_SEARCH > and > >> >> ML_PREDICT are handled*"). Open to revisiting based on community > >> feedback. > >> >> StreamExecWatermarkAssigner Sufficiency > >> >> > >> >> For the physical implementation: > >> >> > >> >> *Yes, StreamExecWatermarkAssigner should be sufficient*, with some > >> >> modifications: > >> >> > >> >> 1. *Input handling*: Currently assumes direct table scan; needs to > >> >> handle APPLY_WATERMARK's column mapping > >> >> 2. *Watermark expression evaluation*: Needs to support arbitrary > >> >> expressions (currently limited) > >> >> 3. *State management*: May need additional state for handling > >> >> out-of-order events > >> >> > >> >> The key insight is that APPLY_WATERMARK conceptually translates to: > >> >> > >> >> TableScan -> Calc (expression evaluation) -> WatermarkAssigner > >> >> > >> >> StreamExecWatermarkAssigner handles the last step; the Calc step > handles > >> >> the expression. > >> >> ------------------------------ > >> >> Summary of Proposed Responses > >> >> > >> >> - *Scope of input*: Support base tables, non-materialized views > *and > >> >> subqueries* (per FLIP Goals); views/catalog semantics stay > unchanged. > >> >> - *Monotonicity*: Validate watermark_expression as a scalar > >> expression; > >> >> same monotonicity contract as today's DDL watermarks. > >> >> - *Override timing*: Planner-level override at > >> LogicalWatermarkAssigner; > >> >> potential /*+ RUNTIME_OVERRIDE */ hint as future work. > >> >> - *PTF reuse*: Dedicated LogicalWatermarkAssigner + specialized > >> ExecNode > >> >> (same pattern as VECTOR_SEARCH / ML_PREDICT). > >> >> - *ExecNode sufficiency*: StreamExecWatermarkAssigner is sufficient > >> with > >> >> minor modifications (input handling + expression evaluation). > >> >> > >> >> ------------------------------ > >> >> > >> >> Looking forward to further discussion! > >> >> > >> >> Best regards, > >> >> FeatZhang > >> >> > >> >> > >> >> On Wed, May 6, 2026 at 4:39 PM Xuyang <[email protected]> wrote: > >> >> > >> >>> Hi, FeatZhang. Thanks for driving this discussion. I've read through > >> the > >> >>> full FLIP and the mailing list context, and I have a few questions: > >> >>> 1. If I understand correctly, in a Layered Data Architecture, > >> >>> silver_events would typically be a table, a materialized view, or a > >> >>> materialized table. From the mailing list discussion, it seems like > no > >> >>> consensus was reached on this point. I think we still need to > consider > >> >>> whether APPLY_WATERMARK should be allowed on (non-materialized) > views. > >> >>> 2. In the Planner Changes section under Logical Plan, could you > >> elaborate > >> >>> on why monotonicity guarantees need to be ensured for the watermark > >> >>> expression validation? > >> >>> 3. (nit) In the Watermark Override part under Planner Changes, > >> shouldn't > >> >>> the override of the upstream watermark happen at runtime rather than > >> at the > >> >>> planner level? > >> >>> 4. I feel that APPLY_WATERMARK is quite similar to TO_CHANGELOG and > >> >>> FROM_CHANGELOG. Is what we actually need just a > >> >>> BuiltInProcessTableFunction? That way, we would only need to further > >> extend > >> >>> ProcessTableFunction to support this. > >> >>> 5. If we choose to translate APPLY_WATERMARK into a specialized > >> ExecNode > >> >>> (similar to VECTOR_SEARCH and ML_PREDICT), would the existing > >> >>> StreamExecWatermarkAssigner be sufficient for this purpose? > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> -- > >> >>> > >> >>> Best! > >> >>> Xuyang > >> >>> > >> >>> > >> >>> > >> >>> At 2026-04-21 22:22:33, "FeatZhang" <[email protected]> wrote: > >> >>>> Hi everyone, > >> >>>> > >> >>>> Thank you for the feedback and discussions on the initial proposal. > >> I've > >> >>>> revised the FLIP based on the community's input and would like to > >> share > >> >>>> the updated version. > >> >>>> > >> >>>> > >> >>>> FLIP-XXX: Support Flexible Watermark Assignment via Built-in > Function > >> >>>> < > >> >>> > >> > https://drive.google.com/open?id=17PXYAi6Pb91OqFhVVK7tRULiaHAb6wiX79jjC0NaaDA > >> >>>> > >> >>>> > >> >>>> > >> >>>> KEY UPDATES > >> >>>> =========== > >> >>>> > >> >>>> The proposal has evolved from "Support Watermark Definition in SQL > >> Views" > >> >>>> to a more flexible and powerful approach: FLIP-XXX: Support > Flexible > >> >>>> Watermark Assignment via APPLY_WATERMARK Function. > >> >>>> > >> >>>> What's Changed: > >> >>>> > >> >>>> 1. Broader Scope: Instead of limiting watermark definitions to SQL > >> views > >> >>>> only, the new proposal introduces a built-in table function > >> >>>> APPLY_WATERMARK that works with: > >> >>>> - Base tables > >> >>>> - Views (both regular and materialized) > >> >>>> - Subqueries > >> >>>> - Any table-valued expressions > >> >>>> > >> >>>> 2. More Flexible Design: The function-based approach provides: > >> >>>> - Dynamic watermark assignment at query time without modifying > >> catalog > >> >>>> metadata > >> >>>> - Override capability for existing watermark strategies > >> >>>> - Composability with other SQL operations > >> >>>> - No need for DDL changes or catalog write permissions > >> >>>> > >> >>>> 3. Better SQL Semantics: Using a table function follows SQL > standard > >> >>>> patterns and integrates naturally with Flink's existing function > >> >>>> ecosystem. > >> >>>> > >> >>>> UPDATED FLIP DOCUMENT > >> >>>> ===================== > >> >>>> > >> >>>> The revised FLIP is now available at: > >> >>>> https://iwiki.woa.com/p/4019879693 > >> >>>> > >> >>>> Key sections include: > >> >>>> - Motivation and use cases > >> >>>> - Public interfaces and SQL syntax > >> >>>> - Implementation plan > >> >>>> - Compatibility analysis > >> >>>> - Test plan > >> >>>> > >> >>>> EXAMPLE USAGE > >> >>>> ============= > >> >>>> > >> >>>> -- Apply watermark to a view > >> >>>> SELECT *FROM APPLY_WATERMARK(my_view, DESCRIPTOR(event_time), > >> >>>> event_time - INTERVAL '5' SECOND); > >> >>>> -- Override existing watermark strategy > >> >>>> SELECT *FROM APPLY_WATERMARK(my_table_with_watermark, > DESCRIPTOR(ts), > >> >>>> ts - INTERVAL '10' SECOND -- Different from DDL watermark > >> >>>> ); > >> >>>> -- Use in complex queries > >> >>>> SELECT window_start, > >> >>>> window_end, > >> >>>> COUNT(*)FROM TABLE(TUMBLE(TABLE APPLY_WATERMARK(orders, > >> >>>> DESCRIPTOR(order_time), order_time - INTERVAL '5' SECOND), > >> >>>> DESCRIPTOR(order_time), INTERVAL '1' HOUR))GROUP BY window_start, > >> >>>> window_end; > >> >>>> > >> >>>> > >> >>>> IMPLEMENTATION PROGRESS > >> >>>> ======================= > >> >>>> > >> >>>> I've also opened a draft PR #27984 with the initial implementation: > >> >>>> - Core built-in function definition > >> >>>> - SQL-to-RelNode conversion rules > >> >>>> - Physical plan integration > >> >>>> - Unit tests and documentation (English + Chinese) > >> >>>> > >> >>>> The PR is available at: > >> >>>> https://github.com/apache/flink/pull/27984 > >> >>>> > >> >>>> REQUEST FOR FEEDBACK > >> >>>> ==================== > >> >>>> > >> >>>> I would appreciate your thoughts on: > >> >>>> > >> >>>> 1. Function naming: Is APPLY_WATERMARK clear and intuitive? > >> >>>> (Alternative considered: WITH_WATERMARK, SET_WATERMARK) > >> >>>> > >> >>>> 2. DESCRIPTOR syntax: Using DESCRIPTOR(column_name) to specify the > >> >>>> rowtime column—does this align well with Flink's existing > patterns? > >> >>>> > >> >>>> 3. Override behavior: Should APPLY_WATERMARK always override > existing > >> >>>> watermarks, or should we provide a mode parameter > >> >>>> (e.g., OVERRIDE, MERGE)? > >> >>>> > >> >>>> 4. Performance considerations: Any concerns about the > function-based > >> >>>> approach vs. catalog-level watermark definitions? > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> Looking forward to your valuable feedback! > >> >>>> > >> >>>> Best regards, > >> >>>> FeatZhang > >> >>>> > >> >>>> On Thu, Feb 12, 2026 at 6:24 PM Timo Walther <[email protected]> > >> wrote: > >> >>>> > >> >>>>> Hi everyone, > >> >>>>> > >> >>>>> I think we all agree that we clearly want this functionality, just > >> the > >> >>>>> "how" needs to be discussed. I also like Lincoln’s suggestion of > >> >>>>> introducing a built-in PTF for this, I had similar ideas in mind. > >> >>>>> > >> >>>>> There are two issues with a APPLY_WATERMARK function, but both on > the > >> >>>>> short-term roadmap: > >> >>>>> > >> >>>>> 1) This function would need to be a function that takes an > >> expression. > >> >>>>> Ideally as a lambda function. Newer Calcite versions have already > >> lambda > >> >>>>> expression support. At Confluent we were planning to work on a > >> Calcite > >> >>>>> upgrade this quarter especially to get lambda support in and > improve > >> >>>>> built-in functions that work on collections. > >> >>>>> > >> >>>>> 2) User-defined PTFs are currently not able to emit watermarks. We > >> could > >> >>>>> introduce a new interface WatermarkFunction (similar to > >> >>>>> ChangelogFunction) that would offer this to everyone. > Alternatively, > >> we > >> >>>>> could only use the PTF signature, but translate to a specialized > >> >>>>> ExecNode similar how we do it for VECTOR_SEARCH and ML_PREDICT. > >> >>>>> > >> >>>>> In any case, even if we go with the function approach, we > definitely > >> >>>>> need a full FLIP on this. > >> >>>>> > >> >>>>> Thanks, > >> >>>>> Timo > >> >>>>> > >> >>>>> On 12.02.26 08:25, Gyula Fóra wrote: > >> >>>>>> Hi All! > >> >>>>>> I would like to chime in here quickly from a slightly different > >> angle. > >> >>>>>> While I am the first to admit that I cannot grasp all the > planning / > >> >>>>>> conceptual implications, I also feel the need for more flexible > >> >>> watermark > >> >>>>>> handling as suggested by Feat. > >> >>>>>> > >> >>>>>> Anything that can only be applied to base/catalog tables is very > >> >>> limiting > >> >>>>>> from a usability perspective. Watermarks feel like they should > be a > >> >>>>> simple > >> >>>>>> function that you can apply on a column/table as part of a > >> query/view. > >> >>>>> For > >> >>>>>> example extract timestamp from a string convert to TS -> apply > >> >>> watermark > >> >>>>>> etc. > >> >>>>>> > >> >>>>>> Users often receive the tables/catalogs as given and can only > write > >> >>>>>> queries. > >> >>>>>> > >> >>>>>> Fixing this would eliminate a long standing disconnect between > the > >> >>>>>> datastream api flexible watermark handling compared to the > currently > >> >>> very > >> >>>>>> restrictive SQL approach. > >> >>>>>> > >> >>>>>> Cheers > >> >>>>>> Gyula > >> >>>>>> > >> >>>>>> On Thu, Feb 12, 2026 at 7:54 AM FeatZhang <[email protected]> > >> >>> wrote: > >> >>>>>> > >> >>>>>>> Hi Timo, Lincoln, > >> >>>>>>> > >> >>>>>>> Thank you both for the detailed feedback. > >> >>>>>>> > >> >>>>>>> I agree with the concern that non-materialized SQL views should > >> >>> remain a > >> >>>>>>> pure logical abstraction. Introducing watermark definitions > >> directly > >> >>>>>>> into CREATE > >> >>>>>>> VIEW or ALTER VIEW could blur the boundary between logical > aliasing > >> >>> and > >> >>>>>>> physical planning semantics, especially considering optimization > >> >>>>> barriers > >> >>>>>>> and watermark propagation behavior. > >> >>>>>>> > >> >>>>>>> Lincoln’s suggestion of introducing a built-in function such as: > >> >>>>>>> > >> >>>>>>> APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column), > >> >>> watermark_expression) > >> >>>>>>> > >> >>>>>>> is a cleaner direction. It keeps watermark definition as an > >> explicit > >> >>>>>>> relational transformation rather than attaching additional > >> semantics > >> >>> to > >> >>>>>>> views. > >> >>>>>>> > >> >>>>>>> However, to fully address the original use cases (especially > >> logical > >> >>>>> reuse > >> >>>>>>> and layered lakehouse architectures), I propose that the table > >> >>> parameter > >> >>>>>>> should support: > >> >>>>>>> > >> >>>>>>> - Base tables > >> >>>>>>> - Non-materialized views > >> >>>>>>> > >> >>>>>>> If APPLY_WATERMARK can accept both, we can: > >> >>>>>>> > >> >>>>>>> - Preserve the conceptual purity of SQL views > >> >>>>>>> - Avoid redefining view semantics > >> >>>>>>> - Still enable logical reuse via views > >> >>>>>>> - Allow different watermark strategies over the same logical > >> >>>>> relation > >> >>>>>>> > >> >>>>>>> In other words, watermark definition becomes an explicit > relational > >> >>>>>>> operator applied on top of any logical relation, instead of > being > >> >>>>> embedded > >> >>>>>>> into the view definition itself. > >> >>>>>>> > >> >>>>>>> From a planner perspective, this keeps the model consistent: > >> >>>>>>> > >> >>>>>>> - The function expands into a relational node > >> >>>>>>> - No optimization barrier is introduced by views > >> >>>>>>> - Watermark handling remains part of the logical plan > >> >>> transformation > >> >>>>>>> > >> >>>>>>> I will prepare a PR to prototype APPLY_WATERMARK with support > for > >> >>> both > >> >>>>> base > >> >>>>>>> tables and non-materialized views, and share it for further > >> >>> discussion. > >> >>>>>>> > >> >>>>>>> Looking forward to your thoughts. > >> >>>>>>> > >> >>>>>>> Best, > >> >>>>>>> Feat > >> >>>>>>> > >> >>>>>>> > >> >>>>>>> Lincoln Lee <[email protected]> 于2026年2月12日周四 12:19写道: > >> >>>>>>> > >> >>>>>>>> Agree with Timo’s point regarding the conceptual semantics. We > >> >>> should > >> >>>>> not > >> >>>>>>>> directly extend non-materialized views with additional > watermark > >> >>>>>>>> definitions. > >> >>>>>>>> > >> >>>>>>>> Regarding the use case mentioned by Feat, defining different > >> >>> watermark > >> >>>>>>>> strategies > >> >>>>>>>> for the same data source, especially in the case of catalog > >> tables, > >> >>> we > >> >>>>>>> are > >> >>>>>>>> exploring a possible solution introducing a built-in function: > >> >>>>>>>> ```sql > >> >>>>>>>> APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column), > >> >>>>> watermark_expression) > >> >>>>>>>> ``` > >> >>>>>>>> This function only support base tables as input and not support > >> >>> views, > >> >>>>>>>> subqueries > >> >>>>>>>> or derived relations. > >> >>>>>>>> > >> >>>>>>>> This would address a meaningful subset of the identified use > cases > >> >>>>>>> without > >> >>>>>>>> redefining the role of SQL views. > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> Best, > >> >>>>>>>> Lincoln Lee > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> Timo Walther <[email protected]> 于2026年2月11日周三 23:29写道: > >> >>>>>>>> > >> >>>>>>>>> Hi Feat, > >> >>>>>>>>> > >> >>>>>>>>> thanks for proposing this FLIP. We had similar discussions in > the > >> >>>>> past, > >> >>>>>>>>> but so far could never reach consensus. > >> >>>>>>>>> > >> >>>>>>>>> SQL views are actually a very simple concept, they just give > SQL > >> >>> text > >> >>>>>>> an > >> >>>>>>>>> alias. A view has no other properties except for the view > >> >>> definition. > >> >>>>>>>>> Everything else is dynamically computed when the SQL text is > >> >>> inserted > >> >>>>>>>>> into the larger plan. A view is never evaluated without the > >> >>>>> surrounding > >> >>>>>>>>> plan. > >> >>>>>>>>> > >> >>>>>>>>> Watermarks in the middle of a pipeline raise a couple of > tricky > >> >>>>> issues: > >> >>>>>>>>> > >> >>>>>>>>> - What if the upstream table is updating, how would you deal > with > >> >>>>>>>>> watermarks in the downstream view? > >> >>>>>>>>> - What if the upstream table emits already watermarks? Would > the > >> >>> view > >> >>>>>>>>> catch them and discard this information? > >> >>>>>>>>> - A view usually dissolves into the plan (e.g. via projection > or > >> >>>>> filter > >> >>>>>>>>> pushdown). Would a watermark definition suddenly introduce an > >> >>>>>>>>> optimization barrier? If this is an optimization barrier, is > this > >> >>>>> still > >> >>>>>>>>> a view or a new concept? E.g. a "materialized view" or > >> "pre-planned > >> >>>>>>>> view"? > >> >>>>>>>>> > >> >>>>>>>>> Cheers, > >> >>>>>>>>> Timo > >> >>>>>>>>> > >> >>>>>>>>> > >> >>>>>>>>> > >> >>>>>>>>> On 11.02.26 03:51, FeatZhang wrote: > >> >>>>>>>>>> Hi Flink Community, > >> >>>>>>>>>> > >> >>>>>>>>>> I'd like to propose adding watermark support for SQL Views in > >> >>> Flink > >> >>>>>>> to > >> >>>>>>>>>> better support event-time processing scenarios. > >> >>>>>>>>>> Problem Statement > >> >>>>>>>>>> > >> >>>>>>>>>> Currently, Flink SQL views cannot define watermarks. This > >> >>> limitation > >> >>>>>>>>>> creates several challenges: > >> >>>>>>>>>> > >> >>>>>>>>>> - *Broken Abstraction*: Users must reference underlying > >> table > >> >>>>>>>>> watermarks > >> >>>>>>>>>> directly, exposing implementation details > >> >>>>>>>>>> - *No Flexibility*: Cannot define different watermark > >> >>> strategies > >> >>>>>>>> for > >> >>>>>>>>>> different use cases on the same data source > >> >>>>>>>>>> - *Limited Architecture Support*: Incompatible with > modern > >> >>>>>>> layered > >> >>>>>>>>> data > >> >>>>>>>>>> architectures (Bronze/Silver/Gold medallion pattern) > >> >>>>>>>>>> > >> >>>>>>>>>> For example, in multi-tenant scenarios, different tenants may > >> >>> require > >> >>>>>>>>>> different lateness tolerance, but currently we cannot create > >> views > >> >>>>>>> with > >> >>>>>>>>>> different watermark strategies on the same source table. > >> >>>>>>>>>> Proposed Solution > >> >>>>>>>>>> > >> >>>>>>>>>> I propose adding two SQL syntax options to support watermark > >> >>>>>>>> definitions > >> >>>>>>>>> in > >> >>>>>>>>>> views: > >> >>>>>>>>>> > >> >>>>>>>>>> *Option 1: CREATE VIEW with WATERMARK* > >> >>>>>>>>>> > >> >>>>>>>>>> CREATE VIEW user_activity > >> >>>>>>>>>> WATERMARK FOR event_time AS event_time - INTERVAL '5' > SECONDAS > >> >>> SELECT > >> >>>>>>>>>> user_id, event_time, action FROM raw_events; > >> >>>>>>>>>> > >> >>>>>>>>>> *Option 2: ALTER VIEW SET WATERMARK* > >> >>>>>>>>>> > >> >>>>>>>>>> ALTER VIEW user_activity SET WATERMARK FOR event_time AS > >> >>> event_time - > >> >>>>>>>>>> INTERVAL '5' SECOND; > >> >>>>>>>>>> > >> >>>>>>>>>> Key Design Aspects > >> >>>>>>>>>> > >> >>>>>>>>>> - *Backward Compatibility*: Watermark stored as optional > >> >>>>> metadata > >> >>>>>>>> in > >> >>>>>>>>>> view options; existing views continue to work unchanged > >> >>>>>>>>>> - *Validation*: Watermark column must exist in view > schema > >> >>> and > >> >>>>> be > >> >>>>>>>> of > >> >>>>>>>>>> TIMESTAMP/TIMESTAMP_LTZ type > >> >>>>>>>>>> - *Storage*: Watermark metadata persists in catalog > options > >> >>> map > >> >>>>>>>>> (works > >> >>>>>>>>>> with all catalog implementations) > >> >>>>>>>>>> - *Propagation*: Follows existing Flink watermark > >> propagation > >> >>>>>>> rules > >> >>>>>>>>> in > >> >>>>>>>>>> joins and nested views > >> >>>>>>>>>> > >> >>>>>>>>>> Use Case Example: Data Lakehouse Architecture > >> >>>>>>>>>> > >> >>>>>>>>>> -- Bronze: Raw data (no watermark)CREATE TABLE bronze_events > >> >>>>>>> (raw_data > >> >>>>>>>>>> STRING, ingestion_time TIMESTAMP(3)) WITH (...); > >> >>>>>>>>>> -- Silver: Cleaned data with watermarkCREATE VIEW > silver_events > >> >>>>>>>>>> WATERMARK FOR event_time AS event_time - INTERVAL '10' > SECONDAS > >> >>>>>>> SELECT > >> >>>>>>>>>> CAST(JSON_VALUE(raw_data, '$.event_time') AS > TIMESTAMP(3)) > >> >>> AS > >> >>>>>>>>> event_time, > >> >>>>>>>>>> JSON_VALUE(raw_data, '$.user_id') AS user_idFROM > >> >>>>>>>>>> bronze_eventsWHERE JSON_VALUE(raw_data, '$.event_time') IS > NOT > >> >>> NULL; > >> >>>>>>>>>> -- Gold: AggregationsSELECT TUMBLE_START(event_time, INTERVAL > >> '1' > >> >>>>>>>>>> HOUR), COUNT(*)FROM silver_eventsGROUP BY TUMBLE(event_time, > >> >>> INTERVAL > >> >>>>>>>>>> '1' HOUR); > >> >>>>>>>>>> > >> >>>>>>>>>> Reference Materials > >> >>>>>>>>>> > >> >>>>>>>>>> - FLIP Document: FLIP-XXX: Support Watermark in Flink SQL > >> >>> View > >> >>>>>>>>>> < > >> >>>>>>>>> > >> >>>>>>>> > >> >>>>>>> > >> >>>>> > >> >>> > >> > https://docs.google.com/document/d/1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y/edit?usp=sharing > >> >>>>>>>>>> > >> >>>>>>>>>> - JIRA Issue: > >> >>> https://issues.apache.org/jira/browse/FLINK-39062 > >> >>>>>>>>>> - Implementation POC: > >> >>>>>>>>>> - [FLINK-39062][table] Support WATERMARK clause in CREATE > >> >>> VIEW > >> >>>>>>>>> statement > >> >>>>>>>>>> <https://github.com/apache/flink/pull/27571> > >> >>>>>>>>>> - [FLINK-39062][table] Support ALTER VIEW SET WATERMARK > >> >>> syntax > >> >>>>>>>>>> <https://github.com/apache/flink/pull/27570> > >> >>>>>>>>>> > >> >>>>>>>>>> Implementation Timeline > >> >>>>>>>>>> > >> >>>>>>>>>> Estimated 6-8 weeks covering parser layer, planner layer, > >> catalog > >> >>>>>>>>>> integration, and comprehensive testing. > >> >>>>>>>>>> Request for Feedback > >> >>>>>>>>>> > >> >>>>>>>>>> This enhancement would significantly improve Flink's support > for > >> >>>>>>>> layered > >> >>>>>>>>>> data architectures and flexible event-time processing. I'm > happy > >> >>> to > >> >>>>>>>>> provide > >> >>>>>>>>>> more details or start a formal FLIP process if the community > >> sees > >> >>>>>>> value > >> >>>>>>>>> in > >> >>>>>>>>>> this proposal. > >> >>>>>>>>>> > >> >>>>>>>>>> Looking forward to the community's feedback! > >> >>>>>>>>>> > >> >>>>>>>>>> Best regards, > >> >>>>>>>>>> > >> >>>>>>>>>> Feat Zhang > >> >>>>>>>>>> > >> >>>>>>>>>> FLIP-XXX: Support Watermark in Flink SQL View > >> >>>>>>>>>> < > >> >>>>>>>>> > >> >>>>>>>> > >> >>>>>>> > >> >>>>> > >> >>> > >> > https://drive.google.com/open?id=1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y > >> >>>>>>>>>> > >> >>>>>>>>>> > >> >>>>>>>>> > >> >>>>>>>>> > >> >>>>>>>> > >> >>>>>>> > >> >>>>>> > >> >>>>> > >> >>>>> > >> >>> > >> >
