Hi Timo,

Thanks a lot for the last-minute feedback. This is very helpful.

I agree with both points.

1) Lambda syntax / no custom parser changes

After re-checking the current Calcite state in Flink, I agree that the FLIP
should use Calcite's lambda expression syntax instead of inventing a custom
watermark-expression syntax.

With the recent Calcite upgrade in Flink, the Calcite parser stack already
supports lambda expressions and PTF-related syntax. So this FLIP should not
require any Calcite parser changes.

I will update the public interface from:

  APPLY_WATERMARK(
      input,
      DESCRIPTOR(order_time),
      order_time - INTERVAL '5' SECOND)

to something like:

  APPLY_WATERMARK(
      input,
      DESCRIPTOR(order_time),
      (order_time) -> order_time - INTERVAL '5' SECOND)

The lambda parameter type can be inferred from the DESCRIPTOR column. The
validation rule will remain aligned with today's DDL watermark contract:

  - the descriptor column must be TIMESTAMP / TIMESTAMP_LTZ;
  - the lambda body must produce a valid watermark expression for that time
    attribute;
  - no planner-level monotonicity check is introduced.

Implementation-wise, I will describe this as using Calcite's SqlOperator
stack for now. I agree that Flink's FunctionDefinition /
BridgingSqlFunction path is not the right dependency point until lambda
operands are fully integrated there. APPLY_WATERMARK can still lower to the
dedicated LogicalWatermarkAssigner / ExecNode path as discussed before.

2) Optional config parameter

Good point. I agree it is worth preparing an extension point for future
configuration.

I will add an optional named parameter:

  config => MAP<STRING, STRING>

so the full form becomes:

  APPLY_WATERMARK(
      input,
      DESCRIPTOR(order_time),
      (order_time) -> order_time - INTERVAL '5' SECOND,
      config => MAP['key', 'value'])

For this FLIP, the default is an empty config and no runtime behavior
changes
are introduced by config options. I will also clarify that unsupported
config
keys should fail validation rather than being silently ignored. Future FLIPs
can define concrete options if needed.

I will update the FLIP accordingly:
  - replace the scalar-expression argument with a lambda argument;
  - remove any implication that custom parser changes are needed;
  - clarify that the first implementation can use Calcite SqlOperator until
    Flink's FunctionDefinition stack supports lambda operands;
  - add the optional config parameter as an extension point;
  - keep the existing specialized ExecNode / StreamExecWatermarkAssigner
    implementation strategy.

Thanks again. With these changes, I think the FLIP aligns better with the
current Calcite direction and remains forward-compatible with Flink's
function
stack evolution.

Best,
FeatZhang


On Tue, May 26, 2026 at 4:19 PM Timo Walther <[email protected]> wrote:

