Hi Soumitra, Thanks for the FLIP! I'd +1 for the direction of enhancing the reducing and aggregating state. It's important that we could leverage Rocksdb's merge operators to eliminate unnecessary `get()`. However I have a few questions:
1. I see you will introduce `AbstractAssociativeMergeOperator` in frocksdb side, so how could user pass this instance to the RocksDB State backend and what is the relationship with the Flink's `ReduceFunction` or `AggregateFunction`. I would suggest we may 'translate' user's `ReduceFunction` into some frocksdb's merge operator, thus for flink we still maintain the original experience. WDYT? 2. I read you introduce a new `ReducingMergeState` with a new `set()` method compared with `ReducingState`, is this necessary? I mean if we intend to optimize performance solely through the use of a merge operator, this is not necessary, right? I do not recommend introducing too many public APIs, as this would force us to consider their semantics. Specifically for example, in changelog stream processing, if the `merge` operation were to permit state-setting operations, it would complicate potential future retraction (or reverse merge/aggregating) operations. WDYT? Best, Zakelly On Tue, May 5, 2026 at 11:50 AM Soumitra Kumar <[email protected]> wrote: > Dear Community Members, > > I want to start discussion on the two tickets I filed recently: > Add support for Java based AssociativeMergeOperator via JNI > <https://issues.apache.org/jira/browse/FLINK-39455> > Support ReducingMergeState and AggregatingMergeState backed by Java based > associative merge operators > <https://issues.apache.org/jira/browse/FLINK-39456> > > Copying the motivation from the FLIP doc > < > https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing > > > : > > Flink supports RocksDBReducingState and RocksDBAggregatingState state > variables that do a synchronous read-modify-write on every add call. While > this works great in many scenarios, for write-heavy workloads this can be > expensive and may become a bottleneck. > RocksDB's AssociativeMergeOperator is a storage-level primitive designed > for commutative and associative operations — integer counters, set union, > list append, approximate sketches, top-K structures, Bloom filter, and > similar patterns. However, frocksdb (the RocksDB fork used in Flink) does > not support Java based associative merge operators. > > This FLIP has two parts: > 1. Support for Java based AssociativeMergeOperator in frocksdb via JNI > 2. Support ReducingMergeState and AggregatingMergeState backed by Java > based associative merge operators > > The first part proposes exposing the associative merge operator as a Java > class in frocksdb with minimal JNI overhead. RocksDB can call these > operators during flushing and compaction. > The second part leverages the frocksdb support developed in the first part > to support ReducingMergeState and AggregatingMergeState state variables > with user defined ReduceFunction and AggregateFunction using rocksdb > backend. > > This enhancement opens up a powerful feature of rocksdb to Java. Flink > users can use it to build interesting associative data structures > on streaming data. I have added benchmark details from a prototype > implementation in the FLIP doc. > > Looking forward to feedback. > > FLIP in Google doc > < > https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing > > > > Best, > -Soumitra. >
