Hi, No concerns at all, just a clarifying question from my side: for detecting out-of-order records, I need both new and old timestamp, I suppose I get it for the new record via timestamp extractor, can I not get it the same way from the old record that is passed down to the aggregation after KIP-904?
Thanks, Lucas On Tue, Apr 11, 2023 at 5:35 AM Matthias J. Sax <mj...@apache.org> wrote: > > Thanks. > > One question: for the repartition topic format change, do we want to > re-use flag=2, or should we introduce flag=3, and determine when > compiling the DSL into the Topology if we want/need to include the > timestamp, and if not, use format version=2 to avoid unnecessary overhead? > > > -Matthias > > On 4/10/23 5:47 PM, Victoria Xia wrote: > > Hi everyone, > > > > While wrapping up the implementation for KIP-914, I have discovered that > > two more DSL processors require semantic updates in the presence of > > versioned tables: > > > > - The table filter processor has an optimization to drop nulls if the > > previous filtered value is also null. When the upstream table is > > versioned, > > this optimization should be disabled in order to preserve proper version > > history in the presence of out-of-order data. > > - When performing an aggregation over a versioned table, only the latest > > value by timestamp (per key) should be included in the final aggregate > > value. This is not happening today in the presence of out-of-order data, > > due to the way that TableSourceNodes call `get(key)` in order to > > determine > > the "old value" which is to be removed from the aggregate as part of > > applying an update. To fix this, aggregations should ignore out-of-order > > records when aggregating versioned tables. > > - In order to implement this change, table aggregate processors need > > a way to determine whether a record is out-of-order or not. This > > cannot be > > done by querying the source table value getter as that store belongs > > to a > > different subtopology (because a repartition occurs before > > aggregation). As > > such, an additional timestamp must be included in the repartition > > topic. > > The 3.5 release already includes an update to the repartition > > topic format > > (with upgrade implications properly handled) via KIP-904 > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-904%3A+Kafka+Streams+-+Guarantee+subtractor+is+called+before+adder+if+key+has+not+changed>, > > so making an additional change to the repartition topic format to > > add a > > timestamp comes at no additional cost to users. > > > > > > I have updated the KIP > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+DSL+Processor+Semantics+for+Versioned+Stores> > > itself with more detail about each of these changes. Please let me know if > > there are any concerns. In the absence of dissent, I'd like to include > > these changes along with the rest of KIP-914 in the 3.5 release. > > > > Apologies for not noticing these additional semantics implications earlier, > > Victoria > > > > ---------- Forwarded message --------- > > From: Victoria Xia <victoria....@confluent.io> > > Date: Wed, Mar 22, 2023 at 10:08 AM > > Subject: Re: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores > > To: <dev@kafka.apache.org> > > > > > > Thanks for voting, everyone! We have three binding yes votes with no > > objections during four full days of voting. I will close the vote and mark > > the KIP as accepted, right in time for the 3.5 release. > > > > Thanks, > > Victoria > > > > On Wed, Mar 22, 2023 at 7:11 AM Bruno Cadonna <cado...@apache.org> wrote: > > > >> +1 (binding) > >> > >> Thanks Victoria! > >> > >> Best, > >> Bruno > >> > >> On 20.03.23 17:13, Matthias J. Sax wrote: > >>> +1 (binding) > >>> > >>> On 3/20/23 9:05 AM, Guozhang Wang wrote: > >>>> +1, thank you Victoria! > >>>> > >>>> On Sat, Mar 18, 2023 at 8:27 AM Victoria Xia > >>>> <victoria....@confluent.io.invalid> wrote: > >>>>> > >>>>> Hi all, > >>>>> > >>>>> I'd like to start a vote on KIP-914 for updating the Kafka Streams join > >>>>> processors to use proper timestamp-based semantics in applications with > >>>>> versioned stores: > >>>>> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores > >>>>> > >>>>> To avoid compatibility concerns, I'd like to include the changes from > >>>>> this > >>>>> KIP together with KIP-889 > >>>>> < > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores > >>> > >>>>> (for introducing versioned stores) in the upcoming 3.5 release. I will > >>>>> close the vote on the 3.5 KIP deadline, March 22, if there are no > >>>>> objections before then. > >>>>> > >>>>> Thanks, > >>>>> Victoria > >> > >