+1 to the optimization plan and mark the new options as experimental. Optimizing state access after data expansion will help ease performance bottlenecks when this operator can’t be fully avoided.
Thanks for pushing this forward! Best, Lincoln Lee Roman Khachatryan <ro...@apache.org> 于2025年9月12日周五 16:38写道: > Hey Rui, > > Thanks for clarifying > > In my opinion, the flexibility is necessary here for a number of reasons > (e.g. payload size may vary per job, TM disk cache size is different, even > user priorities might be different). > > We can mark these options as @Experimental for now to be able to change > them in the future. And/or, the improved switching algorithm can be just a > new strategy, without the need to touch these options. > > > I don't have a strong opinion on this, if it's really necessary to retain > flexibility. > > In that case, I'd like to start the vote today or on Monday unless there's > any further feedback. > > Regards, > Roman > > On Fri, Sep 12, 2025, 10:07 Rui Fan <1996fan...@gmail.com> wrote: > > > Hey Roman, > > > > Thanks for your explanation for the first 2 questions, it is clear for > me! > > > > About the 3rd question: > > > > > I totally agree that configuration options add complexity and increase > > > maintenance burden, but in my opinion this is a trade-off between > > > flexibility and complexity. > > > > Totally agree that it is a trade-off. > > > > > In this case, I don't see how the minimum flexibility can be achieved > > > without adding those options. On the other hand, providing sensible > > > defaults makes it unnecessary to adjust these settings for most of the > > > users. > > > > IIUC, it is hard to provide a unified sensible defaults since rocksdb and > > hashmap state backend expect different thresholds. > > > > > Could you elaborate what do you mean by list size and how is it > different > > > from threshold.high and threshold.low proposed in the FLIP? > > > > What I mean is that we do not expose the threshold.high and threshold.low > > configuration options, but hard-code them inside the flink code to select > > different thresholds depending on whether rocksdb or hashmap is used. > > > > Following is the pseudocode: > > > > ``` > > private static final int ROCKSDB_HIGH_THRESHOLD = 50; > > private static final int ROCKSDB_LOW_THRESHOLD = 40; > > private static final int HASHMAP_HIGH_THRESHOLD = 400; > > private static final int HASHMAP_LOW_THRESHOLD = 300; > > > > int highThreshold = isRocksdb ? > > ROCKSDB_HIGH_THRESHOLD:HASHMAP_HIGH_THRESHOLD; > > int lowThreshold = isRocksdb ? > ROCKSDB_LOW_THRESHOLD:HASHMAP_LOW_THRESHOLD; > > ``` > > > > In the short term, users lose flexibility, but the code and usability are > > simpler. > > Most users do not need to adjust thresholds for HashMap and RocksDB. > > > > In the long term, if Flink wants to introduce other similar switching > > strategies > > other than thresholds, it can switch directly internally without > > considering the > > compatibility of configuration options. > > > > I don't have a strong opinion on this, if it's really necessary to retain > > flexibility. > > > > Best, > > Rui > > > > On Thu, Sep 11, 2025 at 11:02 PM Roman Khachatryan <ro...@apache.org> > > wrote: > > > > > Hey Rui, > > > > > > Thanks for your feedback and questions! > > > > > > > 1. State Compatibility: > > > > > > > I would like to confirm if the underlying stored > > > > data structure is the same in the following two scenarios: > > > > > > > - A ValueState stored in LEGACY mode. > > > > - A ValueState stored in VALUE mode (as used by the ADAPTIVE strategy > > > > when below the threshold). > > > > > > > The FLIP mentions that VALUE - similar to LEGACY, but I do not fully > > know > > > > what the difference is. If their physical storage format is > identical, > > it > > > > seems > > > > there might be an opportunity to provide state compatibility. > > > > > > No, in these two cases, the state is not compatible. This is because > > > in VALUE mode, a timestamp is stored along with every record, plus > > > metadata stores equalizer and hash function generators. > > > > > > The FLIP targets only new jobs. In the future, it is possible to add > > > a migration path. The motivation is simplicity and reduction of risks > > > (during state migration). > > > > > > > 2. Forward Compatibility for TTL > > > > > > > The FLIP mentions that TTL is a follow-up feature, that "storing the > > > > timestamps along with every record will be implemented from the get > > go". > > > > This implies a future state schema evolution. Could you elaborate on > > how > > > > the V1 serializer will be designed to handle this, ensuring that > > adopting > > > > V1 > > > > of this feature won't require another state migration when TTL > support > > is > > > > added in V2? > > > > > > There's no migration necessary to add TTL support for V2: the > timestamps > > > will already be there, and there's no need for extra data. > > > For V1 (the current implementation), a migration path would be needed. > > > > > > > 3. Adaptive Threshold Configuration > > > > > > > FLIP introduces adaptive.threshold.high and adaptive.threshold.low. > > > > This adds a configuration burden on users. To set these values > > correctly, > > > > users need to understand the underlying performance characteristics. > > > > For example, the FLIP recommends very different thresholds for > RocksDB > > > > and Heap backends, which highlights this complexity. > > > > > > > Furthermore, from historical experience, it is easy to expose a new > > > > configuration, > > > > but much harder to change or remove it later due to user > compatibility > > > > concerns. > > > > If we don't expose these parameters now, it would allow the internal > > > > adaptive > > > > strategy to be improved in the future without any compatibility > issues. > > > > > > I totally agree that configuration options add complexity and increase > > > maintenance burden, but in my opinion this is a trade-off between > > > flexibility and complexity. > > > In this case, I don't see how the minimum flexibility can be achieved > > > without adding those options. On the other hand, providing sensible > > > defaults makes it unnecessary to adjust these settings for most of the > > > users. > > > > > > > In the first version, I think list size might be enough as an > internal > > > > strategy, > > > > and we can dynamically choose different thresholds for heap and > > rocksdb. > > > > > > Could you elaborate what do you mean by list size and how is it > different > > > from threshold.high and threshold.low proposed in the FLIP? > > > > > > Regards, > > > Roman > > > > > > > > > On Wed, Sep 3, 2025 at 12:32 PM Rui Fan <1996fan...@gmail.com> wrote: > > > > > > > Hey all, > > > > > > > > Thanks for driving this great proposal and valuable discussion! > > > > I have a few questions related to compatibility and usability. > > > > > > > > 1. State Compatibility: > > > > > > > > The compatibility of state between the old and new modes, which is > > > crucial > > > > for migrating existing jobs. I would like to confirm if the > underlying > > > > stored > > > > data structure is the same in the following two scenarios: > > > > > > > > - A ValueState stored in LEGACY mode. > > > > - A ValueState stored in VALUE mode (as used by the ADAPTIVE strategy > > > > when below the threshold). > > > > > > > > The FLIP mentions that VALUE - similar to LEGACY, but I do not fully > > know > > > > what the difference is. If their physical storage format is > identical, > > it > > > > seems > > > > there might be an opportunity to provide state compatibility. > > > > > > > > 2. Forward Compatibility for TTL > > > > > > > > The FLIP mentions that TTL is a follow-up feature, that "storing the > > > > timestamps along with every record will be implemented from the get > > go". > > > > This implies a future state schema evolution. Could you elaborate on > > how > > > > the V1 serializer will be designed to handle this, ensuring that > > adopting > > > > V1 > > > > of this feature won't require another state migration when TTL > support > > is > > > > added in V2? > > > > > > > > 3. Adaptive Threshold Configuration > > > > > > > > FLIP introduces adaptive.threshold.high and adaptive.threshold.low. > > > > This adds a configuration burden on users. To set these values > > correctly, > > > > users need to understand the underlying performance characteristics. > > > > For example, the FLIP recommends very different thresholds for > RocksDB > > > > and Heap backends, which highlights this complexity. > > > > > > > > Furthermore, from historical experience, it is easy to expose a new > > > > configuration, > > > > but much harder to change or remove it later due to user > compatibility > > > > concerns. > > > > If we don't expose these parameters now, it would allow the internal > > > > adaptive > > > > strategy to be improved in the future without any compatibility > issues. > > > > > > > > For example, future policies could be based not only on list length > but > > > > also on the > > > > size of RowData in their cost models. Furthermore, operators could > even > > > > monitor > > > > the time it takes to access state when processing a specific key, > > > > triggering a mode > > > > switch only when latency exceeds a dynamic baseline. This not only > > makes > > > > operators truly "adaptive" but also greatly simplifies user > > > configuration. > > > > > > > > In the first version, I think list size might be enough as an > internal > > > > strategy, > > > > and we can dynamically choose different thresholds for heap and > > rocksdb. > > > > > > > > Looking forward to your feedback! > > > > > > > > Best, > > > > Rui > > > > > > > > On Mon, Sep 1, 2025 at 3:13 PM Piotr Nowojski <pnowoj...@apache.org> > > > > wrote: > > > > > > > > > Thanks Roman for driving this. I highly support this effort. > > > > > > > > > > Big +1 from my side. > > > > > > > > > > Best, > > > > > Piotrek > > > > > > > > > > niedz., 31 sie 2025 o 21:58 Roman Khachatryan <ro...@apache.org> > > > > > napisał(a): > > > > > > > > > > > Hi Zakelly, > > > > > > > > > > > > > So I assume there is a conditional branch to determine whether > > > > current > > > > > > > stream key is in VALUE or MAP mode, and this also involves some > > > state > > > > > > > access right? > > > > > > > > > > > > Yes, that's a good question; it might add one more state access. > > > > > > To mitigate that to some extent, the results of the 1st access to > > > > > > ValueState > > > > > > can be cached and be used later for add/retract (in case it's not > > > > empty). > > > > > > (such caching is already included in the benchmark results) > > > > > > > > > > > > > Have you evaluated the implementation relying on the > orderliness > > > > > > > of RocksDB's kv? RocksDB's `MapState.iterator().next()` > retrieves > > > the > > > > > > first > > > > > > > entry of the map in binary order. We could reverse the order of > > seq > > > > by > > > > > > > generating it from Long.MAX to 0, and thus the first entry > > > retrieved > > > > > > would > > > > > > > be the last one added. I see you mention this in 'Rejected > > > > > Alternatives' > > > > > > > but I'm still curious whether this could achieve an > improvement. > > To > > > > > > > my knowledge, the iterator might be less efficient since it > could > > > not > > > > > > > leverage bloom filters as point-lookup does and the cache is > > > > > unnecessary > > > > > > if > > > > > > > only the first entry is needed (of course we could remove that > > > > cache). > > > > > > It's > > > > > > > not immediately clear which approach is better, as each has its > > > > > > trade-offs. > > > > > > > > > > > > We tried a similar approach of using an iterator. The benefits > are > > > > > > diminished there > > > > > > by slow iteration (plus, iterator creation is also slow). Because > > of > > > > > that, > > > > > > the > > > > > > performance was similar to the current implementation. We didn't > > > > compare > > > > > > the > > > > > > two approaches side by side though. > > > > > > > > > > > > > I’d also suggest testing scenarios with retraction rates below > > > 100%, > > > > as > > > > > > > that may better reflect real-world workloads IIUC. > > > > > > > > > > > > I mostly focused on 100% because that's where I saw regression: > > > > > > the fewer retractions, the longer the list, the worse performance > > of > > > > > > ValueState. > > > > > > I'll re-run the benchmark with lower rates. > > > > > > > > > > > > > I think it's a good chance to push forward the > discussion/design > > of > > > > > > binary > > > > > > > sorted map state (FLIP-220)[1] since this seems to be a good > > > > > application > > > > > > > scenario. But I also think it's acceptable if we do some hack > to > > > only > > > > > > rely > > > > > > > on RocksDB's state implicitly rather that waiting the new > > > > > > > state primitives, if it is truly beneficial. > > > > > > > > > > > > I agree, probably not for this FLIP (because of the above > reasons), > > > but > > > > > for > > > > > > many other cases it would be definitely beneficial to expose the > > > > > sortedness > > > > > > of RocksDB in some way. Multi-way join (FLIP-516) [1] is one such > > > > > example. > > > > > > > > > > > > > By saying event-time based TTL, I meant to make it easier for > > users > > > > to > > > > > > > understand. The event-time could be defined and > > controlled/advanced > > > > by > > > > > > user > > > > > > > (SQL implementor or Flink user). e.g. Event-time could be > > watermark > > > > > > > progression or just record sequence (state will live through > > > specific > > > > > > > number of records), or even be advanced by special control > > records > > > > for > > > > > > > Datastream users. This kind of user-controlled time advancement > > is > > > > > what I > > > > > > > said "manually controllable". Such flexibility could be broadly > > > > > > beneficial. > > > > > > > > > > > > Thanks for clarifying, special control records is an interesting > > > idea, > > > > > and > > > > > > I think it should be easy to implement. > > > > > > > > > > > > > We’ve encountered cases where state expiration is inconsistent > > > > between > > > > > > > upstream and downstream SQL operators. With event-time based > TTL, > > > > > > operators > > > > > > > could share a synchronized notion of time, allowing them to > > expire > > > > > state > > > > > > in > > > > > > > a more coordinated way. > > > > > > > > > > > > Yeah, that's a pity that Flink doesn't have event-time TTL. But > to > > > > solve > > > > > > cross-operator TTL inconsistency, we'd need to change multiple > > > > operators > > > > > > (or > > > > > > state backend). I'm not sure we can efficiently support event > time > > > TTL > > > > > for > > > > > > a > > > > > > general case. > > > > > > > > > > > > P.S.: have a good vacation! :) > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator > > > > > > > > > > > > Regards, > > > > > > Roman > > > > > > > > > > > > > > > > > > On Fri, Aug 29, 2025 at 8:24 AM Zakelly Lan < > zakelly....@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > Hi Roman, > > > > > > > > > > > > > > Thanks for the answers! Please see my comments below: > > > > > > > > > > > > > > The switch is performed under each stream key individually, > when > > > > > > > > its specific list size > > > > > > > > reaches a threshold. > > > > > > > > > > > > > > > > > > > > > So I assume there is a conditional branch to determine whether > > > > current > > > > > > > stream key is in VALUE or MAP mode, and this also involves some > > > state > > > > > > > access right? > > > > > > > > > > > > > > Yes, those pointers are necessary, I couldn't find a way to get > > rid > > > > of > > > > > > > them: > > > > > > > > - prevSeqNo is used to emit penultimate element when > retracing > > > the > > > > > last > > > > > > > > one; > > > > > > > > - nextSeqNo is used to keep prevSeqNo correct when retracting > > an > > > > item > > > > > > > from > > > > > > > > the middle > > > > > > > > > > > > > > > > > > > > > Oh I see. Have you evaluated the implementation relying on the > > > > > > orderliness > > > > > > > of RocksDB's kv? RocksDB's `MapState.iterator().next()` > retrieves > > > the > > > > > > first > > > > > > > entry of the map in binary order. We could reverse the order of > > seq > > > > by > > > > > > > generating it from Long.MAX to 0, and thus the first entry > > > retrieved > > > > > > would > > > > > > > be the last one added. I see you mention this in 'Rejected > > > > > Alternatives' > > > > > > > but I'm still curious whether this could achieve an > improvement. > > To > > > > > > > my knowledge, the iterator might be less efficient since it > could > > > not > > > > > > > leverage bloom filters as point-lookup does and the cache is > > > > > unnecessary > > > > > > if > > > > > > > only the first entry is needed (of course we could remove that > > > > cache). > > > > > > It's > > > > > > > not immediately clear which approach is better, as each has its > > > > > > trade-offs. > > > > > > > I’d also suggest testing scenarios with retraction rates below > > > 100%, > > > > as > > > > > > > that may better reflect real-world workloads IIUC. > > > > > > > > > > > > > > I think it's a good chance to push forward the > discussion/design > > of > > > > > > binary > > > > > > > sorted map state (FLIP-220)[1] since this seems to be a good > > > > > application > > > > > > > scenario. But I also think it's acceptable if we do some hack > to > > > only > > > > > > rely > > > > > > > on RocksDB's state implicitly rather that waiting the new > > > > > > > state primitives, if it is truly beneficial. > > > > > > > > > > > > > > [1] https://cwiki.apache.org/confluence/x/Xo_FD > > > > > > > > > > > > > > I agree, I think that event-time based TTL is more useful in > > > general > > > > > > > > (I specified processing time as a default to make it less > > > > surprising > > > > > > for > > > > > > > > the users). > > > > > > > > > > > > > > I don't immediately see the potential usages of a manually > > > > controllable > > > > > > > > TtlTimeProvider - do you have any use cases in mind? > > > > > > > > > > > > > > > > > > > > > By saying event-time based TTL, I meant to make it easier for > > users > > > > to > > > > > > > understand. The event-time could be defined and > > controlled/advanced > > > > by > > > > > > user > > > > > > > (SQL implementor or Flink user). e.g. Event-time could be > > watermark > > > > > > > progression or just record sequence (state will live through > > > specific > > > > > > > number of records), or even be advanced by special control > > records > > > > for > > > > > > > Datastream users. This kind of user-controlled time advancement > > is > > > > > what I > > > > > > > said "manually controllable". Such flexibility could be broadly > > > > > > beneficial. > > > > > > > > > > > > > > We’ve encountered cases where state expiration is inconsistent > > > > between > > > > > > > upstream and downstream SQL operators. With event-time based > TTL, > > > > > > operators > > > > > > > could share a synchronized notion of time, allowing them to > > expire > > > > > state > > > > > > in > > > > > > > a more coordinated way. > > > > > > > > > > > > > > > > > > > > > Looking forward to your reply! P.S. I'm on vacation for the > next > > > few > > > > > > days, > > > > > > > so I'll follow up later :) . > > > > > > > > > > > > > > Best, > > > > > > > Zakelly > > > > > > > > > > > > > > On Fri, Aug 29, 2025 at 2:52 AM Roman Khachatryan < > > > ro...@apache.org> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Zakelly, > > > > > > > > > > > > > > > > Thanks for the feedback! > > > > > > > > > > > > > > > > > 1. Could you elaborate more about the ADAPTIVE mode? Is the > > > > switch > > > > > > > > between > > > > > > > > > VALUE and MAP performed under each stream key considering > > each > > > > list > > > > > > > size, > > > > > > > > > or is it performed for all keys if the average list size > > > reaches > > > > > the > > > > > > > > given > > > > > > > > > thresholds? > > > > > > > > > > > > > > > > The switch is performed under each stream key individually, > > when > > > > > > > > its specific list size > > > > > > > > reaches a threshold. > > > > > > > > > > > > > > > > > 2. Is it necessary to maintain pointers 'prevSeqNo' and > > > > 'nextSeqNo' > > > > > > to > > > > > > > > link > > > > > > > > > all the nodes? I assume there should be a traversal need > but > > I > > > > > don't > > > > > > > see > > > > > > > > > that in pseudo-code. > > > > > > > > > > > > > > > > Yes, those pointers are necessary, I couldn't find a way to > get > > > rid > > > > > of > > > > > > > > them: > > > > > > > > - prevSeqNo is used to emit penultimate element when > retracing > > > the > > > > > last > > > > > > > > one; > > > > > > > > - nextSeqNo is used to keep prevSeqNo correct when retracting > > an > > > > item > > > > > > > from > > > > > > > > the middle > > > > > > > > > > > > > > > > > And is `MapState.iterator` also feasible? > > > > > > > > Yes, in fact, the ADAPTIVE strategy uses an iterator to move > > the > > > > > > entries > > > > > > > > between MAP and VALUE. > > > > > > > > > > > > > > > > > 3. I see there are two `RowData` stored for one record, one > > is > > > in > > > > > > > > > `rowToSqn` and another is in `sqnToNode`'s node. I guess > the > > > > first > > > > > is > > > > > > > for > > > > > > > > > upsert-keys. Would it be optimized to single copy for a > > > > > > non-upsert-key > > > > > > > > > scenario? > > > > > > > > > > > > > > > > That's an interesting idea! I'll try to dig into it deeper > when > > > > > > > > open-sourcing or as a follow-up. > > > > > > > > > > > > > > > > > 4. For the TTL mechanism part, I would suggest an > 'event-time > > > > based > > > > > > > ttl', > > > > > > > > > which allows the user to specify insertion time for each > > > > > > insert/update > > > > > > > > > operation and a manually controllable `TtlTimeProvider` > > > (instead > > > > of > > > > > > > just > > > > > > > > > system time). This would be beneficial for many cases, > WDYT? > > > > > > > > > > > > > > > > I agree, I think that event-time based TTL is more useful in > > > > general > > > > > > > > (I specified processing time as a default to make it less > > > > surprising > > > > > > for > > > > > > > > the users). > > > > > > > > > > > > > > > > I don't immediately see the potential usages of a manually > > > > > controllable > > > > > > > > TtlTimeProvider - do you have any use cases in mind? > > > > > > > > > > > > > > > > > 5. Does the current RocksDB benchmark involve significant > > state > > > > > size > > > > > > > and > > > > > > > > > I/O pressure? > > > > > > > > > > > > > > > > No, in the micro-benchmark the state wasn't too big (in order > > of > > > > > > > > megabytes); > > > > > > > > It was bottlenecked by RocksDB put/get operations, however. > > > > > > > > I also performed a benchmark on a cluster with a larger state > > > size > > > > > > > > (in order of gigabytes) and got similar results. > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > Roman > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 28, 2025 at 11:38 AM Zakelly Lan < > > > > zakelly....@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Roman, > > > > > > > > > > > > > > > > > > Thanks for the proposal! The SinkUpsertMaterializer > sometimes > > > > > > becomes a > > > > > > > > > bottleneck in our production, so I'd +1 to optimize it. I > > have > > > > > > several > > > > > > > > > questions regarding your design: > > > > > > > > > > > > > > > > > > 1. Could you elaborate more about the ADAPTIVE mode? Is the > > > > switch > > > > > > > > between > > > > > > > > > VALUE and MAP performed under each stream key considering > > each > > > > list > > > > > > > size, > > > > > > > > > or is it performed for all keys if the average list size > > > reaches > > > > > the > > > > > > > > given > > > > > > > > > thresholds? > > > > > > > > > 2. Is it necessary to maintain pointers 'prevSeqNo' and > > > > 'nextSeqNo' > > > > > > to > > > > > > > > link > > > > > > > > > all the nodes? I assume there should be a traversal need > but > > I > > > > > don't > > > > > > > see > > > > > > > > > that in pseudo-code. And is `MapState.iterator` also > > feasible? > > > > > > > > > 3. I see there are two `RowData` stored for one record, one > > is > > > in > > > > > > > > > `rowToSqn` and another is in `sqnToNode`'s node. I guess > the > > > > first > > > > > is > > > > > > > for > > > > > > > > > upsert-keys. Would it be optimized to single copy for a > > > > > > non-upsert-key > > > > > > > > > scenario? > > > > > > > > > 4. For the TTL mechanism part, I would suggest an > 'event-time > > > > based > > > > > > > ttl', > > > > > > > > > which allows the user to specify insertion time for each > > > > > > insert/update > > > > > > > > > operation and a manually controllable `TtlTimeProvider` > > > (instead > > > > of > > > > > > > just > > > > > > > > > system time). This would be beneficial for many cases, > WDYT? > > > > > > > > > 5. Does the current RocksDB benchmark involve significant > > state > > > > > size > > > > > > > and > > > > > > > > > I/O pressure? > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Zakelly > > > > > > > > > > > > > > > > > > On Thu, Aug 28, 2025 at 7:11 AM Roman Khachatryan < > > > > > ro...@apache.org> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > I would like to start a discussion about FLIP-544 > > > > > > > > SinkUpsertMaterializer > > > > > > > > > V2 > > > > > > > > > > [1]. > > > > > > > > > > > > > > > > > > > > SinkUpsertMaterializer is an operator in Flink that > > > reconciles > > > > > out > > > > > > of > > > > > > > > > order > > > > > > > > > > changelog events before sending them to an upsert sink. > In > > > some > > > > > > cases > > > > > > > > > (that > > > > > > > > > > we see in our production), performance of this operator > > > > degrades > > > > > > > > > > exponentially, depending on the input data. > > > > > > > > > > This FLIP proposes a new implementation that is optimized > > for > > > > > such > > > > > > > > cases > > > > > > > > > > and serves as a synchronization point for other efforts > in > > > that > > > > > > area. > > > > > > > > > > > > > > > > > > > > Looking forward to feedback. > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-544%3A+SinkUpsertMaterializer+V2 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > Roman > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >