OK, I now understand why PTF_ONLY was introduced. Additionally, I have a few more questions I want to clarify: 1. Since we've planed to exposed APIs that allow users to combine STATE_RETENTION and START_MODE, we should clearly document how these settings interact—specifically, their effective behaviors and potential pitfalls. This includes aspects users must handle themselves, such as inconsistencies between state and source data, or operators potentially consuming redundant records. To that end, should we complete a matrix covering all possible combinations of these parameters, annotate it with clear user guidance, and push it in the official documentation? 2. I also have a concern, similar to Ron’s question, regarding the fact that the offset in the source is effectively part of the job’s state. The source behavior becomes ambiguous due to conflicting offset semantics between these two parameters: a) STATE_RETENTION = ALL and START_MODE = BEGINNING b) STATE_RETENTION = NONE and START_MODE = RESUME_OR_... By the way, I saw your response to Ron’s question. I’m not very familiar with the Kafka connector, but from the code snippet, it appears that the offsets committed to Kafka (based on the latest checkpoint) are done asynchronously [1] and without guaranteed success [2]. I’m still not quite clear on what you meant by "decouple offset management from state management"—is there a specific plan or design goal behind this? In my view, offsets naturally belong to the job’s state. The reason is that external systems (like Kafka) aren’t well-suited for tracking offsets across multiple checkpoints—they typically only retain the latest committed offset. Relying solely on external storage to record the last consumed position can be unreliable and limits our ability to restore from arbitrary checkpoints. 3. Friendly reminder: please incorporate key details from discussion in this mail list into the FLIP document such as behavioral differences between streaming and batch modes.
[1] https://github.com/apache/flink-connector-kafka/blob/cdbd635171fc4322ba7182eb93e920472d6d9d91/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L258 [2] https://github.com/apache/flink-connector-kafka/blob/cdbd635171fc4322ba7182eb93e920472d6d9d91/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L153 -- Best! Xuyang At 2026-02-06 21:08:29, "Ramin Gharib" <[email protected]> wrote: >Hi Xuyang, > >Thanks for the detailed questions. You've highlighted some important edge >cases. > >It is important to note that STATE_RETENTION (especially PTF_ONLY) is >designed as a "power user" feature. It provides granular control, but with >that comes the responsibility to understand the implications of mixing old >state with new logic. > >Here are the clarifications for your specific points: > >1. Potential Inconsistencies with PTF_ONLY This is a very valid point. When >using surgical state retention (PTF_ONLY), it is indeed the user's >responsibility to ensure that the retained state is semantically compatible >with the new query logic. Flink guarantees technical consistency (the bytes >will load), but semantic consistency (e.g., "Does this old window state >make sense with my new filter?") is up to the user. >If a user cannot guarantee this safety, they should default to >STATE_RETENTION = NONE to ensure correctness. This aligns with how the >"State Processor API" or allowNonRestoredState works in the DataStream >API—it is a sharp tool for experts. > >2. Scope of Granularity (JOIN_ONLY, AGG_ONLY vs PTF) That is an interesting >idea, but we have technical reasons for focusing on PTFs first. >First, standard SQL operators (Joins, Aggregates) are generated by the >planner, and their internal state structure and UIDs are tightly coupled to >the physical plan. Making these "evolvable" by name would require a massive >overhaul of how Flink SQL assigns UIDs. Second, an option like JOIN_ONLY >would presuppose that the Join operator itself supports state evolution >(e.g., handling topology changes while keeping state). Currently, standard >SQL operators do not provide this guarantee. >PTFs, on the other hand, act as "black boxes" with explicit, user-defined >state and stable naming, making them the ideal candidate for this initial >version. We are currently unsure of the Return on Investment for making >internal SQL operators addressable at this stage. > >3. Interaction between STATE_RETENTION = ALL and START_MODE = >FROM_BEGINNING We debated this combination extensively. The motivation for >supporting this is to enable Bootstrap scenarios. >For example, a user might want to reprocess historical data to fix a data >quality issue (the "From Beginning" part) but effectively "merge" this >fixed history into their existing, valuable running aggregates (the "All >State" part). This acts like an idempotent Upsert pipeline: you keep the >continuity of the live view while correcting the underlying dataset. >Ultimately, we view the validity of these combinations as a concern for the >Catalog Implementation. The Catalog should validate the intent and reject >combinations it cannot support. > >4. STATE_RETENTION Behavior in Batch Mode In Batch mode, STATE_RETENTION >has no semantic meaning. While we understand the desire for a unified API, >we believe that silently ignoring the flag creates dangerous ambiguity. >To follow the "Fail Fast" principle, we prefer to throw an exception if >this clause is used in Batch mode. This is analogous to how Flink handles >state restoration mismatches: if a user requests a specific state behavior >(e.g., restoring a savepoint) that cannot be fulfilled by the current job >graph, Flink throws an exception rather than silently dropping the state >(unless explicitly overridden by flags like allowNonRestoredState). >Similarly, if a user requests STATE_RETENTION=ALL, and the Batch engine >cannot fulfill it, we should fail explicitly rather than silently ignoring >the user's intent. > >Best, > >Ramin > >On Mon, Dec 1, 2025 at 6:23 PM Ramin Gharib <[email protected]> wrote: > >> Hi everyone, >> >> I would like to start a discussion on FLIP-557: Granular Control over Data >> Reprocessing and State Retention in Materialized Table Evolution [1]. >> >> Currently, ALTER MATERIALIZED TABLE forces a full job restart and >> discards state, which is inefficient for many evolution scenarios. FLIP-557 >> proposes decoupling data scope from state management by introducing two new >> optional clauses: >> 1. START_MODE*:* Controls the data processing window (e.g., FROM_BEGINNING, >> RESUME_OR_...). >> >> 2. STATE_RETENTION*:* Controls how existing state is handled (e.g., NONE, >> PTF_ONLY). >> >> This gives users explicit control over cost and correctness during table >> evolution. >> >> For more details, please refer to the FLIP [1]. >> >> Looking forward to your feedback and thoughts! >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-557%3A+Granular+Control+over+Data+Reprocessing+and+State+Retention+in+Materialized+Table+Evolution >> >> Best regards, >> >> Ramin Gharib >>
