Hi Zakelly,

Thanks for the feedback!

> 1. Could you elaborate more about the ADAPTIVE mode? Is the switch between
> VALUE and MAP performed under each stream key considering each list size,
> or is it performed for all keys if the average list size reaches the given
> thresholds?

The switch is performed under each stream key individually, when
its specific list size
reaches a threshold.

> 2. Is it necessary to maintain pointers 'prevSeqNo' and 'nextSeqNo' to
link
> all the nodes? I assume there should be a traversal need but I don't see
> that in pseudo-code.

Yes, those pointers are necessary, I couldn't find a way to get rid of them:
- prevSeqNo is used to emit penultimate element when retracing the last one;
- nextSeqNo is used to keep prevSeqNo correct when retracting an item from
the middle

> And is `MapState.iterator` also feasible?
Yes, in fact, the ADAPTIVE strategy uses an iterator to move the entries
between MAP and VALUE.

> 3. I see there are two `RowData` stored for one record, one is in
> `rowToSqn` and another is in `sqnToNode`'s node. I guess the first is for
> upsert-keys. Would it be optimized to single copy for a non-upsert-key
> scenario?

That's an interesting idea! I'll try to dig into it deeper when
open-sourcing or as a follow-up.

> 4. For the TTL mechanism part, I would suggest an 'event-time based ttl',
> which allows the user to specify insertion time for each insert/update
> operation and a manually controllable `TtlTimeProvider` (instead of just
> system time). This would be beneficial for many cases, WDYT?

I agree, I think that event-time based TTL is more useful in general
(I specified processing time as a default to make it less surprising for
the users).

I don't immediately see the potential usages of a manually controllable
TtlTimeProvider - do you have any use cases in mind?

> 5. Does the current RocksDB benchmark involve significant state size and
> I/O pressure?

No, in the micro-benchmark the state wasn't too big (in order of megabytes);
It was bottlenecked by RocksDB put/get operations, however.
I also performed a benchmark on a cluster with a larger state size
(in order of gigabytes) and got similar results.


Regards,
Roman


On Thu, Aug 28, 2025 at 11:38 AM Zakelly Lan <zakelly....@gmail.com> wrote:

> Hi Roman,
>
> Thanks for the proposal! The SinkUpsertMaterializer sometimes becomes a
> bottleneck in our production, so I'd +1 to optimize it. I have several
> questions regarding your design:
>
> 1. Could you elaborate more about the ADAPTIVE mode? Is the switch between
> VALUE and MAP performed under each stream key considering each list size,
> or is it performed for all keys if the average list size reaches the given
> thresholds?
> 2. Is it necessary to maintain pointers 'prevSeqNo' and 'nextSeqNo' to link
> all the nodes? I assume there should be a traversal need but I don't see
> that in pseudo-code. And is `MapState.iterator` also feasible?
> 3. I see there are two `RowData` stored for one record, one is in
> `rowToSqn` and another is in `sqnToNode`'s node. I guess the first is for
> upsert-keys. Would it be optimized to single copy for a non-upsert-key
> scenario?
> 4. For the TTL mechanism part, I would suggest an 'event-time based ttl',
> which allows the user to specify insertion time for each insert/update
> operation and a manually controllable `TtlTimeProvider` (instead of just
> system time). This would be beneficial for many cases, WDYT?
> 5. Does the current RocksDB benchmark involve significant state size and
> I/O pressure?
>
>
> Best,
> Zakelly
>
> On Thu, Aug 28, 2025 at 7:11 AM Roman Khachatryan <ro...@apache.org>
> wrote:
>
> > Hi everyone,
> >
> > I would like to start a discussion about FLIP-544 SinkUpsertMaterializer
> V2
> > [1].
> >
> > SinkUpsertMaterializer is an operator in Flink that reconciles out of
> order
> > changelog events before sending them to an upsert sink. In some cases
> (that
> > we see in our production), performance of this operator degrades
> > exponentially, depending on the input data.
> > This FLIP proposes a new implementation that is optimized for such cases
> > and serves as a synchronization point for other efforts in that area.
> >
> > Looking forward to feedback.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-544%3A+SinkUpsertMaterializer+V2
> >
> >
> > Regards,
> > Roman
> >
>

Reply via email to