Hi FeatZhang,

I have some last minute feedback on this:

1) No custom syntax

The FLIP should use the Lambda Expression syntax introduced by Calcite here [2]

(order_time) -> order_time - INTERVAL '5' SECOND

Changes to the Calcite parser should not be necessary for this FLIP. With Sergey's recent Calcite upgrade [1], both PTFs and Lambda Expressions are now fully supported by the Calcite stack. Lambdas are not fully integrated into Flink's type system but it is likely that this will follow soon. APPLY_WATERMARK can be implemented using Calcite's SqlOperator class stack, until the BridgingSqlFunction/FunctionDefinition stack is ready to take Lambdas.

2) Custom options

Could there be a need to customize the APPLY_WATERMARK with configuration options? Should we prepare for this already? Maybe a `config => MAP<STRING, STRING>` optional parameter similar to VECTOR_SEARCH PTF?

Otherwise I'm +1 on this FLIP.

Cheers,
Timo


[1] https://github.com/apache/flink/pull/28152
[2] https://github.com/apache/calcite/pull/1733

On 20.05.26 10:29, FeatZhang wrote:
Hi Xuyang,

Thanks a lot for the confirmation, and for the very thorough back-and-forth
throughout this discussion — it really helped sharpen the proposal,
especially around the "arbitrary position" semantics, the planner-side
contract, and the catalog persistence story. Much appreciated!

@all: with Xuyang's last round of feedback addressed, the FLIP has
stabilized.

If anyone — Timo, Sergey, Yunfeng, Jingsong, Ron, Shengkai, and others
who have been following the thread — still has open concerns, please let
me know in the next ~3 days. Otherwise I'll start the VOTE thread.

Thanks again to everyone for the reviews!

Best,
FeatZhang

On Mon, May 18, 2026 at 5:20 PM Xuyang <[email protected]> wrote:

Hi FeatZhang,

Thanks for the update and the revised proposal! I don't have further
questions. Looking forward to seeing this move forward.



--

     Best!
     Xuyang



在 2026-05-16 13:44:20,"FeatZhang" <[email protected]> 写道:
Hi Xuyang,

Thanks a lot for the careful follow-ups — these are exactly the right
questions to ask before we move to a vote. After re-checking the code
paths you pointed at, I want to revise several statements I made in my
previous reply, and tighten the FLIP accordingly. I'll go through them
one by one.

1. Semantics at any position in the pipeline
=============================================
Fully agree. To make this concrete, I'll add a new sub-section
"Semantics at Different Pipeline Positions" to the FLIP, covering:

  - APPLY_WATERMARK on a base table (with or without an existing
    DDL watermark)
  - APPLY_WATERMARK on top of a non-materialized view / sub-query
  - APPLY_WATERMARK applied multiple times in the same query
  - Interaction with TUMBLE / HOP / SESSION / CUMULATE
  - Interaction with joins (regular / interval / temporal)

The mental model will be explicitly aligned with the DataStream API:
each APPLY_WATERMARK in SQL corresponds to one
`assignTimestampsAndWatermarks(...)` call in the DataStream pipeline,
applied in the order they appear.

2. Monotonicity validation
==========================
You're right, and I want to correct my earlier reply. Today's
`CREATE TABLE ... WATERMARK FOR ... AS ...` does NOT enforce
monotonicity at the planner level — the planner only checks that:

  - the rowtime column exists and is of TIMESTAMP / TIMESTAMP_LTZ,
  - the watermark expression is a valid scalar expression over the
    table's schema and resolves to a comparable type.

Monotonicity is a runtime contract: the WatermarkAssignerOperator
emits watermarks that are guaranteed to be non-decreasing.

APPLY_WATERMARK will follow exactly the same contract — no stricter
planner-level monotonicity check. I'll update the FLIP's "Planner
Changes → Validation" section to reflect this.

