Hi everyone,

due to private reasons, Ramin is currently not able to followup on this thread. Let me try to steer the discussion until he is back.

Let me also try to give more background and thoughts:

Overall this FLIP is just a SQL API to what is possible already with DataStream API and Flink CLI commands.

In DataStream API one can set the specific offsets in code but still submit the jar and restore from a savepoint. In those cases, the state (not the code definition) is used for the offsets. The Flink CLI offers commands for restoring state and skipping parts of it via allowNonRestoredState. But overall the validation is very thin, because Flink does not actively check whether the previous pipeline was actually stateful or whether the UID of the Kafka source has been changed and thus offsets of code take precedence over offsets in savepoint.

We are not planning to change the engine significantly for this FLIP. Many combinations won't be supported initially, but the long-term goal is to support more combinations and of course more stateful evolutions eventually. In the mid-term, I only see the PTF_ONLY case as realistic, because PTFs are self-contained and identifiable via a UID. Optimizer rules that reorder the topology make other state retention policies difficult to implement.

Nevertheless, both START_MODE and STATE_RETENTION are two dimensions that help us in capturing a user's intent for "where to start?" and "what to memorize?". With this intent we can improve the validation layer and inform users that changing a streaming query significantly without reprocessing won't work. Many users often do not understand the limitations of query evolution. The goal is also to make realities more explicit.

And as the FLIP also mentions in a small side note:

"capability relies on the Catalog and Connector implementation"

So this FLIP wraps existing API for state retention, but at the same time also makes it possible for catalog/connector implementors to come up with their custom logic for how RESUME_OR_FROM_BEGINNING or FROM_BEGINNING is handled. For example, in case of a FROM_BEGINNING the catalog could instruct the source to assign a new uid, in which case the offsets take precendence over the savepoint.

Looking forward to your thoughts.

Cheers,
Timo




One can specify the offset


On 09.02.26 04:01, Xuyang wrote:
OK, I now understand why PTF_ONLY was introduced. Additionally, I have a few 
more questions I want to clarify:
1. Since we've planed to exposed APIs that allow users to combine 
STATE_RETENTION and START_MODE, we should clearly document how these settings 
interact—specifically, their effective behaviors and potential pitfalls. This 
includes aspects users must handle themselves, such as inconsistencies between 
state and source data, or operators potentially consuming redundant records. To 
that end, should we complete a matrix covering all possible combinations of 
these parameters, annotate it with clear user guidance, and push it in the 
official documentation?
2. I also have a concern, similar to Ron’s question, regarding the fact that 
the offset in the source is effectively part of the job’s state. The source 
behavior becomes ambiguous due to conflicting offset semantics between these 
two parameters:
a) STATE_RETENTION = ALL and START_MODE = BEGINNING
b) STATE_RETENTION = NONE and START_MODE = RESUME_OR_...
By the way, I saw your response to Ron’s question. I’m not very familiar with 
the Kafka connector, but from the code snippet, it appears that the offsets 
committed to Kafka (based on the latest checkpoint) are done asynchronously [1] 
and without guaranteed success [2].
I’m still not quite clear on what you meant by "decouple offset management from 
state management"—is there a specific plan or design goal behind this? In my view, 
offsets naturally belong to the job’s state. The reason is that external systems (like 
Kafka) aren’t well-suited for tracking offsets across multiple checkpoints—they typically 
only retain the latest committed offset. Relying solely on external storage to record the 
last consumed position can be unreliable and limits our ability to restore from arbitrary 
checkpoints.
3. Friendly reminder: please incorporate key details from discussion in this 
mail list into the FLIP document such as behavioral differences between 
streaming and batch modes.


[1] 
https://github.com/apache/flink-connector-kafka/blob/cdbd635171fc4322ba7182eb93e920472d6d9d91/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L258
[2] 
https://github.com/apache/flink-connector-kafka/blob/cdbd635171fc4322ba7182eb93e920472d6d9d91/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L153



--

     Best!
     Xuyang



At 2026-02-06 21:08:29, "Ramin Gharib" <[email protected]> wrote:
Hi Xuyang,

