Hi Zakelly,

Thanks for your review and +1!

On Tue, May 19, 2026 at 8:19 AM Zakelly Lan <[email protected]> wrote:

> Thanks for the FLIP! I'd +1 for the direction of enhancing the reducing and
> aggregating state. It's important that we could leverage Rocksdb's merge
> operators to eliminate unnecessary `get()`. However I have a few questions:
>
> 1. I see you will introduce `AbstractAssociativeMergeOperator` in
> frocksdb side, so how could user pass this instance to the RocksDB State
> backend and what is the relationship with the Flink's `ReduceFunction` or
> `AggregateFunction`. I would suggest we may 'translate' user's
> `ReduceFunction`  into some frocksdb's merge operator, thus for flink we
> still maintain the original experience. WDYT?
>

Yes, that is the whole idea. Since ReduceFunction and AggregateFunction are
existing primitives and they are associative and serializable, I am
building on them.

This is how they are wired. There are two new classes
RocksDBReducingMergeOperator, RocksDBAggregatingMergeOperator implementing
AbstractAssociativeMergeOperator in Flink. The ColumnFamily is configured
with one of them. RocksDBReducingMergeOperator for ReducingMergeState, and
RocksDBAggregatingMergeOperator for AggregatingMergeState types. These
classes get the callback from frocksdb, handle the serde, and call the user
defined ReduceFunction and AggregateFunction.


>
> 2. I read you introduce a new `ReducingMergeState` with a new `set()`
> method compared with `ReducingState`, is this necessary? I mean if we
> intend to optimize performance solely through the use of a merge operator,
> this is not necessary, right? I do not recommend introducing too many
> public APIs, as this would force us to consider their
> semantics. Specifically for example, in changelog stream processing, if the
> `merge` operation were to permit state-setting operations, it would
> complicate potential future retraction (or reverse merge/aggregating)
> operations. WDYT?
>

ReducingMergeState.set is just a shortcut for clear-add. I agree with your
point, I will remove it.

Best,
-Soumitra.


> On Tue, May 5, 2026 at 11:50 AM Soumitra Kumar <[email protected]>
> wrote:
>
> > Dear Community Members,
> >
> > I want to start discussion on the two tickets I filed recently:
> > Add support for Java based AssociativeMergeOperator via JNI
> > <https://issues.apache.org/jira/browse/FLINK-39455>
> > Support ReducingMergeState and AggregatingMergeState backed by Java based
> > associative merge operators
> > <https://issues.apache.org/jira/browse/FLINK-39456>
> >
> > Copying the motivation from the FLIP doc
> > <
> >
> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing
> > >
> > :
> >
> > Flink supports RocksDBReducingState and RocksDBAggregatingState state
> > variables that do a synchronous read-modify-write on every add call.
> While
> > this works great in many scenarios, for write-heavy workloads this can be
> > expensive and may become a bottleneck.
> > RocksDB's AssociativeMergeOperator is a storage-level primitive designed
> > for commutative and associative operations — integer counters, set union,
> > list append, approximate sketches, top-K structures, Bloom filter, and
> > similar patterns. However, frocksdb (the RocksDB fork used in Flink) does
> > not support Java based associative merge operators.
> >
> > This FLIP has two parts:
> > 1. Support for Java based AssociativeMergeOperator in frocksdb via JNI
> > 2. Support ReducingMergeState and AggregatingMergeState backed by Java
> > based associative merge operators
> >
> > The first part proposes exposing the associative merge operator as a Java
> > class in frocksdb with minimal JNI overhead. RocksDB can call these
> > operators during flushing and compaction.
> > The second part leverages the frocksdb support developed in the first
> part
> > to support ReducingMergeState and AggregatingMergeState state variables
> > with user defined ReduceFunction and AggregateFunction using rocksdb
> > backend.
> >
> > This enhancement opens up a powerful feature of rocksdb to Java. Flink
> > users can use it to build interesting associative data structures
> > on streaming data. I have added benchmark details from a prototype
> > implementation in the FLIP doc.
> >
> > Looking forward to feedback.
> >
> > FLIP in Google doc
> > <
> >
> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing
> > >
> >
> > Best,
> > -Soumitra.
> >
>

Reply via email to