3. Override timing — clarification
==================================
I think this was a wording issue on my side rather than a real
design disagreement. Let me restate it:

  - APPLY_WATERMARK introduces a dedicated WatermarkAssigner node in
    the plan. Whether the input already carries a watermark or not,
    the plan ends up with the new assigner positioned downstream of
    the existing one.
  - At runtime there is no "merge" or "reconciliation": each
    WatermarkAssigner operator independently emits its own watermark
    stream; downstream operators simply observe the watermark from
    the most recent upstream assigner.

This is the same model as calling `assignTimestampsAndWatermarks()`
twice in DataStream — the second call wins because it sits later in
the operator chain, not because of any planner-level magic.

So "planner-level override" was a poor choice of words. The correct
description is: **the planner decides the operator topology; the
runtime emits watermarks according to that topology**, exactly like
DataStream. I'll rephrase the FLIP accordingly and drop the
"override" terminology.

4. "Watermark expression evaluation: needs to support arbitrary
   expressions"
====================================================================
Apologies, this statement was inaccurate. After re-checking,
StreamExecWatermarkAssigner already evaluates the watermark
expression through the standard `ExprCodeGenerator`, which supports
the same scalar expressions as DDL today (arithmetic on TIMESTAMP /
TIMESTAMP_LTZ, INTERVAL arithmetic, scalar UDFs, etc.).

What APPLY_WATERMARK actually needs from the ExecNode is:

  - resolving the rowtime column index from the DESCRIPTOR, since the
    input may be a non-base-table (view / sub-query / projected
    relation),
  - wiring the watermark expression's input row to the upstream
    operator's output row instead of a TableScan output.

No new expression capability is required. I'll fix this in the FLIP.

5. State management
===================
You're right, this should be removed. In the scope of this FLIP the
APPLY_WATERMARK ExecNode is **stateless**, identical to the existing
`WatermarkAssignerOperator`. It does not buffer rows and does not
evict late data; late-data handling remains the responsibility of
the downstream window / join operators, exactly as it works today.

The "state management" bullet in my previous reply was speculation
about future watermark strategies (idle source detection, etc.) and
does not belong in this FLIP. I'll drop it.

------------------------------------------------------------
Updated summary of the design after this round
------------------------------------------------------------

  - Scope: base tables, non-materialized views, sub-queries; any
    relation position in the query.
  - Semantics: aligned with DataStream API
    `assignTimestampsAndWatermarks()`; multiple applications in the
    same query are positional, not "overriding".
  - Validation: same contract as today's DDL — scalar expression on
    a TIMESTAMP / TIMESTAMP_LTZ rowtime column; no planner-level
    monotonicity check.
  - ExecNode: stateless, reuses StreamExecWatermarkAssigner with
    minor wiring changes (rowtime column resolution + non-TableScan
    input handling).
  - Out of scope: WatermarkFunction interface, idle-source state,
    runtime-level merge of multiple watermark strategies.

I'll update the FLIP document and PR #27984 to match the points
above, and post a diff summary here once it's done.

Thanks again for pushing on these — the FLIP is much cleaner after
this round.

Best regards,
FeatZhang

On Mon, May 11, 2026 at 10:45 AM Xuyang <[email protected]> wrote:

Hi FeatZhang. Thanks for the detailed responses. I have a few follow-up
comments and questions:


