Hi Zakelly, Thanks for the feedback!
> 1. Could you elaborate more about the ADAPTIVE mode? Is the switch between > VALUE and MAP performed under each stream key considering each list size, > or is it performed for all keys if the average list size reaches the given > thresholds? The switch is performed under each stream key individually, when its specific list size reaches a threshold. > 2. Is it necessary to maintain pointers 'prevSeqNo' and 'nextSeqNo' to link > all the nodes? I assume there should be a traversal need but I don't see > that in pseudo-code. Yes, those pointers are necessary, I couldn't find a way to get rid of them: - prevSeqNo is used to emit penultimate element when retracing the last one; - nextSeqNo is used to keep prevSeqNo correct when retracting an item from the middle > And is `MapState.iterator` also feasible? Yes, in fact, the ADAPTIVE strategy uses an iterator to move the entries between MAP and VALUE. > 3. I see there are two `RowData` stored for one record, one is in > `rowToSqn` and another is in `sqnToNode`'s node. I guess the first is for > upsert-keys. Would it be optimized to single copy for a non-upsert-key > scenario? That's an interesting idea! I'll try to dig into it deeper when open-sourcing or as a follow-up. > 4. For the TTL mechanism part, I would suggest an 'event-time based ttl', > which allows the user to specify insertion time for each insert/update > operation and a manually controllable `TtlTimeProvider` (instead of just > system time). This would be beneficial for many cases, WDYT? I agree, I think that event-time based TTL is more useful in general (I specified processing time as a default to make it less surprising for the users). I don't immediately see the potential usages of a manually controllable TtlTimeProvider - do you have any use cases in mind? > 5. Does the current RocksDB benchmark involve significant state size and > I/O pressure? No, in the micro-benchmark the state wasn't too big (in order of megabytes); It was bottlenecked by RocksDB put/get operations, however. I also performed a benchmark on a cluster with a larger state size (in order of gigabytes) and got similar results. Regards, Roman On Thu, Aug 28, 2025 at 11:38 AM Zakelly Lan <zakelly....@gmail.com> wrote: > Hi Roman, > > Thanks for the proposal! The SinkUpsertMaterializer sometimes becomes a > bottleneck in our production, so I'd +1 to optimize it. I have several > questions regarding your design: > > 1. Could you elaborate more about the ADAPTIVE mode? Is the switch between > VALUE and MAP performed under each stream key considering each list size, > or is it performed for all keys if the average list size reaches the given > thresholds? > 2. Is it necessary to maintain pointers 'prevSeqNo' and 'nextSeqNo' to link > all the nodes? I assume there should be a traversal need but I don't see > that in pseudo-code. And is `MapState.iterator` also feasible? > 3. I see there are two `RowData` stored for one record, one is in > `rowToSqn` and another is in `sqnToNode`'s node. I guess the first is for > upsert-keys. Would it be optimized to single copy for a non-upsert-key > scenario? > 4. For the TTL mechanism part, I would suggest an 'event-time based ttl', > which allows the user to specify insertion time for each insert/update > operation and a manually controllable `TtlTimeProvider` (instead of just > system time). This would be beneficial for many cases, WDYT? > 5. Does the current RocksDB benchmark involve significant state size and > I/O pressure? > > > Best, > Zakelly > > On Thu, Aug 28, 2025 at 7:11 AM Roman Khachatryan <ro...@apache.org> > wrote: > > > Hi everyone, > > > > I would like to start a discussion about FLIP-544 SinkUpsertMaterializer > V2 > > [1]. > > > > SinkUpsertMaterializer is an operator in Flink that reconciles out of > order > > changelog events before sending them to an upsert sink. In some cases > (that > > we see in our production), performance of this operator degrades > > exponentially, depending on the input data. > > This FLIP proposes a new implementation that is optimized for such cases > > and serves as a synchronization point for other efforts in that area. > > > > Looking forward to feedback. > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-544%3A+SinkUpsertMaterializer+V2 > > > > > > Regards, > > Roman > > >