Hi Junfan, Undo operations are not generated via a retract mechanism. Instead, they are derived by reversing the changelog.
For a primary-key table, every row-level change is fully captured in the changelog, including both -U and +U records. This means that by flipping the corresponding changes, we can roll back to historical values directly, without needing to perform any special retract computation for functions like MAX/MIN. Of course, in practice the undo computation is heavily optimized and will skip changes that do not need to be reversed. Best regards, Yang Junfan Zhang <[email protected]> 于2025年12月23日周二 16:41写道: > Hi Yang, > > Thank you again for the valuable proposal. I understand that, in the > checkpointing mechanism, the stored offset has already been advanced to the > last checkpoint changelog offset. > My question is specifically about how undo (retraction) is handled for > aggregation functions such as MAX and MIN. For aggregations like SUM or > PRODUCT, the undo semantics are relatively straightforward, but it is less > clear to me how this is implemented for MAX/MIN. > Could you please elaborate on this in more detail? > > On 2025/12/05 02:45:07 Yang Wang wrote: > > Hi all, > > > > Aggregation computation is one of the most common requirements in > real-time > > data processing, widely used in scenarios like real-time OLAP, reporting, > > and monitoring. However, the current approach of performing aggregation > at > > the Flink compute layer faces significant state management challenges > that > > limit system scalability and performance. > > > > As the number of unique keys grows, Flink's aggregation state expands > > linearly, leading to several critical issues: > > - State bloat consuming massive memory and disk resources > > - Long checkpoint durations affecting RTO and causing backpressure > > - Expensive state redistribution making scaling difficult > > - Limited data scale a single job can handle > > > > By pushing aggregation down to the storage engine layer, we can > > fundamentally address these issues. Aggregation state would migrate from > > Flink State Backend to Fluss storage, making Flink jobs nearly stateless. > > This approach leverages Fluss's LSM structure for efficient storage, > > enables low-latency primary key queries on aggregation results, and > > significantly reduces resource consumption. > > > > Additionally, Apache Paimon has already implemented a comprehensive > > Aggregation Merge Engine. As Flink's streaming storage layer > collaborating > > with Paimon in the stream-lake integration architecture, Fluss needs to > > align this core capability to ensure users can seamlessly switch between > > stream tables and lake tables. > > > > To implement this, I'd like to propose FIP-21: Aggregation Merge Engine > [1]. > > < > https://cwiki.apache.org/confluence/display/FLUSS/FIP-21%3A+Aggregation+Merge+Engine > > > > > > This proposal introduces: > > - An extensible aggregation framework supporting 11 common functions > (sum, > > max, min, product, listagg, bool_and, bool_or, etc.) with field-level > > configuration > > - Exactly-once semantics through changelog-based undo recovery mechanism > > - Column lock coordination enabling multi-job concurrent writes without > > conflicts > > > > Any feedback and suggestions are welcome! > > > > [1]: > > > https://cwiki.apache.org/confluence/display/FLUSS/FIP-21%3A+Aggregation+Merge+Engine > > > > Best regards, > > Yang > > >