1. Support for APPLY_WATERMARK at any node/position
I generally agree with the direction that APPLY_WATERMARK should be
applicable at any node and any position — one or more times — similar to
how the DataStream API allows watermark assignment. However, I think we
need to clearly articulate the precise behavior/semantics in each
scenario
to reduce user confusion. Aligning the mental model with the DataStream
API
(where users can call assignTimestampsAndWatermarks() at arbitrary
points
in the pipeline) would also help lower the learning curve.
2. Monotonicity validation of watermark expressions
Why do we need to enforce monotonicity guarantees on the watermark
expression at the planner level? As far as I know, Flink SQL currently
does
NOT perform such validation at the DDL level for CREATE TABLE ...
WATERMARK
FOR ... AS .... What it actually does is ensure at runtime that emitted
watermarks are non-decreasing. If the existing DDL path does not
validate
monotonicity at planning time, why should APPLY_WATERMARK introduce a
stricter contract?
3. Planner-level watermark override
Could you elaborate more on why the watermark override must happen at
the
planner level? If I understand correctly, in the DataStream API, users
can
define different watermark strategies at different nodes in the same
pipeline, and the runtime handles watermark propagation naturally.
4. "Watermark expression evaluation: Needs to support arbitrary
expressions"
You mentioned that StreamExecWatermarkAssigner currently has limitations
in watermark expression evaluation and "needs to support arbitrary
expressions." Could you clarify what the current limitations are
exactly?
Today's CREATE TABLE ... WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
already supports scalar expressions, what additional expression types
does
APPLY_WATERMARK require that are not already supported?
5. State management
You mentioned "State management". Are we proposing to introduce a
stateful
watermark assigner node that evicts late data? This sounds like a
significant change that goes well beyond the scope of this FLIP. The
current WatermarkAssigner is stateless, it simply computes and emits
watermarks without buffering data.
Looking forward to your clarification!





--

     Best!
     Xuyang



At 2026-05-09 19:08:32, "熊饶饶" <[email protected]> wrote:
Hi Feat Zhang,

+1 (non-binding)

This is a well-motivated proposal that addresses a real pain point in
Flink SQL. The inability to define watermarks in views currently forces
users to either:

Reference underlying table watermarks directly, breaking encapsulation
Create separate physical tables for each watermark strategy, leading to
data duplication
The proposed syntax CREATE VIEW ... WATERMARK FOR col AS expr is
intuitive and aligns naturally with existing DDL watermark semantics.
The
backward-compatible design (storing watermark metadata in catalog
options)
is also a smart choice — it avoids breaking existing views while
enabling
the new capability.

The Data Lakehouse / medallion architecture use case is particularly
compelling. Being able to define watermark strategies at the Silver/Gold
layer while keeping Bronze as raw data would significantly simplify
pipeline design for many teams.

Looking forward to seeing this move to a formal FLIP!

Best regards,
Raorao Xiong

2026年5月7日 20:56,FeatZhang <[email protected]> 写道:

Hi Xuyang,

Thank you for the thorough review and thoughtful questions.

*Problem this FLIP aims to solve*: The core goal of this FLIP is to
allow
watermarks to be defined on *computed columns (and, more generally,
on
any
column produced inside a SQL query) directly in SQL statements*.
Today,
watermarks in Flink SQL can only be declared at the CREATE TABLE
level
via WATERMARK
FOR ... AS ..., which means the time attribute must be a column
visible
at
the base table DDL. This makes it impossible to attach a watermark
to a
timestamp derived inside a query — for example, one computed by
string
parsing, JSON extraction, or any expression inside a view or
subquery —
without pushing that computation back down into the source table
DDL. By
introducing APPLY_WATERMARK as an explicit relational operator that
can
be
applied to *base tables, non-materialized views, and subqueries*,
users
can
assign watermark semantics to any (computed) column produced by a
query,
which also addresses the broader motivations listed in the FLIP:
broken
layered-pipeline abstractions, lack of per-query / multi-tenant
watermark
strategies, and the current gap between SQL and the DataStream API.

Now let me address each of your points:
Support for Non-materialized Views

You raised an excellent point about the scope of APPLY_WATERMARK in
layered
architectures.

*My position*: APPLY_WATERMARK should support base tables,
non-materialized
views, *and subqueries* (this matches the Goals section of the FLIP).
Here's why:

   - Non-materialized views dissolve into the surrounding plan during
   optimization (inline expansion)
   - There's no physical "view" node in the execution plan—just a
logical
   alias
   - The watermark becomes a relational transformation applied on top
of
   the view's / subquery's output

The key design principle is: *watermark definition is an explicit
relational operator, not attached metadata*.

I'd also like to clarify the positions in the prior thread to avoid
confusion:

   - *Lincoln* originally proposed APPLY_WATERMARK(table,
DESCRIPTOR(col),
   expr) scoped to base tables only.
   - *Timo* raised the concern about blurring the view abstraction