Thanks for the detailed questions. You've highlighted some important edge
cases.

It is important to note that STATE_RETENTION (especially PTF_ONLY) is
designed as a "power user" feature. It provides granular control, but with
that comes the responsibility to understand the implications of mixing old
state with new logic.

Here are the clarifications for your specific points:

1. Potential Inconsistencies with PTF_ONLY This is a very valid point. When
using surgical state retention (PTF_ONLY), it is indeed the user's
responsibility to ensure that the retained state is semantically compatible
with the new query logic. Flink guarantees technical consistency (the bytes
will load), but semantic consistency (e.g., "Does this old window state
make sense with my new filter?") is up to the user.
If a user cannot guarantee this safety, they should default to
STATE_RETENTION = NONE to ensure correctness. This aligns with how the
"State Processor API" or allowNonRestoredState works in the DataStream
API—it is a sharp tool for experts.

2. Scope of Granularity (JOIN_ONLY, AGG_ONLY vs PTF) That is an interesting
idea, but we have technical reasons for focusing on PTFs first.
First, standard SQL operators (Joins, Aggregates) are generated by the
planner, and their internal state structure and UIDs are tightly coupled to
the physical plan. Making these "evolvable" by name would require a massive
overhaul of how Flink SQL assigns UIDs. Second, an option like JOIN_ONLY
would presuppose that the Join operator itself supports state evolution
(e.g., handling topology changes while keeping state). Currently, standard
SQL operators do not provide this guarantee.
PTFs, on the other hand, act as "black boxes" with explicit, user-defined
state and stable naming, making them the ideal candidate for this initial
version. We are currently unsure of the Return on Investment for making
internal SQL operators addressable at this stage.

3. Interaction between STATE_RETENTION = ALL and START_MODE =
FROM_BEGINNING We debated this combination extensively. The motivation for
supporting this is to enable Bootstrap scenarios.
For example, a user might want to reprocess historical data to fix a data
quality issue (the "From Beginning" part) but effectively "merge" this
fixed history into their existing, valuable running aggregates (the "All
State" part). This acts like an idempotent Upsert pipeline: you keep the
continuity of the live view while correcting the underlying dataset.
Ultimately, we view the validity of these combinations as a concern for the
Catalog Implementation. The Catalog should validate the intent and reject
combinations it cannot support.

4. STATE_RETENTION Behavior in Batch Mode In Batch mode, STATE_RETENTION
has no semantic meaning. While we understand the desire for a unified API,
we believe that silently ignoring the flag creates dangerous ambiguity.
To follow the "Fail Fast" principle, we prefer to throw an exception if
this clause is used in Batch mode. This is analogous to how Flink handles
state restoration mismatches: if a user requests a specific state behavior
(e.g., restoring a savepoint) that cannot be fulfilled by the current job
graph, Flink throws an exception rather than silently dropping the state
(unless explicitly overridden by flags like allowNonRestoredState).
Similarly, if a user requests STATE_RETENTION=ALL, and the Batch engine
cannot fulfill it, we should fail explicitly rather than silently ignoring
the user's intent.

Best,

Ramin

On Mon, Dec 1, 2025 at 6:23 PM Ramin Gharib <[email protected]> wrote:

Hi everyone,

I would like to start a discussion on FLIP-557: Granular Control over Data
Reprocessing and State Retention in Materialized Table Evolution [1].

Currently, ALTER MATERIALIZED TABLE forces a full job restart and
discards state, which is inefficient for many evolution scenarios. FLIP-557
proposes decoupling data scope from state management by introducing two new
optional clauses:
1. START_MODE*:* Controls the data processing window (e.g., FROM_BEGINNING,
RESUME_OR_...).

2. STATE_RETENTION*:* Controls how existing state is handled (e.g., NONE,
PTF_ONLY).

This gives users explicit control over cost and correctness during table
evolution.

For more details, please refer to the FLIP [1].

Looking forward to your feedback and thoughts!

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-557%3A+Granular+Control+over+Data+Reprocessing+and+State+Retention+in+Materialized+Table+Evolution

Best regards,

Ramin Gharib


Reply via email to