Hi Yang, Thank you again for the valuable proposal. I understand that, in the checkpointing mechanism, the stored offset has already been advanced to the last checkpoint changelog offset. My question is specifically about how undo (retraction) is handled for aggregation functions such as MAX and MIN. For aggregations like SUM or PRODUCT, the undo semantics are relatively straightforward, but it is less clear to me how this is implemented for MAX/MIN. Could you please elaborate on this in more detail?
On 2025/12/05 02:45:07 Yang Wang wrote: > Hi all, > > Aggregation computation is one of the most common requirements in real-time > data processing, widely used in scenarios like real-time OLAP, reporting, > and monitoring. However, the current approach of performing aggregation at > the Flink compute layer faces significant state management challenges that > limit system scalability and performance. > > As the number of unique keys grows, Flink's aggregation state expands > linearly, leading to several critical issues: > - State bloat consuming massive memory and disk resources > - Long checkpoint durations affecting RTO and causing backpressure > - Expensive state redistribution making scaling difficult > - Limited data scale a single job can handle > > By pushing aggregation down to the storage engine layer, we can > fundamentally address these issues. Aggregation state would migrate from > Flink State Backend to Fluss storage, making Flink jobs nearly stateless. > This approach leverages Fluss's LSM structure for efficient storage, > enables low-latency primary key queries on aggregation results, and > significantly reduces resource consumption. > > Additionally, Apache Paimon has already implemented a comprehensive > Aggregation Merge Engine. As Flink's streaming storage layer collaborating > with Paimon in the stream-lake integration architecture, Fluss needs to > align this core capability to ensure users can seamlessly switch between > stream tables and lake tables. > > To implement this, I'd like to propose FIP-21: Aggregation Merge Engine [1]. > <https://cwiki.apache.org/confluence/display/FLUSS/FIP-21%3A+Aggregation+Merge+Engine> > > This proposal introduces: > - An extensible aggregation framework supporting 11 common functions (sum, > max, min, product, listagg, bool_and, bool_or, etc.) with field-level > configuration > - Exactly-once semantics through changelog-based undo recovery mechanism > - Column lock coordination enabling multi-job concurrent writes without > conflicts > > Any feedback and suggestions are welcome! > > [1]: > https://cwiki.apache.org/confluence/display/FLUSS/FIP-21%3A+Aggregation+Merge+Engine > > Best regards, > Yang >