("*A
   view usually dissolves into the plan … would a watermark definition
   suddenly introduce an optimization barrier? If this is an
optimization
   barrier, is this still a view or a new concept?*"). This is exactly
why
   this FLIP does *not* attach watermarks to views via CREATE VIEW /
ALTER
   VIEW, and keeps views as pure logical aliases.
   - *Gyula* emphasized that watermark assignment should be available
on
   views and subqueries too, consistent with the DataStream API.

To address Timo's concern concretely:

   - Watermark semantics are applied at query planning time via an
explicit
   relational operator (APPLY_WATERMARK), not hidden in view/catalog
metadata.
   - No watermark information is persisted into the catalog for views
the
   catalog stays unchanged (see FLIP "Catalog Changes": *No catalog
changes
   are required*).
   - Views continue to dissolve transparently into the plan; the
   optimization barrier only appears where APPLY_WATERMARK is
explicitly
used.

Monotonicity Validation

Great question! Monotonicity guarantees are essential for watermark
correctness:

   - Watermarks define the boundary of "late" data
   - If the watermark expression is not monotonically non-decreasing,
the
   watermark could move backward
   - This would cause data that was previously considered "on-time"
to be
   treated as late (or vice-versa), breaking event-time semantics

*Validation requirement*: In line with the FLIP's Planner Changes
section,
the planner validates that watermark_expression is a valid *scalar*
expression
over columns of the input schema, and (as with today's CREATE TABLE
...
WATERMARK FOR ... AS ...) the expression must produce a monotonically
non-decreasing value relative to the designated rowtime column.

Typical valid forms are the same as what's allowed in DDL today, for
example:

-- Bounded out-of-orderness (most common)
APPLY_WATERMARK(t, DESCRIPTOR(ts), ts - INTERVAL '5' SECOND)
-- Strictly ascending
APPLY_WATERMARK(t, DESCRIPTOR(ts), ts)

Note: watermark_expression is a *scalar expression* per the FLIP
(not an
aggregate / window function). Richer forms such as user-defined
watermark
strategies are explicitly out of scope for this FLIP and are tracked
as
a
future WatermarkFunction interface, which also depends on the Calcite
lambda upgrade mentioned by Timo.
Override Timing (Planner vs Runtime)

You raised a valid concern. Let me clarify the design, which aligns
with
the FLIP's "Planner Changes → Interaction with Existing Table
Watermarks"
section:

*Current proposal*: Planner-level override

   - During query compilation, when the input to APPLY_WATERMARK
already
   carries a watermark (e.g., from CREATE TABLE ... WATERMARK), the
   LogicalWatermarkAssigner node produced by APPLY_WATERMARK
*overrides*
the
   upstream watermark strategy.
   - When the input has no watermark (e.g., a view or a subquery),
   APPLY_WATERMARK introduces a new one.
   - This makes override behavior explicit in the plan and keeps room
for
   standard optimizations.

*Why not runtime-level override*:

   1. Planner-level override keeps watermark semantics a first-class,
   visible part of the plan (consistent with how VECTOR_SEARCH /
ML_PREDICT
   are modeled as specialized ExecNodes in the FLIP).
   2. The override point is deterministic and inspectable via EXPLAIN.
   3. Simpler execution model — no dual-watermark reconciliation at
runtime.

If concrete use cases for a runtime-level override surface later, we
could
revisit this via a hint (e.g., /*+ RUNTIME_OVERRIDE */), but it's not
part
of this FLIP.
Relationship with BuiltInProcessTableFunction

Good observation. APPLY_WATERMARK is declared as a built-in PTF at
the
SQL
surface, similar in spirit to TO_CHANGELOG / FROM_CHANGELOG, but the
FLIP
intentionally maps it to a *specialized ExecNode* rather than a
generic
PTF
runtime — the same pattern used by VECTOR_SEARCH and ML_PREDICT.

*Option A: Reuse the generic BuiltInProcessTableFunction runtime*

   - Pros: Consistent with other built-in PTFs at the runtime layer.
   - Cons: Watermark assignment is not a row-transforming PTF — it
changes
   stream metadata (time attribute + watermark strategy). Forcing it
through
   the generic PTF runtime would require extending the PTF contract
with
   watermark semantics.

*Option B: Dedicated LogicalWatermarkAssigner + specialized ExecNode
(the
FLIP's choice)*

   - Pros: Keeps watermark semantics a first-class citizen in the
planner;
   cleanly integrates with watermark propagation rules; no need to
overload
   the PTF contract; same pattern as VECTOR_SEARCH / ML_PREDICT
already
   established in Flink.
   - Cons: A new dedicated node, though that cost is small compared to
the
   semantic clarity.

*Current decision*: Option B, as stated in the FLIP
("*APPLY_WATERMARK
compiles to a specialized ExecNode --- similar to how VECTOR_SEARCH
and
ML_PREDICT are handled*"). Open to revisiting based on community
feedback.
StreamExecWatermarkAssigner Sufficiency

For the physical implementation:

*Yes, StreamExecWatermarkAssigner should be sufficient*, with some
modifications:

   1. *Input handling*: Currently assumes direct table scan; needs to
   handle APPLY_WATERMARK's column mapping
   2. *Watermark expression evaluation*: Needs to support arbitrary
   expressions (currently limited)
   3. *State management*: May need additional state for handling
   out-of-order events

The key insight is that APPLY_WATERMARK conceptually translates to:

TableScan -> Calc (expression evaluation) -> WatermarkAssigner

StreamExecWatermarkAssigner handles the last step; the Calc step
handles
the expression.
------------------------------
Summary of Proposed Responses

   - *Scope of input*: Support base tables, non-materialized views
*and
   subqueries* (per FLIP Goals); views/catalog semantics stay
unchanged.
   - *Monotonicity*: Validate watermark_expression as a scalar
expression;
   same monotonicity contract as today's DDL watermarks.
   - *Override timing*: Planner-level override at
LogicalWatermarkAssigner;
   potential /*+ RUNTIME_OVERRIDE */ hint as future work.
   - *PTF reuse*: Dedicated LogicalWatermarkAssigner + specialized
ExecNode
   (same pattern as VECTOR_SEARCH / ML_PREDICT).
   - *ExecNode sufficiency*: StreamExecWatermarkAssigner is sufficient
with
   minor modifications (input handling + expression evaluation).

------------------------------

Looking forward to further discussion!

Best regards,
FeatZhang


On Wed, May 6, 2026 at 4:39 PM Xuyang <[email protected]> wrote:

Hi, FeatZhang. Thanks for driving this discussion. I've read through
the
full FLIP and the mailing list context, and I have a few questions:
1. If I understand correctly, in a Layered Data Architecture,
silver_events would typically be a table, a materialized view, or a
materialized table. From the mailing list discussion, it seems like
no
consensus was reached on this point. I think we still need to
consider
whether APPLY_WATERMARK should be allowed on (non-materialized)
views.
2. In the Planner Changes section under Logical Plan, could you
elaborate
on why monotonicity guarantees need to be ensured for the watermark
expression validation?
3. (nit) In the Watermark Override part under Planner Changes,
shouldn't
the override of the upstream watermark happen at runtime rather than
at the
planner level?
4. I feel that APPLY_WATERMARK is quite similar to TO_CHANGELOG and
FROM_CHANGELOG. Is what we actually need just a
BuiltInProcessTableFunction? That way, we would only need to further
extend
ProcessTableFunction to support this.
5. If we choose to translate APPLY_WATERMARK into a specialized
ExecNode
(similar to VECTOR_SEARCH and ML_PREDICT), would the existing
StreamExecWatermarkAssigner be sufficient for this purpose?





--

    Best!
    Xuyang



At 2026-04-21 22:22:33, "FeatZhang" <[email protected]> wrote:
Hi everyone,

Thank you for the feedback and discussions on the initial proposal.
I've
revised the FLIP based on the community's input and would like to
share
the updated version.


FLIP-XXX: Support Flexible Watermark Assignment via Built-in
Function
<


https://drive.google.com/open?id=17PXYAi6Pb91OqFhVVK7tRULiaHAb6wiX79jjC0NaaDA



KEY UPDATES
===========

The proposal has evolved from "Support Watermark Definition in SQL
Views"
to a more flexible and powerful approach: FLIP-XXX: Support
Flexible
Watermark Assignment via APPLY_WATERMARK Function.

What's Changed:

1. Broader Scope: Instead of limiting watermark definitions to SQL
views
  only, the new proposal introduces a built-in table function
  APPLY_WATERMARK that works with:
  - Base tables
  - Views (both regular and materialized)
  - Subqueries
  - Any table-valued expressions

2. More Flexible Design: The function-based approach provides:
  - Dynamic watermark assignment at query time without modifying
catalog
    metadata
  - Override capability for existing watermark strategies
  - Composability with other SQL operations
  - No need for DDL changes or catalog write permissions

3. Better SQL Semantics: Using a table function follows SQL
standard
  patterns and integrates naturally with Flink's existing function
  ecosystem.

UPDATED FLIP DOCUMENT
=====================

The revised FLIP is now available at:
https://iwiki.woa.com/p/4019879693

Key sections include:
- Motivation and use cases
- Public interfaces and SQL syntax
- Implementation plan
- Compatibility analysis
- Test plan

EXAMPLE USAGE
=============

-- Apply watermark to a view
SELECT *FROM APPLY_WATERMARK(my_view, DESCRIPTOR(event_time),
event_time - INTERVAL '5' SECOND);
-- Override existing watermark strategy
SELECT *FROM APPLY_WATERMARK(my_table_with_watermark,
DESCRIPTOR(ts),
ts - INTERVAL '10' SECOND -- Different from DDL watermark
);
-- Use in complex queries
SELECT window_start,
      window_end,
      COUNT(*)FROM TABLE(TUMBLE(TABLE APPLY_WATERMARK(orders,
DESCRIPTOR(order_time), order_time - INTERVAL '5' SECOND),
DESCRIPTOR(order_time), INTERVAL '1' HOUR))GROUP BY window_start,
        window_end;


IMPLEMENTATION PROGRESS
=======================

I've also opened a draft PR #27984 with the initial implementation:
- Core built-in function definition
- SQL-to-RelNode conversion rules
- Physical plan integration
- Unit tests and documentation (English + Chinese)

The PR is available at:
https://github.com/apache/flink/pull/27984

REQUEST FOR FEEDBACK
====================

I would appreciate your thoughts on:

1. Function naming: Is APPLY_WATERMARK clear and intuitive?
  (Alternative considered: WITH_WATERMARK, SET_WATERMARK)

2. DESCRIPTOR syntax: Using DESCRIPTOR(column_name) to specify the
  rowtime column—does this align well with Flink's existing
patterns?

3. Override behavior: Should APPLY_WATERMARK always override
existing
  watermarks, or should we provide a mode parameter
  (e.g., OVERRIDE, MERGE)?

4. Performance considerations: Any concerns about the
function-based
  approach vs. catalog-level watermark definitions?





Looking forward to your valuable feedback!

Best regards,
FeatZhang

On Thu, Feb 12, 2026 at 6:24 PM Timo Walther <[email protected]>
wrote:

Hi everyone,

I think we all agree that we clearly want this functionality, just
the
"how" needs to be discussed. I also like Lincoln’s suggestion of
introducing a built-in PTF for this, I had similar ideas in mind.

There are two issues with a APPLY_WATERMARK function, but both on
the
short-term roadmap:

1) This function would need to be a function that takes an
expression.
Ideally as a lambda function. Newer Calcite versions have already
lambda
expression support. At Confluent we were planning to work on a
Calcite
upgrade this quarter especially to get lambda support in and
improve
built-in functions that work on collections.

2) User-defined PTFs are currently not able to emit watermarks. We
could
introduce a new interface WatermarkFunction (similar to
ChangelogFunction) that would offer this to everyone.
Alternatively,
we
could only use the PTF signature, but translate to a specialized
ExecNode similar how we do it for VECTOR_SEARCH and ML_PREDICT.

In any case, even if we go with the function approach, we
definitely
need a full FLIP on this.

Thanks,
Timo

On 12.02.26 08:25, Gyula Fóra wrote:
Hi All!
I would like to chime in here quickly from a slightly different
angle.
While I am the first to admit that I cannot grasp all the
planning /
conceptual implications, I also feel the need for more flexible
watermark
handling as suggested by Feat.

Anything that can only be applied to base/catalog tables is very
limiting
from a usability perspective. Watermarks feel like they should
be a
simple
function that you can apply on a column/table as part of a
query/view.
For
example extract timestamp from a string convert to TS -> apply
watermark
etc.

Users often receive the tables/catalogs as given and can only
write
queries.

Fixing this would eliminate a long standing disconnect between
the
datastream api flexible watermark handling compared to the
currently
very
restrictive SQL approach.

Cheers
Gyula

On Thu, Feb 12, 2026 at 7:54 AM FeatZhang <[email protected]>
wrote:

Hi Timo, Lincoln,

Thank you both for the detailed feedback.

I agree with the concern that non-materialized SQL views should
remain a
pure logical abstraction. Introducing watermark definitions
directly
into CREATE
VIEW or ALTER VIEW could blur the boundary between logical
aliasing
and
physical planning semantics, especially considering optimization
barriers
and watermark propagation behavior.

Lincoln’s suggestion of introducing a built-in function such as:

APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column),
watermark_expression)