> Hi FeatZhang,
>
> I have some last minute feedback on this:
>
> 1) No custom syntax
>
> The FLIP should use the Lambda Expression syntax introduced by Calcite
> here [2]
>
> (order_time) -> order_time - INTERVAL '5' SECOND
>
> Changes to the Calcite parser should not be necessary for this FLIP.
> With Sergey's recent Calcite upgrade [1], both PTFs and Lambda
> Expressions are now fully supported by the Calcite stack. Lambdas are
> not fully integrated into Flink's type system but it is likely that this
> will follow soon. APPLY_WATERMARK can be implemented using Calcite's
> SqlOperator class stack, until the
> BridgingSqlFunction/FunctionDefinition stack is ready to take Lambdas.
>
> 2) Custom options
>
> Could there be a need to customize the APPLY_WATERMARK with
> configuration options? Should we prepare for this already? Maybe a
> `config => MAP<STRING, STRING>` optional parameter similar to
> VECTOR_SEARCH PTF?
>
> Otherwise I'm +1 on this FLIP.
>
> Cheers,
> Timo
>
>
> [1] https://github.com/apache/flink/pull/28152
> [2] https://github.com/apache/calcite/pull/1733
>
> On 20.05.26 10:29, FeatZhang wrote:
> > Hi Xuyang,
> >
> > Thanks a lot for the confirmation, and for the very thorough
> back-and-forth
> > throughout this discussion — it really helped sharpen the proposal,
> > especially around the "arbitrary position" semantics, the planner-side
> > contract, and the catalog persistence story. Much appreciated!
> >
> > @all: with Xuyang's last round of feedback addressed, the FLIP has
> > stabilized.
> >
> > If anyone — Timo, Sergey, Yunfeng, Jingsong, Ron, Shengkai, and others
> > who have been following the thread — still has open concerns, please let
> > me know in the next ~3 days. Otherwise I'll start the VOTE thread.
> >
> > Thanks again to everyone for the reviews!
> >
> > Best,
> > FeatZhang
> >
> > On Mon, May 18, 2026 at 5:20 PM Xuyang <[email protected]> wrote:
> >
> >> Hi FeatZhang,
> >>
> >> Thanks for the update and the revised proposal! I don't have further
> >> questions. Looking forward to seeing this move forward.
> >>
> >>
> >>
> >> --
> >>
> >>      Best!
> >>      Xuyang
> >>
> >>
> >>
> >> 在 2026-05-16 13:44:20,"FeatZhang" <[email protected]> 写道:
> >>> Hi Xuyang,
> >>>
> >>> Thanks a lot for the careful follow-ups — these are exactly the right
> >>> questions to ask before we move to a vote. After re-checking the code
> >>> paths you pointed at, I want to revise several statements I made in my
> >>> previous reply, and tighten the FLIP accordingly. I'll go through them
> >>> one by one.
> >>>
> >>> 1. Semantics at any position in the pipeline
> >>> =============================================
> >>> Fully agree. To make this concrete, I'll add a new sub-section
> >>> "Semantics at Different Pipeline Positions" to the FLIP, covering:
> >>>
> >>>   - APPLY_WATERMARK on a base table (with or without an existing
> >>>     DDL watermark)
> >>>   - APPLY_WATERMARK on top of a non-materialized view / sub-query
> >>>   - APPLY_WATERMARK applied multiple times in the same query
> >>>   - Interaction with TUMBLE / HOP / SESSION / CUMULATE
> >>>   - Interaction with joins (regular / interval / temporal)
> >>>
> >>> The mental model will be explicitly aligned with the DataStream API:
> >>> each APPLY_WATERMARK in SQL corresponds to one
> >>> `assignTimestampsAndWatermarks(...)` call in the DataStream pipeline,
> >>> applied in the order they appear.
> >>>
> >>> 2. Monotonicity validation
> >>> ==========================
> >>> You're right, and I want to correct my earlier reply. Today's
> >>> `CREATE TABLE ... WATERMARK FOR ... AS ...` does NOT enforce
> >>> monotonicity at the planner level — the planner only checks that:
> >>>
> >>>   - the rowtime column exists and is of TIMESTAMP / TIMESTAMP_LTZ,
> >>>   - the watermark expression is a valid scalar expression over the
> >>>     table's schema and resolves to a comparable type.
> >>>
> >>> Monotonicity is a runtime contract: the WatermarkAssignerOperator
> >>> emits watermarks that are guaranteed to be non-decreasing.
> >>>
> >>> APPLY_WATERMARK will follow exactly the same contract — no stricter
> >>> planner-level monotonicity check. I'll update the FLIP's "Planner
> >>> Changes → Validation" section to reflect this.
> >>>
> >>> 3. Override timing — clarification
> >>> ==================================
> >>> I think this was a wording issue on my side rather than a real
> >>> design disagreement. Let me restate it:
> >>>
> >>>   - APPLY_WATERMARK introduces a dedicated WatermarkAssigner node in
> >>>     the plan. Whether the input already carries a watermark or not,
> >>>     the plan ends up with the new assigner positioned downstream of
> >>>     the existing one.
> >>>   - At runtime there is no "merge" or "reconciliation": each
> >>>     WatermarkAssigner operator independently emits its own watermark
> >>>     stream; downstream operators simply observe the watermark from
> >>>     the most recent upstream assigner.
> >>>
> >>> This is the same model as calling `assignTimestampsAndWatermarks()`
> >>> twice in DataStream — the second call wins because it sits later in
> >>> the operator chain, not because of any planner-level magic.
> >>>
> >>> So "planner-level override" was a poor choice of words. The correct
> >>> description is: **the planner decides the operator topology; the
> >>> runtime emits watermarks according to that topology**, exactly like
> >>> DataStream. I'll rephrase the FLIP accordingly and drop the
> >>> "override" terminology.
> >>>
> >>> 4. "Watermark expression evaluation: needs to support arbitrary
> >>>    expressions"
> >>> ====================================================================
> >>> Apologies, this statement was inaccurate. After re-checking,
> >>> StreamExecWatermarkAssigner already evaluates the watermark
> >>> expression through the standard `ExprCodeGenerator`, which supports
> >>> the same scalar expressions as DDL today (arithmetic on TIMESTAMP /
> >>> TIMESTAMP_LTZ, INTERVAL arithmetic, scalar UDFs, etc.).
> >>>
> >>> What APPLY_WATERMARK actually needs from the ExecNode is:
> >>>
> >>>   - resolving the rowtime column index from the DESCRIPTOR, since the
> >>>     input may be a non-base-table (view / sub-query / projected
> >>>     relation),
> >>>   - wiring the watermark expression's input row to the upstream
> >>>     operator's output row instead of a TableScan output.
> >>>
> >>> No new expression capability is required. I'll fix this in the FLIP.
> >>>
> >>> 5. State management
> >>> ===================
> >>> You're right, this should be removed. In the scope of this FLIP the
> >>> APPLY_WATERMARK ExecNode is **stateless**, identical to the existing
> >>> `WatermarkAssignerOperator`. It does not buffer rows and does not
> >>> evict late data; late-data handling remains the responsibility of
> >>> the downstream window / join operators, exactly as it works today.
> >>>
> >>> The "state management" bullet in my previous reply was speculation
> >>> about future watermark strategies (idle source detection, etc.) and
> >>> does not belong in this FLIP. I'll drop it.
> >>>
> >>> ------------------------------------------------------------
> >>> Updated summary of the design after this round
> >>> ------------------------------------------------------------
> >>>
> >>>   - Scope: base tables, non-materialized views, sub-queries; any
> >>>     relation position in the query.
> >>>   - Semantics: aligned with DataStream API
> >>>     `assignTimestampsAndWatermarks()`; multiple applications in the
> >>>     same query are positional, not "overriding".
> >>>   - Validation: same contract as today's DDL — scalar expression on
> >>>     a TIMESTAMP / TIMESTAMP_LTZ rowtime column; no planner-level
> >>>     monotonicity check.
> >>>   - ExecNode: stateless, reuses StreamExecWatermarkAssigner with
> >>>     minor wiring changes (rowtime column resolution + non-TableScan
> >>>     input handling).
> >>>   - Out of scope: WatermarkFunction interface, idle-source state,
> >>>     runtime-level merge of multiple watermark strategies.
> >>>
> >>> I'll update the FLIP document and PR #27984 to match the points
> >>> above, and post a diff summary here once it's done.
> >>>
> >>> Thanks again for pushing on these — the FLIP is much cleaner after
> >>> this round.
> >>>
> >>> Best regards,
> >>> FeatZhang
> >>>
> >>> On Mon, May 11, 2026 at 10:45 AM Xuyang <[email protected]> wrote:
> >>>
> >>>> Hi FeatZhang. Thanks for the detailed responses. I have a few
> follow-up
> >>>> comments and questions:
> >>>>
> >>>>
> >>>> 1. Support for APPLY_WATERMARK at any node/position
> >>>> I generally agree with the direction that APPLY_WATERMARK should be
> >>>> applicable at any node and any position — one or more times — similar
> to
> >>>> how the DataStream API allows watermark assignment. However, I think
> we
> >>>> need to clearly articulate the precise behavior/semantics in each
> >> scenario
> >>>> to reduce user confusion. Aligning the mental model with the
> DataStream
> >> API
> >>>> (where users can call assignTimestampsAndWatermarks() at arbitrary
> >> points
> >>>> in the pipeline) would also help lower the learning curve.
> >>>> 2. Monotonicity validation of watermark expressions
> >>>> Why do we need to enforce monotonicity guarantees on the watermark
> >>>> expression at the planner level? As far as I know, Flink SQL currently
> >> does
> >>>> NOT perform such validation at the DDL level for CREATE TABLE ...
> >> WATERMARK
> >>>> FOR ... AS .... What it actually does is ensure at runtime that
> emitted
> >>>> watermarks are non-decreasing. If the existing DDL path does not
> >> validate
> >>>> monotonicity at planning time, why should APPLY_WATERMARK introduce a
> >>>> stricter contract?
> >>>> 3. Planner-level watermark override
> >>>> Could you elaborate more on why the watermark override must happen at
> >> the
> >>>> planner level? If I understand correctly, in the DataStream API, users
> >> can
> >>>> define different watermark strategies at different nodes in the same
> >>>> pipeline, and the runtime handles watermark propagation naturally.
> >>>> 4. "Watermark expression evaluation: Needs to support arbitrary
> >>>> expressions"
> >>>> You mentioned that StreamExecWatermarkAssigner currently has
> limitations
> >>>> in watermark expression evaluation and "needs to support arbitrary
> >>>> expressions." Could you clarify what the current limitations are
> >> exactly?
> >>>> Today's CREATE TABLE ... WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
> >>>> already supports scalar expressions, what additional expression types
> >> does
> >>>> APPLY_WATERMARK require that are not already supported?
> >>>> 5. State management
> >>>> You mentioned "State management". Are we proposing to introduce a
> >> stateful
> >>>> watermark assigner node that evicts late data? This sounds like a
> >>>> significant change that goes well beyond the scope of this FLIP. The
> >>>> current WatermarkAssigner is stateless, it simply computes and emits
> >>>> watermarks without buffering data.
> >>>> Looking forward to your clarification!
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>>
> >>>>      Best!
> >>>>      Xuyang
> >>>>
> >>>>
> >>>>
> >>>> At 2026-05-09 19:08:32, "熊饶饶" <[email protected]> wrote:
> >>>>> Hi Feat Zhang,
> >>>>>
> >>>>> +1 (non-binding)
> >>>>>
> >>>>> This is a well-motivated proposal that addresses a real pain point in
> >>>> Flink SQL. The inability to define watermarks in views currently
> forces
> >>>> users to either:
> >>>>>
> >>>>> Reference underlying table watermarks directly, breaking
> encapsulation
> >>>>> Create separate physical tables for each watermark strategy, leading
> to
> >>>> data duplication
> >>>>> The proposed syntax CREATE VIEW ... WATERMARK FOR col AS expr is
> >>>> intuitive and aligns naturally with existing DDL watermark semantics.
> >> The
> >>>> backward-compatible design (storing watermark metadata in catalog
> >> options)
> >>>> is also a smart choice — it avoids breaking existing views while
> >> enabling
> >>>> the new capability.
> >>>>>
> >>>>> The Data Lakehouse / medallion architecture use case is particularly
> >>>> compelling. Being able to define watermark strategies at the
> Silver/Gold
> >>>> layer while keeping Bronze as raw data would significantly simplify
> >>>> pipeline design for many teams.
> >>>>>
> >>>>> Looking forward to seeing this move to a formal FLIP!
> >>>>>
> >>>>> Best regards,
> >>>>> Raorao Xiong
> >>>>>
> >>>>>> 2026年5月7日 20:56,FeatZhang <[email protected]> 写道:
> >>>>>>
> >>>>>> Hi Xuyang,
> >>>>>>
> >>>>>> Thank you for the thorough review and thoughtful questions.
> >>>>>>
> >>>>>> *Problem this FLIP aims to solve*: The core goal of this FLIP is to
> >>>> allow
> >>>>>> watermarks to be defined on *computed columns (and, more generally,
> >> on
> >>>> any
> >>>>>> column produced inside a SQL query) directly in SQL statements*.
> >> Today,
> >>>>>> watermarks in Flink SQL can only be declared at the CREATE TABLE
> >> level
> >>>>>> via WATERMARK
> >>>>>> FOR ... AS ..., which means the time attribute must be a column
> >> visible
> >>>> at
> >>>>>> the base table DDL. This makes it impossible to attach a watermark
> >> to a
> >>>>>> timestamp derived inside a query — for example, one computed by
> >> string
> >>>>>> parsing, JSON extraction, or any expression inside a view or
> >> subquery —
> >>>>>> without pushing that computation back down into the source table
> >> DDL. By
> >>>>>> introducing APPLY_WATERMARK as an explicit relational operator that
> >> can
> >>>> be
> >>>>>> applied to *base tables, non-materialized views, and subqueries*,
> >> users
> >>>> can
> >>>>>> assign watermark semantics to any (computed) column produced by a
> >> query,
> >>>>>> which also addresses the broader motivations listed in the FLIP:
> >> broken
> >>>>>> layered-pipeline abstractions, lack of per-query / multi-tenant
> >>>> watermark
> >>>>>> strategies, and the current gap between SQL and the DataStream API.
> >>>>>>
> >>>>>> Now let me address each of your points:
> >>>>>> Support for Non-materialized Views
> >>>>>>
> >>>>>> You raised an excellent point about the scope of APPLY_WATERMARK in
> >>>> layered
> >>>>>> architectures.
> >>>>>>
> >>>>>> *My position*: APPLY_WATERMARK should support base tables,
> >>>> non-materialized
> >>>>>> views, *and subqueries* (this matches the Goals section of the
> FLIP).
> >>>>>> Here's why:
> >>>>>>
> >>>>>>    - Non-materialized views dissolve into the surrounding plan
> during
> >>>>>>    optimization (inline expansion)
> >>>>>>    - There's no physical "view" node in the execution plan—just a
> >> logical
> >>>>>>    alias
> >>>>>>    - The watermark becomes a relational transformation applied on
> top
> >> of
> >>>>>>    the view's / subquery's output
> >>>>>>
> >>>>>> The key design principle is: *watermark definition is an explicit
> >>>>>> relational operator, not attached metadata*.
> >>>>>>
> >>>>>> I'd also like to clarify the positions in the prior thread to avoid
> >>>>>> confusion:
> >>>>>>
> >>>>>>    - *Lincoln* originally proposed APPLY_WATERMARK(table,
> >>>> DESCRIPTOR(col),
> >>>>>>    expr) scoped to base tables only.
> >>>>>>    - *Timo* raised the concern about blurring the view abstraction
> >> ("*A
> >>>>>>    view usually dissolves into the plan … would a watermark
> definition
> >>>>>>    suddenly introduce an optimization barrier? If this is an
> >> optimization
> >>>>>>    barrier, is this still a view or a new concept?*"). This is
> exactly
> >>>> why
> >>>>>>    this FLIP does *not* attach watermarks to views via CREATE VIEW /
> >>>> ALTER
> >>>>>>    VIEW, and keeps views as pure logical aliases.
> >>>>>>    - *Gyula* emphasized that watermark assignment should be
> available
> >> on
> >>>>>>    views and subqueries too, consistent with the DataStream API.
> >>>>>>
> >>>>>> To address Timo's concern concretely:
> >>>>>>
> >>>>>>    - Watermark semantics are applied at query planning time via an
> >>>> explicit
> >>>>>>    relational operator (APPLY_WATERMARK), not hidden in view/catalog
> >>>> metadata.
> >>>>>>    - No watermark information is persisted into the catalog for
> views
> >> —
> >>>> the
> >>>>>>    catalog stays unchanged (see FLIP "Catalog Changes": *No catalog
> >>>> changes
> >>>>>>    are required*).
> >>>>>>    - Views continue to dissolve transparently into the plan; the
> >>>>>>    optimization barrier only appears where APPLY_WATERMARK is
> >> explicitly
> >>>> used.
> >>>>>>
> >>>>>> Monotonicity Validation
> >>>>>>
> >>>>>> Great question! Monotonicity guarantees are essential for watermark
> >>>>>> correctness:
> >>>>>>
> >>>>>>    - Watermarks define the boundary of "late" data
> >>>>>>    - If the watermark expression is not monotonically
> non-decreasing,
> >> the
> >>>>>>    watermark could move backward
> >>>>>>    - This would cause data that was previously considered "on-time"
> >> to be
> >>>>>>    treated as late (or vice-versa), breaking event-time semantics
> >>>>>>
> >>>>>> *Validation requirement*: In line with the FLIP's Planner Changes
> >>>> section,
> >>>>>> the planner validates that watermark_expression is a valid *scalar*
> >>>> expression
> >>>>>> over columns of the input schema, and (as with today's CREATE TABLE
> >> ...
> >>>>>> WATERMARK FOR ... AS ...) the expression must produce a
> monotonically
> >>>>>> non-decreasing value relative to the designated rowtime column.
> >>>>>>
> >>>>>> Typical valid forms are the same as what's allowed in DDL today, for
> >>>>>> example:
> >>>>>>
> >>>>>> -- Bounded out-of-orderness (most common)
> >>>>>> APPLY_WATERMARK(t, DESCRIPTOR(ts), ts - INTERVAL '5' SECOND)
> >>>>>> -- Strictly ascending
> >>>>>> APPLY_WATERMARK(t, DESCRIPTOR(ts), ts)
> >>>>>>
> >>>>>> Note: watermark_expression is a *scalar expression* per the FLIP
> >> (not an
> >>>>>> aggregate / window function). Richer forms such as user-defined
> >>>> watermark
> >>>>>> strategies are explicitly out of scope for this FLIP and are tracked
> >> as
> >>>> a
> >>>>>> future WatermarkFunction interface, which also depends on the
> Calcite
> >>>>>> lambda upgrade mentioned by Timo.
> >>>>>> Override Timing (Planner vs Runtime)
> >>>>>>
> >>>>>> You raised a valid concern. Let me clarify the design, which aligns
> >> with
> >>>>>> the FLIP's "Planner Changes → Interaction with Existing Table
> >>>> Watermarks"
> >>>>>> section:
> >>>>>>
> >>>>>> *Current proposal*: Planner-level override
> >>>>>>
> >>>>>>    - During query compilation, when the input to APPLY_WATERMARK
> >> already
> >>>>>>    carries a watermark (e.g., from CREATE TABLE ... WATERMARK), the
> >>>>>>    LogicalWatermarkAssigner node produced by APPLY_WATERMARK
> >> *overrides*
> >>>> the
> >>>>>>    upstream watermark strategy.
> >>>>>>    - When the input has no watermark (e.g., a view or a subquery),
> >>>>>>    APPLY_WATERMARK introduces a new one.
> >>>>>>    - This makes override behavior explicit in the plan and keeps
> room
> >> for
> >>>>>>    standard optimizations.
> >>>>>>
> >>>>>> *Why not runtime-level override*:
> >>>>>>
> >>>>>>    1. Planner-level override keeps watermark semantics a
> first-class,
> >>>>>>    visible part of the plan (consistent with how VECTOR_SEARCH /
> >>>> ML_PREDICT
> >>>>>>    are modeled as specialized ExecNodes in the FLIP).
> >>>>>>    2. The override point is deterministic and inspectable via
> EXPLAIN.
> >>>>>>    3. Simpler execution model — no dual-watermark reconciliation at
> >>>> runtime.
> >>>>>>
> >>>>>> If concrete use cases for a runtime-level override surface later, we
> >>>> could
> >>>>>> revisit this via a hint (e.g., /*+ RUNTIME_OVERRIDE */), but it's
> not
> >>>> part
> >>>>>> of this FLIP.
> >>>>>> Relationship with BuiltInProcessTableFunction
> >>>>>>
> >>>>>> Good observation. APPLY_WATERMARK is declared as a built-in PTF at
> >> the
> >>>> SQL
> >>>>>> surface, similar in spirit to TO_CHANGELOG / FROM_CHANGELOG, but the
> >>>> FLIP
> >>>>>> intentionally maps it to a *specialized ExecNode* rather than a
> >> generic
> >>>> PTF
> >>>>>> runtime — the same pattern used by VECTOR_SEARCH and ML_PREDICT.
> >>>>>>
> >>>>>> *Option A: Reuse the generic BuiltInProcessTableFunction runtime*
> >>>>>>
> >>>>>>    - Pros: Consistent with other built-in PTFs at the runtime layer.
> >>>>>>    - Cons: Watermark assignment is not a row-transforming PTF — it
> >>>> changes
> >>>>>>    stream metadata (time attribute + watermark strategy). Forcing it
> >>>> through
> >>>>>>    the generic PTF runtime would require extending the PTF contract
> >> with
> >>>>>>    watermark semantics.
> >>>>>>
> >>>>>> *Option B: Dedicated LogicalWatermarkAssigner + specialized ExecNode
> >>>> (the
> >>>>>> FLIP's choice)*
> >>>>>>
> >>>>>>    - Pros: Keeps watermark semantics a first-class citizen in the
> >>>> planner;
> >>>>>>    cleanly integrates with watermark propagation rules; no need to
> >>>> overload
> >>>>>>    the PTF contract; same pattern as VECTOR_SEARCH / ML_PREDICT
> >> already
> >>>>>>    established in Flink.
> >>>>>>    - Cons: A new dedicated node, though that cost is small compared
> to
> >>>> the
> >>>>>>    semantic clarity.
> >>>>>>
> >>>>>> *Current decision*: Option B, as stated in the FLIP
> >> ("*APPLY_WATERMARK
> >>>>>> compiles to a specialized ExecNode --- similar to how VECTOR_SEARCH
> >> and
> >>>>>> ML_PREDICT are handled*"). Open to revisiting based on community
> >>>> feedback.
> >>>>>> StreamExecWatermarkAssigner Sufficiency
> >>>>>>
> >>>>>> For the physical implementation:
> >>>>>>
> >>>>>> *Yes, StreamExecWatermarkAssigner should be sufficient*, with some
> >>>>>> modifications:
> >>>>>>
> >>>>>>    1. *Input handling*: Currently assumes direct table scan; needs
> to
> >>>>>>    handle APPLY_WATERMARK's column mapping
> >>>>>>    2. *Watermark expression evaluation*: Needs to support arbitrary
> >>>>>>    expressions (currently limited)
> >>>>>>    3. *State management*: May need additional state for handling
> >>>>>>    out-of-order events
> >>>>>>
> >>>>>> The key insight is that APPLY_WATERMARK conceptually translates to:
> >>>>>>
> >>>>>> TableScan -> Calc (expression evaluation) -> WatermarkAssigner
> >>>>>>
> >>>>>> StreamExecWatermarkAssigner handles the last step; the Calc step
> >> handles
> >>>>>> the expression.
> >>>>>> ------------------------------
> >>>>>> Summary of Proposed Responses
> >>>>>>
> >>>>>>    - *Scope of input*: Support base tables, non-materialized views
> >> *and
> >>>>>>    subqueries* (per FLIP Goals); views/catalog semantics stay
> >> unchanged.
> >>>>>>    - *Monotonicity*: Validate watermark_expression as a scalar
> >>>> expression;
> >>>>>>    same monotonicity contract as today's DDL watermarks.
> >>>>>>    - *Override timing*: Planner-level override at
> >>>> LogicalWatermarkAssigner;
> >>>>>>    potential /*+ RUNTIME_OVERRIDE */ hint as future work.
> >>>>>>    - *PTF reuse*: Dedicated LogicalWatermarkAssigner + specialized
> >>>> ExecNode
> >>>>>>    (same pattern as VECTOR_SEARCH / ML_PREDICT).
> >>>>>>    - *ExecNode sufficiency*: StreamExecWatermarkAssigner is
> sufficient
> >>>> with
> >>>>>>    minor modifications (input handling + expression evaluation).
> >>>>>>
> >>>>>> ------------------------------
> >>>>>>
> >>>>>> Looking forward to further discussion!
> >>>>>>
> >>>>>> Best regards,
> >>>>>> FeatZhang
> >>>>>>
> >>>>>>
> >>>>>> On Wed, May 6, 2026 at 4:39 PM Xuyang <[email protected]> wrote:
> >>>>>>
> >>>>>>> Hi, FeatZhang. Thanks for driving this discussion. I've read
> through
> >>>> the
> >>>>>>> full FLIP and the mailing list context, and I have a few questions:
> >>>>>>> 1. If I understand correctly, in a Layered Data Architecture,
> >>>>>>> silver_events would typically be a table, a materialized view, or a
> >>>>>>> materialized table. From the mailing list discussion, it seems like
> >> no
> >>>>>>> consensus was reached on this point. I think we still need to
> >> consider
> >>>>>>> whether APPLY_WATERMARK should be allowed on (non-materialized)
> >> views.
> >>>>>>> 2. In the Planner Changes section under Logical Plan, could you
> >>>> elaborate
> >>>>>>> on why monotonicity guarantees need to be ensured for the watermark
> >>>>>>> expression validation?
> >>>>>>> 3. (nit) In the Watermark Override part under Planner Changes,
> >>>> shouldn't
> >>>>>>> the override of the upstream watermark happen at runtime rather
> than
> >>>> at the
> >>>>>>> planner level?
> >>>>>>> 4. I feel that APPLY_WATERMARK is quite similar to TO_CHANGELOG and
> >>>>>>> FROM_CHANGELOG. Is what we actually need just a
> >>>>>>> BuiltInProcessTableFunction? That way, we would only need to
> further
> >>>> extend
> >>>>>>> ProcessTableFunction to support this.
> >>>>>>> 5. If we choose to translate APPLY_WATERMARK into a specialized
> >>>> ExecNode
> >>>>>>> (similar to VECTOR_SEARCH and ML_PREDICT), would the existing
> >>>>>>> StreamExecWatermarkAssigner be sufficient for this purpose?
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>>
> >>>>>>>     Best!
> >>>>>>>     Xuyang
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> At 2026-04-21 22:22:33, "FeatZhang" <[email protected]> wrote:
> >>>>>>>> Hi everyone,
> >>>>>>>>
> >>>>>>>> Thank you for the feedback and discussions on the initial
> proposal.
> >>>> I've
> >>>>>>>> revised the FLIP based on the community's input and would like to
> >>>> share
> >>>>>>>> the updated version.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> FLIP-XXX: Support Flexible Watermark Assignment via Built-in
> >> Function
> >>>>>>>> <
> >>>>>>>
> >>>>
> >>
> https://drive.google.com/open?id=17PXYAi6Pb91OqFhVVK7tRULiaHAb6wiX79jjC0NaaDA
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> KEY UPDATES
> >>>>>>>> ===========
> >>>>>>>>
> >>>>>>>> The proposal has evolved from "Support Watermark Definition in SQL
> >>>> Views"
> >>>>>>>> to a more flexible and powerful approach: FLIP-XXX: Support
> >> Flexible
> >>>>>>>> Watermark Assignment via APPLY_WATERMARK Function.
> >>>>>>>>
> >>>>>>>> What's Changed:
> >>>>>>>>
> >>>>>>>> 1. Broader Scope: Instead of limiting watermark definitions to SQL
> >>>> views
> >>>>>>>>   only, the new proposal introduces a built-in table function
> >>>>>>>>   APPLY_WATERMARK that works with:
> >>>>>>>>   - Base tables
> >>>>>>>>   - Views (both regular and materialized)
> >>>>>>>>   - Subqueries
> >>>>>>>>   - Any table-valued expressions
> >>>>>>>>
> >>>>>>>> 2. More Flexible Design: The function-based approach provides:
> >>>>>>>>   - Dynamic watermark assignment at query time without modifying
> >>>> catalog
> >>>>>>>>     metadata
> >>>>>>>>   - Override capability for existing watermark strategies
> >>>>>>>>   - Composability with other SQL operations
> >>>>>>>>   - No need for DDL changes or catalog write permissions
> >>>>>>>>
> >>>>>>>> 3. Better SQL Semantics: Using a table function follows SQL
> >> standard
> >>>>>>>>   patterns and integrates naturally with Flink's existing function
> >>>>>>>>   ecosystem.
> >>>>>>>>
> >>>>>>>> UPDATED FLIP DOCUMENT
> >>>>>>>> =====================
> >>>>>>>>
> >>>>>>>> The revised FLIP is now available at:
> >>>>>>>> https://iwiki.woa.com/p/4019879693
> >>>>>>>>
> >>>>>>>> Key sections include:
> >>>>>>>> - Motivation and use cases
> >>>>>>>> - Public interfaces and SQL syntax
> >>>>>>>> - Implementation plan
> >>>>>>>> - Compatibility analysis
> >>>>>>>> - Test plan
> >>>>>>>>
> >>>>>>>> EXAMPLE USAGE
> >>>>>>>> =============
> >>>>>>>>
> >>>>>>>> -- Apply watermark to a view
> >>>>>>>> SELECT *FROM APPLY_WATERMARK(my_view, DESCRIPTOR(event_time),
> >>>>>>>> event_time - INTERVAL '5' SECOND);
> >>>>>>>> -- Override existing watermark strategy
> >>>>>>>> SELECT *FROM APPLY_WATERMARK(my_table_with_watermark,
> >> DESCRIPTOR(ts),
> >>>>>>>> ts - INTERVAL '10' SECOND -- Different from DDL watermark
> >>>>>>>> );
> >>>>>>>> -- Use in complex queries
> >>>>>>>> SELECT window_start,
> >>>>>>>>       window_end,
> >>>>>>>>       COUNT(*)FROM TABLE(TUMBLE(TABLE APPLY_WATERMARK(orders,
> >>>>>>>> DESCRIPTOR(order_time), order_time - INTERVAL '5' SECOND),
> >>>>>>>> DESCRIPTOR(order_time), INTERVAL '1' HOUR))GROUP BY window_start,
> >>>>>>>>         window_end;
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> IMPLEMENTATION PROGRESS
> >>>>>>>> =======================
> >>>>>>>>
> >>>>>>>> I've also opened a draft PR #27984 with the initial
> implementation:
> >>>>>>>> - Core built-in function definition
> >>>>>>>> - SQL-to-RelNode conversion rules
> >>>>>>>> - Physical plan integration
> >>>>>>>> - Unit tests and documentation (English + Chinese)
> >>>>>>>>
> >>>>>>>> The PR is available at:
> >>>>>>>> https://github.com/apache/flink/pull/27984
> >>>>>>>>
> >>>>>>>> REQUEST FOR FEEDBACK
> >>>>>>>> ====================
> >>>>>>>>
> >>>>>>>> I would appreciate your thoughts on:
> >>>>>>>>
> >>>>>>>> 1. Function naming: Is APPLY_WATERMARK clear and intuitive?
> >>>>>>>>   (Alternative considered: WITH_WATERMARK, SET_WATERMARK)
> >>>>>>>>
> >>>>>>>> 2. DESCRIPTOR syntax: Using DESCRIPTOR(column_name) to specify the
> >>>>>>>>   rowtime column—does this align well with Flink's existing
> >> patterns?
> >>>>>>>>
> >>>>>>>> 3. Override behavior: Should APPLY_WATERMARK always override
> >> existing
> >>>>>>>>   watermarks, or should we provide a mode parameter
> >>>>>>>>   (e.g., OVERRIDE, MERGE)?
> >>>>>>>>
> >>>>>>>> 4. Performance considerations: Any concerns about the
> >> function-based
> >>>>>>>>   approach vs. catalog-level watermark definitions?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Looking forward to your valuable feedback!
> >>>>>>>>
> >>>>>>>> Best regards,
> >>>>>>>> FeatZhang
> >>>>>>>>
> >>>>>>>> On Thu, Feb 12, 2026 at 6:24 PM Timo Walther <[email protected]>
> >>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi everyone,
> >>>>>>>>>
> >>>>>>>>> I think we all agree that we clearly want this functionality,
> just
> >>>> the
> >>>>>>>>> "how" needs to be discussed. I also like Lincoln’s suggestion of
> >>>>>>>>> introducing a built-in PTF for this, I had similar ideas in mind.
> >>>>>>>>>
> >>>>>>>>> There are two issues with a APPLY_WATERMARK function, but both on
> >> the
> >>>>>>>>> short-term roadmap:
> >>>>>>>>>
> >>>>>>>>> 1) This function would need to be a function that takes an
> >>>> expression.
> >>>>>>>>> Ideally as a lambda function. Newer Calcite versions have already
> >>>> lambda
> >>>>>>>>> expression support. At Confluent we were planning to work on a
> >>>> Calcite
> >>>>>>>>> upgrade this quarter especially to get lambda support in and
> >> improve
> >>>>>>>>> built-in functions that work on collections.
> >>>>>>>>>
> >>>>>>>>> 2) User-defined PTFs are currently not able to emit watermarks.
> We
> >>>> could
> >>>>>>>>> introduce a new interface WatermarkFunction (similar to
> >>>>>>>>> ChangelogFunction) that would offer this to everyone.
> >> Alternatively,
> >>>> we
> >>>>>>>>> could only use the PTF signature, but translate to a specialized
> >>>>>>>>> ExecNode similar how we do it for VECTOR_SEARCH and ML_PREDICT.
> >>>>>>>>>
> >>>>>>>>> In any case, even if we go with the function approach, we
> >> definitely
> >>>>>>>>> need a full FLIP on this.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Timo
> >>>>>>>>>
> >>>>>>>>> On 12.02.26 08:25, Gyula Fóra wrote:
> >>>>>>>>>> Hi All!
> >>>>>>>>>> I would like to chime in here quickly from a slightly different
> >>>> angle.
> >>>>>>>>>> While I am the first to admit that I cannot grasp all the
> >> planning /
> >>>>>>>>>> conceptual implications, I also feel the need for more flexible
> >>>>>>> watermark
> >>>>>>>>>> handling as suggested by Feat.
> >>>>>>>>>>
> >>>>>>>>>> Anything that can only be applied to base/catalog tables is very
> >>>>>>> limiting
> >>>>>>>>>> from a usability perspective. Watermarks feel like they should
> >> be a
> >>>>>>>>> simple
> >>>>>>>>>> function that you can apply on a column/table as part of a
> >>>> query/view.
> >>>>>>>>> For
> >>>>>>>>>> example extract timestamp from a string convert to TS -> apply
> >>>>>>> watermark
> >>>>>>>>>> etc.
> >>>>>>>>>>
> >>>>>>>>>> Users often receive the tables/catalogs as given and can only
> >> write
> >>>>>>>>>> queries.
> >>>>>>>>>>
> >>>>>>>>>> Fixing this would eliminate a long standing disconnect between
> >> the
> >>>>>>>>>> datastream api flexible watermark handling compared to the
> >> currently
> >>>>>>> very
> >>>>>>>>>> restrictive SQL approach.
> >>>>>>>>>>
> >>>>>>>>>> Cheers
> >>>>>>>>>> Gyula
> >>>>>>>>>>
> >>>>>>>>>> On Thu, Feb 12, 2026 at 7:54 AM FeatZhang <[email protected]
> >
> >>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Timo, Lincoln,
> >>>>>>>>>>>
> >>>>>>>>>>> Thank you both for the detailed feedback.
> >>>>>>>>>>>
> >>>>>>>>>>> I agree with the concern that non-materialized SQL views should
> >>>>>>> remain a
> >>>>>>>>>>> pure logical abstraction. Introducing watermark definitions
> >>>> directly
> >>>>>>>>>>> into CREATE
> >>>>>>>>>>> VIEW or ALTER VIEW could blur the boundary between logical
> >> aliasing
> >>>>>>> and
> >>>>>>>>>>> physical planning semantics, especially considering
> optimization
> >>>>>>>>> barriers
> >>>>>>>>>>> and watermark propagation behavior.
> >>>>>>>>>>>
> >>>>>>>>>>> Lincoln’s suggestion of introducing a built-in function such
> as:
> >>>>>>>>>>>
> >>>>>>>>>>> APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column),
> >>>>>>> watermark_expression)
> >>>>>>>>>>>
> >>>>>>>>>>> is a cleaner direction. It keeps watermark definition as an
> >>>> explicit
> >>>>>>>>>>> relational transformation rather than attaching additional
> >>>> semantics
> >>>>>>> to
> >>>>>>>>>>> views.
> >>>>>>>>>>>
> >>>>>>>>>>> However, to fully address the original use cases (especially
> >>>> logical
> >>>>>>>>> reuse
> >>>>>>>>>>> and layered lakehouse architectures), I propose that the table
> >>>>>>> parameter
> >>>>>>>>>>> should support:
> >>>>>>>>>>>
> >>>>>>>>>>>     - Base tables
> >>>>>>>>>>>     - Non-materialized views
> >>>>>>>>>>>
> >>>>>>>>>>> If APPLY_WATERMARK can accept both, we can:
> >>>>>>>>>>>
> >>>>>>>>>>>     - Preserve the conceptual purity of SQL views
> >>>>>>>>>>>     - Avoid redefining view semantics
> >>>>>>>>>>>     - Still enable logical reuse via views
> >>>>>>>>>>>     - Allow different watermark strategies over the same
> logical
> >>>>>>>>> relation
> >>>>>>>>>>>
> >>>>>>>>>>> In other words, watermark definition becomes an explicit
> >> relational
> >>>>>>>>>>> operator applied on top of any logical relation, instead of
> >> being
> >>>>>>>>> embedded
> >>>>>>>>>>> into the view definition itself.
> >>>>>>>>>>>
> >>>>>>>>>>>  From a planner perspective, this keeps the model consistent:
> >>>>>>>>>>>
> >>>>>>>>>>>     - The function expands into a relational node
> >>>>>>>>>>>     - No optimization barrier is introduced by views
> >>>>>>>>>>>     - Watermark handling remains part of the logical plan
> >>>>>>> transformation
> >>>>>>>>>>>
> >>>>>>>>>>> I will prepare a PR to prototype APPLY_WATERMARK with support
> >> for
> >>>>>>> both
> >>>>>>>>> base
> >>>>>>>>>>> tables and non-materialized views, and share it for further
> >>>>>>> discussion.
> >>>>>>>>>>>
> >>>>>>>>>>> Looking forward to your thoughts.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Feat
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Lincoln Lee <[email protected]> 于2026年2月12日周四 12:19写道:
> >>>>>>>>>>>
> >>>>>>>>>>>> Agree with Timo’s point regarding the conceptual semantics. We
> >>>>>>> should
> >>>>>>>>> not
> >>>>>>>>>>>> directly extend non-materialized views with additional
> >> watermark
> >>>>>>>>>>>> definitions.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regarding the use case mentioned by Feat, defining different
> >>>>>>> watermark
> >>>>>>>>>>>> strategies
> >>>>>>>>>>>> for the same data source, especially in the case of catalog
> >>>> tables,
> >>>>>>> we
> >>>>>>>>>>> are
> >>>>>>>>>>>> exploring a possible solution introducing a built-in function:
> >>>>>>>>>>>> ```sql
> >>>>>>>>>>>> APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column),
> >>>>>>>>> watermark_expression)
> >>>>>>>>>>>> ```
> >>>>>>>>>>>> This function only support base tables as input and not
> support
> >>>>>>> views,
> >>>>>>>>>>>> subqueries
> >>>>>>>>>>>> or derived relations.
> >>>>>>>>>>>>
> >>>>>>>>>>>> This would address a meaningful subset of the identified use
> >> cases
> >>>>>>>>>>> without
> >>>>>>>>>>>> redefining the role of SQL views.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best,
> >>>>>>>>>>>> Lincoln Lee
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Timo Walther <[email protected]> 于2026年2月11日周三 23:29写道:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Feat,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> thanks for proposing this FLIP. We had similar discussions in
> >> the
> >>>>>>>>> past,
> >>>>>>>>>>>>> but so far could never reach consensus.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> SQL views are actually a very simple concept, they just give
> >> SQL
> >>>>>>> text
> >>>>>>>>>>> an
> >>>>>>>>>>>>> alias. A view has no other properties except for the view
> >>>>>>> definition.
> >>>>>>>>>>>>> Everything else is dynamically computed when the SQL text is
> >>>>>>> inserted
> >>>>>>>>>>>>> into the larger plan. A view is never evaluated without the
> >>>>>>>>> surrounding
> >>>>>>>>>>>>> plan.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Watermarks in the middle of a pipeline raise a couple of
> >> tricky
> >>>>>>>>> issues:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> - What if the upstream table is updating, how would you deal
> >> with
> >>>>>>>>>>>>> watermarks in the downstream view?
> >>>>>>>>>>>>> - What if the upstream table emits already watermarks? Would
> >> the
> >>>>>>> view
> >>>>>>>>>>>>> catch them and discard this information?
> >>>>>>>>>>>>> - A view usually dissolves into the plan (e.g. via projection
> >> or
> >>>>>>>>> filter
> >>>>>>>>>>>>> pushdown). Would a watermark definition suddenly introduce an
> >>>>>>>>>>>>> optimization barrier? If this is an optimization barrier, is
> >> this
> >>>>>>>>> still
> >>>>>>>>>>>>> a view or a new concept? E.g. a "materialized view" or
> >>>> "pre-planned
> >>>>>>>>>>>> view"?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 11.02.26 03:51, FeatZhang wrote:
> >>>>>>>>>>>>>> Hi Flink Community,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I'd like to propose adding watermark support for SQL Views
> in
> >>>>>>> Flink
> >>>>>>>>>>> to
> >>>>>>>>>>>>>> better support event-time processing scenarios.
> >>>>>>>>>>>>>> Problem Statement
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Currently, Flink SQL views cannot define watermarks. This
> >>>>>>> limitation
> >>>>>>>>>>>>>> creates several challenges:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>      - *Broken Abstraction*: Users must reference underlying
> >>>> table
> >>>>>>>>>>>>> watermarks
> >>>>>>>>>>>>>>      directly, exposing implementation details
> >>>>>>>>>>>>>>      - *No Flexibility*: Cannot define different watermark
> >>>>>>> strategies
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>      different use cases on the same data source
> >>>>>>>>>>>>>>      - *Limited Architecture Support*: Incompatible with
> >> modern
> >>>>>>>>>>> layered
> >>>>>>>>>>>>> data
> >>>>>>>>>>>>>>      architectures (Bronze/Silver/Gold medallion pattern)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> For example, in multi-tenant scenarios, different tenants
> may
> >>>>>>> require
> >>>>>>>>>>>>>> different lateness tolerance, but currently we cannot create
> >>>> views
> >>>>>>>>>>> with
> >>>>>>>>>>>>>> different watermark strategies on the same source table.
> >>>>>>>>>>>>>> Proposed Solution
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I propose adding two SQL syntax options to support watermark
> >>>>>>>>>>>> definitions
> >>>>>>>>>>>>> in
> >>>>>>>>>>>>>> views:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> *Option 1: CREATE VIEW with WATERMARK*
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> CREATE VIEW user_activity
> >>>>>>>>>>>>>> WATERMARK FOR event_time AS event_time - INTERVAL '5'
> >> SECONDAS
> >>>>>>> SELECT
> >>>>>>>>>>>>>> user_id, event_time, action FROM raw_events;
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> *Option 2: ALTER VIEW SET WATERMARK*
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> ALTER VIEW user_activity SET WATERMARK FOR event_time AS
> >>>>>>> event_time -
> >>>>>>>>>>>>>> INTERVAL '5' SECOND;
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Key Design Aspects
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>      - *Backward Compatibility*: Watermark stored as
> optional
> >>>>>>>>> metadata
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>      view options; existing views continue to work unchanged
> >>>>>>>>>>>>>>      - *Validation*: Watermark column must exist in view
> >> schema
> >>>>>>> and
> >>>>>>>>> be
> >>>>>>>>>>>> of
> >>>>>>>>>>>>>>      TIMESTAMP/TIMESTAMP_LTZ type
> >>>>>>>>>>>>>>      - *Storage*: Watermark metadata persists in catalog
> >> options
> >>>>>>> map
> >>>>>>>>>>>>> (works
> >>>>>>>>>>>>>>      with all catalog implementations)
> >>>>>>>>>>>>>>      - *Propagation*: Follows existing Flink watermark
> >>>> propagation
> >>>>>>>>>>> rules
> >>>>>>>>>>>>> in
> >>>>>>>>>>>>>>      joins and nested views
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Use Case Example: Data Lakehouse Architecture
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> -- Bronze: Raw data (no watermark)CREATE TABLE bronze_events
> >>>>>>>>>>> (raw_data
> >>>>>>>>>>>>>> STRING, ingestion_time TIMESTAMP(3)) WITH (...);
> >>>>>>>>>>>>>> -- Silver: Cleaned data with watermarkCREATE VIEW
> >> silver_events
> >>>>>>>>>>>>>> WATERMARK FOR event_time AS event_time - INTERVAL '10'
> >> SECONDAS
> >>>>>>>>>>> SELECT
> >>>>>>>>>>>>>>       CAST(JSON_VALUE(raw_data, '$.event_time') AS
> >> TIMESTAMP(3))
> >>>>>>> AS
> >>>>>>>>>>>>> event_time,
> >>>>>>>>>>>>>>       JSON_VALUE(raw_data, '$.user_id') AS user_idFROM
> >>>>>>>>>>>>>> bronze_eventsWHERE JSON_VALUE(raw_data, '$.event_time') IS
> >> NOT
> >>>>>>> NULL;
> >>>>>>>>>>>>>> -- Gold: AggregationsSELECT TUMBLE_START(event_time,
> INTERVAL
> >>>> '1'
> >>>>>>>>>>>>>> HOUR), COUNT(*)FROM silver_eventsGROUP BY TUMBLE(event_time,
> >>>>>>> INTERVAL
> >>>>>>>>>>>>>> '1' HOUR);
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Reference Materials
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>      - FLIP Document: FLIP-XXX: Support Watermark in Flink
> SQL
> >>>>>>> View
> >>>>>>>>>>>>>>      <
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y/edit?usp=sharing
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>      - JIRA Issue:
> >>>>>>> https://issues.apache.org/jira/browse/FLINK-39062
> >>>>>>>>>>>>>>      - Implementation POC:
> >>>>>>>>>>>>>>      - [FLINK-39062][table] Support WATERMARK clause in
> CREATE
> >>>>>>> VIEW
> >>>>>>>>>>>>> statement
> >>>>>>>>>>>>>>      <https://github.com/apache/flink/pull/27571>
> >>>>>>>>>>>>>>      - [FLINK-39062][table] Support ALTER VIEW SET WATERMARK
> >>>>>>> syntax
> >>>>>>>>>>>>>>      <https://github.com/apache/flink/pull/27570>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Implementation Timeline
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Estimated 6-8 weeks covering parser layer, planner layer,
> >>>> catalog
> >>>>>>>>>>>>>> integration, and comprehensive testing.
> >>>>>>>>>>>>>> Request for Feedback
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> This enhancement would significantly improve Flink's support
> >> for
> >>>>>>>>>>>> layered
> >>>>>>>>>>>>>> data architectures and flexible event-time processing. I'm
> >> happy
> >>>>>>> to
> >>>>>>>>>>>>> provide
> >>>>>>>>>>>>>> more details or start a formal FLIP process if the community
> >>>> sees
> >>>>>>>>>>> value
> >>>>>>>>>>>>> in
> >>>>>>>>>>>>>> this proposal.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Looking forward to the community's feedback!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best regards,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Feat Zhang
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>    FLIP-XXX: Support Watermark in Flink SQL View
> >>>>>>>>>>>>>> <
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>
> >>
> https://drive.google.com/open?id=1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>
> >>
> >
>
>

Reply via email to