Hi Roman, Thanks for the proposal! The SinkUpsertMaterializer sometimes becomes a bottleneck in our production, so I'd +1 to optimize it. I have several questions regarding your design:
1. Could you elaborate more about the ADAPTIVE mode? Is the switch between VALUE and MAP performed under each stream key considering each list size, or is it performed for all keys if the average list size reaches the given thresholds? 2. Is it necessary to maintain pointers 'prevSeqNo' and 'nextSeqNo' to link all the nodes? I assume there should be a traversal need but I don't see that in pseudo-code. And is `MapState.iterator` also feasible? 3. I see there are two `RowData` stored for one record, one is in `rowToSqn` and another is in `sqnToNode`'s node. I guess the first is for upsert-keys. Would it be optimized to single copy for a non-upsert-key scenario? 4. For the TTL mechanism part, I would suggest an 'event-time based ttl', which allows the user to specify insertion time for each insert/update operation and a manually controllable `TtlTimeProvider` (instead of just system time). This would be beneficial for many cases, WDYT? 5. Does the current RocksDB benchmark involve significant state size and I/O pressure? Best, Zakelly On Thu, Aug 28, 2025 at 7:11 AM Roman Khachatryan <ro...@apache.org> wrote: > Hi everyone, > > I would like to start a discussion about FLIP-544 SinkUpsertMaterializer V2 > [1]. > > SinkUpsertMaterializer is an operator in Flink that reconciles out of order > changelog events before sending them to an upsert sink. In some cases (that > we see in our production), performance of this operator degrades > exponentially, depending on the input data. > This FLIP proposes a new implementation that is optimized for such cases > and serves as a synchronization point for other efforts in that area. > > Looking forward to feedback. > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-544%3A+SinkUpsertMaterializer+V2 > > > Regards, > Roman >