is a cleaner direction. It keeps watermark definition as an
explicit
relational transformation rather than attaching additional
semantics
to
views.

However, to fully address the original use cases (especially
logical
reuse
and layered lakehouse architectures), I propose that the table
parameter
should support:

    - Base tables
    - Non-materialized views

If APPLY_WATERMARK can accept both, we can:

    - Preserve the conceptual purity of SQL views
    - Avoid redefining view semantics
    - Still enable logical reuse via views
    - Allow different watermark strategies over the same logical
relation

In other words, watermark definition becomes an explicit
relational
operator applied on top of any logical relation, instead of
being
embedded
into the view definition itself.

 From a planner perspective, this keeps the model consistent:

    - The function expands into a relational node
    - No optimization barrier is introduced by views
    - Watermark handling remains part of the logical plan
transformation

I will prepare a PR to prototype APPLY_WATERMARK with support
for
both
base
tables and non-materialized views, and share it for further
discussion.

Looking forward to your thoughts.

Best,
Feat


Lincoln Lee <[email protected]> 于2026年2月12日周四 12:19写道:

Agree with Timo’s point regarding the conceptual semantics. We
should
not
directly extend non-materialized views with additional
watermark
definitions.

Regarding the use case mentioned by Feat, defining different
watermark
strategies
for the same data source, especially in the case of catalog
tables,
we
are
exploring a possible solution introducing a built-in function:
```sql
APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column),
watermark_expression)
```
This function only support base tables as input and not support
views,
subqueries
or derived relations.

