Hi Soumitra, Thanks for your reply.
I might be missing something, but I don't see why the existing mechanisms can't solve the problem; especially simple MapState (2) or ListState (6). At the same time, I have concerns about the proposal itself (1), (4), (5): > > 1. Is it possible that reads will become more costly in some scenarios? > Your calculation of the cost of the proposed approach is correct. If there > is frequent read-after-write, then the merge operator does not add value. My point is that it not only doesn't improve; it degrades the performance in these read-after-write cases. So the proposal adds one more (expert-level) way to tune the runtime. Which I believe we should avoid. > > > 2. Speaking more generally, could you list the motivating use cases > > Isn't it possible to use MapState keyed by event time? > > The sorting will come for free on RocksDB and PUT will only add the new > > element without touching the existing ones. > > (there is an API to check whether it's sorted or not) > don't know how MapState can help with generic ordering, but my knowledge on that is limited. A common pattern is to use MapState<Long, ...>, where keys are timestamps. Such a MapState, when backed by RocksDB, is automatically sorted by timestamps. > > 4. Function dispatch during restore > Great point! The Reduce/Aggregate functions in Flink are already > serializable and in the proposal they are serialized in the savepoint and > are restored when rocksdb is loaded. This allows rocksdb to call these > functions during compaction before state descriptors are called. This way > we don't need to disable compaction during restore. That means that in case of schema change, compactions will be using old schema, right? That way, I'm afraid it can bypass state migration. > > 5. Remote compactions > TBH, I don't understand ForSt in detail to comment on this item. Since the > proposal is exposing associative merge operators, it should not be an issue > to support in ForSt. In fact, if ForSt does not support associative merge > operators, then I will volunteer to add it, but let's get this proposal > first. My concern is about an external compaction component: this proposal forces it to have job-specific java code instead of having only C++ code only (or whatever is used in state backend). > > 6. Alternative: ListState > I can implement the sorted list using this construct, but the read will be more > expensive than the proposal, since the sorting will happen during the read. In the current proposal, read will trigger sorting as well - if the data is not compacted/sorted yet. And it will add more latency than with ListState because of the extra read/write pass. ListState solution provides flexibility to choose when this work happens: 1. Periodically (using processing time timers) - similar to compactions 2. On reads 3. On writes incrementally 4. Some combination Regards, Roman On Mon, Jun 15, 2026 at 9:13 PM Soumitra Kumar <[email protected]> wrote: > ---------- Forwarded message --------- > From: Soumitra Kumar <[email protected]> > Date: Mon, Jun 15, 2026, 12:12 PM > Subject: Re: [DISCUSS] FLIP-XXX Support ReducingMergeState and > AggregatingMergeState backed by Java based associative merge operators > To: <[email protected]> > > > Hi Roman, > > I replied to your questions a while back. Let me forward the thread to > [email protected] . > > Best, > -Soumitra. > > On Mon, Jun 15, 2026, 12:48 AM Roman Khachatryan <[email protected]> wrote: > > > Hello Soumitra Kumar, > > > > It would be great to get the answers to the questions above I posted - > > unless the problem is solved and the FLIP isn't necessary. > > > > Regards, > > Roman > > > > > > On Sun, Jun 14, 2026 at 9:41 PM Soumitra Kumar <[email protected] > > > > wrote: > > > > > Hello Members, > > > > > > Thank you for your review so far. I don't have any open issues at this > > > moment. Please let me know if there is any issue for me to > > clarify/address. > > > > > > Best, > > > -Soumitra. > > > > > > On Mon, Jun 8, 2026 at 10:09 PM Soumitra Kumar < > [email protected] > > > > > > wrote: > > > > > > > Hi Han, > > > > > > > > I have added a section on TTL of ReducingMergeState and > > > > AggregatingMergeState - HERE > > > > < > > > > > > https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?tab=t.0#heading=h.mqp1qeixcg45 > > > > > > , > > > > please review. > > > > > > > > Best, > > > > -Soumitra. > > > > > > > > On Mon, Jun 1, 2026 at 11:02 PM Soumitra Kumar < > > [email protected] > > > > > > > > wrote: > > > > > > > >> Hi Han, > > > >> > > > >> Thanks for your review and encouragement! > > > >> > > > >> #1 - Users can migrate from ReducingState to ReducingMergeState, but > > it > > > >> has to be a conscious decision knowing the rocksdb implication. We > > > should > > > >> plan to create a few howto docs monitoring and tuning rocksdb to get > > the > > > >> best out of the merge operators. Theoretically, it is possible to > > build > > > an > > > >> automatic migration path, but I would not favor that because of the > > > >> different runtime characteristics of ReducingState and > > > ReducingMergeState. > > > >> The checkpoints/savepoints for > > ReducingMergeState/AggregatingMergeState > > > >> state variables will serialize the Reduce/Aggregate function as > well. > > > >> > > > >> #2 - "Will this introduce different semantics when State TTL is > > enabled" > > > >> - Can you elaborate on this? TBH, I have not planned the details of > > the > > > TTL > > > >> of ReducingMergeState/AggregatingMergeState variables yet, but the > TTL > > > >> should be applied on the variable, not on individual operands. I > will > > > add a > > > >> section on TTL of these variables in the FLIP. > > > >> > > > >> Best, > > > >> -Soumitra. > > > >> > > > >> On Mon, Jun 1, 2026 at 3:03 AM Han Yin <[email protected]> > wrote: > > > >> > > > >>> Hi Sumatra, > > > >>> Thanks for the FLIP. The ability to leverage RocksDB merge > operators > > in > > > >>> Reducing/Aggregating state is a really meaningful improvement. > > > >>> > > > >>> I share similar concerns about the user interface with the previous > > > >>> comments: > > > >>> • If new state types are introduced, can users migrate their > > > >>> existing jobs from ReducingState to ReducingMergeState? Since the > > core > > > >>> logic of the ReduceFunction remains the same, one would expect a > > > >>> straightforward migration path. If yes, will checkpoints/savepoints > > > remain > > > >>> compatible across this switch (and back)? > > > >>> • Will this introduce different semantics when State TTL is > > > enabled? > > > >>> > > > >>> > > > > > >
