Hi everyone,
I think we all agree that we clearly want this functionality, just the
"how" needs to be discussed. I also like Lincoln’s suggestion of
introducing a built-in PTF for this, I had similar ideas in mind.
There are two issues with a APPLY_WATERMARK function, but both on the
short-term roadmap:
1) This function would need to be a function that takes an expression.
Ideally as a lambda function. Newer Calcite versions have already lambda
expression support. At Confluent we were planning to work on a Calcite
upgrade this quarter especially to get lambda support in and improve
built-in functions that work on collections.
2) User-defined PTFs are currently not able to emit watermarks. We could
introduce a new interface WatermarkFunction (similar to
ChangelogFunction) that would offer this to everyone. Alternatively, we
could only use the PTF signature, but translate to a specialized
ExecNode similar how we do it for VECTOR_SEARCH and ML_PREDICT.
In any case, even if we go with the function approach, we definitely
need a full FLIP on this.
Thanks,
Timo
On 12.02.26 08:25, Gyula Fóra wrote:
Hi All!
I would like to chime in here quickly from a slightly different angle.
While I am the first to admit that I cannot grasp all the planning /
conceptual implications, I also feel the need for more flexible watermark
handling as suggested by Feat.
Anything that can only be applied to base/catalog tables is very limiting
from a usability perspective. Watermarks feel like they should be a simple
function that you can apply on a column/table as part of a query/view. For
example extract timestamp from a string convert to TS -> apply watermark
etc.
Users often receive the tables/catalogs as given and can only write
queries.
Fixing this would eliminate a long standing disconnect between the
datastream api flexible watermark handling compared to the currently very
restrictive SQL approach.
Cheers
Gyula
On Thu, Feb 12, 2026 at 7:54 AM FeatZhang <[email protected]> wrote:
Hi Timo, Lincoln,
Thank you both for the detailed feedback.
I agree with the concern that non-materialized SQL views should remain a
pure logical abstraction. Introducing watermark definitions directly
into CREATE
VIEW or ALTER VIEW could blur the boundary between logical aliasing and
physical planning semantics, especially considering optimization barriers
and watermark propagation behavior.
Lincoln’s suggestion of introducing a built-in function such as:
APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column), watermark_expression)
is a cleaner direction. It keeps watermark definition as an explicit
relational transformation rather than attaching additional semantics to
views.
However, to fully address the original use cases (especially logical reuse
and layered lakehouse architectures), I propose that the table parameter
should support:
- Base tables
- Non-materialized views
If APPLY_WATERMARK can accept both, we can:
- Preserve the conceptual purity of SQL views
- Avoid redefining view semantics
- Still enable logical reuse via views
- Allow different watermark strategies over the same logical relation
In other words, watermark definition becomes an explicit relational
operator applied on top of any logical relation, instead of being embedded
into the view definition itself.
From a planner perspective, this keeps the model consistent:
- The function expands into a relational node
- No optimization barrier is introduced by views
- Watermark handling remains part of the logical plan transformation
I will prepare a PR to prototype APPLY_WATERMARK with support for both base
tables and non-materialized views, and share it for further discussion.
Looking forward to your thoughts.
Best,
Feat
Lincoln Lee <[email protected]> 于2026年2月12日周四 12:19写道:
Agree with Timo’s point regarding the conceptual semantics. We should not
directly extend non-materialized views with additional watermark
definitions.
Regarding the use case mentioned by Feat, defining different watermark
strategies
for the same data source, especially in the case of catalog tables, we
are
exploring a possible solution introducing a built-in function:
```sql
APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column), watermark_expression)
```
This function only support base tables as input and not support views,
subqueries
or derived relations.
This would address a meaningful subset of the identified use cases
without
redefining the role of SQL views.
Best,
Lincoln Lee
Timo Walther <[email protected]> 于2026年2月11日周三 23:29写道:
Hi Feat,
thanks for proposing this FLIP. We had similar discussions in the past,
but so far could never reach consensus.
SQL views are actually a very simple concept, they just give SQL text
an
alias. A view has no other properties except for the view definition.
Everything else is dynamically computed when the SQL text is inserted
into the larger plan. A view is never evaluated without the surrounding
plan.
Watermarks in the middle of a pipeline raise a couple of tricky issues:
- What if the upstream table is updating, how would you deal with
watermarks in the downstream view?
- What if the upstream table emits already watermarks? Would the view
catch them and discard this information?
- A view usually dissolves into the plan (e.g. via projection or filter
pushdown). Would a watermark definition suddenly introduce an
optimization barrier? If this is an optimization barrier, is this still
a view or a new concept? E.g. a "materialized view" or "pre-planned
view"?
Cheers,
Timo
On 11.02.26 03:51, FeatZhang wrote:
Hi Flink Community,
I'd like to propose adding watermark support for SQL Views in Flink
to
better support event-time processing scenarios.
Problem Statement
Currently, Flink SQL views cannot define watermarks. This limitation
creates several challenges:
- *Broken Abstraction*: Users must reference underlying table
watermarks
directly, exposing implementation details
- *No Flexibility*: Cannot define different watermark strategies
for
different use cases on the same data source
- *Limited Architecture Support*: Incompatible with modern
layered
data
architectures (Bronze/Silver/Gold medallion pattern)
For example, in multi-tenant scenarios, different tenants may require
different lateness tolerance, but currently we cannot create views
with
different watermark strategies on the same source table.
Proposed Solution
I propose adding two SQL syntax options to support watermark
definitions
in
views:
*Option 1: CREATE VIEW with WATERMARK*
CREATE VIEW user_activity
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDAS SELECT
user_id, event_time, action FROM raw_events;
*Option 2: ALTER VIEW SET WATERMARK*
ALTER VIEW user_activity SET WATERMARK FOR event_time AS event_time -
INTERVAL '5' SECOND;
Key Design Aspects
- *Backward Compatibility*: Watermark stored as optional metadata
in
view options; existing views continue to work unchanged
- *Validation*: Watermark column must exist in view schema and be
of
TIMESTAMP/TIMESTAMP_LTZ type
- *Storage*: Watermark metadata persists in catalog options map
(works
with all catalog implementations)
- *Propagation*: Follows existing Flink watermark propagation
rules
in
joins and nested views
Use Case Example: Data Lakehouse Architecture
-- Bronze: Raw data (no watermark)CREATE TABLE bronze_events
(raw_data
STRING, ingestion_time TIMESTAMP(3)) WITH (...);
-- Silver: Cleaned data with watermarkCREATE VIEW silver_events
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECONDAS
SELECT
CAST(JSON_VALUE(raw_data, '$.event_time') AS TIMESTAMP(3)) AS
event_time,
JSON_VALUE(raw_data, '$.user_id') AS user_idFROM
bronze_eventsWHERE JSON_VALUE(raw_data, '$.event_time') IS NOT NULL;
-- Gold: AggregationsSELECT TUMBLE_START(event_time, INTERVAL '1'
HOUR), COUNT(*)FROM silver_eventsGROUP BY TUMBLE(event_time, INTERVAL
'1' HOUR);
Reference Materials
- FLIP Document: FLIP-XXX: Support Watermark in Flink SQL View
<
https://docs.google.com/document/d/1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y/edit?usp=sharing
- JIRA Issue: https://issues.apache.org/jira/browse/FLINK-39062
- Implementation POC:
- [FLINK-39062][table] Support WATERMARK clause in CREATE VIEW
statement
<https://github.com/apache/flink/pull/27571>
- [FLINK-39062][table] Support ALTER VIEW SET WATERMARK syntax
<https://github.com/apache/flink/pull/27570>
Implementation Timeline
Estimated 6-8 weeks covering parser layer, planner layer, catalog
integration, and comprehensive testing.
Request for Feedback
This enhancement would significantly improve Flink's support for
layered
data architectures and flexible event-time processing. I'm happy to
provide
more details or start a formal FLIP process if the community sees
value
in
this proposal.
Looking forward to the community's feedback!
Best regards,
Feat Zhang
FLIP-XXX: Support Watermark in Flink SQL View
<
https://drive.google.com/open?id=1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y