This would address a meaningful subset of the identified use
cases
without
redefining the role of SQL views.


Best,
Lincoln Lee


Timo Walther <[email protected]> 于2026年2月11日周三 23:29写道:

Hi Feat,

thanks for proposing this FLIP. We had similar discussions in
the
past,
but so far could never reach consensus.

SQL views are actually a very simple concept, they just give
SQL
text
an
alias. A view has no other properties except for the view
definition.
Everything else is dynamically computed when the SQL text is
inserted
into the larger plan. A view is never evaluated without the
surrounding
plan.

Watermarks in the middle of a pipeline raise a couple of
tricky
issues:

- What if the upstream table is updating, how would you deal
with
watermarks in the downstream view?
- What if the upstream table emits already watermarks? Would
the
view
catch them and discard this information?
- A view usually dissolves into the plan (e.g. via projection
or
filter
pushdown). Would a watermark definition suddenly introduce an
optimization barrier? If this is an optimization barrier, is
this
still
a view or a new concept? E.g. a "materialized view" or
"pre-planned
view"?

Cheers,
Timo



On 11.02.26 03:51, FeatZhang wrote:
Hi Flink Community,

I'd like to propose adding watermark support for SQL Views in
Flink
to
better support event-time processing scenarios.
Problem Statement

Currently, Flink SQL views cannot define watermarks. This
limitation
creates several challenges:

     - *Broken Abstraction*: Users must reference underlying
