Hi Zakelly,

I have removed the ReducingMergeState.set method to minimize the changes to
public APIs. The modified FLIP is at the same -
https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing

Community members,
This proposal is a great addition to FrocksDB/Flink to enable applications
to harness the power of RocksDB. This addition will help write-heavy
workloads and let users build associative data structures in a streaming
fashion.

Please review, and I can answer any question.

Best,
-Soumitra.

On Tue, May 19, 2026 at 8:56 PM Soumitra Kumar <[email protected]>
wrote:

> Hi Zakelly,
>
> Thanks for your review and +1!
>
> On Tue, May 19, 2026 at 8:19 AM Zakelly Lan <[email protected]> wrote:
>
>> Thanks for the FLIP! I'd +1 for the direction of enhancing the reducing
>> and
>> aggregating state. It's important that we could leverage Rocksdb's merge
>> operators to eliminate unnecessary `get()`. However I have a few
>> questions:
>>
>> 1. I see you will introduce `AbstractAssociativeMergeOperator` in
>> frocksdb side, so how could user pass this instance to the RocksDB State
>> backend and what is the relationship with the Flink's `ReduceFunction` or
>> `AggregateFunction`. I would suggest we may 'translate' user's
>> `ReduceFunction`  into some frocksdb's merge operator, thus for flink we
>> still maintain the original experience. WDYT?
>>
>
> Yes, that is the whole idea. Since ReduceFunction and AggregateFunction
> are existing primitives and they are associative and serializable, I am
> building on them.
>
> This is how they are wired. There are two new classes
> RocksDBReducingMergeOperator, RocksDBAggregatingMergeOperator implementing
> AbstractAssociativeMergeOperator in Flink. The ColumnFamily is configured
> with one of them. RocksDBReducingMergeOperator for ReducingMergeState, and
> RocksDBAggregatingMergeOperator for AggregatingMergeState types. These
> classes get the callback from frocksdb, handle the serde, and call the user
> defined ReduceFunction and AggregateFunction.
>
>
>>
>> 2. I read you introduce a new `ReducingMergeState` with a new `set()`
>> method compared with `ReducingState`, is this necessary? I mean if we
>> intend to optimize performance solely through the use of a merge operator,
>> this is not necessary, right? I do not recommend introducing too many
>> public APIs, as this would force us to consider their
>> semantics. Specifically for example, in changelog stream processing, if
>> the
>> `merge` operation were to permit state-setting operations, it would
>> complicate potential future retraction (or reverse merge/aggregating)
>> operations. WDYT?
>>
>
> ReducingMergeState.set is just a shortcut for clear-add. I agree with your
> point, I will remove it.
>
> Best,
> -Soumitra.
>
>
>> On Tue, May 5, 2026 at 11:50 AM Soumitra Kumar <[email protected]>
>> wrote:
>>
>> > Dear Community Members,
>> >
>> > I want to start discussion on the two tickets I filed recently:
>> > Add support for Java based AssociativeMergeOperator via JNI
>> > <https://issues.apache.org/jira/browse/FLINK-39455>
>> > Support ReducingMergeState and AggregatingMergeState backed by Java
>> based
>> > associative merge operators
>> > <https://issues.apache.org/jira/browse/FLINK-39456>
>> >
>> > Copying the motivation from the FLIP doc
>> > <
>> >
>> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing
>> > >
>> > :
>> >
>> > Flink supports RocksDBReducingState and RocksDBAggregatingState state
>> > variables that do a synchronous read-modify-write on every add call.
>> While
>> > this works great in many scenarios, for write-heavy workloads this can
>> be
>> > expensive and may become a bottleneck.
>> > RocksDB's AssociativeMergeOperator is a storage-level primitive designed
>> > for commutative and associative operations — integer counters, set
>> union,
>> > list append, approximate sketches, top-K structures, Bloom filter, and
>> > similar patterns. However, frocksdb (the RocksDB fork used in Flink)
>> does
>> > not support Java based associative merge operators.
>> >
>> > This FLIP has two parts:
>> > 1. Support for Java based AssociativeMergeOperator in frocksdb via JNI
>> > 2. Support ReducingMergeState and AggregatingMergeState backed by Java
>> > based associative merge operators
>> >
>> > The first part proposes exposing the associative merge operator as a
>> Java
>> > class in frocksdb with minimal JNI overhead. RocksDB can call these
>> > operators during flushing and compaction.
>> > The second part leverages the frocksdb support developed in the first
>> part
>> > to support ReducingMergeState and AggregatingMergeState state variables
>> > with user defined ReduceFunction and AggregateFunction using rocksdb
>> > backend.
>> >
>> > This enhancement opens up a powerful feature of rocksdb to Java. Flink
>> > users can use it to build interesting associative data structures
>> > on streaming data. I have added benchmark details from a prototype
>> > implementation in the FLIP doc.
>> >
>> > Looking forward to feedback.
>> >
>> > FLIP in Google doc
>> > <
>> >
>> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing
>> > >
>> >
>> > Best,
>> > -Soumitra.
>> >
>>
>

Reply via email to