Hi Junfan,

Undo operations are not generated via a retract mechanism. Instead, they
are derived by reversing the changelog.

For a primary-key table, every row-level change is fully captured in the
changelog, including both -U and +U records. This means that by flipping
the corresponding changes, we can roll back to historical values directly,
without needing to perform any special retract computation for functions
like MAX/MIN.

Of course, in practice the undo computation is heavily optimized and will
skip changes that do not need to be reversed.

Best regards,
Yang

Junfan Zhang <[email protected]> 于2025年12月23日周二 16:41写道:

> Hi Yang,
>
> Thank you again for the valuable proposal. I understand that, in the
> checkpointing mechanism, the stored offset has already been advanced to the
> last checkpoint changelog offset.
> My question is specifically about how undo (retraction) is handled for
> aggregation functions such as MAX and MIN. For aggregations like SUM or
> PRODUCT, the undo semantics are relatively straightforward, but it is less
> clear to me how this is implemented for MAX/MIN.
> Could you please elaborate on this in more detail?
>
> On 2025/12/05 02:45:07 Yang Wang wrote:
> > Hi all,
> >
> > Aggregation computation is one of the most common requirements in
> real-time
> > data processing, widely used in scenarios like real-time OLAP, reporting,
> > and monitoring. However, the current approach of performing aggregation
> at
> > the Flink compute layer faces significant state management challenges
> that
> > limit system scalability and performance.
> >
> > As the number of unique keys grows, Flink's aggregation state expands
> > linearly, leading to several critical issues:
> > - State bloat consuming massive memory and disk resources
> > - Long checkpoint durations affecting RTO and causing backpressure
> > - Expensive state redistribution making scaling difficult
> > - Limited data scale a single job can handle
> >
> > By pushing aggregation down to the storage engine layer, we can
> > fundamentally address these issues. Aggregation state would migrate from
> > Flink State Backend to Fluss storage, making Flink jobs nearly stateless.
> > This approach leverages Fluss's LSM structure for efficient storage,
> > enables low-latency primary key queries on aggregation results, and
> > significantly reduces resource consumption.
> >
> > Additionally, Apache Paimon has already implemented a comprehensive
> > Aggregation Merge Engine. As Flink's streaming storage layer
> collaborating
> > with Paimon in the stream-lake integration architecture, Fluss needs to
> > align this core capability to ensure users can seamlessly switch between
> > stream tables and lake tables.
> >
> > To implement this, I'd like to propose FIP-21: Aggregation Merge Engine
> [1].
> > <
> https://cwiki.apache.org/confluence/display/FLUSS/FIP-21%3A+Aggregation+Merge+Engine
> >
> >
> > This proposal introduces:
> > - An extensible aggregation framework supporting 11 common functions
> (sum,
> > max, min, product, listagg, bool_and, bool_or, etc.) with field-level
> > configuration
> > - Exactly-once semantics through changelog-based undo recovery mechanism
> > - Column lock coordination enabling multi-job concurrent writes without
> > conflicts
> >
> > Any feedback and suggestions are welcome!
> >
> > [1]:
> >
> https://cwiki.apache.org/confluence/display/FLUSS/FIP-21%3A+Aggregation+Merge+Engine
> >
> > Best regards,
> > Yang
> >
>

Reply via email to