table
watermarks
     directly, exposing implementation details
     - *No Flexibility*: Cannot define different watermark
strategies
for
     different use cases on the same data source
     - *Limited Architecture Support*: Incompatible with
modern
layered
data
     architectures (Bronze/Silver/Gold medallion pattern)

For example, in multi-tenant scenarios, different tenants may
require
different lateness tolerance, but currently we cannot create
views
with
different watermark strategies on the same source table.
Proposed Solution

I propose adding two SQL syntax options to support watermark
definitions
in
views:

*Option 1: CREATE VIEW with WATERMARK*

CREATE VIEW user_activity
WATERMARK FOR event_time AS event_time - INTERVAL '5'
SECONDAS
SELECT
user_id, event_time, action FROM raw_events;

*Option 2: ALTER VIEW SET WATERMARK*

ALTER VIEW user_activity SET WATERMARK FOR event_time AS
event_time -
INTERVAL '5' SECOND;

Key Design Aspects

     - *Backward Compatibility*: Watermark stored as optional
metadata
in
     view options; existing views continue to work unchanged
     - *Validation*: Watermark column must exist in view
schema
and
be
of
     TIMESTAMP/TIMESTAMP_LTZ type
     - *Storage*: Watermark metadata persists in catalog
options
map
(works
     with all catalog implementations)
     - *Propagation*: Follows existing Flink watermark
