Hi Zakelly, I have removed the ReducingMergeState.set method to minimize the changes to public APIs. The modified FLIP is at the same - https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing
Community members, This proposal is a great addition to FrocksDB/Flink to enable applications to harness the power of RocksDB. This addition will help write-heavy workloads and let users build associative data structures in a streaming fashion. Please review, and I can answer any question. Best, -Soumitra. On Tue, May 19, 2026 at 8:56 PM Soumitra Kumar <[email protected]> wrote: > Hi Zakelly, > > Thanks for your review and +1! > > On Tue, May 19, 2026 at 8:19 AM Zakelly Lan <[email protected]> wrote: > >> Thanks for the FLIP! I'd +1 for the direction of enhancing the reducing >> and >> aggregating state. It's important that we could leverage Rocksdb's merge >> operators to eliminate unnecessary `get()`. However I have a few >> questions: >> >> 1. I see you will introduce `AbstractAssociativeMergeOperator` in >> frocksdb side, so how could user pass this instance to the RocksDB State >> backend and what is the relationship with the Flink's `ReduceFunction` or >> `AggregateFunction`. I would suggest we may 'translate' user's >> `ReduceFunction` into some frocksdb's merge operator, thus for flink we >> still maintain the original experience. WDYT? >> > > Yes, that is the whole idea. Since ReduceFunction and AggregateFunction > are existing primitives and they are associative and serializable, I am > building on them. > > This is how they are wired. There are two new classes > RocksDBReducingMergeOperator, RocksDBAggregatingMergeOperator implementing > AbstractAssociativeMergeOperator in Flink. The ColumnFamily is configured > with one of them. RocksDBReducingMergeOperator for ReducingMergeState, and > RocksDBAggregatingMergeOperator for AggregatingMergeState types. These > classes get the callback from frocksdb, handle the serde, and call the user > defined ReduceFunction and AggregateFunction. > > >> >> 2. I read you introduce a new `ReducingMergeState` with a new `set()` >> method compared with `ReducingState`, is this necessary? I mean if we >> intend to optimize performance solely through the use of a merge operator, >> this is not necessary, right? I do not recommend introducing too many >> public APIs, as this would force us to consider their >> semantics. Specifically for example, in changelog stream processing, if >> the >> `merge` operation were to permit state-setting operations, it would >> complicate potential future retraction (or reverse merge/aggregating) >> operations. WDYT? >> > > ReducingMergeState.set is just a shortcut for clear-add. I agree with your > point, I will remove it. > > Best, > -Soumitra. > > >> On Tue, May 5, 2026 at 11:50 AM Soumitra Kumar <[email protected]> >> wrote: >> >> > Dear Community Members, >> > >> > I want to start discussion on the two tickets I filed recently: >> > Add support for Java based AssociativeMergeOperator via JNI >> > <https://issues.apache.org/jira/browse/FLINK-39455> >> > Support ReducingMergeState and AggregatingMergeState backed by Java >> based >> > associative merge operators >> > <https://issues.apache.org/jira/browse/FLINK-39456> >> > >> > Copying the motivation from the FLIP doc >> > < >> > >> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing >> > > >> > : >> > >> > Flink supports RocksDBReducingState and RocksDBAggregatingState state >> > variables that do a synchronous read-modify-write on every add call. >> While >> > this works great in many scenarios, for write-heavy workloads this can >> be >> > expensive and may become a bottleneck. >> > RocksDB's AssociativeMergeOperator is a storage-level primitive designed >> > for commutative and associative operations — integer counters, set >> union, >> > list append, approximate sketches, top-K structures, Bloom filter, and >> > similar patterns. However, frocksdb (the RocksDB fork used in Flink) >> does >> > not support Java based associative merge operators. >> > >> > This FLIP has two parts: >> > 1. Support for Java based AssociativeMergeOperator in frocksdb via JNI >> > 2. Support ReducingMergeState and AggregatingMergeState backed by Java >> > based associative merge operators >> > >> > The first part proposes exposing the associative merge operator as a >> Java >> > class in frocksdb with minimal JNI overhead. RocksDB can call these >> > operators during flushing and compaction. >> > The second part leverages the frocksdb support developed in the first >> part >> > to support ReducingMergeState and AggregatingMergeState state variables >> > with user defined ReduceFunction and AggregateFunction using rocksdb >> > backend. >> > >> > This enhancement opens up a powerful feature of rocksdb to Java. Flink >> > users can use it to build interesting associative data structures >> > on streaming data. I have added benchmark details from a prototype >> > implementation in the FLIP doc. >> > >> > Looking forward to feedback. >> > >> > FLIP in Google doc >> > < >> > >> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing >> > > >> > >> > Best, >> > -Soumitra. >> > >> >