propagation
rules
in
     joins and nested views

Use Case Example: Data Lakehouse Architecture

-- Bronze: Raw data (no watermark)CREATE TABLE bronze_events
(raw_data
STRING, ingestion_time TIMESTAMP(3)) WITH (...);
-- Silver: Cleaned data with watermarkCREATE VIEW
silver_events
WATERMARK FOR event_time AS event_time - INTERVAL '10'
SECONDAS
SELECT
      CAST(JSON_VALUE(raw_data, '$.event_time') AS
TIMESTAMP(3))
AS
event_time,
      JSON_VALUE(raw_data, '$.user_id') AS user_idFROM
bronze_eventsWHERE JSON_VALUE(raw_data, '$.event_time') IS
NOT
NULL;
-- Gold: AggregationsSELECT TUMBLE_START(event_time, INTERVAL
'1'
HOUR), COUNT(*)FROM silver_eventsGROUP BY TUMBLE(event_time,
INTERVAL
'1' HOUR);

Reference Materials

     - FLIP Document: FLIP-XXX: Support Watermark in Flink SQL
View
     <






https://docs.google.com/document/d/1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y/edit?usp=sharing

     - JIRA Issue:
https://issues.apache.org/jira/browse/FLINK-39062
     - Implementation POC:
     - [FLINK-39062][table] Support WATERMARK clause in CREATE
VIEW
statement
     <https://github.com/apache/flink/pull/27571>
     - [FLINK-39062][table] Support ALTER VIEW SET WATERMARK
syntax
     <https://github.com/apache/flink/pull/27570>

Implementation Timeline

Estimated 6-8 weeks covering parser layer, planner layer,
catalog
integration, and comprehensive testing.
Request for Feedback

This enhancement would significantly improve Flink's support
for
layered
data architectures and flexible event-time processing. I'm
happy
to
provide
more details or start a formal FLIP process if the community
sees
value
in
this proposal.

Looking forward to the community's feedback!

Best regards,

Feat Zhang

   FLIP-XXX: Support Watermark in Flink SQL View
<






https://drive.google.com/open?id=1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y















Reply via email to